/ live.thehmm.nl / back / node_modules / stream-chain /

[ICO]NameLast modifiedSizeDescription
[PARENTDIR]Parent Directory  -  
[DIR]utils/2 years ago -  
[   ]LICENSE40 years ago1.4K 
[TXT]README.md40 years ago 14Kd7c1522 post receive test [كارل مبارك]
[   ]defs.js40 years ago735  
[   ]index.js40 years ago6.1K 
[   ]package.json2 years ago1.6K 
README.md

stream-chain NPM version

stream-chain creates a chain of streams out of regular functions, asynchronous functions, generator functions, and existing streams, while properly handling backpressure. The resulting chain is represented as a Duplex stream, which can be combined with other streams the usual way. It eliminates a boilerplate helping to concentrate on functionality without losing the performance especially make it easy to build object mode data processing pipelines.

Originally stream-chain was used internally with stream-fork and stream-json to create flexible data processing pipelines.

stream-chain is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.

Intro

const Chain = require('stream-chain');

const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');

// the chain will work on a stream of number objects
const chain = new Chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i > 0; --i) {
      yield i;
    }
    return 0;
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, x.toString());
    }
  }),
  // compress
  zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));

Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.

While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creating a pipeline. stream-chain eliminates most of it.

Installation

npm i --save stream-chain
# or: yarn add stream-chain

Documentation

Chain, which is returned by require('stream-chain'), is based on Duplex. It chains its dependents in a single pipeline optionally binding error events.

Many details about this package can be discovered by looking at test files located in tests/ and in the source code (index.js).

Constructor: new Chain(fns[, options])

The constructor accepts the following arguments:

An instance can be used to attach handlers for stream events.

const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain);

Properties

Following public properties are available:

Generally, a Chain instance should be used to represent a chain:

const chain = new Chain([
  x => x * x,
  x => [x - 1, x, x + 1],
  new Transform({
    writableObjectMode: true,
    transform(chunk, _, callback) {
      callback(null, chunk.toString());
    }
  })
]);
dataSource
  .pipe(chain);
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));

But in some cases input and output provide a better control over how a data processing pipeline should be organized:

chain.output
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);

Please select what style you want to use, and never mix them together with the same object.

Static methods

Following static methods are available:

Release History

Apache/2.4.38 (Debian) Server at www.karls.computer Port 80