Skip to content

Commit

Permalink
Fix switchMap and concatMap not forwarding Push before Start (#69)
Browse files Browse the repository at this point in the history
* Fix switchMap and concatMap not forwarding Push before Start

This is technically incorrect, but a lot of asynchronous
sources will Push eagerly, which means Start doesn't
always get to talk first.

* Readd state.innerActive guard to switchMap
  • Loading branch information
kitten authored Jan 13, 2020
1 parent ee50165 commit a66720c
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions src/Wonka_operators.re
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
ended: false,
};

let rec applyInnerSource = innerSource =>
let rec applyInnerSource = innerSource => {
state.innerActive = true;
innerSource((. signal) =>
switch (signal) {
| Start(tb) =>
state.innerActive = true;
state.innerTalkback = tb;
state.innerPulled = false;
tb(. Pull);
Expand All @@ -231,6 +231,7 @@ let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
| End => ()
}
);
};

source((. signal) =>
switch (signal) {
Expand Down Expand Up @@ -822,33 +823,34 @@ let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
ended: false,
};

let applyInnerSource = innerSource =>
let applyInnerSource = innerSource => {
state.innerActive = true;
innerSource((. signal) =>
switch (signal) {
| Start(tb) =>
state.innerActive = true;
state.innerTalkback = tb;
state.innerPulled = false;
tb(. Pull);
| Push(_) when state.innerActive =>
sink(. signal);
if (!state.innerPulled) {
state.innerTalkback(. Pull);
} else {
if (state.innerActive) {
switch (signal) {
| Start(tb) =>
state.innerTalkback = tb;
state.innerPulled = false;
};
| Push(_) => ()
| End when state.innerActive =>
state.innerActive = false;
if (state.ended) {
tb(. Pull);
| Push(_) =>
sink(. signal);
} else if (!state.outerPulled) {
state.outerPulled = true;
state.outerTalkback(. Pull);
if (!state.innerPulled) {
state.innerTalkback(. Pull);
} else {
state.innerPulled = false;
};
| End =>
state.innerActive = false;
if (state.ended) {
sink(. signal);
} else if (!state.outerPulled) {
state.outerPulled = true;
state.outerTalkback(. Pull);
};
};
| End => ()
}
);
};

source((. signal) =>
switch (signal) {
Expand Down

0 comments on commit a66720c

Please sign in to comment.