From 8bf80bbb9d2e1c0d9e4a1c0803d1b9d6cf49cecf Mon Sep 17 00:00:00 2001 From: Andreas Galazis Date: Sat, 22 Jul 2017 15:18:48 +0300 Subject: [PATCH 1/5] Added support for flushing the remainder of the stream, updated dependencies, tests and examples --- examples/index.js | 36 +++++++++++++++++++++--------------- lib/index.js | 26 ++++++++++++++++++++------ package.json | 6 +++--- test/test.js | 38 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/examples/index.js b/examples/index.js index efb871e..8f6a1f0 100644 --- a/examples/index.js +++ b/examples/index.js @@ -2,22 +2,28 @@ var eventStream = require( 'event-stream' ), cStream = require( '../lib' ); // Create some data... -var data = new Array( 1000 ); -for ( var i = 0; i < data.length; i++ ) { - data[ i ] = Math.random(); +function createArray(size){ + var data = new Array( size ); + for ( var i = 0; i < data.length; i++ ) { + data[ i ] = i; + } + return data; +} +function setupExample(transformFactory, size){ + const randomArray = createArray(size); + const stream = eventStream.readArray(randomArray); + return stream.pipe( transformFactory.stream() ) + .pipe( eventStream.map( function( d, clbk ){ + clbk( null, 'stream'+size+': '+JSON.stringify( d )+'\n' ); + })) + } - -// Create a readable stream: -var readStream = eventStream.readArray( data ); // Create a new chunkify stream: -var stream = cStream() - .numValues( 10 ) - .stream(); - +var transformFactory = cStream() + .numValues( 10 ); +var exampleSizes = [5,10,11,1000] +for (var i=0; i-1){ + this.push(buffer.slice(0,i+1) ) + } + clbk(); + } + return { onData:onData, flush: flush } } // end FUNCTION onData() @@ -123,14 +136,15 @@ * @returns {object} through stream */ Stream.prototype.stream = function() { - return through2( {'objectMode': true}, onData( this._numValues ) ); + var transformation=transform( this._numValues ) + return through2( {'objectMode': true}, transformation.onData , transformation.flush ); }; // end METHOD stream() - // EXPORTS // module.exports = function createStream() { return new Stream(); }; -})(); \ No newline at end of file +})(); + diff --git a/package.json b/package.json index 8b15d9e..d85398b 100644 --- a/package.json +++ b/package.json @@ -36,12 +36,12 @@ "url": "https://github.com/flow-io/flow-chunkify/issues" }, "dependencies": { - "through2": "^0.5.1" + "through2": "^2.0.3" }, "devDependencies": { - "chai": "1.x.x", + "chai": "4.x.x", "event-stream": "^3.1.7", - "mocha": "1.x.x" + "mocha": "3.x.x" }, "licenses": [ { diff --git a/test/test.js b/test/test.js index f0a10a1..d89e2bb 100644 --- a/test/test.js +++ b/test/test.js @@ -170,5 +170,43 @@ describe( 'flow-chunkify', function tests() { done(); } // end FUNCTION onRead() }); + it( 'should push the rest of the data on source close', function test( done ) { + var data, expected, cStream, NUMVALUES = 3; + + // Simulate some data that do no match chunk size + data = [ 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6 ]; + + // Expected values: + expected = [ [2,2,2], [3,3,3], [4,4,4], [5,5,5], [6,6]]; + + // Create a new chunkify stream: + cStream = chunkStream() + .numValues( NUMVALUES ) + .stream(); + + // Mock reading from the stream: + utils.readStream( cStream, onRead ); + + // Mock piping a data to the stream: + utils.writeStream( data, cStream ); + + return; + /** + * FUNCTION: onRead( error, actual ) + * Read event handler. Checks for errors and compares streamed data to expected data. + */ + function onRead( error, actual ) { + expect( error ).to.not.exist; + + for ( var i = 0; i < expected.length; i++ ) { + assert.deepEqual( + actual[ i ], + expected[ i ] + ); + } + + done(); + } // end FUNCTION onRead() + }); }); \ No newline at end of file From c63ad9e16eabfd8b0d613ae70aa2fd103ae512fc Mon Sep 17 00:00:00 2001 From: Andreas Galazis Date: Sat, 22 Jul 2017 15:44:36 +0300 Subject: [PATCH 2/5] fixed formatting --- examples/index.js | 6 +++--- lib/index.js | 22 +++++++++++----------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/examples/index.js b/examples/index.js index 8f6a1f0..dcbb144 100644 --- a/examples/index.js +++ b/examples/index.js @@ -13,9 +13,9 @@ function setupExample(transformFactory, size){ const randomArray = createArray(size); const stream = eventStream.readArray(randomArray); return stream.pipe( transformFactory.stream() ) - .pipe( eventStream.map( function( d, clbk ){ - clbk( null, 'stream'+size+': '+JSON.stringify( d )+'\n' ); - })) + .pipe( eventStream.map( function( d, clbk ){ + clbk( null, 'stream'+size+': '+JSON.stringify( d )+'\n' ); + })) } diff --git a/lib/index.js b/lib/index.js index 3d6461f..5f6a028 100644 --- a/lib/index.js +++ b/lib/index.js @@ -79,19 +79,19 @@ }; // end FUNCTION onData() /** - * FUNCTION: onData( newVal, encoding, clbk ) - * flush function. Handles the remaining data before end event. + * FUNCTION: flush( clbk ) + * flush function. Pushes the rest of the data on source close. * - * @param {Function} clbk - callback to invoke after handling streamed data. Function accepts two arguments: [ error, chunk ]. + * @param {Function} clbk - callback to invoke after flush. Function accepts one argument: [ error ]. */ - function flush(clbk){ - if(i>-1){ - this.push(buffer.slice(0,i+1) ) - } + function flush(clbk) { + if( i>-1 ) { + this.push( buffer.slice( 0, i+1 ) ) + } clbk(); - } + } // end FUNCTION flush() return { onData:onData, flush: flush } - } // end FUNCTION onData() + } // end FUNCTION transform() // STREAM // @@ -136,8 +136,8 @@ * @returns {object} through stream */ Stream.prototype.stream = function() { - var transformation=transform( this._numValues ) - return through2( {'objectMode': true}, transformation.onData , transformation.flush ); + var transformation = transform( this._numValues ) + return through2( {'objectMode': true}, transformation.onData, transformation.flush ); }; // end METHOD stream() From 261791da5a5301ad70fdfb1e229dd1c2a9a0c27d Mon Sep 17 00:00:00 2001 From: Andreas Galazis Date: Sat, 22 Jul 2017 15:47:59 +0300 Subject: [PATCH 3/5] fixed formatting --- lib/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/index.js b/lib/index.js index 5f6a028..b4335e0 100644 --- a/lib/index.js +++ b/lib/index.js @@ -90,6 +90,7 @@ } clbk(); } // end FUNCTION flush() + return { onData:onData, flush: flush } } // end FUNCTION transform() From 292dc9e17ef6b1d7c2973c3c33fe7a683143654e Mon Sep 17 00:00:00 2001 From: Andreas Galazis Date: Sat, 22 Jul 2017 15:51:16 +0300 Subject: [PATCH 4/5] added missing exports comment --- lib/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/index.js b/lib/index.js index b4335e0..bf9875b 100644 --- a/lib/index.js +++ b/lib/index.js @@ -90,7 +90,7 @@ } clbk(); } // end FUNCTION flush() - + return { onData:onData, flush: flush } } // end FUNCTION transform() @@ -141,7 +141,7 @@ return through2( {'objectMode': true}, transformation.onData, transformation.flush ); }; // end METHOD stream() - + // EXPORTS // module.exports = function createStream() { return new Stream(); From 199a87de5c3263e56ecaff545d7742aab66fcea9 Mon Sep 17 00:00:00 2001 From: Andreas Galazis Date: Sat, 22 Jul 2017 16:13:15 +0300 Subject: [PATCH 5/5] added comments in examples --- examples/index.js | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/examples/index.js b/examples/index.js index dcbb144..9cadf0e 100644 --- a/examples/index.js +++ b/examples/index.js @@ -1,7 +1,7 @@ var eventStream = require( 'event-stream' ), cStream = require( '../lib' ); -// Create some data... +// Create some data for given size... function createArray(size){ var data = new Array( size ); for ( var i = 0; i < data.length; i++ ) { @@ -9,21 +9,36 @@ function createArray(size){ } return data; } + +// sets up an example given a chankify transformation +// factory and example data size function setupExample(transformFactory, size){ + + // create an array of given size const randomArray = createArray(size); + + // create a stream out of it const stream = eventStream.readArray(randomArray); + + // pipe ste stream to chunkify return stream.pipe( transformFactory.stream() ) + //improve output format .pipe( eventStream.map( function( d, clbk ){ clbk( null, 'stream'+size+': '+JSON.stringify( d )+'\n' ); })) } -// Create a new chunkify stream: +// Create a new chunkify stream factory: var transformFactory = cStream() .numValues( 10 ); + +// set the size of your examples here var exampleSizes = [5,10,11,1000] + +// setup examples fro all the sizes for (var i=0; i