Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions examples/index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
var eventStream = require( 'event-stream' ),
cStream = require( '../lib' );

// Create some data...
var data = new Array( 1000 );
for ( var i = 0; i < data.length; i++ ) {
data[ i ] = Math.random();
// Create some data for given size...
function createArray(size){
var data = new Array( size );
for ( var i = 0; i < data.length; i++ ) {
data[ i ] = i;
}
return data;
}

// Create a readable stream:
var readStream = eventStream.readArray( data );
// sets up an example given a chankify transformation
// factory and example data size
function setupExample(transformFactory, size){

// create an array of given size
const randomArray = createArray(size);

// create a stream out of it
const stream = eventStream.readArray(randomArray);

// Create a new chunkify stream:
var stream = cStream()
.numValues( 10 )
.stream();
// pipe ste stream to chunkify
return stream.pipe( transformFactory.stream() )
//improve output format
.pipe( eventStream.map( function( d, clbk ){
clbk( null, 'stream'+size+': '+JSON.stringify( d )+'\n' );
}))

}

// Create a new chunkify stream factory:
var transformFactory = cStream()
.numValues( 10 );

// Pipe the data:
readStream.pipe( stream )
.pipe( eventStream.map( function( d, clbk ){
clbk( null, JSON.stringify( d )+'\n' );
}))
.pipe( process.stdout );
// set the size of your examples here
var exampleSizes = [5,10,11,1000]

// setup examples fro all the sizes
for (var i=0; i<exampleSizes.length; i++){
setupExample(transformFactory, exampleSizes[i])
// pipe the data to stdout
.pipe( process.stdout );
}
29 changes: 22 additions & 7 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
var // Through module:
through2 = require( 'through2' );


// FUNCTIONS //

/**
Expand All @@ -53,7 +52,7 @@
* @param {Number} numValues - number of values to chunk
* @returns {Function} callback
*/
function onData( numValues ) {
function transform( numValues ) {
var i = -1, buffer;

// Initialize the buffer array used for chunking:
Expand All @@ -68,7 +67,7 @@
* @param {String} encoding
* @param {Function} clbk - callback to invoke after handling streamed data. Function accepts two arguments: [ error, chunk ].
*/
return function onData( newVal, encoding, clbk ) {
function onData( newVal, encoding, clbk ) {
i += 1;
buffer[ i ] = newVal;
if ( i === numValues-1 ) {
Expand All @@ -78,7 +77,22 @@
}
clbk();
}; // end FUNCTION onData()
} // end FUNCTION onData()

/**
* FUNCTION: flush( clbk )
* flush function. Pushes the rest of the data on source close.
*
* @param {Function} clbk - callback to invoke after flush. Function accepts one argument: [ error ].
*/
function flush(clbk) {
if( i>-1 ) {
this.push( buffer.slice( 0, i+1 ) )
}
clbk();
} // end FUNCTION flush()

return { onData:onData, flush: flush }
} // end FUNCTION transform()


// STREAM //
Expand Down Expand Up @@ -123,14 +137,15 @@
* @returns {object} through stream
*/
Stream.prototype.stream = function() {
return through2( {'objectMode': true}, onData( this._numValues ) );
var transformation = transform( this._numValues )
return through2( {'objectMode': true}, transformation.onData, transformation.flush );
}; // end METHOD stream()


// EXPORTS //

module.exports = function createStream() {
return new Stream();
};

})();
})();

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
"url": "https://github.com/flow-io/flow-chunkify/issues"
},
"dependencies": {
"through2": "^0.5.1"
"through2": "^2.0.3"
},
"devDependencies": {
"chai": "1.x.x",
"chai": "4.x.x",
"event-stream": "^3.1.7",
"mocha": "1.x.x"
"mocha": "3.x.x"
},
"licenses": [
{
Expand Down
38 changes: 38 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,43 @@ describe( 'flow-chunkify', function tests() {
done();
} // end FUNCTION onRead()
});
it( 'should push the rest of the data on source close', function test( done ) {
var data, expected, cStream, NUMVALUES = 3;

// Simulate some data that do no match chunk size
data = [ 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6 ];

// Expected values:
expected = [ [2,2,2], [3,3,3], [4,4,4], [5,5,5], [6,6]];

// Create a new chunkify stream:
cStream = chunkStream()
.numValues( NUMVALUES )
.stream();

// Mock reading from the stream:
utils.readStream( cStream, onRead );

// Mock piping a data to the stream:
utils.writeStream( data, cStream );

return;

/**
* FUNCTION: onRead( error, actual )
* Read event handler. Checks for errors and compares streamed data to expected data.
*/
function onRead( error, actual ) {
expect( error ).to.not.exist;

for ( var i = 0; i < expected.length; i++ ) {
assert.deepEqual(
actual[ i ],
expected[ i ]
);
}

done();
} // end FUNCTION onRead()
});
});