From 2244e88ac18ab44feaeca42c07f900b717dbe651 Mon Sep 17 00:00:00 2001 From: Xavier Verges Date: Tue, 1 May 2018 14:22:36 +0200 Subject: [PATCH 1/4] Do not change the size of documents when updating them --- pymjq/jobqueue.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/pymjq/jobqueue.py b/pymjq/jobqueue.py index a82c43c..770a063 100644 --- a/pymjq/jobqueue.py +++ b/pymjq/jobqueue.py @@ -5,11 +5,17 @@ class JobQueue: + # Capped collection documents can not have its size updated + # https://docs.mongodb.com/manual/core/capped-collections/#document-size + DONE = 'done'.ljust(10, '_') + WAITING = 'waiting'.ljust(10, '_') + WORKING = 'working'.ljust(10, '_') + def __init__(self, db, silent=False): """ Return an instance of a JobQueue. Initialization requires one argument, the database, since we use one jobqueue collection to cover all - sites in an installation/database. The second + sites in an installation/database. The second argument specifies if to print status while waiting for new job, the default value is False""" self.db = db @@ -42,11 +48,11 @@ def valid(self): def next(self): """ Runs the next job in the queue. """ - cursor = self.q.find({'status': 'waiting'}, + cursor = self.q.find({'status': self.WAITING}, tailable=True) if cursor: row = cursor.next() - row['status'] = 'done' + row['status'] = self.DONE row['ts']['started'] = datetime.now() row['ts']['done'] = datetime.now() self.q.save(row) @@ -61,7 +67,7 @@ def pub(self, data=None): ts={'created': datetime.now(), 'started': datetime.now(), 'done': datetime.now()}, - status='waiting', + status=self.WAITING, data=data) try: self.q.insert(doc, manipulate=False) @@ -72,15 +78,15 @@ def pub(self, data=None): def __iter__(self): """ Iterates through all docs in the queue andw aits for new jobs when queue is empty. """ - cursor = self.q.find({'status': 'waiting'}, tailable=True) + cursor = self.q.find({'status': self.WAITING}, tailable=True) while 1: try: row = cursor.next() try: result = self.q.update({'_id': row['_id'], - 'status': 'waiting'}, + 'status': self.WAITING}, {'$set': { - 'status': 'working', + 'status': self.WORKING, 'ts.started': datetime.now() } }) @@ -90,7 +96,7 @@ def __iter__(self): print ('---') print ('Working on job:') yield row - row['status'] = 'done' + row['status'] = self.DONE row['ts']['done'] = datetime.now() self.q.save(row) except: @@ -100,7 +106,7 @@ def __iter__(self): def queue_count(self): """ Returns the number of jobs waiting in the queue. """ - cursor = self.q.find({'status': 'waiting'}) + cursor = self.q.find({'status': self.WAITING}) if cursor: return cursor.count() From 0e581a361a33342c9b9639ef5a8fc2241f8e72dc Mon Sep 17 00:00:00 2001 From: accraze Date: Mon, 5 Feb 2018 11:50:17 -0800 Subject: [PATCH 2/4] adding test teardown to drop collection --- pymjq/test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pymjq/test.py b/pymjq/test.py index 6568128..41f7e48 100644 --- a/pymjq/test.py +++ b/pymjq/test.py @@ -15,6 +15,9 @@ def setUpClass(cls): client.pymongo_test.jobqueue.drop() cls.db = client.pymongo_test + def tearDown(self): + self.db['jobqueue'].drop() + def test_init(self): jq = JobQueue(self.db) self.assertTrue(jq.valid()) From d1f9c51036f235af6883b55ce1e5bf2cfbd90db5 Mon Sep 17 00:00:00 2001 From: Xavier Verges Date: Tue, 1 May 2018 18:09:03 +0200 Subject: [PATCH 3/4] Iterator tests + support for pymongo 3+ --- pymjq/jobqueue.py | 45 +++++++++++++++++++++++++++++---------------- pymjq/test.py | 29 +++++++++++++++-------------- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pymjq/jobqueue.py b/pymjq/jobqueue.py index 770a063..e84d0d4 100644 --- a/pymjq/jobqueue.py +++ b/pymjq/jobqueue.py @@ -11,7 +11,7 @@ class JobQueue: WAITING = 'waiting'.ljust(10, '_') WORKING = 'working'.ljust(10, '_') - def __init__(self, db, silent=False): + def __init__(self, db, silent=False, iterator_wait=None): """ Return an instance of a JobQueue. Initialization requires one argument, the database, since we use one jobqueue collection to cover all @@ -19,11 +19,19 @@ def __init__(self, db, silent=False): argument specifies if to print status while waiting for new job, the default value is False""" self.db = db - self.silent=silent if not self._exists(): print ('Creating jobqueue collection.') self._create() self.q = self.db['jobqueue'] + self.iterator_wait = iterator_wait + if self.iterator_wait is None: + def deafult_iterator_wait(): + time.sleep(5) + if not silent: + print ('waiting!') + return True + + self.iterator_wait = deafult_iterator_wait def _create(self, capped=True): """ Creates a Capped Collection. """ @@ -35,6 +43,11 @@ def _create(self, capped=True): except: raise Exception('Collection "jobqueue" already created') + def _find_opts(self): + if hasattr(pymongo.CursorType, 'TAILABLE_AWAIT'): + return {'cursor_type': pymongo.CursorType.TAILABLE_AWAIT} + return {'Tailable': True} + def _exists(self): """ Ensures that the jobqueue collection exists in the DB. """ return 'jobqueue' in self.db.collection_names() @@ -49,7 +62,7 @@ def valid(self): def next(self): """ Runs the next job in the queue. """ cursor = self.q.find({'status': self.WAITING}, - tailable=True) + **self._find_opts()) if cursor: row = cursor.next() row['status'] = self.DONE @@ -78,19 +91,21 @@ def pub(self, data=None): def __iter__(self): """ Iterates through all docs in the queue andw aits for new jobs when queue is empty. """ - cursor = self.q.find({'status': self.WAITING}, tailable=True) - while 1: + cursor = self.q.find({'status': self.WAITING}, + **self._find_opts()) + get_next = True + while get_next: try: row = cursor.next() try: - result = self.q.update({'_id': row['_id'], - 'status': self.WAITING}, - {'$set': { - 'status': self.WORKING, - 'ts.started': datetime.now() - } - }) - except OperationFailure: + self.q.update({'_id': row['_id'], + 'status': self.WAITING}, + {'$set': { + 'status': self.WORKING, + 'ts.started': datetime.now() + } + }) + except pymongo.errors.OperationFailure: print ('Job Failed!!') continue print ('---') @@ -100,9 +115,7 @@ def __iter__(self): row['ts']['done'] = datetime.now() self.q.save(row) except: - time.sleep(5) - if not self.silent: - print ('waiting!') + get_next = self.iterator_wait() def queue_count(self): """ Returns the number of jobs waiting in the queue. """ diff --git a/pymjq/test.py b/pymjq/test.py index 41f7e48..e60c4c3 100644 --- a/pymjq/test.py +++ b/pymjq/test.py @@ -22,7 +22,6 @@ def test_init(self): jq = JobQueue(self.db) self.assertTrue(jq.valid()) self.assertRaises(Exception, jq._create) - jq.clear_queue() def test_valid(self): jq = JobQueue(self.db) @@ -30,7 +29,6 @@ def test_valid(self): jq._create(capped=False) self.assertFalse(jq.valid()) self.assertRaises(Exception, jq._create) - jq.clear_queue() def test_publish(self): jq = JobQueue(self.db) @@ -48,19 +46,22 @@ def test_next(self): jq.pub(job) row = jq.next() self.assertEquals(row['data']['message'], 'hello world!') - jq.clear_queue() - # def test_iter(self): - # jq = JobQueue(self.db) - # job = {'message': 'hello world!'} - # jq.pub(job) - # for job in jq: - # if job: - # self.assertTrue(True, "Found job") - # jq.clear_queue() - # return - # self.assertEquals(False, "No jobs found!") - # jq.clear_queue() + def test_iter(self): + NUM_JOBS = 3 + num_jobs_queued = [NUM_JOBS] + def iterator_wait(): + num_jobs_queued[0] -= 1 + return num_jobs_queued[0] < 0 + jq = JobQueue(self.db, iterator_wait=iterator_wait) + for ii in range(1, NUM_JOBS + 1): + job = {'message': 'I am # ' + str(ii)} + jq.pub(job) + num_jobs_done = 0 + for job in jq: + print job['data']['message'] + num_jobs_done += 1 + self.assertEquals(num_jobs_done, NUM_JOBS) if __name__ == '__main__': From f07bde6b3cbdddc841ee65a4ecb6cdef1a731023 Mon Sep 17 00:00:00 2001 From: Xavier Verges Date: Tue, 1 May 2018 18:54:46 +0200 Subject: [PATCH 4/4] Previous commit broke PyMongo 2.x --- pymjq/jobqueue.py | 4 ++-- pymjq/test.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pymjq/jobqueue.py b/pymjq/jobqueue.py index e84d0d4..64fcd57 100644 --- a/pymjq/jobqueue.py +++ b/pymjq/jobqueue.py @@ -44,8 +44,8 @@ def _create(self, capped=True): raise Exception('Collection "jobqueue" already created') def _find_opts(self): - if hasattr(pymongo.CursorType, 'TAILABLE_AWAIT'): - return {'cursor_type': pymongo.CursorType.TAILABLE_AWAIT} + if hasattr(pymongo, 'CursorType'): + return {'cursor_type': pymongo.CursorType.TAILABLE_AWAIT} # pylint: disable=no-member return {'Tailable': True} def _exists(self): diff --git a/pymjq/test.py b/pymjq/test.py index e60c4c3..2cd3ac7 100644 --- a/pymjq/test.py +++ b/pymjq/test.py @@ -46,6 +46,7 @@ def test_next(self): jq.pub(job) row = jq.next() self.assertEquals(row['data']['message'], 'hello world!') + self.assertEquals(jq.queue_count(), 0) def test_iter(self): NUM_JOBS = 3