diff --git a/furious/context/context.py b/furious/context/context.py index 1328f62..1a81bea 100644 --- a/furious/context/context.py +++ b/furious/context/context.py @@ -247,21 +247,22 @@ def _insert_tasks(tasks, queue, transactional=False): if not tasks: return 0 - try: - taskqueue.Queue(name=queue).add(tasks, transactional=transactional) - return len(tasks) - except (taskqueue.BadTaskStateError, - taskqueue.TaskAlreadyExistsError, - taskqueue.TombstonedTaskError, - taskqueue.TransientError): - count = len(tasks) - if count <= 1: - return 0 - - inserted = _insert_tasks(tasks[:count / 2], queue, transactional) - inserted += _insert_tasks(tasks[count / 2:], queue, transactional) - - return inserted + inserted = len(tasks) + + # NOTE: I don't believe we need all these exceptions on here anymore, but + # I'm going to leave them just to be safe. + for task in tasks: + try: + taskqueue.Queue(name=queue).add_async( + task, transactional=transactional) + except (taskqueue.BadTaskStateError, + taskqueue.TaskAlreadyExistsError, + taskqueue.TombstonedTaskError, + taskqueue.TransientError): + # TODO: Not sure if this is correct anymore. Any thoughts? + inserted -= 1 + + return inserted def _task_batcher(tasks, batch_size=None): diff --git a/furious/tests/context/test_auto_context.py b/furious/tests/context/test_auto_context.py index 09d3716..e19e6dd 100644 --- a/furious/tests/context/test_auto_context.py +++ b/furious/tests/context/test_auto_context.py @@ -18,7 +18,6 @@ from google.appengine.ext import testbed -from mock import Mock from mock import patch @@ -47,7 +46,7 @@ def tearDown(self): os.environ.clear() os.environ.update(self._orig_environ) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_job_to_context_multiple_batches(self, queue_add_mock): """Ensure adding more tasks than the batch_size causes multiple batches to get inserted. @@ -76,18 +75,14 @@ def test_add_job_to_context_multiple_batches(self, queue_add_mock): self.assertIsInstance(job2, Async) queue_add_mock.assert_called_once() #Ensure only two tasks were inserted - tasks_added = queue_add_mock.call_args[0][0] - self.assertEqual(2, len(tasks_added)) + self.assertEqual(2, len(queue_add_mock.call_args_list)) # Ensure the third job was inserted when the context exited. self.assertIsInstance(job3, Async) - # Ensure add has now been called twice. - self.assertEqual(2, queue_add_mock.call_count) - # Ensure only one task was inserted - tasks_added = queue_add_mock.call_args[0][0] - self.assertEqual(1, len(tasks_added)) + # Ensure add_async has now been called three times. + self.assertEqual(3, queue_add_mock.call_count) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_job_to_context_batch_size_unspecified(self, queue_add_mock): """When batch_size is None or 0, the default behavior of Context is used. All the tasks are added to the queue when the context is exited. @@ -108,11 +103,8 @@ def test_add_job_to_context_batch_size_unspecified(self, queue_add_mock): # Ensure no batches of tasks are inserted yet. self.assertFalse(queue_add_mock.called) - # Ensure the list of tasks added when the context exited. - self.assertEqual(1, queue_add_mock.call_count) # Ensure the three tasks were inserted - tasks_added = queue_add_mock.call_args[0][0] - self.assertEqual(3, len(tasks_added)) + self.assertEqual(3, len(queue_add_mock.call_args_list)) @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) def test_no_jobs(self, queue_add_mock): diff --git a/furious/tests/context/test_context.py b/furious/tests/context/test_context.py index 16142b2..66463df 100644 --- a/furious/tests/context/test_context.py +++ b/furious/tests/context/test_context.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import unittest from google.appengine.ext import testbed from mock import Mock +from mock import call from mock import patch @@ -123,7 +123,7 @@ def wrapper(): self.assertRaises(TestError, wrapper) self.assertEqual(0, queue_add_mock.call_count) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_nested_context_works(self, queue_add_mock): """Ensure adding a job works.""" from furious.async import Async @@ -140,7 +140,7 @@ def test_nested_context_works(self, queue_add_mock): self.assertEqual(1, ctx2.insert_success) self.assertEqual(2, queue_add_mock.call_count) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_multiple_jobs_to_context_works(self, queue_add_mock): """Ensure adding multiple jobs works.""" from furious.context import Context @@ -149,8 +149,8 @@ def test_add_multiple_jobs_to_context_works(self, queue_add_mock): for _ in range(10): ctx.add('test', args=[1, 2]) - queue_add_mock.assert_called_once() - self.assertEqual(10, len(queue_add_mock.call_args[0][0])) + self.assertEqual(10, queue_add_mock.call_count) + self.assertEqual(10, len(queue_add_mock.call_args_list)) self.assertEqual(10, ctx.insert_success) @patch('google.appengine.api.taskqueue.Queue', auto_spec=True) @@ -162,7 +162,8 @@ def test_added_to_correct_queue(self, queue_mock): ctx.add('test', args=[1, 2], queue='A') ctx.add('test', args=[1, 2], queue='A') - queue_mock.assert_called_once_with(name='A') + self.assertEqual(queue_mock.call_args_list, + [call(name='A') for _ in xrange(2)]) self.assertEqual(2, ctx.insert_success) def test_add_jobs_to_multiple_queues(self): @@ -175,12 +176,11 @@ def test_add_jobs_to_multiple_queues(self): class AwesomeQueue(Queue): def __init__(self, *args, **kwargs): super(AwesomeQueue, self).__init__(*args, **kwargs) + self.queue_name = kwargs.get('name') - queue_registry[kwargs.get('name')] = self - self._calls = [] - - def add(self, *args, **kwargs): - self._calls.append((args, kwargs)) + def add_async(self, *args, **kwargs): + queue_registry.setdefault( + self.queue_name, []).append((args, kwargs)) with patch('google.appengine.api.taskqueue.Queue', AwesomeQueue): with Context() as ctx: @@ -189,22 +189,18 @@ def add(self, *args, **kwargs): ctx.add('test', args=[1, 2], queue='B') ctx.add('test', args=[1, 2], queue='C') - self.assertEqual(2, len(queue_registry['A']._calls[0][0][0])) - self.assertEqual(1, len(queue_registry['B']._calls[0][0][0])) - self.assertEqual(1, len(queue_registry['C']._calls[0][0][0])) + self.assertEqual(2, len(queue_registry['A'])) + self.assertEqual(1, len(queue_registry['B'])) + self.assertEqual(1, len(queue_registry['C'])) self.assertEqual(4, ctx.insert_success) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_add_task_fails(self, queue_add_mock): """Ensure insert_failed and insert_success are calculated correctly.""" from google.appengine.api.taskqueue import TaskAlreadyExistsError from furious.context import Context - def queue_add(tasks, transactional=False): - if len(tasks) != 2: - raise TaskAlreadyExistsError() - - queue_add_mock.side_effect = queue_add + queue_add_mock.side_effect = [None, None, TaskAlreadyExistsError()] with Context() as ctx: ctx.add('test', args=[1, 2], queue='A') @@ -402,20 +398,25 @@ def test_queue_name_is_honored(self, queue_mock): from furious.context.context import _insert_tasks inserted = _insert_tasks((None,), 'AbCd') + queue_mock.assert_called_once_with(name='AbCd') self.assertEqual(1, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_tasks_are_passed_along(self, queue_add_mock): """Ensure the list of tasks are passed along.""" + from functools import partial from furious.context.context import _insert_tasks - inserted = _insert_tasks(('A', 1, 'B', 'joe'), 'AbCd') - queue_add_mock.assert_called_once_with(('A', 1, 'B', 'joe'), - transactional=False) + tasks = ['A', 1, 'B', 'joe'] + + inserted = _insert_tasks(tasks, 'AbCd') + + calls = map(partial(call, transactional=False), tasks) + queue_add_mock.assert_has_calls(calls) self.assertEqual(4, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_TransientError(self, queue_add_mock): """Ensure a TransientError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -423,29 +424,14 @@ def test_task_add_error_TransientError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.TransientError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_TransientError(self, queue_add_mock): - """Ensure a batches get split and retried on TransientErrors.""" - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.TransientError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_BadTaskStateError(self, queue_add_mock): """Ensure a BadTaskStateError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -453,29 +439,14 @@ def test_task_add_error_BadTaskStateError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.BadTaskStateError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_BadTaskStateError(self, queue_add_mock): - """Ensure a batches get split and retried on BadTaskStateErrors.""" - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.BadTaskStateError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_TaskAlreadyExistsError(self, queue_add_mock): """Ensure a TaskAlreadyExistsError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -483,30 +454,14 @@ def test_task_add_error_TaskAlreadyExistsError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.TaskAlreadyExistsError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_TaskAlreadyExistsError(self, queue_add_mock): - """Ensure a batches get split and retried on TaskAlreadyExistsErrors. - """ - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.TaskAlreadyExistsError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted) - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) + @patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True) def test_task_add_error_TombstonedTaskError(self, queue_add_mock): """Ensure a TombstonedTaskError doesn't get raised from add.""" from furious.context.context import _insert_tasks @@ -514,26 +469,11 @@ def test_task_add_error_TombstonedTaskError(self, queue_add_mock): def raise_error(*args, **kwargs): from google.appengine.api import taskqueue raise taskqueue.TombstonedTaskError() - queue_add_mock.side_effect = raise_error inserted = _insert_tasks(('A',), 'AbCd') - queue_add_mock.assert_called_once_with(('A',), transactional=False) - self.assertEqual(0, inserted) - - @patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True) - def test_batches_get_split_TombstonedTaskError(self, queue_add_mock): - """Ensure a batches get split and retried on TombstonedTaskErrors.""" - from furious.context.context import _insert_tasks - - def raise_error(*args, **kwargs): - from google.appengine.api import taskqueue - raise taskqueue.TombstonedTaskError() - - queue_add_mock.side_effect = raise_error - inserted = _insert_tasks(('A', 1, 'B'), 'AbCd') - self.assertEqual(5, queue_add_mock.call_count) + queue_add_mock.assert_called_once_with('A', transactional=False) self.assertEqual(0, inserted)