README
pipeline-pipe
Parallel transform and some utilities for Node Object Stream lovers
Why
- Parallel transform accepting async function
- Fixes mafintosh/parallel-transform/issues/4 to work with
import { pipeline } from 'stream'
- TypeScript Definition with the pure TypeScript reimplementation
- Add tests
- Utility functions
- Blog post
Install
npm install pipeline-pipe
pipe(fn, opts)
Example usage:
// Example to scrape HTML and store titles of them in DB:
const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
// Request HTML asynchronously in 16 parallel
pipe(async postId => {
const json = await getPost(postId);
return json;
}, 16),
// Synchronous transformation as Array.prototype.map
pipe(json => parseHTML(json.postBody).document.title),
// Synchronous transformation as Array.prototype.filter
pipe(title => title.includes('important') ? title : null),
// Asynchronous in 4 parallel
pipe(async title => {
const result = await storeInDB(title), 4);
console.info(result);
}, 4)
(err) => console.info('All done!')
);
Types:
import { Transform, TransformOptions } from 'stream';
type ParallelTransformOpitons =
| number
| TransformOptions & { maxParallel?: number, ordered?: boolean };
export default function pipe(
fn: (data: any) => Promise<any> | any,
opts?: ParallelTransformOptions,
): Transform;
Option property | Default value | description |
---|---|---|
maxParallel |
10 |
Number of maximum parallel executions. |
ordered |
true |
Preserving order of streaming chunks. |
A number can be passed to opts
. pipe(fn, 20)
is same as pipe(fn, {maxParallel: 20})
.
Some utility functions
pipeline(stream, stream, ...)
A promisified version of require('stream').pipeline
. Equivalent to:
const { promisify } = require('util');
const { pipeline: _pipeline } = require('stream');
const pipeline = promisify(_pipeline);
Example:
const { pipeline, pipe } = require('pipeline-pipe');
await pipeline(
readable,
pipe(chunk => chunk.replace('a', 'z')),
pipe(chunk => storeInDB(chunk)),
);
console.log('All done!');
concat(size)
It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.
Example:
const { pipeline } = require('stream');
const { concat, pipe } = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3, 4, 5]),
concat(2),
pipe(console.log), // [ 1, 2 ]
// [ 3, 4 ]
// [ 5 ]
(err) => console.info('All done!'),
);
split()
Creates a Transform
to split incoming Array
chunk into pieces to subsequent streams.
const { pipeline } = require('stream');
const { split, pipe } = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
pipe(page => getPostsByPage(page)),
pipe(json => json.posts), // Returns an array of posts
split(), // Splits the array into each posts
pipe(post => storeInDB(post.title)), // Now the argument is a post
(err) => console.info('All done!')
);
License
MIT