A port of pull-stream to TypeScript for use with Deno.
Print a series of numbers to the console:
import { map } from "https://raw.githubusercontent.com/crabmusket/deno_pull_streams/v0.2/lib/map.ts"
import { values } from "https://raw.githubusercontent.com/crabmusket/deno_pull_streams/v0.2/lib/values.ts"
import { each } from "https://raw.githubusercontent.com/crabmusket/deno_pull_streams/v0.2/lib/each.ts"
let numbers = values([1, 2, 3]);
let increment = map(x => x + 1);
let log = each(x => console.log(x))
log(increment(numbers));
Print a series of random numbers using infinite streams!
import { Source } from "https://raw.githubusercontent.com/crabmusket/deno_pull_streams/v0.2/lib/types.ts"
import { take } from "https://raw.githubusercontent.com/crabmusket/deno_pull_streams/v0.2/lib/take.ts"
import { each } from "https://raw.githubusercontent.com/crabmusket/deno_pull_streams/v0.2/lib/each.ts"
function randoms(): Source<number> {
return function(end, cont) {
if (end) {
cont(end, undefined);
} else {
cont(null, Math.random());
}
};
}
let first5 = take(5);
let log = each(x => console.log(x))
log(first5(randoms));
See examples for more.
- types.ts describes the core abstractions of pull streams with generic types.
- values.ts creates a Source from an array of values.
- map.ts transforms each element in a stream, either keeping their type or returning an entirely new type.
- take.ts takes a limited number of elements from a stream, converting an infinite stream into a finite one.
- each.ts simply calls a callback on each element of the stream it encounters. If the callback returns a Promise,
each
will wait for the promise to resolve before continuing. - reduce.ts reduces all values of a stream into a single value, returning a Promise so you can
await
on the result. - iterate.ts converts a stream into an async iterator for use with
for await
syntax. See examples/countdown.ts.
See lib/types.ts for a readable introduction to the core abstractions of pull streams. Then take a look at examples/basics.ts for a walk through creating your first pull stream pipeline.
From the blog of Dominic Tarr, creator of pull-stream:
node.js streams have the concept of a “writable” stream. A writable is a passive object that accepts data, like a couch-potato watching the TV. They must actively use the TV remote to send an explicit signal to the TV when they want to stop or slow down. A pull-stream sink is a reader. it is like a book-worm reading a book - they are active and can read faster or slower. They don’t need to send an explicit signal to slow down, they just act slower.
I got into pull streams when I needed to stream large JSON files from disk into a database, but existing libraries couldn't usefully handle back-pressure (e.g. stop loading data from disk while rows were inserted into the database).
Pull streams are elegant and fun!
The most important feature of pull streams is that they are pulled: the sink, the reader, controls the speed of the flow. Sometimes that doesn't fit your use-case, and it may be simpler to just map over an ordinary stream.
If the consumer of data is expected to be faster than the producer (e.g. streaming from the network to a file on disk), pull streams' advantage is lessened. That said, there are few disadvantages to using pull streams even in those cases - so if you're comfortable with them, go right ahead!
- To add quality type declarations throughout, and tweak the APIs to be more friendly to statically-typed usage and type inference.
- To make APIs that use Promises and async iteration. Internally, the structure of pull streams still works on callbacks and recursion, but sinks like reduce now use Promises to play nicely with
await
. - To make it compatible with ESM imports/Deno, including providing ready-to-go stream helpers that work with Deno's API and standard library.
- To learn more about how pull streams work by implementing them.
A key feature of the pull-stream
library is its pull
helper function which can combine pull streams in an ergonomic fashion.
Instead of writing
sink(through2(through1(source())))
the pull
helper lets you write this:
pull(
source(),
through1,
through2,
sink
)
It can even combine streams in a smart way, detecting their types by the number of arguments they take:
let through = pull(through1, through2);
pull(
source(),
through,
sink
)
I have decided not to try to port this function. I may create some kind of fluent 'builder' helper for those who really want a 'forwards' API, but I think it's fairly straightforward, and better for static typing, to stick with the more 'functional' interface. Here's an example of composing throughs:
let through1: Through<number> = filter(n => n > 10);
let through2 = take(5);
// pass the `s` argument explicitly:
let through = s => through2(through1(s));
sink(through(source()));
- Implement more of the
pull-stream
library abstractions. - Implement Deno specific library streams (e.g. a file reading stream, UTF-8 chunking, what else?).
- Tests!!
- Sort into folders like the pull-stream repo?
- Any API changes?
- Decide on whether sinks should use Promise internally, or loop like drain
- Should creating
Sink
s be encouraged (reduce(cb, 0)(source)
), or should they be "inlined" to match the functionalish API (reduce(cb, 0, source)
)? The former is better for reusing components, the latter is better for writing pipelines as expressions.