From 51d20db2384704bea55f9e83bca76ede42aa9c67 Mon Sep 17 00:00:00 2001 From: Mark Strefford Date: Sat, 18 Jan 2014 20:50:49 +0000 Subject: [PATCH] Allows different paths for hadoop and hadoop-streaming.jar MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Modified to work with Cloudera distribution which has hadoop-streaming.jar in a different directory, and also doesn’t work with wildcards in the .jar filename. --- job_description.js | 220 +++++++++++++++++++++++---------------------- 1 file changed, 114 insertions(+), 106 deletions(-) diff --git a/job_description.js b/job_description.js index 0dc9517..518bb76 100644 --- a/job_description.js +++ b/job_description.js @@ -1,7 +1,7 @@ var fs = require('fs'); var exec = require('child_process').exec; -var JobDescription = function() { +var JobDescription = function() { this.filesToCompress = []; this.mappers = []; this.reducers = []; @@ -14,43 +14,51 @@ JobDescription.prototype.validate = function(cb) { console.log("** validating"); var notifications = {errors:[], warnings:[]}; if(this.mappers.length === 0) - notifications.errors.push("ERROR: No mapper function has been assigned"); + notifications.errors.push("ERROR: No mapper function has been assigned"); if(this.configuration.hadoopHome == null) - notifications.errors.push("ERROR: HADOOP_HOME not set"); + notifications.errors.push("ERROR: HADOOP_HOME not set"); + + //Added to permit seperate directory for hadoop*streaming*.jar + if(this.configuration.hadoopStreamingHome == null) + var hadoopStreamingHome = hadoopHome + "/contrib/streaming"; + + // Added to fix any issues with wildcards in hadoop*streaming*.jar + if(this.configuration.hadoopStreamingJar == null) + var hadoopStreamingJar = "hadoop*streaming*.jar"; if(this.configuration.input == null) - notifications.errors.push("ERROR: input not set"); + notifications.errors.push("ERROR: input not set"); if(this.configuration.output == null) - notifications.errors.push("ERROR: output not set"); + notifications.errors.push("ERROR: output not set"); // add more errors and warnings if(notifications.errors.length > 0) { - console.log("* validation fail"); - console.log(notifications.errors); - cb("Validation fail", notifications.errors); + console.log("* validation fail"); + console.log(notifications.errors); + cb("Validation fail", notifications.errors); } else { - console.log("* validation ok"); - cb(null, notifications); + console.log("* validation ok"); + cb(null, notifications); } }; JobDescription.prototype.generatePackageDotJSON = function(cb) { var pkg = {name: this.configuration.name.replace(/[^a-zA-Z0-9]/g,"_").toLowerCase(), - version: "1.0.0", - dependencies: this.dependencies, - engine: "node >=0.6.0"}; + version: "1.0.0", + dependencies: this.dependencies, + engine: "node >=0.6.0"}; this.packageJSONPath = this.jobWorkingDirectory+"/package.json"; this.filesToCompress.push("package.json"); fs.writeFile(this.packageJSONPath, JSON.stringify(pkg), function (err) { - if (err !== null) { - console.log('(!!) error writing package.json : ' + err); - } - cb(err); + if (err !== null) { + console.log('(!!) error writing package.json : ' + err); + } + cb(err); }); }; @@ -58,18 +66,18 @@ JobDescription.prototype.generateShellScripts = function(cb) { var that = this; var mapperCommand = "#!/usr/bin/env bash\n tar -zxvf compressed.tar.gz > /dev/null && node mapper.js"; fs.writeFile(this.mapperShellScriptPath, mapperCommand, function (err) { - if (err !== null) { - console.log('(!!) error writing package.json : ' + err); - cb(err); - } else { - var reducerCommand = "#!/usr/bin/env bash\n tar -zxvf compressed.tar.gz > /dev/null && node reducer.js"; - fs.writeFile(that.reducerShellScriptPath, reducerCommand, function (err) { - if (err !== null) { - console.log('(!!) error writing package.json : ' + err); - } - cb(err); - }); - } + if (err !== null) { + console.log('(!!) error writing package.json : ' + err); + cb(err); + } else { + var reducerCommand = "#!/usr/bin/env bash\n tar -zxvf compressed.tar.gz > /dev/null && node reducer.js"; + fs.writeFile(that.reducerShellScriptPath, reducerCommand, function (err) { + if (err !== null) { + console.log('(!!) error writing package.json : ' + err); + } + cb(err); + }); + } }); }; @@ -78,7 +86,7 @@ JobDescription.prototype.installLocalModules = function(cb) { var command = "cd "+that.jobWorkingDirectory+" && npm install"; console.log("** Installing local modules"); exec(command, function(err, stdout, stderr) { - cb(err); + cb(err); }); }; @@ -87,7 +95,7 @@ JobDescription.prototype.compressFiles = function(cb) { var command = "cd "+this.jobWorkingDirectory+" && tar --exclude compressed.tar.gz -zcvf "+this.compressedPath +" "+this.filesToCompress.join(" "); console.log("** Compressing package"); exec(command, function(err, stdout, sterr) { - cb(err); + cb(err); }); }; @@ -104,57 +112,57 @@ JobDescription.prototype.generate = function(index, cb) { this.filesToCompress = this.filesToCompress.concat(["mapper.js", "reducer.js", "mapper.sh", "reducer.sh"]); if (this.dependencies != null) { - this.filesToCompress.push("node_modules"); + this.filesToCompress.push("node_modules"); } console.log("* generated files in: "+this.jobWorkingDirectory); var that = this, data; - exec("mkdir -p "+this.jobWorkingDirectory, - function (error, stdout, stderr) { - if (error !== null) { - console.log('(!!) exec error: ' + error); - cb(error); - } else { - data = fs.readFileSync(__dirname+"/timothy/mapper_template.js", "utf8"); - data = data.replace("//@MAPPER_HERE", "this.map="+that.mappers[index]+";"); - - if(that.setupFn != null) - data = data.replace("//@LOCALS_HERE","this.setup = "+that.setupFn.toString()+"; setup();"); - - fs.writeFileSync(that.mapperPath, data, "utf8"); - - data = fs.readFileSync(__dirname+"/timothy/reducer_template.js", "utf8"); - data = data.replace("//@REDUCER_HERE", "this.reduce="+that.reducers[index]+";"); - - if(that.setupFn != null) - data = data.replace("//@LOCALS_HERE","this.setup = "+that.setupFn.toString()+"; setup();"); - - fs.writeFileSync(that.reducerPath, data, "utf8"); - - that.generatePackageDotJSON(function(error){ - if(error) { - cb(error, that); - } else { - that.installLocalModules(function(error) { - if(error) { - cb(error, that); - } else { - that.generateShellScripts(function(error){ - if(error) { - cb(error, that); - } else { - that.compressFiles(function(error){ - cb(error, that); - }); - } - }); - } - }); - } - }); - } - }); + exec("mkdir -p "+this.jobWorkingDirectory, + function (error, stdout, stderr) { + if (error !== null) { + console.log('(!!) exec error: ' + error); + cb(error); + } else { + data = fs.readFileSync(__dirname+"/timothy/mapper_template.js", "utf8"); + data = data.replace("//@MAPPER_HERE", "this.map="+that.mappers[index]+";"); + + if(that.setupFn != null) + data = data.replace("//@LOCALS_HERE","this.setup = "+that.setupFn.toString()+"; setup();"); + + fs.writeFileSync(that.mapperPath, data, "utf8"); + + data = fs.readFileSync(__dirname+"/timothy/reducer_template.js", "utf8"); + data = data.replace("//@REDUCER_HERE", "this.reduce="+that.reducers[index]+";"); + + if(that.setupFn != null) + data = data.replace("//@LOCALS_HERE","this.setup = "+that.setupFn.toString()+"; setup();"); + + fs.writeFileSync(that.reducerPath, data, "utf8"); + + that.generatePackageDotJSON(function(error){ + if(error) { + cb(error, that); + } else { + that.installLocalModules(function(error) { + if(error) { + cb(error, that); + } else { + that.generateShellScripts(function(error){ + if(error) { + cb(error, that); + } else { + that.compressFiles(function(error){ + cb(error, that); + }); + } + }); + } + }); + } + }); + } + }); }; JobDescription.prototype.execute = function(index, cb) { @@ -162,60 +170,60 @@ JobDescription.prototype.execute = function(index, cb) { var input, output; if(this.mappers.length > 1) - jobName += " Stage " + index; + jobName += " Stage " + index; if(index > 0 && this.mappers.length > 1) - input = this.configuration.output + "_stage_" + (index - 1); - else - input = this.configuration.input; + input = this.configuration.output + "_stage_" + (index - 1); + else + input = this.configuration.input; if(index < (this.mappers.length -1) ) - output = this.configuration.output + "_stage_" + index; + output = this.configuration.output + "_stage_" + index; else - output = this.configuration.output; + output = this.configuration.output; console.log("* executing"); - var command = this.configuration.hadoopHome+"/bin/hadoop jar "+this.configuration.hadoopHome+"/contrib/streaming/hadoop*streaming*.jar "; - command += "-D mapred.job.name='"+jobName+"' "; + var command = this.configuration.hadoopHome+"/bin/hadoop jar "+this.configuration.hadoopStreamingHome+"/"+this.configuration.hadoopStreamingJar; + command += " -D mapred.job.name='"+jobName+"' "; if(this.configuration.numMapTasks != null) - command += "-D mapred.map.tasks="+this.configuration.numMapTasks+" "; + command += "-D mapred.map.tasks="+this.configuration.numMapTasks+" "; if(this.configuration.numRedTasks != null) - command += "-D mapred.reduce.tasks="+this.configuration.numMapTasks+" "; + command += "-D mapred.reduce.tasks="+this.configuration.numMapTasks+" "; for(var prop in this.configuration) { - if(prop.indexOf(".") != null && prop.split(".").length > 2) { - command += "-D "+prop+"="+this.configuration[prop]+" "; - } + if(prop.indexOf(".") != null && prop.split(".").length > 2) { + command += "-D "+prop+"="+this.configuration[prop]+" "; + } } command += "-files "+this.compressedPath+","+this.mapperShellScriptPath+","+this.reducerShellScriptPath+" "; if(this.configuration.config) - command += "-conf '"+this.configuration.config+"' "; + command += "-conf '"+this.configuration.config+"' "; command += "-inputformat '"+this.configuration.inputFormat+"' "; command += "-outputformat '"+this.configuration.outputFormat+"' "; if(input.constructor === Array) { - for(var i=0; i