From c57ccd60aeaff141e7cbddccae9924df9848d36e Mon Sep 17 00:00:00 2001 From: Tanner Miller Date: Mon, 7 Oct 2013 13:06:56 -0500 Subject: [PATCH] Change _insert_tasks to use add_async In _insert_tasks(), we now add each task individually using Queue.add_async(), instead of all at once with Queue.add(). This allows us to insert each task only once and not have to worry about splitting and retrying until we find the bad tasks. Unfortunately this method prevents us from determining exactly how many tasks were successfully inserted. This will help however when a large number of duplicated tasks are trying to be added and it keeps splitting instead of just quitting. --- furious/context/context.py | 31 ++--- furious/tests/context/test_auto_context.py | 20 +--- furious/tests/context/test_context.py | 126 ++++++--------------- 3 files changed, 55 insertions(+), 122 deletions(-) diff --git a/furious/context/context.py b/furious/context/context.py index 1328f62..1a81bea 100644 --- a/furious/context/context.py +++ b/furious/context/context.py @@ -247,21 +247,22 @@ def _insert_tasks(tasks, queue, transactional=False): if not tasks: return 0 - try: - taskqueue.Queue(name=queue).add(tasks, transactional=transactional) - return len(tasks) - except (taskqueue.BadTaskStateError, - taskqueue.TaskAlreadyExistsError, - taskqueue.TombstonedTaskError, - taskqueue.TransientError): - count = len(tasks) - if count <= 1: - return 0 - - inserted = _insert_tasks(tasks[:count / 2], queue, transactional) - inserted += _insert_tasks(tasks[count / 2:], queue, transactional) - - return inserted + inserted = len(tasks) + + # NOTE: I don't believe we need all these exceptions on here anymore, but + # I'm going to leave them just to be safe. + for task in tasks: + try: + taskqueue.Queue(name=queue).add_async( + task, transactional=transactional) + except (taskqueue.BadTaskStateError, + taskqueue.TaskAlreadyExistsError, + taskqueue.TombstonedTaskError, + taskqueue.TransientError): + # TODO: Not sure if this is correct anymore. Any thoughts? + inserted -= 1 + + return inserted def _task_batcher(tasks, batch_size=None): diff --git a/furious/tests/context/test_auto_context.py b/furious/tests/context/test_auto_context.py index 09d3716..e19e6dd 100644 --- a/furious/tests/context/test_auto_context.py +++ b/furious/tests/context/test_auto_context.py @@ -18,7 +18,6 @@ from google.appengine.ext import testbed -from mock import Mock from mock import patch @@ -47,7 +46,7 @@ def tearDown(self): os.environ.clear() os.environ.update(self._orig_environ) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_job_to_context_multiple_batches(self, queue_add_mock): """Ensure adding more tasks than the batch_size causes multiple batches to get inserted. @@ -76,18 +75,14 @@ def test_add_job_to_context_multiple_batches(self, queue_add_mock): self.assertIsInstance(job2, Async) queue_add_mock.assert_called_once() #Ensure only two tasks were inserted - tasks_added = queue_add_mock.call_args[0][0] - self.assertEqual(2, len(tasks_added)) + self.assertEqual(2, len(queue_add_mock.call_args_list)) # Ensure the third job was inserted when the context exited. self.assertIsInstance(job3, Async) - # Ensure add has now been called twice. - self.assertEqual(2, queue_add_mock.call_count) - # Ensure only one task was inserted - tasks_added = queue_add_mock.call_args[0][0] - self.assertEqual(1, len(tasks_added)) + # Ensure add_async has now been called three times. + self.assertEqual(3, queue_add_mock.call_count) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_job_to_context_batch_size_unspecified(self, queue_add_mock): """When batch_size is None or 0, the default behavior of Context is used. All the tasks are added to the queue when the context is exited. @@ -108,11 +103,8 @@ def test_add_job_to_context_batch_size_unspecified(self, queue_add_mock): # Ensure no batches of tasks are inserted yet. self.assertFalse(queue_add_mock.called) - # Ensure the list of tasks added when the context exited. - self.assertEqual(1, queue_add_mock.call_count) # Ensure the three tasks were inserted - tasks_added = queue_add_mock.call_args[0][0] - self.assertEqual(3, len(tasks_added)) + self.assertEqual(3, len(queue_add_mock.call_args_list)) @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) def test_no_jobs(self, queue_add_mock): diff --git a/furious/tests/context/test_context.py b/furious/tests/context/test_context.py index 16142b2..66463df 100644 --- a/furious/tests/context/test_context.py +++ b/furious/tests/context/test_context.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import unittest from google.appengine.ext import testbed from mock import Mock +from mock import call from mock import patch @@ -123,7 +123,7 @@ def wrapper(): self.assertRaises(TestError, wrapper) self.assertEqual(0, queue_add_mock.call_count) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_nested_context_works(self, queue_add_mock): """Ensure adding a job works.""" from furious.async import Async @@ -140,7 +140,7 @@ def test_nested_context_works(self, queue_add_mock): self.assertEqual(1, ctx2.insert_success) self.assertEqual(2, queue_add_mock.call_count) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_multiple_jobs_to_context_works(self, queue_add_mock): """Ensure adding multiple jobs works.""" from furious.context import Context @@ -149,8 +149,8 @@ def test_add_multiple_jobs_to_context_works(self, queue_add_mock): for _ in range(10): ctx.add('test', args=[1, 2]) - queue_add_mock.assert_called_once() - self.assertEqual(10, len(queue_add_mock.call_args[0][0])) + self.assertEqual(10, queue_add_mock.call_count) + self.assertEqual(10, len(queue_add_mock.call_args_list)) self.assertEqual(10, ctx.insert_success) @patch('google.appengine.api.taskqueue.Queue', auto_spec=True) @@ -162,7 +162,8 @@ def test_added_to_correct_queue(self, queue_mock): ctx.add('test', args=[1, 2], queue='A') ctx.add('test', args=[1, 2], queue='A') - queue_mock.assert_called_once_with(name='A') + self.assertEqual(queue_mock.call_args_list, + [call(name='A') for _ in xrange(2)]) self.assertEqual(2, ctx.insert_success) def test_add_jobs_to_multiple_queues(self): @@ -175,12 +176,11 @@ def test_add_jobs_to_multiple_queues(self): class AwesomeQueue(Queue): def __init__(self, *args, **kwargs): super(AwesomeQueue, self).__init__(*args, **kwargs) + self.queue_name = kwargs.get('name') - queue_registry[kwargs.get('name')] = self - self._calls = [] - - def add(self, *args, **kwargs): - self._calls.append((args, kwargs)) + def add_async(self, *args, **kwargs): + queue_registry.setdefault( + self.queue_name, []).append((args, kwargs)) with patch('google.appengine.api.taskqueue.Queue', AwesomeQueue): with Context() as ctx: @@ -189,22 +189,18 @@ def add(self, *args, **kwargs): ctx.add('test', args=[1, 2], queue='B') ctx.add('test', args=[1, 2], queue='C') - self.assertEqual(2, len(queue_registry['A']._calls[0][0][0])) - self.assertEqual(1, len(queue_registry['B']._calls[0][0][0])) - self.assertEqual(1, len(queue_registry['C']._calls[0][0][0])) + self.assertEqual(2, len(queue_registry['A'])) + self.assertEqual(1, len(queue_registry['B'])) + self.assertEqual(1, len(queue_registry['C'])) self.assertEqual(4, ctx.insert_success) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_task_fails(self, queue_add_mock): """Ensure insert_failed and insert_success are calculated correctly.""" from google.appengine.api.taskqueue import TaskAlreadyExistsError from furious.context import Context - def queue_add(tasks, transactional=False): - if len(tasks) != 2: - raise TaskAlreadyExistsError() - - queue_add_mock.side_effect = queue_add + queue_add_mock.side_effect = [None, None, TaskAlreadyExistsError()] with Context() as ctx: ctx.add('test', args=[1, 2], queue='A') @@ -402,20 +398,25 @@ def test_queue_name_is_honored(self, queue_mock): from furious.context.context import _insert_tasks inserted = _insert_tasks((None,), 'AbCd') + queue_mock.assert_called_once_with(name='AbCd') self.assertEqual(1, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_tasks_are_passed_along(self, queue_add_mock): """Ensure the list of tasks are passed along.""" + from functools import partial from furious.context.context import _insert_tasks - inserted = _insert_tasks(('A', 1, 'B', 'joe'), 'AbCd') - queue_add_mock.assert_called_once_with(('A', 1, 'B', 'joe'), - transactional=False) + tasks = ['A', 1, 'B', 'joe'] + + inserted = _insert_tasks(tasks, 'AbCd') + + calls = map(partial(call, transactional=False), tasks) + queue_add_mock.assert_has_calls(calls) self.assertEqual(4, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_TransientError(self, queue_add_mock): """Ensure a TransientError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -423,29 +424,14 @@ def test_task_add_error_TransientError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.TransientError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_TransientError(self, queue_add_mock): - """Ensure a batches get split and retried on TransientErrors.""" - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.TransientError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_BadTaskStateError(self, queue_add_mock): """Ensure a BadTaskStateError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -453,29 +439,14 @@ def test_task_add_error_BadTaskStateError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.BadTaskStateError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_BadTaskStateError(self, queue_add_mock): - """Ensure a batches get split and retried on BadTaskStateErrors.""" - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.BadTaskStateError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_TaskAlreadyExistsError(self, queue_add_mock): """Ensure a TaskAlreadyExistsError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -483,30 +454,14 @@ def test_task_add_error_TaskAlreadyExistsError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.TaskAlreadyExistsError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_TaskAlreadyExistsError(self, queue_add_mock): - """Ensure a batches get split and retried on TaskAlreadyExistsErrors. - """ - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.TaskAlreadyExistsError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_TombstonedTaskError(self, queue_add_mock): """Ensure a TombstonedTaskError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -514,26 +469,11 @@ def test_task_add_error_TombstonedTaskError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.TombstonedTaskError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_TombstonedTaskError(self, queue_add_mock): - """Ensure a batches get split and retried on TombstonedTaskErrors.""" - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.TombstonedTaskError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted)