Skip to content

Latest commit



325 lines (273 loc) · 13.8 KB

File metadata and controls

325 lines (273 loc) · 13.8 KB


The power of streams, without the overhead.
Extremely small and simple reactive programming library.

npm travis build dependencies codecov coverage semantic-release Commitizen friendly

✨ Features:

  • Extremely small.
  • Familiar interface (mostly replicating RxJS's API)
  • Modular. Add as little or as much functionality as you want
  • Lazy streams (only active once subscribed to)
  • Only "hot" streams
  • Pipeable operators support
  • Fantasy Observable compliant
  • More than 50 operators and factories available

🔧 Installation

Assuming you use npm as your package manager:

npm install --save stream-lite

This package includes both a CommonJS and ES2015 based modules.
You can use them from Node environment or if you are building for the browser you can use a module bundler like Webpack, Browserify, or Rollup.
If you want to experiment and play around with Amnis without a module bundler or you don't use one - that's OK. The stream-lite npm package includes precompiled production and development UMD builds in the umd folder. You can just drop a UMD build as a <script> tag on a page. The UMD builds make stream-lite available as a global variable and include all the functionality.

<script type="application/javascript" src=""></script>

📦🔨 Import and Usage

The following guide assumes you use ES2015+ but you don't have to.
There are a few different ways of importing the functionality that you need.

To import the entire set of functionality:

import Stream from 'stream-lite'
import 'stream/add/all'

  .map(x => x * 2)
  .subscribe(x => console.log(x))

However the stream-lite module encourages shipping only the scripts that you will actually use.
One way of doing that is to add to Stream's prototype only the methods you actually need:

import Stream from 'stream-lite'
import 'stream-lite/add/statics/of'
import 'stream-lite/add/operators/map'

  .map(x => x * 2)
  .subscribe(x => console.log(x))

To not add static methods to Stream, you can import them as pure functions:

import {of} from 'stream-lite/statics'
import 'stream-lite/add/operators/map'

  .map(x => x * 2)
  .subscribe(x => console.log(x))

To take this one step further and avoid patching the Stream's prototype altogether, you can use pipeable operators.
If you are not familiar with pipeable operators you can learn about them here.

import {of} from 'stream-lite/statics'
import {map, filter} from 'stream-lite/operators'

  filter(x => x % 2 === 0),
  map(x => x * 2)
  .subscribe(x => console.log(x))

The stream-lite core also provides a pipeable subscribe for your convenience.
So you can bring the subscribe method inside your pipe and write the code above like so:

import {subscribe} from 'stream-lite'
import {of} from 'stream-lite/statics'
import {map, filter} from 'stream-lite/operators'

  filter(x => x % 2 === 0),
  map(x => x * 2),
  subscribe(x => console.log(x))

Or if you use the proposed JavaScript pipe operator:

import {subscribe} from 'stream-lite'
import {of} from 'stream-lite/statics'
import {map, filter} from 'stream-lite/operators'

  |> filter(x => x % 2 === 0)
  |> map(x => x * 2)
  |> subscribe(x => console.log(x))

Please note: This additional syntax requires transpiler support.

🎉 Size

The stream-lite package is built to bring as little overhead to your project as possible.


The core of the library includes the create function and a few prototype methods, like subscribe and pipe.
This core is ~1KB gzipped.


A common usage will probably include around 15 most common methods and operators, which should bring about 1.8KB to your app if you use tree-shaking. 😍


If for some reason you feel the need to import all available operators and factories, that option is also available.
That includes more than 50 operators and factories, and will make your app heavier by about 3.5KB gzipped.


The vast majority of factories and operators are too similar to the API of RxJS, so most links will point you to RxJS documentation.
However there are some that don't exist in RxJS or ones with a different API. Those are marked with an astrix (*) and their documentation you will find below.
Operators marked with 🚩 are also available as statics.


Methods and Operators


You can call it in two different ways:
Either passing three callbacks in this following order: next, error, complete.

import {of} from 'stream-lite/statics'

of(1, 2, 3).subscribe(
  x => console.log(x),
  err => console.error("There's an error!", err),
  () => console.log("We're done")

Or passing a subscription object with the next, error, complete functions as keys.

import {of} from 'stream-lite/statics'

of(1, 2, 3).subscribe({
  next: x => console.log(x),
  error: err => console.error("There's an error!", err),
  complete: () => console.log("We're done")

You can also use a pipeable version of subscribe:

import {subscribe} from 'stream-lite'
import {of} from 'stream-lite/statics'
import {map} from 'stream-lite/operators'

  map(x => x * 2),
  subscribe(x => console.log(x))


This is the only thing that is included in the core object exported from stream-lite. Most use-cases for creating a stream involve calling other factory functions, like fromEvent or fromPromise, etc.
Those are all abstractions on top of the create factory. Usually you want to use those. However, sometimes you may need more control and the way you achieve that in stream-lite is different from RxJS.

Creating a stream with a producer

A Producer is a simple JavaScript object with start and stop functions. start function will be called when the first subscriber subscribes to it. stop function will be called when the last subscriber unsubscribes or the stream completes or it errors. Here is an example of a producer that is used inside the interval factory (except with step parameter hard-coded):

const producer = {
  counter: 0,
  id: 0,
  start: function(consumer) { = setInterval(() =>, 1000)
  stop: function() {

Armed with that producer we can now easily create a new stream:

import {create} from 'stream-lite'

const myIntervalStream = create(producer) 

When subscribed to it will start emitting values every second.

Creating a stream without a producer

Sometimes you just want to create an empty stream and manually push values into it. You can achieve this functionality by calling create with no parameters:

import {create} from 'stream-lite'

const manuallyControlledStream = create() 

manuallyControlledStream.subscribe(x => console.log(x))
// logs 1, 2

This is sort of similar to how you would use RxJs's Subject.


Equivalent to calling RxJS's from with an array.


Equivalent to calling RxJS's from with an observable.


Same as RxJS's _throw but with a friendlier name.


Mostly equivalent to calling RxJS's scan except currently it requires an initial value parameter.


Alias: mergeMap.

Equivalent to RxJS's flatMap without the support for an optional 3rd parameter concurrent.


Simply an alias for combineLatest.


Allows to emit an extra parameter in addition to one emitted from source stream.

import {subscribe} from 'stream-lite'
import {of} from 'stream-lite/statics'
import {withValue} from 'stream-lite/operators'

  |> withValue(x => x / 2)
  |> subscribe(x => console.log(x)) // logs [4, 2]

🙏 License