Skip to content

Commit

Permalink
lib: refactor wrap_js_stream for ES6/readability
Browse files Browse the repository at this point in the history
PR-URL: #16158
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Tobias Nießen <tniessen@tnie.de>
  • Loading branch information
addaleax committed Oct 19, 2017
1 parent 542e94c commit 33b4320
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 183 deletions.
359 changes: 178 additions & 181 deletions lib/internal/wrap_js_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,216 +8,213 @@ const uv = process.binding('uv');
const debug = util.debuglog('stream_wrap');
const errors = require('internal/errors');

function StreamWrap(stream) {
const handle = new JSStream();

this.stream = stream;

this._list = null;

const self = this;
handle.close = function(cb) {
debug('close');
self.doClose(cb);
};
handle.isAlive = function() {
return self.isAlive();
};
handle.isClosing = function() {
return self.isClosing();
};
handle.onreadstart = function() {
return self.readStart();
};
handle.onreadstop = function() {
return self.readStop();
};
handle.onshutdown = function(req) {
return self.doShutdown(req);
};
handle.onwrite = function(req, bufs) {
return self.doWrite(req, bufs);
};

this.stream.pause();
this.stream.on('error', function onerror(err) {
self.emit('error', err);
});
this.stream.on('data', function ondata(chunk) {
if (typeof chunk === 'string' || this._readableState.objectMode === true) {
// Make sure that no further `data` events will happen
this.pause();
this.removeListener('data', ondata);

self.emit('error', new errors.Error('ERR_STREAM_WRAP'));
return;
}

debug('data', chunk.length);
if (self._handle)
self._handle.readBuffer(chunk);
});
this.stream.once('end', function onend() {
debug('end');
if (self._handle)
self._handle.emitEOF();
});

Socket.call(this, {
handle: handle
});
}
util.inherits(StreamWrap, Socket);
module.exports = StreamWrap;

// require('_stream_wrap').StreamWrap
StreamWrap.StreamWrap = StreamWrap;

StreamWrap.prototype.isAlive = function isAlive() {
return true;
};

StreamWrap.prototype.isClosing = function isClosing() {
return !this.readable || !this.writable;
};

StreamWrap.prototype.readStart = function readStart() {
this.stream.resume();
return 0;
};

StreamWrap.prototype.readStop = function readStop() {
this.stream.pause();
return 0;
};

