diff --git a/index.js b/index.js index 24d3a71..9d2bb43 100644 --- a/index.js +++ b/index.js @@ -45,6 +45,7 @@ Yajob.prototype.put = function (attrs, opts) { opts = opts || {}; opts.schedule = opts.schedule || new Date(Date.now() + this._delay); opts.priority = opts.priority || 0; + opts.meta = opts.meta || {}; if (!Array.isArray(attrs)) { attrs = [attrs]; @@ -56,7 +57,8 @@ Yajob.prototype.put = function (attrs, opts) { attempts: 0, attrs, scheduledAt: opts.schedule, - priority: opts.priority + priority: opts.priority, + meta: opts.meta }; } @@ -65,6 +67,28 @@ Yajob.prototype.put = function (attrs, opts) { return jobs.then(c => c.insert(attrs.map(attrsToJob))); }; +Yajob.prototype.replace = function (attrs, opts) { + opts = opts || {}; + opts.schedule = opts.schedule || new Date(Date.now() + this._delay); + opts.priority = opts.priority || 0; + opts.meta = opts.meta || {}; + + function attrsToJob(attrs) { + return { + status: Yajob.status.new, + attempts: 0, + attrs, + scheduledAt: opts.schedule, + priority: opts.priority, + meta: opts.meta + }; + } + + const jobs = this._db.then(db => db.collection(this._tag)); + + return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs), {upsert: true, w: 1})); +}; + Yajob.prototype.take = function (count) { count = count || 1; @@ -111,7 +135,7 @@ Yajob.prototype.take = function (count) { try { for (let i = 0; i < batch.length; i++) { const job = batch[i]; - const done = yield job.attrs; + const done = yield Object.assign(job.attrs, job.meta); if (done === false) { const status = job.attempts < maxTrys ? Yajob.status.new : Yajob.status.failed; diff --git a/readme.md b/readme.md index f8f97d5..f60f3d6 100644 --- a/readme.md +++ b/readme.md @@ -28,6 +28,54 @@ for (var mail of yield mails.take(100)) { Processed jobs removed from queue, when for-loop is ended or broken (either with `break` or exception). +### Updating pending events with metadata + +You may also attach metadata to future job and update as follows: + +```js +const yajob = require('yajob'); +const mails = yajob('localhost/queuedb') + .tag('mails'); + +var d = new Date(); +d.setHours(24,0,0,0); + +mails.put({ + from: 'floatdrop@gmail.com', + to: 'nodejs-dev@dev-null.com' +}, { + meta: {body: 'You have 1 new notification'}, + schedule: d +}); +// => Promise + +// Meanwhile, a new notification comes in + +mails.replace({ + from: 'floatdrop@gmail.com', + to: 'nodejs-dev@dev-null.com' +}, { + meta: {body: 'You have 2 new notification'}, + schedule: d +}); + +// Now, when you take the job in the future: + +let job = yield mails.take(); +console.log(job); +``` + +This would print out: +``` +{ + from: 'floatdrop@gmail.com', + to: 'nodejs-dev@dev-null.com', + body: 'You have 2 new notification' +} +``` + +This will only send out a single email with the new body. + ### Skip jobs In some cases you will need to skip taken job. To do this pass into generator `false` value: @@ -55,13 +103,13 @@ const important = queue.tag('mail').sort({priority: -1}); Returns instance of queue, that stores data in MongoDB. -##### uri -Type: `String` +##### uri +Type: `String` MongoDB URI string. -##### options -Type: `Object` +##### options +Type: `Object` MongoDB [MongoClient.connect options](http://mongodb.github.io/node-mongodb-native/2.1/api/MongoClient.html). @@ -82,6 +130,23 @@ Type: `Object` * `schedule` - `Date`, when job should be available to `take` * `priority` - `Number`, that represents priority of job + * `meta` - `Object`, optional metadata attached to job and returned in taken object + +### replace(attrs, [options]) + +Update a pending job in the queue. Returns `Promise`. + +##### attrs +Type: `Object` + +Data, that will be attached to job. + +##### options +Type: `Object` + + * `schedule` - `Date`, when job should be available to `take` + * `priority` - `Number`, that represents priority of job + * `meta` - `Object`, optional metadata attached to job and returned in taken object ### take([count]) @@ -90,7 +155,7 @@ Returns `Promise` that resolves to a `Generator`, that will emit jobs one by one After all jobs are taken from batch - they are considered `done` and removed from queue. ##### count -Type: `Number` +Type: `Number` Default: `1` Maximum number of jobs to take from one batch request. diff --git a/test/put.js b/test/put.js index f7c8054..8d4cedf 100644 --- a/test/put.js +++ b/test/put.js @@ -15,6 +15,20 @@ test('put should add job to queue', async t => { } }); +test('put should add job to queue with meta', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'message'}, {meta: {param: 1}}); + const job = await queueDb.db.collection('default').find().toArray(); + t.same(job[0].attrs, {test: 'message'}); + t.same(job[0].meta, {param: 1}); + } finally { + await queueDb.close(); + } +}); + test('put take an Array as argument', async t => { const queueDb = await new QueueDb(); const queue = yajob(queueDb.uri); diff --git a/test/remove.js b/test/remove.js index 7685d15..06d4e29 100644 --- a/test/remove.js +++ b/test/remove.js @@ -15,3 +15,17 @@ test('removes job', async t => { await queueDb.close(); } }); + +test('removes job with meta', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'wow'}, {meta: {param: 1}}); + await queue.remove({test: 'wow'}); + const jobs = await queueDb.db.collection('default').find().toArray(); + t.is(jobs.length, 0, 'should remove job from queue'); + } finally { + await queueDb.close(); + } +}); diff --git a/test/replace.js b/test/replace.js new file mode 100644 index 0000000..6080e0c --- /dev/null +++ b/test/replace.js @@ -0,0 +1,35 @@ +import test from 'ava'; +import yajob from '..'; +import {QueueDb} from './_utils'; + +test('replace should add job to queue then update it', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'message'}, {meta: {param: 1}}); + const job = await queueDb.db.collection('default').find().toArray(); + t.same(job[0].attrs, {test: 'message'}); + t.same(job[0].meta, {param: 1}); + await queue.replace({test: 'message'}, {meta: {param: 2}}); + const job2 = await queueDb.db.collection('default').find().toArray(); + t.same(job2[0].attrs, {test: 'message'}); + t.same(job2[0].meta, {param: 2}); + } finally { + await queueDb.close(); + } +}); + +test('replace should add job to queue if it does not exist', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.replace({test: 'message'}, {meta: {param: 2}}); + const job2 = await queueDb.db.collection('default').find().toArray(); + t.same(job2[0].attrs, {test: 'message'}); + t.same(job2[0].meta, {param: 2}); + } finally { + await queueDb.close(); + } +}); diff --git a/test/take.js b/test/take.js index f8ec1a7..cb0b9b1 100644 --- a/test/take.js +++ b/test/take.js @@ -22,6 +22,26 @@ test('take one', async t => { } }); +test('take one with meta', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'wow'}, {meta: {param: 1}}); + + const promise = queue.take(); + t.is(typeof promise.then, 'function', 'should return a Promise'); + + const taken = Array.from(await promise); + t.same(taken, [{test: 'wow', param: 1}]); + + const jobs = Array.from(queue.take()); + t.is(jobs.length, 0, 'should remove job from queue'); + } finally { + await queueDb.close(); + } +}); + test('take two', async t => { const queueDb = await new QueueDb(); const queue = yajob(queueDb.uri);