summaryrefslogtreecommitdiffstats
path: root/testing/xpcshell/node-http2/lib/protocol/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'testing/xpcshell/node-http2/lib/protocol/stream.js')
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/stream.js659
1 files changed, 659 insertions, 0 deletions
diff --git a/testing/xpcshell/node-http2/lib/protocol/stream.js b/testing/xpcshell/node-http2/lib/protocol/stream.js
new file mode 100644
index 000000000..6d520b949
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/stream.js
@@ -0,0 +1,659 @@
+var assert = require('assert');
+
+// The Stream class
+// ================
+
+// Stream is a [Duplex stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex)
+// subclass that implements the [HTTP/2 Stream](https://tools.ietf.org/html/rfc7540#section-5)
+// concept. It has two 'sides': one that is used by the user to send/receive data (the `stream`
+// object itself) and one that is used by a Connection to read/write frames to/from the other peer
+// (`stream.upstream`).
+
+var Duplex = require('stream').Duplex;
+
+exports.Stream = Stream;
+
+// Public API
+// ----------
+
+// * **new Stream(log, connection)**: create a new Stream
+//
+// * **Event: 'headers' (headers)**: signals incoming headers
+//
+// * **Event: 'promise' (stream, headers)**: signals an incoming push promise
+//
+// * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0
+// (highest priority) and 2^31-1 (lowest priority). Default value is 2^30.
+//
+// * **Event: 'error' (type)**: signals an error
+//
+// * **headers(headers)**: send headers
+//
+// * **promise(headers): Stream**: promise a stream
+//
+// * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer
+// too, but once it is set locally, it can not be changed remotely.
+//
+// * **reset(error)**: reset the stream with an error code
+//
+// * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames
+// that are to be sent/arrived to/from the peer and are related to this stream.
+//
+// Headers are always in the [regular node.js header format][1].
+// [1]: https://nodejs.org/api/http.html#http_message_headers
+
+// Constructor
+// -----------
+
+// The main aspects of managing the stream are:
+function Stream(log, connection) {
+ Duplex.call(this);
+
+ // * logging
+ this._log = log.child({ component: 'stream', s: this });
+
+ // * receiving and sending stream management commands
+ this._initializeManagement();
+
+ // * sending and receiving frames to/from the upstream connection
+ this._initializeDataFlow();
+
+ // * maintaining the state of the stream (idle, open, closed, etc.) and error detection
+ this._initializeState();
+
+ this.connection = connection;
+}
+
+Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } });
+
+// Managing the stream
+// -------------------
+
+// the default stream priority is 2^30
+var DEFAULT_PRIORITY = Math.pow(2, 30);
+var MAX_PRIORITY = Math.pow(2, 31) - 1;
+
+// PUSH_PROMISE and HEADERS are forwarded to the user through events.
+Stream.prototype._initializeManagement = function _initializeManagement() {
+ this._resetSent = false;
+ this._priority = DEFAULT_PRIORITY;
+ this._letPeerPrioritize = true;
+};
+
+Stream.prototype.promise = function promise(headers) {
+ var stream = new Stream(this._log, this.connection);
+ stream._priority = Math.min(this._priority + 1, MAX_PRIORITY);
+ this._pushUpstream({
+ type: 'PUSH_PROMISE',
+ flags: {},
+ stream: this.id,
+ promised_stream: stream,
+ headers: headers
+ });
+ return stream;
+};
+
+Stream.prototype._onPromise = function _onPromise(frame) {
+ this.emit('promise', frame.promised_stream, frame.headers);
+};
+
+Stream.prototype.headers = function headers(headers) {
+ this._pushUpstream({
+ type: 'HEADERS',
+ flags: {},
+ stream: this.id,
+ headers: headers
+ });
+};
+
+Stream.prototype._onHeaders = function _onHeaders(frame) {
+ if (frame.priority !== undefined) {
+ this.priority(frame.priority, true);
+ }
+ this.emit('headers', frame.headers);
+};
+
+Stream.prototype.priority = function priority(priority, peer) {
+ if ((peer && this._letPeerPrioritize) || !peer) {
+ if (!peer) {
+ this._letPeerPrioritize = false;
+
+ var lastFrame = this.upstream.getLastQueuedFrame();
+ if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) {
+ lastFrame.priority = priority;
+ } else {
+ this._pushUpstream({
+ type: 'PRIORITY',
+ flags: {},
+ stream: this.id,
+ priority: priority
+ });
+ }
+ }
+
+ this._log.debug({ priority: priority }, 'Changing priority');
+ this.emit('priority', priority);
+ this._priority = priority;
+ }
+};
+
+Stream.prototype._onPriority = function _onPriority(frame) {
+ this.priority(frame.priority, true);
+};
+
+// Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for
+// any stream.
+Stream.prototype.reset = function reset(error) {
+ if (!this._resetSent) {
+ this._resetSent = true;
+ this._pushUpstream({
+ type: 'RST_STREAM',
+ flags: {},
+ stream: this.id,
+ error: error
+ });
+ }
+};
+
+// Specify an alternate service for the origin of this stream
+Stream.prototype.altsvc = function altsvc(host, port, protocolID, maxAge, origin) {
+ var stream;
+ if (origin) {
+ stream = 0;
+ } else {
+ stream = this.id;
+ }
+ this._pushUpstream({
+ type: 'ALTSVC',
+ flags: {},
+ stream: stream,
+ host: host,
+ port: port,
+ protocolID: protocolID,
+ origin: origin,
+ maxAge: maxAge
+ });
+};
+
+// Data flow
+// ---------
+
+// The incoming and the generated outgoing frames are received/transmitted on the `this.upstream`
+// [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read
+// and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by
+// the user to write or read the body of the request.
+// [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex
+
+// upstream side stream user side
+//
+// +------------------------------------+
+// | |
+// +------------------+ |
+// | upstream | |
+// | | |
+// +--+ | +--|
+// read() | | _send() | _write() | | write(buf)
+// <--------------|B |<--------------|--------------| B|<------------
+// | | | | |
+// frames +--+ | +--| buffers
+// | | | | |
+// -------------->|B |---------------|------------->| B|------------>
+// write(frame) | | _receive() | _read() | | read()
+// +--+ | +--|
+// | | |
+// | | |
+// +------------------+ |
+// | |
+// +------------------------------------+
+//
+// B: input or output buffer
+
+var Flow = require('./flow').Flow;
+
+Stream.prototype._initializeDataFlow = function _initializeDataFlow() {
+ this.id = undefined;
+
+ this._ended = false;
+
+ this.upstream = new Flow();
+ this.upstream._log = this._log;
+ this.upstream._send = this._send.bind(this);
+ this.upstream._receive = this._receive.bind(this);
+ this.upstream.write = this._writeUpstream.bind(this);
+ this.upstream.on('error', this.emit.bind(this, 'error'));
+
+ this.on('finish', this._finishing);
+};
+
+Stream.prototype._pushUpstream = function _pushUpstream(frame) {
+ this.upstream.push(frame);
+ this._transition(true, frame);
+};
+
+// Overriding the upstream's `write` allows us to act immediately instead of waiting for the input
+// queue to empty. This is important in case of control frames.
+Stream.prototype._writeUpstream = function _writeUpstream(frame) {
+ this._log.debug({ frame: frame }, 'Receiving frame');
+
+ var moreNeeded = Flow.prototype.write.call(this.upstream, frame);
+
+ // * Transition to a new state if that's the effect of receiving the frame
+ this._transition(false, frame);
+
+ // * If it's a control frame. Call the appropriate handler method.
+ if (frame.type === 'HEADERS') {
+ if (this._processedHeaders && !frame.flags['END_STREAM']) {
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+ this._processedHeaders = true;
+ this._onHeaders(frame);
+ } else if (frame.type === 'PUSH_PROMISE') {
+ this._onPromise(frame);
+ } else if (frame.type === 'PRIORITY') {
+ this._onPriority(frame);
+ } else if (frame.type === 'ALTSVC') {
+ // TODO
+ } else if (frame.type === 'BLOCKED') {
+ // TODO
+ }
+
+ // * If it's an invalid stream level frame, emit error
+ else if ((frame.type !== 'DATA') &&
+ (frame.type !== 'WINDOW_UPDATE') &&
+ (frame.type !== 'RST_STREAM')) {
+ this._log.error({ frame: frame }, 'Invalid stream level frame');
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+
+ return moreNeeded;
+};
+
+// The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame.
+Stream.prototype._receive = function _receive(frame, ready) {
+ // * If it's a DATA frame, then push the payload into the output buffer on the other side.
+ // Call ready when the other side is ready to receive more.
+ if (!this._ended && (frame.type === 'DATA')) {
+ var moreNeeded = this.push(frame.data);
+ if (!moreNeeded) {
+ this._receiveMore = ready;
+ }
+ }
+
+ // * Any frame may signal the end of the stream with the END_STREAM flag
+ if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
+ this.push(null);
+ this._ended = true;
+ }
+
+ // * Postpone calling `ready` if `push()` returned a falsy value
+ if (this._receiveMore !== ready) {
+ ready();
+ }
+};
+
+// The `_read` method is called when the user side is ready to receive more data. If there's a
+// pending write on the upstream, then call its pending ready callback to receive more frames.
+Stream.prototype._read = function _read() {
+ if (this._receiveMore) {
+ var receiveMore = this._receiveMore;
+ delete this._receiveMore;
+ receiveMore();
+ }
+};
+
+// The `write` method gets called when there's a write request from the user.
+Stream.prototype._write = function _write(buffer, encoding, ready) {
+ // * Chunking is done by the upstream Flow.
+ var moreNeeded = this._pushUpstream({
+ type: 'DATA',
+ flags: {},
+ stream: this.id,
+ data: buffer
+ });
+
+ // * Call ready when upstream is ready to receive more frames.
+ if (moreNeeded) {
+ ready();
+ } else {
+ this._sendMore = ready;
+ }
+};
+
+// The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames.
+// If there's a pending write on the user side, then call its pending ready callback to receive more
+// writes.
+Stream.prototype._send = function _send() {
+ if (this._sendMore) {
+ var sendMore = this._sendMore;
+ delete this._sendMore;
+ sendMore();
+ }
+};
+
+// When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM`
+// flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag,
+// then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an
+// existing frame is a nice optimization.
+var emptyBuffer = new Buffer(0);
+Stream.prototype._finishing = function _finishing() {
+ var endFrame = {
+ type: 'DATA',
+ flags: { END_STREAM: true },
+ stream: this.id,
+ data: emptyBuffer
+ };
+ var lastFrame = this.upstream.getLastQueuedFrame();
+ if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) {
+ this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.');
+ lastFrame.flags.END_STREAM = true;
+ this._transition(true, endFrame);
+ } else {
+ this._pushUpstream(endFrame);
+ }
+};
+
+// [Stream States](https://tools.ietf.org/html/rfc7540#section-5.1)
+// ----------------
+//
+// +--------+
+// PP | | PP
+// ,--------| idle |--------.
+// / | | \
+// v +--------+ v
+// +----------+ | +----------+
+// | | | H | |
+// ,---| reserved | | | reserved |---.
+// | | (local) | v | (remote) | |
+// | +----------+ +--------+ +----------+ |
+// | | ES | | ES | |
+// | | H ,-------| open |-------. | H |
+// | | / | | \ | |
+// | v v +--------+ v v |
+// | +----------+ | +----------+ |
+// | | half | | | half | |
+// | | closed | | R | closed | |
+// | | (remote) | | | (local) | |
+// | +----------+ | +----------+ |
+// | | v | |
+// | | ES / R +--------+ ES / R | |
+// | `----------->| |<-----------' |
+// | R | closed | R |
+// `-------------------->| |<--------------------'
+// +--------+
+
+// Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame
+Stream.prototype._initializeState = function _initializeState() {
+ this.state = 'IDLE';
+ this._initiated = undefined;
+ this._closedByUs = undefined;
+ this._closedWithRst = undefined;
+ this._processedHeaders = false;
+};
+
+// Only `_setState` should change `this.state` directly. It also logs the state change and notifies
+// interested parties using the 'state' event.
+Stream.prototype._setState = function transition(state) {
+ assert(this.state !== state);
+ this._log.debug({ from: this.state, to: state }, 'State transition');
+ this.state = state;
+ this.emit('state', state);
+};
+
+// A state is 'active' if the stream in that state counts towards the concurrency limit. Streams
+// that are in the "open" state, or either of the "half closed" states count toward this limit.
+function activeState(state) {
+ return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN'));
+}
+
+// `_transition` is called every time there's an incoming or outgoing frame. It manages state
+// transitions, and detects stream errors. A stream error is always caused by a frame that is not
+// allowed in the current state.
+Stream.prototype._transition = function transition(sending, frame) {
+ var receiving = !sending;
+ var connectionError;
+ var streamError;
+
+ var DATA = false, HEADERS = false, PRIORITY = false, ALTSVC = false, BLOCKED = false;
+ var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false;
+ switch(frame.type) {
+ case 'DATA' : DATA = true; break;
+ case 'HEADERS' : HEADERS = true; break;
+ case 'PRIORITY' : PRIORITY = true; break;
+ case 'RST_STREAM' : RST_STREAM = true; break;
+ case 'PUSH_PROMISE' : PUSH_PROMISE = true; break;
+ case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break;
+ case 'ALTSVC' : ALTSVC = true; break;
+ case 'BLOCKED' : BLOCKED = true; break;
+ }
+
+ var previousState = this.state;
+
+ switch (this.state) {
+ // All streams start in the **idle** state. In this state, no frames have been exchanged.
+ //
+ // * Sending or receiving a HEADERS frame causes the stream to become "open".
+ //
+ // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen.
+ case 'IDLE':
+ if (HEADERS) {
+ this._setState('OPEN');
+ if (frame.flags.END_STREAM) {
+ this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
+ }
+ this._initiated = sending;
+ } else if (sending && RST_STREAM) {
+ this._setState('CLOSED');
+ } else if (PRIORITY) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // A stream in the **reserved (local)** state is one that has been promised by sending a
+ // PUSH_PROMISE frame.
+ //
+ // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed
+ // (remote)" state.
+ // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This
+ // releases the stream reservation.
+ // * An endpoint may receive PRIORITY frame in this state.
+ // * An endpoint MUST NOT send any other type of frame in this state.
+ case 'RESERVED_LOCAL':
+ if (sending && HEADERS) {
+ this._setState('HALF_CLOSED_REMOTE');
+ } else if (RST_STREAM) {
+ this._setState('CLOSED');
+ } else if (PRIORITY) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // A stream in the **reserved (remote)** state has been reserved by a remote peer.
+ //
+ // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This
+ // releases the stream reservation.
+ // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)".
+ // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream.
+ // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR.
+ case 'RESERVED_REMOTE':
+ if (RST_STREAM) {
+ this._setState('CLOSED');
+ } else if (receiving && HEADERS) {
+ this._setState('HALF_CLOSED_LOCAL');
+ } else if (BLOCKED || PRIORITY) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // The **open** state is where both peers can send frames. In this state, sending peers observe
+ // advertised stream level flow control limits.
+ //
+ // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes
+ // the stream to transition into one of the "half closed" states: an endpoint sending a
+ // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint
+ // receiving a END_STREAM flag causes the stream state to become "half closed (remote)".
+ // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition
+ // immediately to "closed".
+ case 'OPEN':
+ if (frame.flags.END_STREAM) {
+ this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
+ } else if (RST_STREAM) {
+ this._setState('CLOSED');
+ } else {
+ /* No state change */
+ }
+ break;
+
+ // A stream that is **half closed (local)** cannot be used for sending frames.
+ //
+ // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM
+ // flag is received, or when either peer sends a RST_STREAM frame.
+ // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream.
+ // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag.
+ case 'HALF_CLOSED_LOCAL':
+ if (RST_STREAM || (receiving && frame.flags.END_STREAM)) {
+ this._setState('CLOSED');
+ } else if (BLOCKED || ALTSVC || receiving || PRIORITY || (sending && WINDOW_UPDATE)) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // A stream that is **half closed (remote)** is no longer being used by the peer to send frames.
+ // In this state, an endpoint is no longer obligated to maintain a receiver flow control window
+ // if it performs flow control.
+ //
+ // * If an endpoint receives additional frames for a stream that is in this state it MUST
+ // respond with a stream error of type STREAM_CLOSED.
+ // * A stream can transition from this state to "closed" by sending a frame that contains a
+ // END_STREAM flag, or when either peer sends a RST_STREAM frame.
+ // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream.
+ // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream.
+ case 'HALF_CLOSED_REMOTE':
+ if (RST_STREAM || (sending && frame.flags.END_STREAM)) {
+ this._setState('CLOSED');
+ } else if (BLOCKED || ALTSVC || sending || PRIORITY || (receiving && WINDOW_UPDATE)) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // The **closed** state is the terminal state.
+ //
+ // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame
+ // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST
+ // treat that as a stream error of type STREAM_CLOSED.
+ // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short
+ // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives
+ // and processes the frame bearing the END_STREAM flag, it might send either frame type.
+ // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY
+ // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending
+ // END_STREAM as a connection error of type PROTOCOL_ERROR.
+ // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives
+ // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream
+ // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that
+ // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose
+ // to limit the period over which it ignores frames and treat frames that arrive after this
+ // time as being in error.
+ // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE
+ // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM
+ // can be used to close any of those streams.
+ case 'CLOSED':
+ if (PRIORITY || (sending && RST_STREAM) ||
+ (receiving && WINDOW_UPDATE) ||
+ (receiving && this._closedByUs &&
+ (this._closedWithRst || RST_STREAM || ALTSVC))) {
+ /* No state change */
+ } else {
+ streamError = 'STREAM_CLOSED';
+ }
+ break;
+ }
+
+ // Noting that the connection was closed by the other endpoint. It may be important in edge cases.
+ // For example, when the peer tries to cancel a promised stream, but we already sent every data
+ // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM.
+ if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) {
+ this._closedByUs = sending;
+ this._closedWithRst = RST_STREAM;
+ }
+
+ // Sending/receiving a PUSH_PROMISE
+ //
+ // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state
+ // for the reserved stream transitions to "reserved (local)".
+ // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer.
+ // The state of the stream becomes "reserved (remote)".
+ if (PUSH_PROMISE && !connectionError && !streamError) {
+ /* This assertion must hold, because _transition is called immediately when a frame is written
+ to the stream. If it would be called when a frame gets out of the input queue, the state
+ of the reserved could have been changed by then. */
+ assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state);
+ frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE');
+ frame.promised_stream._initiated = sending;
+ }
+
+ // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1)
+ if (this._initiated) {
+ var change = (activeState(this.state) - activeState(previousState));
+ if (sending) {
+ frame.count_change = change;
+ } else {
+ frame.count_change(change);
+ }
+ } else if (sending) {
+ frame.count_change = 0;
+ }
+
+ // Common error handling.
+ if (connectionError || streamError) {
+ var info = {
+ error: connectionError,
+ frame: frame,
+ state: this.state,
+ closedByUs: this._closedByUs,
+ closedWithRst: this._closedWithRst
+ };
+
+ // * When sending something invalid, throwing an exception, since it is probably a bug.
+ if (sending) {
+ this._log.error(info, 'Sending illegal frame.');
+ return this.emit('error', new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.'));
+ }
+
+ // * In case of a serious problem, emitting and error and letting someone else handle it
+ // (e.g. closing the connection)
+ // * When receiving something invalid, sending an RST_STREAM using the `reset` method.
+ // This will automatically cause a transition to the CLOSED state.
+ else {
+ this._log.error(info, 'Received illegal frame.');
+ if (connectionError) {
+ this.emit('connectionError', connectionError);
+ } else {
+ this.reset(streamError);
+ this.emit('error', streamError);
+ }
+ }
+ }
+};
+
+// Bunyan serializers
+// ------------------
+
+exports.serializers = {};
+
+var nextId = 0;
+exports.serializers.s = function(stream) {
+ if (!('_id' in stream)) {
+ stream._id = nextId;
+ nextId += 1;
+ }
+ return stream._id;
+};