README
Description
Feed multiple node.js streams sequentially into one (Writable or Duplex) stream.
Requirements
- node.js -- v0.10.0 or newer
Install
npm install conveyor
Examples
- Pass HTTP requests to an echo stream:
var TransformStream = require('stream').Transform,
http = require('http');
var Conveyor = require('conveyor');
var stream = new TransformStream();
stream._transform = function(chunk, encoding, cb) {
this.push(chunk);
cb();
};
var c = new Conveyor(stream),
TOTAL = 10,
count = 0;
http.createServer(function(req, res) {
if (++count === TOTAL)
this.close();
c.push(req, res);
}).listen(8080, function() {
for (var i = 0; i < TOTAL; ++i) {
http.request({
host: '127.0.0.1',
port: 8080,
method: 'POST'
}, function(res) {
var b = '';
res.setEncoding('utf8');
res.on('data', function(d) {
b += d;
}).on('end', function() {
console.log(b);
});
}).end('Hello from request #' + (i + 1));
}
});
// output:
// Hello from request #1
// Hello from request #2
// Hello from request #3
// Hello from request #4
// Hello from request #5
// Hello from request #6
// Hello from request #7
// Hello from request #8
// Hello from request #9
// Hello from request #10
- Pass HTTP requests to an Writable stream:
var WritableStream = require('stream').Writable,
http = require('http');
var Conveyor = require('conveyor');
var stream = new WritableStream();
stream._write = function(chunk, encoding, cb) {
console.log(chunk.toString());
cb();
};
var c = new Conveyor(stream),
TOTAL = 10,
count = 0;
http.createServer(function(req, res) {
if (++count === TOTAL)
this.close();
c.push(req, function() {
// this req stream finished
res.end();
});
}).listen(8080, function() {
for (var i = 0; i < TOTAL; ++i) {
http.request({
host: '127.0.0.1',
port: 8080,
method: 'POST'
}, function(res) {
res.resume();
}).end('Hello from request #' + (i + 1));
}
});
// output (assuming 1-chunk requests):
// Hello from request #1
// Hello from request #2
// Hello from request #3
// Hello from request #4
// Hello from request #5
// Hello from request #6
// Hello from request #7
// Hello from request #8
// Hello from request #9
// Hello from request #10
API
Conveyor is an EventEmitter
Conveyor events
- end() - Emitted after end() is called and all streams have been processed.
Conveyor methods
(constructor)(< Writable >dest, < object >config) - Creates and returns a new Dicer instance with the following valid
config
settings:- max - integer - This is the max queue size.
push(< Readable >stream[, < Writable >pipeStream][, < object >pipeStreamOpts][, < function >callback]) - boolean - Pushes (appends)
stream
to the queue. IfpipeStream
is set, data (fromdest
passed to the constructor) will be piped to this stream with optionalpipeStreamOpts
pipe settings.callback
is called oncestream
has ended anddest
is drained. The return value is false ifstream
could not be enqueued due to the queue being full.unshift(< Readable >stream[, < Writable >pipeStream][, < object >pipeStreamOpts][, < function >callback]) - boolean - Identical to push() except it unshifts (prepends)
stream
to the queue.