README
Node Streams
Install
npm install node-streams
buffer
transform stream, buffers all chunks until wait
function calls. Pushes buffered chunks array.
(options: TransformOptions) => (wait: WaitFn) => Transform
WaitFn = (callback: () => void) => CancelFn
buffered
stream will callwait
function for timeout, so it is possible to provide different period each time
import { buffer } from 'node-streams'
// we have the streams with `objectMode` set to true
declare var inStream: ReadableStream
declare var outStream: WritableStream
const wait100ms = (callback: () => void) => {
const id = setTimeout(callback, 100)
return () => clearTimeout(id)
}
const buffered = buffer({
objectMode: true // set up standard TranformOptions
})(
wait100ms // buffer for 100ms then push accumulated data
)
inStream
.pipe(buffered)
.pipe(outStream)
bufferTime
buffers all chunks for provided time period. Pushes buffered chunks array.
(options: TransformOptions) => (ms: number) => Transform
import { bufferTime } from 'node-streams'
// we have the streams with `objectMode` set to true
declare var inStream: ReadableStream
declare var outStream: WritableStream
const buffered = buffer({
objectMode: true // set up standard TranformOptions
})(
100 // buffer for 100ms then push accumulated data
)
inStream
.pipe(buffered)
.pipe(outStream)
combine
delivers latest values from all streams as an array, ends when any of streams does
(options: ReadableOptions) => (...streams: ReadableStream[]): ReadableStream
import { combine } from 'node-streams'
// we have the streams
declare var stream0: ReadableStream [..1..2..3..4..]
declare var stream1: ReadableStream ['a'...'b']
// create combined readable
const combined = combine({
objectMode: true // provide standard ReadableOptions
})(stream0, stream1)
combined
.on('data', ([value0, value1]) => {
// [undefined, 'a']..[1, 'a']..[2, 'a'].[2, 'b']
})
.on('end', () => {})
concat
concatenates all streams, subscribing to next on previous has ended
(options: ReadableOptions) => (...streams: ReadableStream[]): ReadableStream
import { concat } from 'node-streams'
// we have the streams
declare var stream0: ReadableStream [..1..2]
declare var stream1: ReadableStream [..'a'..'b']
// create combined readable
const combined = concat({
objectMode: true // provide standard ReadableOptions
})(stream0, stream1)
combined
.on('data', ([value0, value1]) => {
// [..1..2..'a'..'b']
})
.on('end', () => {})
debounce
skips all fast arriving data, until idle period, after that pushes last received chunk.
(options: TransformOptions) => (wait: WaitFn) => Transform
debounced
stream will cancel and rearmwait
function repeatedly on every chunk received.
WaitFn = (cb: () => void) => CancelFn
providedwait
function should properly release timeout resources onCancelFn
call.
import { debounce } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream
declare var outStream: WritableStream
// we have `setTimeout` function with baked in 10ms time
// CAUTION: wait function should be able to cancel timeout
const wait10ms = (callback: () => void) => {
const id = setTimeout(callback, 10)
return () => clearTimeout(id)
}
const debounced = debounce({
objectMode: true // set up standard TranformOptions
})(
wait10ms // debounce stream for 10ms
)
inStream
.pipe(debounced)
.pipe(outStream)
debounceTime
skips all fast arriving data, until idle period, after that pushes last received chunk.
(options: TransformOptions) => (ms: number) => Transform
import { debounceTime } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream
declare var outStream: WritableStream
const debounced = debounceTime({
objectMode: true // set up standard TranformOptions
})(
10 // debounce stream for 10ms
)
inStream
.pipe(debounced)
.pipe(outStream)
delay
(options: TransformOptions) => (ms: number) => Transform
import { delay } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream
declare var outStream: WritableStream
const delayed = delay({
objectMode: true // set up standard TransformOptions
})(
100 // delay stream events up to 100ms
)
inStream
.pipe(delayed)
.pipe(outStream)
distinct
(options: TransformOptions) => (isEqual: (a: T, b: T) => boolean) => Transform
import { distinct } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream // [1..2..2..3..3..2..1]
declare var outStream: WritableStream
const unique = distinct({
objectMode: true // set up standard TransformOptions
})(
(a, b) => a === b // provide compare function
)
inStream
.pipe(unique)
.pipe(outStream) // [1..2....3....2..1]
distinctUntilChanged
(options: TransformOptions) => Transform
import { distinctUntilChanged } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream // [1..2..2..3..3..2..1]
declare var outStream: WritableStream
const unique = distinctUntilChanged({
objectMode: true // set up standard TransformOptions
})
inStream
.pipe(unique)
.pipe(outStream) // [1..2....3....2..1]
empty
(options: ReadableOptions) => Readable
import { empty } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream
declare var outStream: WritableStream
(myCondition
? inStream
: empty({})
).pipe(outStream)
filter
(options: TransformOptions) => (predicate: (arg: T) => boolean) => Transform
import { filter } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream // [1..2..3..4..5..6..7]
declare var outStream: WritableStream
const filtered = filter({
objectMode: true // provide standard TransformOptions
})(
a => a % 2 === 0 // is even
)
inStream
.pipe(filtered)
.pipe(outStream) // [..2....4....6..]
first
(options: TransformOptions) => Transform
import { first } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream // [1..2..3..4..5..6..7]
declare var outStream: WritableStream
const firstTransform = first({
objectMode: true // provide standard TransformOptions
})
inStream
.pipe(firstTransform)
.pipe(outStream) // [1]
from
(options: ReadableOptions) => (iterable: Iterable<T>) => Readable
import { from } from 'node-streams'
const myStream = from({
objectMode: true // provide standard ReadableOptions
})(
[1, 2, 3, 4, 5] // provide Iterable
)
myStream
.on('data', () => {}) // subscribe and get the data
.on('end', () => {})
last
(options: TransformOptions) => Transform
import { last } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream // [1..2..3..4..5..6..7]
declare var outStream: WritableStream
const lastTransform = last({
objectMode: true // provide standard TransformOptions
})
inStream
.pipe(lastTransform)
.pipe(outStream) // [7]
map
(options: TransformOptions) => (xf: (value: T) => R) => Transform
import { map } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
const mapped = map({
objectMode: true // provide standard TransformOptions
})(
x => x * 2
)
inStream
.pipe(mapped)
.pipe(outStream) // [2..4..6..8]
merge
(options: ReadableOptions) => (...streams: ReadableStreams[]) => Readable
import { merge } from 'node-streams'
// we have the streams in `objectMode`
declare var stream0: ReadableStream // [1...2...3...4]
declare var stream1: ReadableStream // [..'a'.....'b'...]
const merged = merge({
objectMode: true // provide standard ReadableOptions
})(
stream0,
stream1
)
merged
.on('data' () => {}) // [1..'a'..2..3..'b'..4..]
.on('end', () => {})
of
(options: ReadableOptions) => (...values: T[]) => Readable
import { of } from 'node-streams'
const myStream = of({
objectMode: true // provide standard ReadableOptions
})(
1, 2, 3, 4
)
myStream
.on('data', () => {}) // [1.2.3.4]
.on('end', () => {})
ofAsync
(options: ReadableOptions) => (wait: WaitFn) => (...values: T[]) => Readable
WaitFn = (callback: () => void) => UnsubscribeFn
import { ofAsync } from 'node-streams'
// we have custom wait function
const wait100ms = (cb: () => void) => {
const id = seTimeout(cb, 100)
return () => clearTimeout(id)
}
const myStream = ofAsync({
objectMode: true // provide standard ReadableOptions
})(
wait100ms // set WaitFn
)(
1, 2, 3, 4, 5 // provide values
)
myStream
.on('data', () => {}) // [1...2...3...4...5]
.on('end', () => {})
ofTime
(options: ReadableOptions) => (ms: number) => (...values: T[]) => Readable
import { ofTime } from 'node-streams'
const myStream = ofTime({
objectMode: true // provide standard ReadableOptions
})(
100 // provide time in milliseconds
)(
1, 2, 3, 4 // values to stream
)
myStream
.on('data', () => {}) // [1...2...3...4]
.on('end', () => {})
pipe
(...streams: Array<ReadWriteStream | ReadWriteStream[]>) => ReadWriteStream[]
import { pipe } from 'node-streams'
// we have the following streams
declare var inStream: ReadableStream
declare var outStream: WritableStream
declare var tripleValues: TransformStream
declare var isEvenValues: TransformStream
declare var takeFirstValue: TransformStream
const tripleEven = pipe(
tripleValues,
isEvenValues
)
const firstTripleEven = pipe(
tripleEven,
takeFirstValue
)
pipe(
inStream,
firstTripleEven,
outStream
)
pluck
(opts: TransformOptions) => (propName: string) => Transform
import { pluck } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream
declare var outStream: WritableStream
const pluckMyProp = pluck({
objectMode: true // provide standard TransformOptions
})(
'my-prop' // property name
)
inStream
.pipe(pluckMyProp)
.pipe(outStream)
reduce
(options: TransformOptions) => (reducer: (state: S, value: T) => S) => Transform
import { reduce } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
// we have the following reducer
const addAll = (acc = 0, value: number) => acc + value
const reduceTransform = reduce({
objectMode: true // provide standard TransformOptions
})(
addAll // set the reducer
)
inStream
.pipe(reduceTransform)
.pipe(outStream) // [............10]
scan
(options: TransformOptions) => (reducer: (state: S, value: T) => S) => Transform
import { scan } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
// we have the following reducer
const addAll = (acc = 0, value: number) => acc + value
const scanTransform = reduce({
objectMode: true //provide standard TransformOptions
})(
addAll
)
inStream
.pipe(scanTransform)
.pipe(outStream) // [1..3..6..10]
side
(options: TransformOptions) => (sideEffect: (value: T) => void) => Transform
import { side } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
const sideEffect = side({
objectMode: true // provide standard TransformOptions
})(
console.log
)
inStream
.pipe(sideEffect)
.pipe(outStream)
skip
(options: TransformOptions) => (numSkip: number) => Transform
import { skip } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
const skipTransform = skip({
objectMode: true // provide standard TransformOptions
})(
2 // skip 2 chunks
)
inStream
.pipe(skipTransform)
.pipe(outStream) // [....3..4]
startWith
(options: ReadableOptions) => (...values: T[]) => (readable: ReadableStream) => Readable
import { startWith } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
const prependedReadable = startWith({
objectMode: true // provide standard ReadableOptions
})(
'a', 'b', 'c'
)(
inStream // provide the stream to prepend
)
prependedReadable.pipe(outStream) // ['a'.'b'.'c'.1..2..3..4]
take
(options: TransformOptions) => (numTake: number) => Transform
import { take } from 'node-streams'
// we have the following streams in "objectMode"
declare var inStream: ReadableStream // [1..2..3..4]
declare var outStream: WritableStream
const takeTransform = take({
objectMode: true // provide standard TransformOptions
})(
2 // take 2 chunks
)
inStream
.pipe(takeTransform)
.pipe(outStream) // [1..2]
throttle
(options: TransformOptions) => (wait: WaitFn) => Transform
import { throttle } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream
declare var outStream: WritableStream
// we have `setTimeout` function with baked in 10ms time
// CAUTION: wait function should be able to cancel timeout
const wait10ms = (callback: () => void) => {
const id = setTimeout(callback, 10)
return () => clearTimeout(id)
}
const throttled = throttle({
objectMode: true // set up standard TranformOptions
})(
wait10ms // debounce stream for 10ms
)
inStream
.pipe(throttled)
.pipe(outStream)
throttleTime
(options: TransformOptions) => (ms: number) => Transform
import { throttleTime } from 'node-streams'
// we have the streams in `objectMode`
declare var inStream: ReadableStream
declare var outStream: WritableStream
const throttled = debounceTime({
objectMode: true // set up standard TranformOptions
})(
10 // debounce stream for 10ms
)
inStream
.pipe(throttled)
.pipe(outStream)
withLatest
(options: ReadableOptions) => (...streams: ReadableStream[]) => (mainStream: ReadableStream) => Readable
import { withLatest } from 'node-streams'
// we have the streams in `objectMode`
declare var mainStream: ReadableStream // [1..2..3]
declare var stream0: ReadableStream // ['a'..'b']
declare var stream1: ReadableStream // [true............false]
const combined = withLatest({
objectMode: true // standard ReadableOptions
})(
stream0,
stream1 // streams to take the latest values
)(
mainStream // mainStream to sync with
)
combined
.on('data', () => {}) // [[1, 'a', true]..[2, 'a', true]..[3, 'b', true]]
.on('end', () => {})
zip
(options: ReadableOptions) => (...streams: ReadableStream[]) => Readable
import { zip } from 'node-streams'
// we have the streams in `objectMode`
declare var stream0: ReadableStream // ['a'..'b']
declare var stream1: ReadableStream // [true............false]
const combined = zip({
objectMode: true // standard ReadableOptions
})(
stream0,
stream1 // streams to combine
)
combined
.on('data', () => {}) // [['a', true]..........['b', false]]
.on('end', () => {})
subscribe
({ next, error?, complete? }: IObserver) => (...streams: ReadableStream[]) => UnsubscribeFn
type IObserver = {
next: (value: T) => void,
error?: (e: Error) => void,
complete?: () => void
}
type UnsubscribeFn = () => void
import { subscribe } from 'node-streams'
// we have the following streams
declare var stream0: ReadableStream // [1..2..3]
declare var stream1: ReadableStream // [..'a'..'b'..]
const unsub = subscribe({
next: console.log // [1..'a'..2..3..'b'..]
})(
stream0,
stream1
)
subscribeEx
({ next, error?, complete? }: IObserverEx) => (...streams: ReadableStream[]) => UnsubscribeFn
type EmitterValue = {
value: T,
index: number,
emitter: EventEmitter,
emitterIndex: number,
event: string
}
type IObserverEx = {
next: (value: EmitterValue) => void,
error?: (e: Error) => void,
complete: () => void
}
type UnsubscribeFn = () => void
import { subscribeEx } from 'node-streams'
// we have the following streams
declare var stream0: ReadableStream // [1..2..3]
declare var stream1: ReadableStream // [..'a'..'b'..]
const unsub = subscribeEx({
next: ({value, index, emitter, emitterIndex, event}) => console.log(`value ${value} from stream ${emitterIndex}`)
})(
stream0,
stream1
)
subscribeReadable
({ next, error?, complete? }: IObserver) => (...streams: ReadableStream[]) => UnsubscribeFn
import { subscribeReadable } from 'node-streams'
// we have the following streams
declare var stream0: ReadableStream // [1..2..3]
declare var stream1: ReadableStream // [..'a'..'b'..]
const unsub = subscribeReadable({
next: console.log // [1..'a'..2..3..'b'..]
})(
stream0,
stream1
)
subscribeReadableEx
({ next, error?, complete? }: IObserverEx) => (...streams: ReadableStream[]) => UnsubscribeFn
import { subscribeReadableEx } from 'node-streams'
// we have the following streams
declare var stream0: ReadableStream // [1..2..3]
declare var stream1: ReadableStream // [..'a'..'b'..]
const unsub = subscribeReadableEx({
next: ({value, index, emitter, emitterIndex, event}) => console.log(`value ${value} from stream ${emitterIndex}`)
})(
stream0,
stream1
)