-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmulticast.ts
65 lines (58 loc) · 1.59 KB
/
multicast.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import { Producer, StrictSink } from './types';
export function multicast<T>(source: Producer<T>): Producer<T> {
let sinks: Array<StrictSink<T> | undefined> = [];
let talkback: any;
let lasts: T[] = [];
let started = false;
let timerActive = false;
const mkTalkback = (sink: StrictSink<T>) => () => {
sinks[sinks.indexOf(sink)] = void 0;
// Allow others to subscribe in the same iteration of the JS event loop
queueMicrotask(() => {
if (sinks.every(x => x === undefined)) {
sinks = [];
talkback(2);
}
});
};
return (_, sink) => {
sinks.push(sink);
if (!started) {
source(0, (t, d) => {
if (t === 0) {
started = true;
talkback = d;
for (let i = 0; i < sinks.length; i++) {
const sink = sinks[i]!;
sink(0, mkTalkback(sink));
}
} else {
if (t === 1) {
lasts.push(d);
if (!timerActive) {
timerActive = true;
Promise.resolve().then(() => {
lasts = [];
timerActive = false;
});
}
}
let hasDeleted = false;
for (let i = 0; i < sinks.length; i++) {
const sink = sinks[i];
if (sink) sink(t, d);
else hasDeleted = true;
}
if (hasDeleted) {
sinks = sinks.filter(x => x !== undefined);
}
}
});
} else {
sink(0, mkTalkback(sink));
for (let i = 0; i < lasts.length; i++) {
sink(1, lasts[i]);
}
}
};
}