This is a proposal. It does not match the code as of writing.
This describes the minimum contract that Stream objects must adhere to in order to properly interoperate with pipes.
The parent class for all stream objects. Implements the pipe
method, and a default "pass-through" no-op filter interface.
Streams that inherit from the Stream base class SHOULD override methods with their implementation-specific functionality, as appropriate, but the methods on the base class will provide a bare minimum amount of functionality.
-
writable=false,readable=falseThe base class is neither writable nor readable by default. One or both of these must be set on the child instances.
-
bool write(chunk, callback=null)The base class write method emits a
dataevent with the provided chunk, calls the callback function if supplied, and returns true.If called after
end(), then MUST throw. Otherwise, MUST NOT throw.The callback function MUST be called asynchronously.
-
bool end(chunk=null, callback=null)Writes the supplied chunk if provided, sets
ended=true, emitsend, and returns true.Calls the supplied callback on nextTick.
If called more than once, then MUST throw. Otherwise, MUST NOT throw.
The callback function MUST be called asynchronously.
-
bool flush(callback=null)Returns true, and calls the supplied callback on nextTick.
If called after
end()then MUST throw. Otherwise, MUST NOT throw.Note: Calling Stream.flush() does not indicate that an entire pipe chain of Streams is flushed, but merely that this particular stream's write queue is empty.
The callback function MUST be called asynchronously.
-
destroy(cb)Emit
destroy, and thenend. Call the supplied callback on nextTick. If called multiple times, does nothing after the first call.Since
destroy()may be called multiple times, it MUST be idempotent, and do nothing ifdestroy()has already been called on the stream. -
pause(),resume()Emits
pauseorresume, respectively. -
pipe()Proxies data and end events from the
this(readable stream) to the writabledeststream, managing backpressure and event proxying appropriately.Event ----------------> Result src.on('data') -------> dest.write(data) src.on('end') --------> dest.end() dest.on('pause') -----> src.pause() dest.on('resume') ----> src.resume() ! dest.write(c) ------> src.pause() dest.on('destroy') ---> src.destroy() dest.on('error', e) --> src.emit('error', e) src.pipe('dest') -----> dest.emit('pipe', src)The
destwritable stream MUST be returned.
Writable Streams SHOULD be an instanceof the Stream base class, but
this is optional.
Writable Streams MUST implement the following members and methods:
-
writable=trueMUST have a
writableflag set totrue.After being destroyed or ended, this flag MUST be set to
false. -
bool write(chunk, callback=null)If the chunk can be written entirely without buffering, then the function SHOULD return true.
If the chunk must be stored in a write queue, and the upstream readable stream should be paused, then the
writemethod MUST returnfalse. Ifwritereturnsfalsethen it MUST emit adrainevent when the queue is emptied.If a
callbackfunction is provided, then it MUST be called when the chunk has been completely written, and MUST NOT be called prior to nextTick.If
writeis called afterend, then it SHOULD throw.-
"write queue"
The specifics of queueing or buffering data chunks on writable streams is an implementation detail which may vary from stream to stream, and need not be any sort of externally visible "queue".
Semantically, a
falsereturn fromwrite()means "Please do not continue to write to this stream, because doing so may have adverse affects". However, it is a strictly advisory message.
-
-
bool end(chunk=null, callback=null)The
endmethod indicates that no morewritecalls will follow.If a chunk is passed in as the first argument then it MUST be passed to
this.write(). If a callback is provided as the last argument, then it MUST be called when the stream's write queue is completely flushed, or on nextTick. The return value MUST befalseif there are buffered chunks in the write queue, and SHOULD betrueif the queue is empty.If
end()returns false, then it MUST emit adrainevent at some point in the future. -
destroy(callback=null)Clean up all worker threads, file descriptors, open sockets, or whatever else may be associated with the stream, and immediately emit
destroyto indicate that it is no longer valid.Note that
destroy()is destructive, and should only be called when it is acceptable to lose data, for example, to forcibly shut down a TCP stream when the response is finished.If there is no work for a
destroymethod to perform, then it MUST at least emit thedestroyevent.If a callback is provided, then it MUST be called when the cleanup is complete, but MUST NOT be called before nextTick.
Writable streams SHOULD implement the following functions:
-
bool flush(callback=null)Make a best effort to push out any pending writes, even if doing so would degrade performance or be otherwise sub-optimal.
Return value MUST be
falseif there are still pending writes that could not yet be flushed. Return value SHOULD betrueif there are no additional data chunks waiting to be processed.If a callback is provided, then it MUST be called once the write queue is emptied, and MUST NOT be called before nextTick.
Writable streams MUST implement the following events:
-
emit('drain')If a
write()call returnsfalse, then adrainevent MUST be emitted when the queue is drained. -
emit('destroy')Emitted when
destroy()is called.
Writable Streams SHOULD implement the following events:
-
emit('close')When all the underlying machinery of a stream is completely cleaned up, sockets closed, threads completed, file descriptors closed, then a writable stream SHOULD emit a
closeevent.
Readable Streams SHOULD be an instanceof the Stream base class, but
this is optional.
Readable Streams MUST implement the following members and methods:
-
readable=trueMUST have a
readableflag set totrue.After being destroyed or ended, this flag MUST be set to
false. -
pause()When
pause()is called, the stream SHOULD stop emitting anydataevents. -
resume()If paused, then
resume()MUST cause data events to begin emitting again. -
pipe(dest)See above under Stream base class.
Readable Streams MAY override the
pipe()method from the Stream base class, but SHOULD callStream.prototype.pipe.call(this, dest)at some point.The
destwritable stream MUST be returned. -
destroy(callback=null)Clean up all worker threads, file descriptors, open sockets, or whatever else may be associated with the stream. Stream MUST emit
destroywhen thedestroy()method is called.Note that
destroy()is destructive, and should only be called when it is acceptable to lose data, for example, to forcibly shut down a TCP stream when the response is finished.This method MUST be idempotent, and MUST NOT throw or have other deleterious effects when called multiple times on the same object.
An example
destroy()method:MyReadStream.prototype.destroy = function(callback) { this.fd = null; this._binding = null; Stream.prototype.destroy.call(this, callback); };
Readable Strams MUST implement the following events:
-
emit('data', chunk)Whenever data is available, if not paused, then the stream MUST emit a data event with the data as an argument.
dataevents MUST NOT be emitted without a chunk argument.dataevents MAY be emitted while paused. -
emit('end')When no more data is going to be emitted, the stream MUST emit an
endevent.This event MUST be emitted eventually by all readable streams, even in cases where
erroris emitted, ordestroy()is called.
Readable Streams SHOULD implement the following events:
-
emit('close')When all the underlying machinery of a stream is completely cleaned up, sockets closed, threads completed, file descriptors closed, then a readable stream SHOULD emit a
closeevent.
Filter Streams are streams that are both readable and writable, where a
write() method call corresponds to a data event being emitted at
some later time.
In order to properly proxy the events used by Stream.pipe, Filter streams SHOULD have the following additions to the ReadableStream and WritableStream APIs:
-
allowHalfOpen=trueThe
allowHalfOpenflag MUST be set totruefor filter streams. -
pause()->pauseeventThe
pause()method, in addition to halting the flow ofdataevents, SHOULD emit apauseevent so that upstream writers can be paused before the backpressure builds up. -
resume()->resumeeventThe
resume()method, in addition to allowing a paused stream to begin flowing data events again, SHOULD emit aresumeevent, so that upstream writers can begin sending data before thedrainevents propagate back. -
destroy(cb)Filter streams MUST emit the
destroyevent like a writable stream, in addition to emitting theendevent like a readable stream.Since
destroy()may be called multiple times, it MUST be idempotent, and do nothing ifdestroy()has already been called on the stream.
Duplex Streams are streams that are both readable and writable, where
the write() calls are messages to another agent, and the data
events are messages from that same agent.
In these cases, it may make sense to exist in a half-duplex state, where either the readable or the writable aspect of the stream is preserved, but the other is destroyed.
-
bool allowHalfOpenThe
allowHalfOpenflag MUST be set totrueif it supports this. -
close(chunk=null, callback=null)Semantically, this is sugar for:
stream.end(chunk, function() { stream.destroy(callback); });However, when it is known that a duplex stream will be completely destroyed after the write queue is empty, then the
close()method SHOULD be used, so that implementation-specific optimizations can be executed. For example, a TCP stream could stop reading on the socket file descriptor. -
end(chunk=null, callback=null)On Duplex Streams, the
end()method means that no more data will be written. However, it does not correspond directly to anendevent being emitted, if the stream can exist in a half-open state.If
allowHalfOpenis not set totrue, then this MUST cause the stream to be fully destroyed once the write queue is flushed. -
destroy(cb)The destroy method MUST forcibly shut down both sides of the stream, and emit a
destroyevent immediately. If a callback is supplied, then it MUST be called when the stream destruction is complete, and MUST NOT be called before nextTick.Since
destroy()may be called multiple times, it MUST be idempotent, and do nothing ifdestroy()has already been called on the stream.
Some guidelines and summary information for the Stream code contract:
-
Use the
pipe()method to connect streams.This is the best way to ensure that errors and events are handled appropriately.
-
Errors propagate back up the
pipe()chain.In the case where you have a Readable stream
A, a readable/writable filterB, and a writable streamC, you'd do this:A.pipe(B).pipe(C);If
Cemits anerrorevent, then it will be proxied to theBobject, and then to the A object. So, the best way to catch all errors is to do:A.pipe(B).pipe(C); A.on('error', handleErrors); -
Readable Streams:
endMUST be emitted at some point.Even if the stream was destroyed, or encountered an error, the
endevent must be guaranteed to be emitted at some point in lifetime of the stream object.
-
Event propagation in circular pipe chains
Consider this Stream.pipe chain:
A.pipe(B).pipe(C).pipe(A)where A is a duplex stream, and B and C are filters. A more realistic situation where this can occur:
net.createServer(function (socket) { socket.pipe(rot13Filter).pipe(GzipFilter).pipe(socket); });So, the incoming data from the socket is rot13'd, and then gzipped, and then sent back to the client.
Consider what happens when an
errorevent is raised in stream C:C.emit("error")B.emit("error")A.emit("error")
At this point, presumably we catch and handle the error. However, according to the
Stream.pipesemantics, it will still cause another error event to be emitted by C, sinceCis upstream fromA.A similar problem can be seen in a simple echo server duplex stream:
net.createServer(function (socket) { socket.pipe(socket); });Any
errorevent on the socket will cause anothererrorevent to be emitted on the same object, and cause a RangeError.The same behavior would occur with
destroyevents. However, the semantics are such that the cycle will be broken by the fact that callingstream.destroy()a second time must do nothing, not even emitdestroyagain.
Writable Streams
bool write(chunk, callback=null)Isn't
write(chunk, encoding, callback)a part of this spec?emit('drain')Even if a
write()does not returnfalse, candrainevent be emitted?Readable Streams
resume()Can
dataevent be emitted beforeresume()returns?example:
If
resume()emits (suspended)dataevent immediately, it is lost (fs.ReadStreamwoks so).I think it should be asynchronously...