From ee0ea8c3d2d8b19c17dd4e5e389d7351fb2ddfba Mon Sep 17 00:00:00 2001 From: Slava Nadvorny Date: Tue, 29 Jul 2014 12:48:45 +0300 Subject: [PATCH] v1.1.0. Use node cluster load balancer. This is a fix for an issue when mixdown cluster works behind nginx reversed proxy. Master process accepted connection and sent it to workers process. Pipelined http requests were lost en route. Browsers did fallback from pipelined mode to keep-alive only so no issue with browser directly connecting to mixdown. This implementation uses cluster built-in socket sharing and load balancing. --- index.js | 147 ++++++++++++++++++-------------------------------- lib/master.js | 102 ----------------------------------- lib/worker.js | 54 +++++++++++++------ package.json | 2 +- 4 files changed, 92 insertions(+), 213 deletions(-) delete mode 100644 lib/master.js diff --git a/index.js b/index.js index ddd262d..12dd5b4 100644 --- a/index.js +++ b/index.js @@ -1,118 +1,86 @@ var _ = require('lodash'); -var util = require('util'); var cluster = require('cluster'); -var mixdownMaster = require('./lib/master.js'); -var mixdownWorker = require('./lib/worker.js'); -var path = require('path'); -var packageJSON = require(path.join(process.cwd(), '/package.json')); +var Worker = require('./lib/worker.js'); // Export the factory -module.exports.create = function(mixdown, options) { - var main = new Main(mixdown, options); - return main; +module.exports.create = function (mixdown, options) { + return new Main(mixdown, options); }; -var Main = function(mixdown, options) { - // instance attrs - this.server = null; - this.workers = {}; // if this is a master, then we'll load this with child processes. - this.socket = null; +var Main = function (mixdownConfig, serverOptions) { + this.mixdown = mixdownConfig; - this.master = null; // if this is a master, then we'll set this delegate. - this.worker = null; // if this is a worker, then we'll set this delegate. - - // passed configs. - this.mixdown = mixdown; - - this.options = _.defaults(options || {}, { + this.options = _.defaults(serverOptions || {}, { cluster: { on: false } }); }; -var logServerInfo = function(server,message) { - var hmap = _.map(server.mixdown.apps, function(app){ - return _.pick(app, 'vhosts', 'id'); +var logServerInfo = function (server, message) { + var hmap = _.map(server.mixdown.apps, function (app) { + return _.pick(app, 'vhosts', 'id'); }); var address = server.server && server.server.address(); - logger.info(message || 'Server Information. ', address|| ' ', hmap); + logger.info(message || 'Server Information. ', address || ' ', hmap); }; -Main.prototype.createMaster = function(callback) { +Main.prototype.createMaster = function (callback) { var self = this; - // start server. Sets up server, port, and starts the app. - self.master = new mixdownMaster(self.workers, self.options, self.mixdown); + if (!self.options.cluster.on) { + logger.info('Starting master server pid: ' + process.pid); + var master = new Worker(self.mixdown, self.options); + } - self.master.start(function(err, data) { - if (err) { - logger.error("Could not start server. Stopping process.", err); - process.exit(); - } - else { - self.socket = data.socket; - self.server = data.server; - logServerInfo(self, 'Server started successfully.'); - typeof(callback) === 'function' ? callback(err, self) : null; - } - }); + logServerInfo(self, 'Server started successfully.'); + + if (_.isFunction(callback)) callback(null); }; -Main.prototype.stop = function(callback) { +Main.prototype.stop = function () { throw new Error('stop() not implemented on server. TODO.'); }; -Main.prototype.start = function(callback) { +Main.prototype.start = function (callback) { var self = this; var mixdown = this.mixdown; + var options = this.options; // this reload listener just logs the reload info. - mixdown.on('reload', function() { + mixdown.on('reload', function () { logServerInfo(self, 'Mixdown reloaded. '); }); - // Start cluster. - var clusterConfig = mixdown.main.options.cluster || {}; - - if(clusterConfig.on){ - var numChidrenToSpawn = clusterConfig.workers || require('os').cpus().length; + if (options.cluster.on) { + // Start cluster. + var numChildrenToSpawn = options.cluster.workers || require('os').cpus().length; - if(cluster.isMaster){ - logger.info("Using cluster"); - //cluser is on, and this is the master! - logger.info("Starting master with " + numChidrenToSpawn + " workers"); + if (cluster.isMaster) { + logger.info("Using cluster."); + //cluster is on, and this is the master! + logger.info("Starting master with " + numChildrenToSpawn + " workers."); // spawn n workers - for (var i = 0; i < numChidrenToSpawn; i++) { - (function(){ - var child = cluster.fork(); - - child.once('message',function(message){ - if(message == 'ready'){ - - self.workers[child.process.pid] = child; - logger.debug('initial child ready'); - } - }); - - })(); + for (var i = 0; i < numChildrenToSpawn; i++) { + var child = cluster.fork(); + logger.debug('Initializing worker pid: ' + child.process.pid); } // Add application kill signals. var signals = ['SIGINT', 'SIGTERM', 'SIGQUIT']; - _.each(signals, function(sig) { + _.each(signals, function (sig) { - process.on(sig, function() { + process.on(sig, function () { - _.each(cluster.workers, function(child) { + _.each(cluster.workers, function (child) { child.destroy(); // send suicide signal }); // create function to check self all workers are dead. - var checkExit = function() { - if (_.keys(cluster.workers).length == 0) { + var checkExit = function () { + if (_.keys(cluster.workers).length === 0) { process.exit(); } else { @@ -125,48 +93,37 @@ Main.prototype.start = function(callback) { }); }); - cluster.on('disconnect',function(worker) { - delete self.workers[worker.process.pid]; - logger.info('worker '+worker.process.pid+' disconnected'); + cluster.on('disconnect', function (worker) { + logger.info('Worker ' + worker.process.pid + ' disconnected.'); }); - cluster.on('exit', function(worker) { - - // remove the child from the tracked running list.. - delete self.workers[worker.process.pid]; + cluster.on('exit', function (worker) { // if it purposely destroyed itself, then do no re-spawn. - if(!worker.suicide){ - logger.error('Worker exited unexpectedly. Spawning new worker'); - // spawn new child + if (!worker.suicide) { + logger.error('Worker exited unexpectedly. Spawning new worker'); var child = cluster.fork(); - - child.on('message',function(message){ - if(message == 'ready'){ - logger.debug('respawned child ready id: ' + child.process.pid); - self.workers[child.process.pid] = child; - } - }); + logger.debug('Restarting child with pid: ' + child.process.pid + '...'); } }); self.createMaster(callback); - } - else { - //cluser is on, and this is a worker! - logger.info("new worker Worker id: "+process.pid); + } + else if (cluster.isWorker) { + //cluster is on, and this is a worker. + logger.info("I'm a new worker Worker pid: " + process.pid); try { - self.worker = new mixdownWorker(mixdown); + var worker = new Worker(mixdown, options); } - catch(e) { - typeof(callback) === 'function' ? callback(e, self) : null; + catch (e) { + if (_.isFunction(callback)) callback(e, self); } } } - else { - //cluster isn't running so create a master server. + else if (!options.cluster.on) { + logger.info("Standalone (non-clustered) server."); self.createMaster(callback); } }; diff --git a/lib/master.js b/lib/master.js deleted file mode 100644 index 2095ad9..0000000 --- a/lib/master.js +++ /dev/null @@ -1,102 +0,0 @@ -var _ = require('lodash'); -var fs = require('fs'); -var net = require('net'); -var util = require('util'); -var assert = require('assert'); -var Worker = require('./worker.js'); - -var Master = function(workers, options,mixdown) { - - this.options = options || {}; - this.currentIndex = 0; - this.server = null; - this.socket = null; - this.workers = workers; - this.mixdown = mixdown; - - if(!options.cluster.on){ - this.worker = new Worker(mixdown); - } -}; - -Master.prototype.getWorker = function() { - var pids = Object.keys(this.workers); - - if (this.currentIndex >= pids.length) { - this.currentIndex = 0; - } - - return this.workers[pids[this.currentIndex++]]; -}; - -Master.prototype.distribute = function(socket) { - var worker = this.getWorker() - var self = this; - - if(!worker){ - //TODO::make a correct http error - socket.end('HTTP 1.1 500 Internal Server Error\nContent-Length:0'); - } - else{ - self.handoff(worker, socket); - } -}; - -Master.prototype.handoff = function(worker, socket) { - var self = this; - worker.send('socket', socket); -}; - -// create the server. -Master.prototype.start = function(callback) { - - var self = this; - var clusterEnabled = self.options.cluster.on; - - // Set port from server config. - var listen = this.options.listen || {}; - - if (listen.type !== 'unix' && !isNaN(process.env.MIXDOWN_PORT)) { - listen.port = process.env.MIXDOWN_PORT; - } - - var lp = listen.type === 'unix' ? listen.path : listen.port; - - var create = function() { - - logger.info('Starting master server'); - - self.server = net.createServer(function() { return; }); - - self.server.listen(lp, function(err) { - callback(err, _.pick(self, 'server', 'socket')); - }); - - // remove the defaults so we don't parse things 2x - self.server.removeAllListeners('connection'); - - // add our connection handler: - // if cluster is used, then the socket is distributed to a worker. - // if cluster is not used, then handle in master. - self.server.on('connection', function(socket) { - if (clusterEnabled) { - socket.pause(); - self.distribute(socket); - } - else { - self.worker.handleMessage('socket', socket); - } - }); - }; - - // remove old file descriptor if needed. - if (listen.type === 'unix') { - fs.unlink(lp, create); - } - else { - create(); - //self.on('request',self.worker.handleRequest.bind(self.worker)); - } -}; - -module.exports = Master; diff --git a/lib/worker.js b/lib/worker.js index abc7ce3..3f26444 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -2,18 +2,49 @@ var _ = require('lodash'); var http = require('http'); var util = require('util'); var events = require('events'); +var fs = require('fs'); -var Worker = function(mixdown) { - this.vhosts = {}; - this.mixdown = mixdown; +var Worker = function(mixdown, options, callback) { + var self = this; + self.vhosts = {}; + self.mixdown = mixdown; + self.options = options; this.on('request', this.handleRequest); - // send message support. we listen for a socket. - process.on('message', this.handleMessage.bind(this)); + /** + * Setting up connection in cluster child process actually shares socket between child processes. + * There is no need to load-balance or dispatch requests between nodes. + * + * @see http://nodejs.org/api/cluster.html#cluster_cluster + */ + (function setupConnection() { + var listen = self.options.listen || {}; + + if (listen.type !== 'unix' && !isNaN(process.env.MIXDOWN_PORT)) { + listen.port = process.env.MIXDOWN_PORT; + } + + var lp = listen.type === 'unix' ? listen.path : listen.port; - // check if the external Config is enabled and starting listening if so. - var self = this; + var startListening = function () { + + self.server = http.createServer(); + self.server.on('connection', function (socket) { + http._connectionListener.call(self, socket); + }); + + self.server.listen(lp, callback); + }; + + // remove socket file descriptor before connecting. + if (listen.type === 'unix') { + fs.unlink(lp, startListening); + } + else { + startListening(); + } + })(); //this is a bit of a problem, it is an async function in the constructor //also i don't think it's necessary as mixdown will do this? @@ -52,7 +83,7 @@ var Worker = function(mixdown) { }); } }); - + //need to do this incase process.send is not defined //which will be the case when cluster is off if(process.send){ @@ -129,11 +160,4 @@ Worker.prototype.handleRequest = function(req, res) { } }; -Worker.prototype.handleMessage = function(message, socket) { - if (message === 'socket') { - // call the http connection parsing stuff here. - http._connectionListener.call(this, socket); - } -}; - module.exports = Worker; \ No newline at end of file diff --git a/package.json b/package.json index 4a960b1..3ebecb3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mixdown-server", - "version": "1.0.6", + "version": "1.1.0", "main": "index", "description": "Simple server for activiting a site.", "keywords": [