[ICO]NameLast modifiedSizeDescription
[PARENTDIR]Parent Directory  -  
[DIR]utils/2023-06-15 18:18 -  
[TXT]README.md1985-10-26 08:15 14Kd7c1522 post receive test [كارل مبارك]
[   ]index.js1985-10-26 08:15 6.1K 
[   ]package.json2023-06-15 18:21 1.6K 
[   ]LICENSE1985-10-26 08:15 1.4K 
[   ]defs.js1985-10-26 08:15 735  
# stream-chain [![NPM version][npm-img]][npm-url]

[npm-img]:      https://img.shields.io/npm/v/stream-chain.svg
[npm-url]:      https://npmjs.org/package/stream-chain

`stream-chain` creates a chain of streams out of regular functions, asynchronous functions, generator functions, and existing streams, while properly handling [backpressure](https://nodejs.org/en/docs/guides/backpressuring-in-streams/). The resulting chain is represented as a [Duplex](https://nodejs.org/api/stream.html#stream_class_stream_duplex) stream, which can be combined with other streams the usual way. It eliminates a boilerplate helping to concentrate on functionality without losing the performance especially make it easy to build object mode data processing pipelines.

Originally `stream-chain` was used internally with [stream-fork](https://www.npmjs.com/package/stream-fork) and [stream-json](https://www.npmjs.com/package/stream-json) to create flexible data processing pipelines.

`stream-chain` is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.

## Intro

```js
const Chain = require('stream-chain');

const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');

// the chain will work on a stream of number objects
const chain = new Chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i > 0; --i) {
      yield i;
    }
    return 0;
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, x.toString());
    }
  }),
  // compress
  zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));
```

Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like [backpressure](https://nodejs.org/en/docs/guides/backpressuring-in-streams/), which requires algorithms implemented by [streams](https://nodejs.org/api/stream.html).

While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creating a pipeline. `stream-chain` eliminates most of it.

## Installation

```bash
npm i --save stream-chain
# or: yarn add stream-chain
```

## Documentation

`Chain`, which is returned by `require('stream-chain')`, is based on [Duplex](https://nodejs.org/api/stream.html#stream_class_stream_duplex). It chains its dependents in a single pipeline optionally binding `error` events.

Many details about this package can be discovered by looking at test files located in `tests/` and in the source code (`index.js`).

### Constructor: `new Chain(fns[, options])`

The constructor accepts the following arguments:

* `fns` is an array of functions arrays or stream instances.
  * If a value is a function, a [Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform) stream is created, which calls this function with two parameters: `chunk` (an object), and an optional `encoding`. See [Node's documentation](https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback) for more details on those parameters. The function will be called in the context of the created stream.
    * If it is a regular function, it can return:
      * Regular value:
        * *(deprecated since 2.1.0)* Array of values to pass several or zero values to the next stream as they are.
          ```js
          // produces no values:
          x => []
          // produces two values:
          x => [x, x + 1]
          // produces one array value:
          x => [[x, x + 1]]
          ```
        * Single value.
          * If it is `undefined` or `null`, no value shall be passed.
          * Otherwise, the value will be passed to the next stream.
          ```js
          // produces no values:
          x => null
          x => undefined
          // produces one value:
          x => x
          ```
      * Special value:
        * If it is an instance of [Promise](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) or "thenable" (an object with a method called `then()`), it will be waited for. Its result should be a regular value.
          ```js
          // delays by 0.5s:
          x => new Promise(
            resolve => setTimeout(() => resolve(x), 500))
          ```
        * If it is an instance of a generator or "nextable" (an object with a method called `next()`), it will be iterated according to the [generator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Generator) protocol. The results should be regular values.
          ```js
          // produces multiple values:
          class Nextable {
            constructor(x) {
              this.x = x;
              this.i = -1;
            }
            next() {
              return {
                done:  this.i <= 1,
                value: this.x + this.i++
              };
            }
          }
          x => new Nextable(x)
          ```
          `next()` can return a `Promise` according to the [asynchronous generator](https://zaiste.net/nodejs_10_asynchronous_iteration_async_generators/) protocol.
      * Any thrown exception will be caught and passed to a callback function effectively generating an error event.
        ```js
        // fails
        x => { throw new Error('Bad!'); }
        ```
  * If it is an asynchronous function, it can return a regular value.
    * In essence, it is covered under "special values" as a function that returns a promise.
    ```js
    // delays by 0.5s:
    async x => {
      await new Promise(resolve => setTimeout(() => resolve(), 500));
      return x;
    }
    ```
  * If it is a generator function, each yield should produce a regular value.
    * In essence, it is covered under "special values" as a function that returns a generator object.
    ```js
    // produces multiple values:
    function* (x) {
      for (let i = -1; i <= 1; ++i) {
        if (i) yield x + i;
      }
      return x;
    }
    ```
  * *(since 2.2.0)* If it is an asynchronous generator function, each yield should produce a regular value.
    * In essence, it is covered under "special values" as a function that returns a generator object.
    ```js
    // produces multiple values:
    async function* (x) {
      for (let i = -1; i <= 1; ++i) {
        if (i) {
          await new Promise(resolve => setTimeout(() => resolve(), 50));
          yield x + i;
        }
      }
      return x;
    }
    ```
  * *(since 2.1.0)* If a value is an array, it is assumed to be an array of regular functions.
    Their values are passed in a chain. All values (including `null`, `undefined`, and arrays) are allowed
    and passed without modifications. The last value is a subject to processing defined above for regular functions.
    * Empty arrays are ignored.
    * If any function returns a value produced by `Chain.final(value)` (see below), it terminates the chain using
      `value` as the final value of the chain.
    * This feature bypasses streams. It is implemented for performance reasons.
  * If a value is a valid stream, it is included as is in the pipeline.
    * [Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform).
    * [Duplex](https://nodejs.org/api/stream.html#stream_class_stream_duplex).
    * The very first stream can be [Readable](https://nodejs.org/api/stream.html#stream_class_stream_readable).
      * In this case a `Chain` instance ignores all possible writes to the front, and ends when the first stream ends.
    * The very last stream can be [Writable](https://nodejs.org/api/stream.html#stream_class_stream_writable).
      * In this case a `Chain` instance does not produce any output, and finishes when the last stream finishes.
      * Because `'data'` event is not used in this case, the instance resumes itself automatically. Read about it in Node's documentation:
        * [Two modes](https://nodejs.org/api/stream.html#stream_two_modes).
        * [readable.resume()](https://nodejs.org/api/stream.html#stream_readable_resume).
* `options` is an optional object detailed in the [Node's documentation](https://nodejs.org/api/stream.html#stream_new_stream_duplex_options).
  * If `options` is not specified, or falsy, it is assumed to be:
    ```js
    {writableObjectMode: true, readableObjectMode: true}
    ```
  * Always make sure that `writableObjectMode` is the same as the corresponding object mode of the first stream, and `readableObjectMode` is the same as the corresponding object mode of the last stream.
    * Eventually both these modes can be deduced, but Node does not define the standard way to determine it, so currently it cannot be done reliably.
  * Additionally the following custom properties are recognized:
    * `skipEvents` is an optional flag. If it is falsy (the default), `'error'` events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.

An instance can be used to attach handlers for stream events.

```js
const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain);
```

### Properties

Following public properties are available:

* `streams` is an array of streams created by a constructor. Its values either [Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform) streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are piped sequentially starting from the beginning.
* `input` is the beginning of the pipeline. Effectively it is the first item of `streams`.
* `output` is the end of the pipeline. Effectively it is the last item of `streams`.

Generally, a `Chain` instance should be used to represent a chain:

```js
const chain = new Chain([
  x => x * x,
  x => [x - 1, x, x + 1],
  new Transform({
    writableObjectMode: true,
    transform(chunk, _, callback) {
      callback(null, chunk.toString());
    }
  })
]);
dataSource
  .pipe(chain);
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
```

But in some cases `input` and `output` provide a better control over how a data processing pipeline should be organized:

```js
chain.output
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);
```

Please select what style you want to use, and never mix them together with the same object.

### Static methods

Following static methods are available:

* `chain(fns[, options)` is a helper factory function, which has the same arguments as the constructor and returns a `Chain` instance.
  ```js
  const {chain} = require('stream-chain');

  // simple
  dataSource
    .pipe(chain([x => x * x, x => [x - 1, x, x + 1]]));

  // all inclusive
  chain([
    dataSource,
    x => x * x,
    x => [x - 1, x, x + 1],
    zlib.createGzip(),
    fs.createWriteStream('output.txt.gz')
  ])
  ```
* *(since 2.1.0)* `final(value)` is a helper factory function, which can be used in by chained functions (see above the array of functions).
  It returns a special value, which terminates the chain and uses the passed value as the result of the chain.
  ```js
  const {chain, final} = require('stream-chain');

  // simple
  dataSource
    .pipe(chain([[x => x * x, x => 2 * x + 1]]));
  // faster than [x => x * x, x => 2 * x + 1]

  // final
  dataSource
    .pipe(chain([[
      x => x * x,
      x => final(x),
      x => 2 * x + 1
    ]]));
  // the same as [[x => x * x, x => x]]
  // the same as [[x => x * x]]
  // the same as [x => x * x]

  // final as a terminator
  dataSource
    .pipe(chain([[
      x => x * x,
      x => final(),
      x => 2 * x + 1
    ]]));
  // produces no values, because the final value is undefined,
  // which is interpreted as "no value shall be passed"
  // see the doc above

  // final() as a filter
  dataSource
    .pipe(chain([[
      x => x * x,
      x => x % 2 ? final() : x,
      x => 2 * x + 1
    ]]));
  // only even values are passed, odd values are ignored

  // if you want to be really performant...
  const none = final();
  dataSource
    .pipe(chain([[
      x => x * x,
      x => x % 2 ? none : x,
      x => 2 * x + 1
    ]]));
  ```
* *(since 2.1.0)* `many(array)` is a helper factory function, which is used to wrap arrays to be interpreted as multiple values returned from a function.
  At the moment it is redundant: you can use a simple array to indicate that, but a naked array is being deprecated and in future versions it will be passed as is.
  The thinking is that using `many()` is better indicates the intention. Additionally, in the future versions it will be used by array of functions (see above).
  ```js
  const {chain, many} = require('stream-chain');

  dataSource
    .pipe(chain([x => many([x, x + 1, x + 2])]));
  // currently the same as [x => [x, x + 1, x + 2]]
  ```

## Release History

- 2.2.5 *Relaxed the definition of a stream (thx [Rich Hodgkins](https://github.com/rhodgkins)).*
- 2.2.4 *Bugfix: wrong `const`-ness in the async generator branch (thx [Patrick Pang](https://github.com/patrickpang)).*
- 2.2.3 *Technical release. No need to upgrade.*
- 2.2.2 *Technical release. No need to upgrade.*
- 2.2.1 *Technical release: new symbols namespace, explicit license (thx [Keen Yee Liau](https://github.com/kyliau)), added Greenkeeper.*
- 2.2.0 *Added utilities: `take`, `takeWhile`, `skip`, `skipWhile`, `fold`, `scan`, `Reduce`, `comp`.*
- 2.1.0 *Added simple transducers, dropped Node 6.*
- 2.0.3 *Added TypeScript typings and the badge.*
- 2.0.2 *Workaround for Node 6: use `'finish'` event instead of `_final()`.*
- 2.0.1 *Improved documentation.*
- 2.0.0 *Upgraded to use Duplex instead of EventEmitter as the base.*
- 1.0.3 *Improved documentation.*
- 1.0.2 *Better README.*
- 1.0.1 *Fixed the README.*
- 1.0.0 *The initial release.*