From f1ed3d0c12d3cda9921d27ce673c81c1cb25b78a Mon Sep 17 00:00:00 2001 From: Jeff P Date: Fri, 2 Nov 2018 09:32:34 -0700 Subject: [PATCH 01/15] Prevent pegged cpu when unable to reconnect Implements a 1 second wait time when reconnecting to the beanstalkd server. In situations where beanstalkd is unavailable, the worker will reconnect so quickly that it consumes all CPU resources. The reconnect time of 1s is configurable. --- src/tube/watcher.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tube/watcher.js b/src/tube/watcher.js index 5a09cf8..79c132c 100644 --- a/src/tube/watcher.js +++ b/src/tube/watcher.js @@ -13,6 +13,7 @@ export default class Watcher { this.backoff = this.options.backoff || {}; this.backoff.initial = this.backoff.initial || 60 * 1000; this.backoff.exponential = this.backoff.exponential || 1.5; + this.reconnectBackoff = this.options.reconnectBackoff || 1000; } async connection() { @@ -72,7 +73,7 @@ export default class Watcher { this.debug(`reserve error ${err.toString()}`); } finally { this.$current = null; - this.loop(); + setTimeout(() => this.loop(), this.reconnectBackoff); } } From 12bed5cc013ec09b888186c9dbd2049dee82a945 Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:27:44 -0800 Subject: [PATCH 02/15] preinstall --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 63ded9e..e382d01 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ }, "scripts": { "prepublish": "npm run check && npm run build", + "preinstall": "npm run build", "check": "npm run lint && npm run test:unit", "lint": "eslint src", "build": "rm -rf lib/* && babel src --ignore test --optional bluebirdCoroutines,runtime --out-dir lib", From 8715a02ce8faca95a4596cc4d9f75bb8459ae7ec Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:29:59 -0800 Subject: [PATCH 03/15] Update package.json --- package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/package.json b/package.json index e382d01..63ded9e 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,6 @@ }, "scripts": { "prepublish": "npm run check && npm run build", - "preinstall": "npm run build", "check": "npm run lint && npm run test:unit", "lint": "eslint src", "build": "rm -rf lib/* && babel src --ignore test --optional bluebirdCoroutines,runtime --out-dir lib", From bb6bf927693afcee5647d75835c4d01908b944be Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:32:01 -0800 Subject: [PATCH 04/15] Update .npmignore --- .npmignore | 3 --- 1 file changed, 3 deletions(-) diff --git a/.npmignore b/.npmignore index 1a486f5..123ae94 100644 --- a/.npmignore +++ b/.npmignore @@ -25,6 +25,3 @@ build/Release # Dependency directory # https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git node_modules - -# Source target -src \ No newline at end of file From 5a2f75b9e72ea8df050885a1b406f146ca5947f6 Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:32:25 -0800 Subject: [PATCH 05/15] updated import without babel --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 63ded9e..ea3b758 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "beanstalkd-worker", "version": "1.1.0", "description": "High level library for running beanstalkd workers in Node.js", - "main": "lib/index.js", + "main": "src/index.js", "dependencies": { "babel-runtime": "^5.8.25", "beanstalkd": "^2.0.1", From 4bff77ce38c483d127447fe44085f3f003f6adfa Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:36:47 -0800 Subject: [PATCH 06/15] regular imports - index --- src/index.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/index.js b/src/index.js index ea452b9..f497378 100644 --- a/src/index.js +++ b/src/index.js @@ -1,11 +1,11 @@ -import spawn from './spawn'; -import Job from './job'; -import Tube from './tube'; -import _ from 'lodash'; -import Beanstalkd from 'beanstalkd'; -import Promise from 'bluebird'; - -export default class BeanstalkdWorker { +const spawn = require('./spawn'); +const Job = require('./job'); +const Tube = require('./tube'); +const _ = require('lodash'); +const Beanstalkd = require('beanstalkd'); +const Promise = require('bluebird'); + +module.exports = class BeanstalkdWorker { constructor(host, port, options = {}) { this.host = host; this.port = port; From 70e395760cc8b8f2605fd61a52fc585f3eb00efa Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:37:27 -0800 Subject: [PATCH 07/15] regular exports = job --- src/job.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/job.js b/src/job.js index d9ef51a..c77c5a1 100644 --- a/src/job.js +++ b/src/job.js @@ -1,7 +1,7 @@ -import Promise from 'bluebird'; -import debug from 'debug'; +const Promise = require('bluebird'); +const debug = require('debug'); -export default class Job { +module.exports = class Job { constructor(worker, tube, id) { this.worker = worker; this.tube = tube; From 9c9d7967d55f660f7770441ae44d0dfee9bb151e Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:37:50 -0800 Subject: [PATCH 08/15] Update spawn.js --- src/spawn.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/spawn.js b/src/spawn.js index 6c86399..9a8b960 100644 --- a/src/spawn.js +++ b/src/spawn.js @@ -1,6 +1,6 @@ -import Job from './job'; +const Job = require('./job'); -export default function spawn(worker, tube, payload, options = {}) { +module.exports = function spawn(worker, tube, payload, options = {}) { if (!payload) { throw new Error('Job has no payload. Use an explicit empty payload ({}) if that is your intention'); } From c7fcb7bef346480ae02bc2ee1d0d1b332a924d97 Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:38:34 -0800 Subject: [PATCH 09/15] Update tube.js --- src/tube.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tube.js b/src/tube.js index a924951..e5d5379 100644 --- a/src/tube.js +++ b/src/tube.js @@ -1,8 +1,8 @@ -import Watcher from './tube/watcher'; -import debug from 'debug'; -import Promise from 'bluebird'; +const Watcher = require('./tube/watcher'); +const debug = require('debug'); +const Promise = require('bluebird'); -export default class Tube { +module.exports = class Tube { constructor(worker, name) { if (typeof name !== 'string') throw new Error('Tube name must be a string'); From 4a29834e22ed9aefd5a733448a66eceb617de684 Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:39:24 -0800 Subject: [PATCH 10/15] Update watcher.js --- src/tube/watcher.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/tube/watcher.js b/src/tube/watcher.js index 79c132c..b3f8eb1 100644 --- a/src/tube/watcher.js +++ b/src/tube/watcher.js @@ -1,9 +1,11 @@ -import Promise from 'bluebird'; -import {default as WatcherJob, DELAYED} from './watcher/job'; +const Promise = require('bluebird'); +const WatcherJob = require('./watcher/job'); + +const { DELAYED } = WatcherJob; const RESERVE_TIMEOUT = 30 * 1000; -export default class Watcher { +module.exports = class Watcher { constructor(tube, index, handler, options = {}) { this.tube = tube; this.index = index; From ed212673ae08e86446f8eeef72b1a70eaec03d8c Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:40:19 -0800 Subject: [PATCH 11/15] Update job.js --- src/tube/watcher/job.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tube/watcher/job.js b/src/tube/watcher/job.js index b26330e..da22a0c 100644 --- a/src/tube/watcher/job.js +++ b/src/tube/watcher/job.js @@ -1,6 +1,6 @@ -import Job from '../../job'; -import _ from 'lodash'; -import Promise from 'bluebird'; +const Job = require('../../job'); +const _ = require('lodash'); +const Promise = require('bluebird'); function defer() { var resolve, reject; @@ -15,13 +15,11 @@ function defer() { }; } -export const DELAYED = 1; - /* * A job class with extra commands only available to the connection that has reserved the job */ -export default class WatcherJob extends Job { +module.exports =class WatcherJob extends Job { constructor(worker, tube, id, client) { super(worker, tube, id); @@ -157,3 +155,5 @@ export default class WatcherJob extends Job { } } } + +module.exports.DELAYED = 1; From 95932145937ea5f68e808c9e5dcfe7658f57630c Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:41:27 -0800 Subject: [PATCH 12/15] Update job.js --- src/job.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/job.js b/src/job.js index c77c5a1..c3a8fff 100644 --- a/src/job.js +++ b/src/job.js @@ -1,7 +1,7 @@ const Promise = require('bluebird'); const debug = require('debug'); -module.exports = class Job { +class Job { constructor(worker, tube, id) { this.worker = worker; this.tube = tube; @@ -47,3 +47,5 @@ Job.status = function (stats) { if (!stats) return 'success'; return stats.state; }; + +module.exports = Job; From 991b972a1ab7cada2811a99d863a005437ecac9a Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:44:01 -0800 Subject: [PATCH 13/15] Update index.js --- src/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.js b/src/index.js index f497378..6e6d18c 100644 --- a/src/index.js +++ b/src/index.js @@ -2,7 +2,7 @@ const spawn = require('./spawn'); const Job = require('./job'); const Tube = require('./tube'); const _ = require('lodash'); -const Beanstalkd = require('beanstalkd'); +const Beanstalkd = require('beanstalkd').default; const Promise = require('bluebird'); module.exports = class BeanstalkdWorker { From 9443a92af54771cce216a18f5a0d208974b26fa8 Mon Sep 17 00:00:00 2001 From: Jeff P Date: Tue, 27 Nov 2018 14:50:26 -0800 Subject: [PATCH 14/15] allow reconnection to occur without rethrowing an error when reserve fails, this tube watcher never re-acquires a connection --- src/tube/watcher.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tube/watcher.js b/src/tube/watcher.js index b3f8eb1..9f5a534 100644 --- a/src/tube/watcher.js +++ b/src/tube/watcher.js @@ -73,6 +73,7 @@ module.exports = class Watcher { await Promise.delay(500); } this.debug(`reserve error ${err.toString()}`); + throw err; // rethrow to allow a reconnection to happen } finally { this.$current = null; setTimeout(() => this.loop(), this.reconnectBackoff); From 2b5ed790569f82f9138a55561b5766a3ad4468df Mon Sep 17 00:00:00 2001 From: Ken Hunt Date: Fri, 22 Feb 2019 17:51:39 -0600 Subject: [PATCH 15/15] Add backoff delay when the connection is lost so that we don't busy-loop the node process into uselessness. --- src/index.js | 2 +- src/spawn.js | 2 +- src/tube.js | 2 +- src/tube/watcher.js | 7 ++++--- src/tube/watcher/job.js | 5 +++-- test/unit/watcher/job.test.js | 2 +- test/unit/watcher/run.test.js | 4 ++-- 7 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/index.js b/src/index.js index 6e6d18c..864a908 100644 --- a/src/index.js +++ b/src/index.js @@ -102,4 +102,4 @@ module.exports = class BeanstalkdWorker { return _.some(tubes, tube => tube.working()); } -} +}; diff --git a/src/spawn.js b/src/spawn.js index 9a8b960..8015af8 100644 --- a/src/spawn.js +++ b/src/spawn.js @@ -38,4 +38,4 @@ module.exports = function spawn(worker, tube, payload, options = {}) { tube.debug('spawned job: ' + id); return new Job(worker, tube, id); }); -} +}; diff --git a/src/tube.js b/src/tube.js index e5d5379..e6fd704 100644 --- a/src/tube.js +++ b/src/tube.js @@ -68,4 +68,4 @@ module.exports = class Tube { working() { return this.watchers.some(watcher => watcher.$current); } -} +}; diff --git a/src/tube/watcher.js b/src/tube/watcher.js index 9f5a534..8edd980 100644 --- a/src/tube/watcher.js +++ b/src/tube/watcher.js @@ -73,10 +73,11 @@ module.exports = class Watcher { await Promise.delay(500); } this.debug(`reserve error ${err.toString()}`); - throw err; // rethrow to allow a reconnection to happen + await Promise.delay(this.reconnectBackoff); } finally { this.$current = null; - setTimeout(() => this.loop(), this.reconnectBackoff); + this.loop(); + // await Promise.resolve(setTimeout(() => this.loop(), this.reconnectBackoff); } } @@ -129,4 +130,4 @@ module.exports = class Watcher { // Destroy job from beanstalkd if we completed successfully await job._destroy(); } -} +}; diff --git a/src/tube/watcher/job.js b/src/tube/watcher/job.js index da22a0c..1102e1a 100644 --- a/src/tube/watcher/job.js +++ b/src/tube/watcher/job.js @@ -2,6 +2,7 @@ const Job = require('../../job'); const _ = require('lodash'); const Promise = require('bluebird'); + function defer() { var resolve, reject; var promise = new Promise(function () { @@ -19,7 +20,7 @@ function defer() { * A job class with extra commands only available to the connection that has reserved the job */ -module.exports =class WatcherJob extends Job { +module.exports = class WatcherJob extends Job { constructor(worker, tube, id, client) { super(worker, tube, id); @@ -154,6 +155,6 @@ module.exports =class WatcherJob extends Job { this.debug(`failed to release: ${err.toString()}`); } } -} +}; module.exports.DELAYED = 1; diff --git a/test/unit/watcher/job.test.js b/test/unit/watcher/job.test.js index 1d68585..fc00e45 100644 --- a/test/unit/watcher/job.test.js +++ b/test/unit/watcher/job.test.js @@ -2,7 +2,7 @@ var chai = require('chai') , expect = chai.expect , sinon = require('sinon') , Promise = require('bluebird') - , WatcherJob = require('tube/watcher/job').default; + , WatcherJob = require('tube/watcher/job'); describe('WatcherJob', function () { beforeEach(function () { diff --git a/test/unit/watcher/run.test.js b/test/unit/watcher/run.test.js index 4793879..7eac281 100644 --- a/test/unit/watcher/run.test.js +++ b/test/unit/watcher/run.test.js @@ -2,7 +2,7 @@ var chai = require('chai') , expect = chai.expect , sinon = require('sinon') , Promise = require('bluebird') - , WatcherJob = require('tube/watcher/job').default + , WatcherJob = require('tube/watcher/job') , Watcher = require('tube/watcher'); describe('Watcher', function () { @@ -20,7 +20,7 @@ describe('Watcher', function () { this.destroyStub = this.sinon.stub(WatcherJob.prototype, '_destroy').resolves(); this.releaseStub = this.sinon.stub(WatcherJob.prototype, '_release').resolves(); - this.handler = this.sinon.stub().resolves(), + this.handler = this.sinon.stub().resolves(); this.watcher = new Watcher({burstable: {}}, 0, this.handler, { maxTries: 3 });