Skip to content
3 changes: 0 additions & 3 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,3 @@ build/Release
# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules

# Source target
src
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "beanstalkd-worker",
"version": "1.1.0",
"description": "High level library for running beanstalkd workers in Node.js",
"main": "lib/index.js",
"main": "src/index.js",
"dependencies": {
"babel-runtime": "^5.8.25",
"beanstalkd": "^2.0.1",
Expand Down
18 changes: 9 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import spawn from './spawn';
import Job from './job';
import Tube from './tube';
import _ from 'lodash';
import Beanstalkd from 'beanstalkd';
import Promise from 'bluebird';

export default class BeanstalkdWorker {
const spawn = require('./spawn');
const Job = require('./job');
const Tube = require('./tube');
const _ = require('lodash');
const Beanstalkd = require('beanstalkd').default;
const Promise = require('bluebird');

module.exports = class BeanstalkdWorker {
constructor(host, port, options = {}) {
this.host = host;
this.port = port;
Expand Down Expand Up @@ -102,4 +102,4 @@ export default class BeanstalkdWorker {

return _.some(tubes, tube => tube.working());
}
}
};
8 changes: 5 additions & 3 deletions src/job.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Promise from 'bluebird';
import debug from 'debug';
const Promise = require('bluebird');
const debug = require('debug');

export default class Job {
class Job {
constructor(worker, tube, id) {
this.worker = worker;
this.tube = tube;
Expand Down Expand Up @@ -47,3 +47,5 @@ Job.status = function (stats) {
if (!stats) return 'success';
return stats.state;
};

module.exports = Job;
6 changes: 3 additions & 3 deletions src/spawn.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Job from './job';
const Job = require('./job');

export default function spawn(worker, tube, payload, options = {}) {
module.exports = function spawn(worker, tube, payload, options = {}) {
if (!payload) {
throw new Error('Job has no payload. Use an explicit empty payload ({}) if that is your intention');
}
Expand Down Expand Up @@ -38,4 +38,4 @@ export default function spawn(worker, tube, payload, options = {}) {
tube.debug('spawned job: ' + id);
return new Job(worker, tube, id);
});
}
};
10 changes: 5 additions & 5 deletions src/tube.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Watcher from './tube/watcher';
import debug from 'debug';
import Promise from 'bluebird';
const Watcher = require('./tube/watcher');
const debug = require('debug');
const Promise = require('bluebird');

export default class Tube {
module.exports = class Tube {
constructor(worker, name) {
if (typeof name !== 'string') throw new Error('Tube name must be a string');

Expand Down Expand Up @@ -68,4 +68,4 @@ export default class Tube {
working() {
return this.watchers.some(watcher => watcher.$current);
}
}
};
13 changes: 9 additions & 4 deletions src/tube/watcher.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import Promise from 'bluebird';
import {default as WatcherJob, DELAYED} from './watcher/job';
const Promise = require('bluebird');
const WatcherJob = require('./watcher/job');

const { DELAYED } = WatcherJob;

const RESERVE_TIMEOUT = 30 * 1000;

export default class Watcher {
module.exports = class Watcher {
constructor(tube, index, handler, options = {}) {
this.tube = tube;
this.index = index;
Expand All @@ -13,6 +15,7 @@ export default class Watcher {
this.backoff = this.options.backoff || {};
this.backoff.initial = this.backoff.initial || 60 * 1000;
this.backoff.exponential = this.backoff.exponential || 1.5;
this.reconnectBackoff = this.options.reconnectBackoff || 1000;
}

async connection() {
Expand Down Expand Up @@ -70,9 +73,11 @@ export default class Watcher {
await Promise.delay(500);
}
this.debug(`reserve error ${err.toString()}`);
await Promise.delay(this.reconnectBackoff);
} finally {
this.$current = null;
this.loop();
// await Promise.resolve(setTimeout(() => this.loop(), this.reconnectBackoff);
}
}

Expand Down Expand Up @@ -125,4 +130,4 @@ export default class Watcher {
// Destroy job from beanstalkd if we completed successfully
await job._destroy();
}
}
};
15 changes: 8 additions & 7 deletions src/tube/watcher/job.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Job from '../../job';
import _ from 'lodash';
import Promise from 'bluebird';
const Job = require('../../job');
const _ = require('lodash');
const Promise = require('bluebird');


function defer() {
var resolve, reject;
Expand All @@ -15,13 +16,11 @@ function defer() {
};
}

export const DELAYED = 1;

/*
* A job class with extra commands only available to the connection that has reserved the job
*/

export default class WatcherJob extends Job {
module.exports = class WatcherJob extends Job {
constructor(worker, tube, id, client) {
super(worker, tube, id);

Expand Down Expand Up @@ -156,4 +155,6 @@ export default class WatcherJob extends Job {
this.debug(`failed to release: ${err.toString()}`);
}
}
}
};

module.exports.DELAYED = 1;
2 changes: 1 addition & 1 deletion test/unit/watcher/job.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ var chai = require('chai')
, expect = chai.expect
, sinon = require('sinon')
, Promise = require('bluebird')
, WatcherJob = require('tube/watcher/job').default;
, WatcherJob = require('tube/watcher/job');

describe('WatcherJob', function () {
beforeEach(function () {
Expand Down
4 changes: 2 additions & 2 deletions test/unit/watcher/run.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ var chai = require('chai')
, expect = chai.expect
, sinon = require('sinon')
, Promise = require('bluebird')
, WatcherJob = require('tube/watcher/job').default
, WatcherJob = require('tube/watcher/job')
, Watcher = require('tube/watcher');

describe('Watcher', function () {
Expand All @@ -20,7 +20,7 @@ describe('Watcher', function () {
this.destroyStub = this.sinon.stub(WatcherJob.prototype, '_destroy').resolves();
this.releaseStub = this.sinon.stub(WatcherJob.prototype, '_release').resolves();

this.handler = this.sinon.stub().resolves(),
this.handler = this.sinon.stub().resolves();
this.watcher = new Watcher({burstable: {}}, 0, this.handler, {
maxTries: 3
});
Expand Down