-
Notifications
You must be signed in to change notification settings - Fork 0
/
splitStreamByGroups.js
36 lines (32 loc) · 1.26 KB
/
splitStreamByGroups.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import lookaheadGenerator from "./lookaheadGenerator.js";
async function* filterStreamByGroup(laStream) {
// console.log('filterStreamByGroup()');
for await (const record of laStream) {
// console.log(record);
yield record;
// console.log('instance of group: ' + record.groupId);
// process.stdout.write(record.groupId + laStream.next.groupId + '-');
if (laStream.lookAhead === undefined || laStream.lookAhead.groupId !== record.groupId) {
break;
}
}
}
export default async function* splitStreamByGroups(inputStream) {
// console.log('splitStreamByGroups()');
var laStream = await lookaheadGenerator(inputStream);
// console.log(laStream.hasNext);
// console.log(laStream.lookAhead);
while(laStream.hasNext) {
const currGroupId = laStream.lookAhead.groupId;
// console.log('beginning of group: ' + currGroupId);
yield {
groupId: currGroupId,
queryStream: filterStreamByGroup(laStream)
};
// console.log(laStream.hasNext);
// console.log(laStream.lookAhead);
while(laStream.hasNext && laStream.lookAhead.groupId === currGroupId) {
await new Promise(resolve => setImmediate(resolve));
}
}
}