-
Notifications
You must be signed in to change notification settings - Fork 5
/
merge.ts
38 lines (34 loc) · 1016 Bytes
/
merge.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { Producer, Talkback } from './types';
export function merge<T>(...sources: Producer<T>[]): Producer<T> {
return (_, sink) => {
const n = sources.length;
let ended = false;
let sourceTalkbacks: Array<Talkback | undefined> = new Array(n);
let startCount = 0;
let endCount = 0;
const talkback: Talkback = (_, d) => {
ended = true;
for (let i = 0; i < n; i++) {
sourceTalkbacks[i]?.(2, d);
}
};
for (let i = 0; i < sources.length; i++) {
if (ended) return;
sources[i](0, (t, d) => {
if (t === 0) {
sourceTalkbacks[i] = d;
if (++startCount === 1) sink(0, talkback);
} else if (t === 2 && d) {
ended = true;
for (let j = 0; j < n; j++) {
if (j !== i) sourceTalkbacks[j]?.(2);
}
sink(2, d);
} else if (t === 2) {
sourceTalkbacks[i] = void 0;
if (++endCount === n) sink(2);
} else sink(t, d);
});
}
};
}