From 048ec46ccdebd6d7e4977722f0616a13ee6cca04 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 23 Jun 2016 11:46:09 +0100 Subject: [PATCH 1/2] add tests to check for the close event --- index.js | 14 ++++++++++++- test.js | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 7ae4473..92bd339 100644 --- a/index.js +++ b/index.js @@ -46,6 +46,7 @@ var Duplexify = function(writable, readable, opts) { this._unread = null this._ended = false + this.closed = false this.destroyed = false if (writable) this.setWritable(writable) @@ -130,6 +131,13 @@ Duplexify.prototype.setReadable = function(readable) { self.push(null) } + var onclose = function() { + if (!self.closed) { + self.closed = true + self.emit('close') + } + } + var clear = function() { self._readable2.removeListener('readable', onreadable) self._readable2.removeListener('end', onend) @@ -141,6 +149,7 @@ Duplexify.prototype.setReadable = function(readable) { this._readable2 = readable._readableState ? readable : toStreams2(readable) this._readable2.on('readable', onreadable) this._readable2.on('end', onend) + this._readable2.on('close', onclose) this._unread = clear this._forward() @@ -188,7 +197,10 @@ Duplexify.prototype._destroy = function(err) { if (this._writable && this._writable.destroy) this._writable.destroy() } - this.emit('close') + if (!this.closed) { + this.closed = true + this.emit('close') + } } Duplexify.prototype._write = function(data, enc, cb) { diff --git a/test.js b/test.js index 91f2063..96467a0 100644 --- a/test.js +++ b/test.js @@ -2,6 +2,7 @@ var tape = require('tape') var through = require('through2') var concat = require('concat-stream') var duplexify = require('./') +var tcp = require('net') tape('passthrough', function(t) { t.plan(2) @@ -261,9 +262,68 @@ tape('close', function(t) { var dup = duplexify(passthrough, passthrough) var ok = false - passthrough.emit('close') dup.on('close', function() { t.ok(true, 'should forward close') t.end() }) -}) \ No newline at end of file + + passthrough.emit('close') +}) + +tape('close is bubbled up on both ends - destroy on listener', function(t) { + var listener + var counter = 0 + + listener = tcp.createServer(function(socket) { + var dup = duplexify(socket, socket) + + socket.on('close', count) + dup.on('close', count) + + dup.destroy() + }) + + listener.listen(0) + + var socket = tcp.connect(listener.address()) + var dup = duplexify(socket, socket) + + socket.on('close', count) + dup.on('close', count) + + function count() { + if (++counter === 4) { + return listener.close(t.end) + } + } +}) + +tape('close is bubbled up on both ends - destroy on dialer', function(t) { + var listener + var counter = 0 + + listener = tcp.createServer(function(socket) { + var dup = duplexify(socket, socket) + + socket.on('close', count) + dup.on('close', count) + }) + + listener.listen(0) + + var socket = tcp.connect(listener.address()) + var dup = duplexify(socket, socket) + + socket.on('close', count) + dup.on('close', count) + + setTimeout(function () { + dup.destroy() + }, 100) + + function count() { + if (++counter === 4) { + return listener.close(t.end) + } + } +}) From 2977545bfaccb8c29022bc40e76bffe91c0e25d4 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Fri, 8 Jul 2016 12:59:46 +0200 Subject: [PATCH 2/2] fix: add compatability with node@6.3.0 streams fixes #6 --- index.js | 17 +++++++++++++++-- test.js | 12 ++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index 92bd339..78799bd 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,20 @@ var destroyer = function(self, end) { } } +var getStateLength = function(state) { + if (state.buffer.length) { + // Since node 6.3.0 state.buffer is a BufferList not an array + if (state.buffer.head) { + return state.buffer.head.data.length + } + + return state.buffer[0].length + } + + return state.length +} + + var end = function(ws, fn) { if (!ws) return fn() if (ws._writableState && ws._writableState.finished) return fn() @@ -166,8 +180,7 @@ Duplexify.prototype._forward = function() { var data var state = this._readable2._readableState - - while ((data = this._readable2.read(state.buffer.length ? state.buffer[0].length : state.length)) !== null) { + while ((data = this._readable2.read(getStateLength(state))) !== null) { this._drained = this.push(data) } diff --git a/test.js b/test.js index 96467a0..91e92a3 100644 --- a/test.js +++ b/test.js @@ -301,10 +301,14 @@ tape('close is bubbled up on both ends - destroy on listener', function(t) { tape('close is bubbled up on both ends - destroy on dialer', function(t) { var listener var counter = 0 - + t.plan(2) listener = tcp.createServer(function(socket) { var dup = duplexify(socket, socket) + dup.once('data', function(chunk) { + t.same(chunk, Buffer('hello world')) + }) + socket.on('close', count) dup.on('close', count) }) @@ -314,6 +318,8 @@ tape('close is bubbled up on both ends - destroy on dialer', function(t) { var socket = tcp.connect(listener.address()) var dup = duplexify(socket, socket) + dup.write(Buffer('hello world')) + socket.on('close', count) dup.on('close', count) @@ -323,7 +329,9 @@ tape('close is bubbled up on both ends - destroy on dialer', function(t) { function count() { if (++counter === 4) { - return listener.close(t.end) + return listener.close(function() { + t.ok(true) + }) } } })