Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
sorted the generator from array in the middle of a stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ildella committed Nov 26, 2023
1 parent 4285811 commit 617929d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
19 changes: 19 additions & 0 deletions fusto/exstream-generators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const __ = require('exstream.js')

const arrayMapping = ({
// zeroBased = true,
prefix = '',
items,
}) => (push, next) => {
// console.log({items})
items.map((item, index) => {
// console.log({prefix, item, index})
// const index = zeroBased ? i : i + 1
push({item: `${prefix}${item}`, index})
next()
})
// console.log('DONE.')
push(__.nil)
}

module.exports = {arrayMapping}
2 changes: 2 additions & 0 deletions fusto/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ const extreamErrorToJson = require('./extream-error-to-json')
const websocketStreamSource = require('./websocket-stream-source')
const webReadableStream = require('./web-readable-stream')
const streams = require('./streams')
const generators = require('./exstream-generators')

module.exports = {
...exstreamExtras,
extreamErrorToJson,
generators,
streams,
webReadableStream,
websocketStreamSource,
Expand Down
33 changes: 9 additions & 24 deletions tests/fusto/generators.test.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
const {__, nil} = require('../../fusto')
const {__, generators: {arrayMapping}} = require('../../fusto')

const items = ['a', 'b']

const generator = ({
// zeroBased = true,
prefix = '',
} = {}) => (push, next) => {
// console.log({items})
items.map((item, index) => {
// console.log({prefix, item, index})
// const index = zeroBased ? i : i + 1
push({item: `${prefix}${item}`, index})
next()
})
// console.log('DONE.')
push(nil)
}
const generator = arrayMapping

test('index generator', async () => {
const stream = __(generator())
const stream = __(generator({items}))
const values = await stream.toPromise()
expect(values).toEqual([
{item: 'a', index: 0},
Expand All @@ -27,7 +13,7 @@ test('index generator', async () => {
})

test('prefix and zeroBased', async () => {
const stream = __(generator({prefix: 'doc:'}))
const stream = __(generator({items, prefix: 'doc:'}))
const values = await stream.toPromise()
expect(values).toEqual([
{item: 'doc:a', index: 0},
Expand All @@ -38,7 +24,7 @@ test('prefix and zeroBased', async () => {
test('index generator in the middle of the stream', async () => {
const stream = __([1, 2, 3])
const values = await stream
.through(__(generator()))
.through(__(generator({items})))
.toPromise()
expect(values).toEqual([
{item: 'a', index: 0},
Expand All @@ -49,16 +35,15 @@ test('index generator in the middle of the stream', async () => {
test('should generate 6 elements', async () => {
const stream = __([1, 2, 3])
const values = await stream
.map(index => __(generator({prefix: index})))
.merge()
.map(index => __(generator({items, prefix: index})))
.merge(1, true) // true is required only when we have parallelism
.toPromise()
// console.log(values)
expect(values).toEqual([
{item: '1a', index: 0},
{item: '2a', index: 0},
{item: '3a', index: 0},
{item: '1b', index: 1},
{item: '2a', index: 0},
{item: '2b', index: 1},
{item: '3a', index: 0},
{item: '3b', index: 1},
])
})

0 comments on commit 617929d

Please sign in to comment.