StreamWrap.prototype.doShutdown = function doShutdown(req) {
const self = this;
const handle = this._handle;
const item = this._enqueue('shutdown', req);

this.stream.end(function() {
// Ensure that write was dispatched
setImmediate(function() {
if (!self._dequeue(item))
/* This class serves as a wrapper for when the C++ side of Node wants access
* to a standard JS stream. For example, TLS or HTTP do not operate on network
* resources conceptually, although that is the common case and what we are
* optimizing for; in theory, they are completely composable and can work with
* any stream resource they see.
*
* For the common case, i.e. a TLS socket wrapping around a net.Socket, we
* can skip going through the JS layer and let TLS access the raw C++ handle
* of a net.Socket. The flipside of this is that, to maintain composability,
* we need a way to create "fake" net.Socket instances that call back into a
* "real" JavaScript stream. JSStreamWrap is exactly this.
*/
class JSStreamWrap extends Socket {
constructor(stream) {
const handle = new JSStream();
handle.close = (cb) => {
debug('close');
this.doClose(cb);
};
handle.isAlive = () => this.isAlive();
handle.isClosing = () => this.isClosing();
handle.onreadstart = () => this.readStart();
handle.onreadstop = () => this.readStop();
handle.onshutdown = (req) => this.doShutdown(req);
handle.onwrite = (req, bufs) => this.doWrite(req, bufs);

stream.pause();
stream.on('error', (err) => this.emit('error', err));
const ondata = (chunk) => {
if (typeof chunk === 'string' ||
stream._readableState.objectMode === true) {
// Make sure that no further `data` events will happen.
stream.pause();
stream.removeListener('data', ondata);

this.emit('error', new errors.Error('ERR_STREAM_WRAP'));
return;
}

handle.finishShutdown(req, 0);
debug('data', chunk.length);
if (this._handle)
this._handle.readBuffer(chunk);
};
stream.on('data', ondata);
stream.once('end', () => {
debug('end');
if (this._handle)
this._handle.emitEOF();
});
});
return 0;
};

StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
const self = this;
const handle = self._handle;
super({ handle, manualStart: true });
this.stream = stream;
this._list = null;
this.read(0);
}

var pending = bufs.length;
// Legacy
static get StreamWrap() {
return JSStreamWrap;
}

// Queue the request to be able to cancel it
const item = self._enqueue('write', req);
isAlive() {
return true;
}

self.stream.cork();
for (var n = 0; n < bufs.length; n++)
self.stream.write(bufs[n], done);
self.stream.uncork();
isClosing() {
return !this.readable || !this.writable;
}

function done(err) {
if (!err && --pending !== 0)
return;
readStart() {
this.stream.resume();
return 0;
}

// Ensure that this is called once in case of error
pending = 0;
readStop() {
this.stream.pause();
return 0;
}

let errCode = 0;
if (err) {
const code = uv[`UV_${err.code}`];
errCode = (err.code && code) ? code : uv.UV_EPIPE;
}
doShutdown(req) {
const handle = this._handle;
const item = this._enqueue('shutdown', req);

// Ensure that write was dispatched
setImmediate(function() {
// Do not invoke callback twice
if (!self._dequeue(item))
return;
this.stream.end(() => {
// Ensure that write was dispatched
setImmediate(() => {
if (!this._dequeue(item))
return;

handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
handle.finishShutdown(req, 0);
});
});
return 0;
}

return 0;
};
doWrite(req, bufs) {
const self = this;
const handle = this._handle;

function QueueItem(type, req) {
this.type = type;
this.req = req;
this.prev = this;
this.next = this;
}
var pending = bufs.length;

StreamWrap.prototype._enqueue = function _enqueue(type, req) {
const item = new QueueItem(type, req);
if (this._list === null) {
this._list = item;
return item;
}
// Queue the request to be able to cancel it
const item = this._enqueue('write', req);

this.stream.cork();
for (var n = 0; n < bufs.length; n++)
this.stream.write(bufs[n], done);
this.stream.uncork();

function done(err) {
if (!err && --pending !== 0)
return;

// Ensure that this is called once in case of error
pending = 0;

item.next = this._list.next;
item.prev = this._list;
item.next.prev = item;
item.prev.next = item;
let errCode = 0;
if (err) {
const code = uv[`UV_${err.code}`];
errCode = (err.code && code) ? code : uv.UV_EPIPE;
}

return item;
};
// Ensure that write was dispatched
setImmediate(function() {
// Do not invoke callback twice
if (!self._dequeue(item))
return;

StreamWrap.prototype._dequeue = function _dequeue(item) {
assert(item instanceof QueueItem);
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
});
}

var next = item.next;
var prev = item.prev;
return 0;
}

if (next === null && prev === null)
return false;
_enqueue(type, req) {
const item = new QueueItem(type, req);
if (this._list === null) {
this._list = item;
return item;
}

item.next = null;
item.prev = null;
item.next = this._list.next;
item.prev = this._list;
item.next.prev = item;
item.prev.next = item;

if (next === item) {
prev = null;
next = null;
} else {
prev.next = next;
next.prev = prev;
return item;
}

if (this._list === item)
this._list = next;
_dequeue(item) {
assert(item instanceof QueueItem);

return true;
};
var next = item.next;
var prev = item.prev;

StreamWrap.prototype.doClose = function doClose(cb) {
const self = this;
const handle = self._handle;
if (next === null && prev === null)
return false;

setImmediate(function() {
while (self._list !== null) {
const item = self._list;
const req = item.req;
self._dequeue(item);
item.next = null;
item.prev = null;

const errCode = uv.UV_ECANCELED;
if (item.type === 'write') {
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
} else if (item.type === 'shutdown') {
handle.finishShutdown(req, errCode);
}
if (next === item) {
prev = null;
next = null;
} else {
prev.next = next;
next.prev = prev;
}

// Should be already set by net.js
assert(self._handle === null);
cb();
});
};
if (this._list === item)
this._list = next;

return true;
}

doClose(cb) {
const handle = this._handle;

setImmediate(() => {
while (this._list !== null) {
const item = this._list;
const req = item.req;
this._dequeue(item);

const errCode = uv.UV_ECANCELED;
if (item.type === 'write') {
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
} else if (item.type === 'shutdown') {
handle.finishShutdown(req, errCode);
}
}

// Should be already set by net.js
assert.strictEqual(this._handle, null);
cb();
});
}
}

function QueueItem(type, req) {
this.type = type;
this.req = req;
this.prev = this;
this.next = this;
}

module.exports = JSStreamWrap;
Loading

0 comments on commit 33b4320

Please sign in to comment.