Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ var destroyer = function(self, end) {
}
}

var getStateLength = function(state) {
if (state.buffer.length) {
// Since node 6.3.0 state.buffer is a BufferList not an array
if (state.buffer.head) {
return state.buffer.head.data.length
}

return state.buffer[0].length
}

return state.length
}


var end = function(ws, fn) {
if (!ws) return fn()
if (ws._writableState && ws._writableState.finished) return fn()
Expand Down Expand Up @@ -46,6 +60,7 @@ var Duplexify = function(writable, readable, opts) {
this._unread = null
this._ended = false

this.closed = false
this.destroyed = false

if (writable) this.setWritable(writable)
Expand Down Expand Up @@ -130,6 +145,13 @@ Duplexify.prototype.setReadable = function(readable) {
self.push(null)
}

var onclose = function() {
if (!self.closed) {
self.closed = true
self.emit('close')
}
}

var clear = function() {
self._readable2.removeListener('readable', onreadable)
self._readable2.removeListener('end', onend)
Expand All @@ -141,6 +163,7 @@ Duplexify.prototype.setReadable = function(readable) {
this._readable2 = readable._readableState ? readable : toStreams2(readable)
this._readable2.on('readable', onreadable)
this._readable2.on('end', onend)
this._readable2.on('close', onclose)
this._unread = clear

this._forward()
Expand All @@ -157,8 +180,7 @@ Duplexify.prototype._forward = function() {

var data
var state = this._readable2._readableState

while ((data = this._readable2.read(state.buffer.length ? state.buffer[0].length : state.length)) !== null) {
while ((data = this._readable2.read(getStateLength(state))) !== null) {
this._drained = this.push(data)
}

Expand Down Expand Up @@ -188,7 +210,10 @@ Duplexify.prototype._destroy = function(err) {
if (this._writable && this._writable.destroy) this._writable.destroy()
}

this.emit('close')
if (!this.closed) {
this.closed = true
this.emit('close')
}
}

Duplexify.prototype._write = function(data, enc, cb) {
Expand Down
72 changes: 70 additions & 2 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var tape = require('tape')
var through = require('through2')
var concat = require('concat-stream')
var duplexify = require('./')
var tcp = require('net')

tape('passthrough', function(t) {
t.plan(2)
Expand Down Expand Up @@ -261,9 +262,76 @@ tape('close', function(t) {
var dup = duplexify(passthrough, passthrough)
var ok = false

passthrough.emit('close')
dup.on('close', function() {
t.ok(true, 'should forward close')
t.end()
})
})

passthrough.emit('close')
})

tape('close is bubbled up on both ends - destroy on listener', function(t) {
var listener
var counter = 0

listener = tcp.createServer(function(socket) {
var dup = duplexify(socket, socket)

socket.on('close', count)
dup.on('close', count)

dup.destroy()
})

listener.listen(0)

var socket = tcp.connect(listener.address())
var dup = duplexify(socket, socket)

socket.on('close', count)
dup.on('close', count)

function count() {
if (++counter === 4) {
return listener.close(t.end)
}
}
})

tape('close is bubbled up on both ends - destroy on dialer', function(t) {
var listener
var counter = 0
t.plan(2)
listener = tcp.createServer(function(socket) {
var dup = duplexify(socket, socket)

dup.once('data', function(chunk) {
t.same(chunk, Buffer('hello world'))
})

socket.on('close', count)
dup.on('close', count)
})

listener.listen(0)

var socket = tcp.connect(listener.address())
var dup = duplexify(socket, socket)

dup.write(Buffer('hello world'))

socket.on('close', count)
dup.on('close', count)

setTimeout(function () {
dup.destroy()
}, 100)

function count() {
if (++counter === 4) {
return listener.close(function() {
t.ok(true)
})
}
}
})