diff --git a/index.js b/index.js index ddd262d..12dd5b4 100644 --- a/index.js +++ b/index.js @@ -1,118 +1,86 @@ var _ = require('lodash'); -var util = require('util'); var cluster = require('cluster'); -var mixdownMaster = require('./lib/master.js'); -var mixdownWorker = require('./lib/worker.js'); -var path = require('path'); -var packageJSON = require(path.join(process.cwd(), '/package.json')); +var Worker = require('./lib/worker.js'); // Export the factory -module.exports.create = function(mixdown, options) { - var main = new Main(mixdown, options); - return main; +module.exports.create = function (mixdown, options) { + return new Main(mixdown, options); }; -var Main = function(mixdown, options) { - // instance attrs - this.server = null; - this.workers = {}; // if this is a master, then we'll load this with child processes. - this.socket = null; +var Main = function (mixdownConfig, serverOptions) { + this.mixdown = mixdownConfig; - this.master = null; // if this is a master, then we'll set this delegate. - this.worker = null; // if this is a worker, then we'll set this delegate. - - // passed configs. - this.mixdown = mixdown; - - this.options = _.defaults(options || {}, { + this.options = _.defaults(serverOptions || {}, { cluster: { on: false } }); }; -var logServerInfo = function(server,message) { - var hmap = _.map(server.mixdown.apps, function(app){ - return _.pick(app, 'vhosts', 'id'); +var logServerInfo = function (server, message) { + var hmap = _.map(server.mixdown.apps, function (app) { + return _.pick(app, 'vhosts', 'id'); }); var address = server.server && server.server.address(); - logger.info(message || 'Server Information. ', address|| ' ', hmap); + logger.info(message || 'Server Information. ', address || ' ', hmap); }; -Main.prototype.createMaster = function(callback) { +Main.prototype.createMaster = function (callback) { var self = this; - // start server. Sets up server, port, and starts the app. - self.master = new mixdownMaster(self.workers, self.options, self.mixdown); + if (!self.options.cluster.on) { + logger.info('Starting master server pid: ' + process.pid); + var master = new Worker(self.mixdown, self.options); + } - self.master.start(function(err, data) { - if (err) { - logger.error("Could not start server. Stopping process.", err); - process.exit(); - } - else { - self.socket = data.socket; - self.server = data.server; - logServerInfo(self, 'Server started successfully.'); - typeof(callback) === 'function' ? callback(err, self) : null; - } - }); + logServerInfo(self, 'Server started successfully.'); + + if (_.isFunction(callback)) callback(null); }; -Main.prototype.stop = function(callback) { +Main.prototype.stop = function () { throw new Error('stop() not implemented on server. TODO.'); }; -Main.prototype.start = function(callback) { +Main.prototype.start = function (callback) { var self = this; var mixdown = this.mixdown; + var options = this.options; // this reload listener just logs the reload info. - mixdown.on('reload', function() { + mixdown.on('reload', function () { logServerInfo(self, 'Mixdown reloaded. '); }); - // Start cluster. - var clusterConfig = mixdown.main.options.cluster || {}; - - if(clusterConfig.on){ - var numChidrenToSpawn = clusterConfig.workers || require('os').cpus().length; + if (options.cluster.on) { + // Start cluster. + var numChildrenToSpawn = options.cluster.workers || require('os').cpus().length; - if(cluster.isMaster){ - logger.info("Using cluster"); - //cluser is on, and this is the master! - logger.info("Starting master with " + numChidrenToSpawn + " workers"); + if (cluster.isMaster) { + logger.info("Using cluster."); + //cluster is on, and this is the master! + logger.info("Starting master with " + numChildrenToSpawn + " workers."); // spawn n workers - for (var i = 0; i < numChidrenToSpawn; i++) { - (function(){ - var child = cluster.fork(); - - child.once('message',function(message){ - if(message == 'ready'){ - - self.workers[child.process.pid] = child; - logger.debug('initial child ready'); - } - }); - - })(); + for (var i = 0; i < numChildrenToSpawn; i++) { + var child = cluster.fork(); + logger.debug('Initializing worker pid: ' + child.process.pid); } // Add application kill signals. var signals = ['SIGINT', 'SIGTERM', 'SIGQUIT']; - _.each(signals, function(sig) { + _.each(signals, function (sig) { - process.on(sig, function() { + process.on(sig, function () { - _.each(cluster.workers, function(child) { + _.each(cluster.workers, function (child) { child.destroy(); // send suicide signal }); // create function to check self all workers are dead. - var checkExit = function() { - if (_.keys(cluster.workers).length == 0) { + var checkExit = function () { + if (_.keys(cluster.workers).length === 0) { process.exit(); } else { @@ -125,48 +93,37 @@ Main.prototype.start = function(callback) { }); }); - cluster.on('disconnect',function(worker) { - delete self.workers[worker.process.pid]; - logger.info('worker '+worker.process.pid+' disconnected'); + cluster.on('disconnect', function (worker) { + logger.info('Worker ' + worker.process.pid + ' disconnected.'); }); - cluster.on('exit', function(worker) { - - // remove the child from the tracked running list.. - delete self.workers[worker.process.pid]; + cluster.on('exit', function (worker) { // if it purposely destroyed itself, then do no re-spawn. - if(!worker.suicide){ - logger.error('Worker exited unexpectedly. Spawning new worker'); - // spawn new child + if (!worker.suicide) { + logger.error('Worker exited unexpectedly. Spawning new worker'); var child = cluster.fork(); - - child.on('message',function(message){ - if(message == 'ready'){ - logger.debug('respawned child ready id: ' + child.process.pid); - self.workers[child.process.pid] = child; - } - }); + logger.debug('Restarting child with pid: ' + child.process.pid + '...'); } }); self.createMaster(callback); - } - else { - //cluser is on, and this is a worker! - logger.info("new worker Worker id: "+process.pid); + } + else if (cluster.isWorker) { + //cluster is on, and this is a worker. + logger.info("I'm a new worker Worker pid: " + process.pid); try { - self.worker = new mixdownWorker(mixdown); + var worker = new Worker(mixdown, options); } - catch(e) { - typeof(callback) === 'function' ? callback(e, self) : null; + catch (e) { + if (_.isFunction(callback)) callback(e, self); } } } - else { - //cluster isn't running so create a master server. + else if (!options.cluster.on) { + logger.info("Standalone (non-clustered) server."); self.createMaster(callback); } }; diff --git a/lib/master.js b/lib/master.js deleted file mode 100644 index 2095ad9..0000000 --- a/lib/master.js +++ /dev/null @@ -1,102 +0,0 @@ -var _ = require('lodash'); -var fs = require('fs'); -var net = require('net'); -var util = require('util'); -var assert = require('assert'); -var Worker = require('./worker.js'); - -var Master = function(workers, options,mixdown) { - - this.options = options || {}; - this.currentIndex = 0; - this.server = null; - this.socket = null; - this.workers = workers; - this.mixdown = mixdown; - - if(!options.cluster.on){ - this.worker = new Worker(mixdown); - } -}; - -Master.prototype.getWorker = function() { - var pids = Object.keys(this.workers); - - if (this.currentIndex >= pids.length) { - this.currentIndex = 0; - } - - return this.workers[pids[this.currentIndex++]]; -}; - -Master.prototype.distribute = function(socket) { - var worker = this.getWorker() - var self = this; - - if(!worker){ - //TODO::make a correct http error - socket.end('HTTP 1.1 500 Internal Server Error\nContent-Length:0'); - } - else{ - self.handoff(worker, socket); - } -}; - -Master.prototype.handoff = function(worker, socket) { - var self = this; - worker.send('socket', socket); -}; - -// create the server. -Master.prototype.start = function(callback) { - - var self = this; - var clusterEnabled = self.options.cluster.on; - - // Set port from server config. - var listen = this.options.listen || {}; - - if (listen.type !== 'unix' && !isNaN(process.env.MIXDOWN_PORT)) { - listen.port = process.env.MIXDOWN_PORT; - } - - var lp = listen.type === 'unix' ? listen.path : listen.port; - - var create = function() { - - logger.info('Starting master server'); - - self.server = net.createServer(function() { return; }); - - self.server.listen(lp, function(err) { - callback(err, _.pick(self, 'server', 'socket')); - }); - - // remove the defaults so we don't parse things 2x - self.server.removeAllListeners('connection'); - - // add our connection handler: - // if cluster is used, then the socket is distributed to a worker. - // if cluster is not used, then handle in master. - self.server.on('connection', function(socket) { - if (clusterEnabled) { - socket.pause(); - self.distribute(socket); - } - else { - self.worker.handleMessage('socket', socket); - } - }); - }; - - // remove old file descriptor if needed. - if (listen.type === 'unix') { - fs.unlink(lp, create); - } - else { - create(); - //self.on('request',self.worker.handleRequest.bind(self.worker)); - } -}; - -module.exports = Master; diff --git a/lib/worker.js b/lib/worker.js index abc7ce3..3f26444 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -2,18 +2,49 @@ var _ = require('lodash'); var http = require('http'); var util = require('util'); var events = require('events'); +var fs = require('fs'); -var Worker = function(mixdown) { - this.vhosts = {}; - this.mixdown = mixdown; +var Worker = function(mixdown, options, callback) { + var self = this; + self.vhosts = {}; + self.mixdown = mixdown; + self.options = options; this.on('request', this.handleRequest); - // send message support. we listen for a socket. - process.on('message', this.handleMessage.bind(this)); + /** + * Setting up connection in cluster child process actually shares socket between child processes. + * There is no need to load-balance or dispatch requests between nodes. + * + * @see http://nodejs.org/api/cluster.html#cluster_cluster + */ + (function setupConnection() { + var listen = self.options.listen || {}; + + if (listen.type !== 'unix' && !isNaN(process.env.MIXDOWN_PORT)) { + listen.port = process.env.MIXDOWN_PORT; + } + + var lp = listen.type === 'unix' ? listen.path : listen.port; - // check if the external Config is enabled and starting listening if so. - var self = this; + var startListening = function () { + + self.server = http.createServer(); + self.server.on('connection', function (socket) { + http._connectionListener.call(self, socket); + }); + + self.server.listen(lp, callback); + }; + + // remove socket file descriptor before connecting. + if (listen.type === 'unix') { + fs.unlink(lp, startListening); + } + else { + startListening(); + } + })(); //this is a bit of a problem, it is an async function in the constructor //also i don't think it's necessary as mixdown will do this? @@ -52,7 +83,7 @@ var Worker = function(mixdown) { }); } }); - + //need to do this incase process.send is not defined //which will be the case when cluster is off if(process.send){ @@ -129,11 +160,4 @@ Worker.prototype.handleRequest = function(req, res) { } }; -Worker.prototype.handleMessage = function(message, socket) { - if (message === 'socket') { - // call the http connection parsing stuff here. - http._connectionListener.call(this, socket); - } -}; - module.exports = Worker; \ No newline at end of file diff --git a/package.json b/package.json index 4a960b1..3ebecb3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mixdown-server", - "version": "1.0.6", + "version": "1.1.0", "main": "index", "description": "Simple server for activiting a site.", "keywords": [