Skip to content
This repository was archived by the owner on Apr 30, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions furious/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,22 @@ def _insert_tasks(tasks, queue, transactional=False):
if not tasks:
return 0

try:
taskqueue.Queue(name=queue).add(tasks, transactional=transactional)
return len(tasks)
except (taskqueue.BadTaskStateError,
taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError,
taskqueue.TransientError):
count = len(tasks)
if count <= 1:
return 0

inserted = _insert_tasks(tasks[:count / 2], queue, transactional)
inserted += _insert_tasks(tasks[count / 2:], queue, transactional)

return inserted
inserted = len(tasks)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see more statistics on this. Just some basic scenario testing....

Old style (batch and split)
All Async
Batch and Async (take X tasks and split into Y batches and insert that batches async)
Batch and Async on failure (Do the batch insert and if it fails then fallback to all async)
Batch and Async with Async on Failure (take X tasks and split into Y batches and insert that batches async and if those fail split into single tasks to insert async)

Then run those scenarios against 1, 10, 100, 1000 tasks, etc. Also run with some tasks failing and hitting the different split scenarios.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# NOTE: I don't believe we need all these exceptions on here anymore, but
# I'm going to leave them just to be safe.
for task in tasks:
try:
taskqueue.Queue(name=queue).add_async(
task, transactional=transactional)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with never calling get_result is that some, or all, of the inserts might have failed due to, for example, TransientErrors (in which case you should retry the insert). However, this won't "block" here, so the user's code will continue running with the assumption all tasks were inserted even though some, or all, were not in fact inserted.

except (taskqueue.BadTaskStateError,
taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError,
taskqueue.TransientError):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume these errors would not actually be raised by the async call? Instead, you could probably call get_result on the async result(s), and actually get an accurate picture (and count) of what happened.

# TODO: Not sure if this is correct anymore. Any thoughts?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the above comment.

inserted -= 1

return inserted


def _task_batcher(tasks, batch_size=None):
Expand Down
20 changes: 6 additions & 14 deletions furious/tests/context/test_auto_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from google.appengine.ext import testbed

from mock import Mock
from mock import patch


Expand Down Expand Up @@ -47,7 +46,7 @@ def tearDown(self):
os.environ.clear()
os.environ.update(self._orig_environ)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_add_job_to_context_multiple_batches(self, queue_add_mock):
"""Ensure adding more tasks than the batch_size causes multiple batches
to get inserted.
Expand Down Expand Up @@ -76,18 +75,14 @@ def test_add_job_to_context_multiple_batches(self, queue_add_mock):
self.assertIsInstance(job2, Async)
queue_add_mock.assert_called_once()
#Ensure only two tasks were inserted
tasks_added = queue_add_mock.call_args[0][0]
self.assertEqual(2, len(tasks_added))
self.assertEqual(2, len(queue_add_mock.call_args_list))

# Ensure the third job was inserted when the context exited.
self.assertIsInstance(job3, Async)
# Ensure add has now been called twice.
self.assertEqual(2, queue_add_mock.call_count)
# Ensure only one task was inserted
tasks_added = queue_add_mock.call_args[0][0]
self.assertEqual(1, len(tasks_added))
# Ensure add_async has now been called three times.
self.assertEqual(3, queue_add_mock.call_count)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_add_job_to_context_batch_size_unspecified(self, queue_add_mock):
"""When batch_size is None or 0, the default behavior of Context is
used. All the tasks are added to the queue when the context is exited.
Expand All @@ -108,11 +103,8 @@ def test_add_job_to_context_batch_size_unspecified(self, queue_add_mock):
# Ensure no batches of tasks are inserted yet.
self.assertFalse(queue_add_mock.called)

# Ensure the list of tasks added when the context exited.
self.assertEqual(1, queue_add_mock.call_count)
# Ensure the three tasks were inserted
tasks_added = queue_add_mock.call_args[0][0]
self.assertEqual(3, len(tasks_added))
self.assertEqual(3, len(queue_add_mock.call_args_list))

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
def test_no_jobs(self, queue_add_mock):
Expand Down
126 changes: 33 additions & 93 deletions furious/tests/context/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

from google.appengine.ext import testbed

from mock import Mock
from mock import call
from mock import patch


Expand Down Expand Up @@ -123,7 +123,7 @@ def wrapper():
self.assertRaises(TestError, wrapper)
self.assertEqual(0, queue_add_mock.call_count)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_nested_context_works(self, queue_add_mock):
"""Ensure adding a job works."""
from furious.async import Async
Expand All @@ -140,7 +140,7 @@ def test_nested_context_works(self, queue_add_mock):
self.assertEqual(1, ctx2.insert_success)
self.assertEqual(2, queue_add_mock.call_count)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_add_multiple_jobs_to_context_works(self, queue_add_mock):
"""Ensure adding multiple jobs works."""
from furious.context import Context
Expand All @@ -149,8 +149,8 @@ def test_add_multiple_jobs_to_context_works(self, queue_add_mock):
for _ in range(10):
ctx.add('test', args=[1, 2])

queue_add_mock.assert_called_once()
self.assertEqual(10, len(queue_add_mock.call_args[0][0]))
self.assertEqual(10, queue_add_mock.call_count)
self.assertEqual(10, len(queue_add_mock.call_args_list))
self.assertEqual(10, ctx.insert_success)

@patch('google.appengine.api.taskqueue.Queue', auto_spec=True)
Expand All @@ -162,7 +162,8 @@ def test_added_to_correct_queue(self, queue_mock):
ctx.add('test', args=[1, 2], queue='A')
ctx.add('test', args=[1, 2], queue='A')

queue_mock.assert_called_once_with(name='A')
self.assertEqual(queue_mock.call_args_list,
[call(name='A') for _ in xrange(2)])
self.assertEqual(2, ctx.insert_success)

def test_add_jobs_to_multiple_queues(self):
Expand All @@ -175,12 +176,11 @@ def test_add_jobs_to_multiple_queues(self):
class AwesomeQueue(Queue):
def __init__(self, *args, **kwargs):
super(AwesomeQueue, self).__init__(*args, **kwargs)
self.queue_name = kwargs.get('name')

queue_registry[kwargs.get('name')] = self
self._calls = []

def add(self, *args, **kwargs):
self._calls.append((args, kwargs))
def add_async(self, *args, **kwargs):
queue_registry.setdefault(
self.queue_name, []).append((args, kwargs))

with patch('google.appengine.api.taskqueue.Queue', AwesomeQueue):
with Context() as ctx:
Expand All @@ -189,22 +189,18 @@ def add(self, *args, **kwargs):
ctx.add('test', args=[1, 2], queue='B')
ctx.add('test', args=[1, 2], queue='C')

self.assertEqual(2, len(queue_registry['A']._calls[0][0][0]))
self.assertEqual(1, len(queue_registry['B']._calls[0][0][0]))
self.assertEqual(1, len(queue_registry['C']._calls[0][0][0]))
self.assertEqual(2, len(queue_registry['A']))
self.assertEqual(1, len(queue_registry['B']))
self.assertEqual(1, len(queue_registry['C']))
self.assertEqual(4, ctx.insert_success)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_add_task_fails(self, queue_add_mock):
"""Ensure insert_failed and insert_success are calculated correctly."""
from google.appengine.api.taskqueue import TaskAlreadyExistsError
from furious.context import Context

def queue_add(tasks, transactional=False):
if len(tasks) != 2:
raise TaskAlreadyExistsError()

queue_add_mock.side_effect = queue_add
queue_add_mock.side_effect = [None, None, TaskAlreadyExistsError()]

with Context() as ctx:
ctx.add('test', args=[1, 2], queue='A')
Expand Down Expand Up @@ -402,138 +398,82 @@ def test_queue_name_is_honored(self, queue_mock):
from furious.context.context import _insert_tasks

inserted = _insert_tasks((None,), 'AbCd')

queue_mock.assert_called_once_with(name='AbCd')
self.assertEqual(1, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_tasks_are_passed_along(self, queue_add_mock):
"""Ensure the list of tasks are passed along."""
from functools import partial
from furious.context.context import _insert_tasks

inserted = _insert_tasks(('A', 1, 'B', 'joe'), 'AbCd')
queue_add_mock.assert_called_once_with(('A', 1, 'B', 'joe'),
transactional=False)
tasks = ['A', 1, 'B', 'joe']

inserted = _insert_tasks(tasks, 'AbCd')

calls = map(partial(call, transactional=False), tasks)
queue_add_mock.assert_has_calls(calls)
self.assertEqual(4, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_task_add_error_TransientError(self, queue_add_mock):
"""Ensure a TransientError doesn't get raised from add."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.TransientError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A',), 'AbCd')
queue_add_mock.assert_called_once_with(('A',), transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
def test_batches_get_split_TransientError(self, queue_add_mock):
"""Ensure a batches get split and retried on TransientErrors."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.TransientError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A', 1, 'B'), 'AbCd')
self.assertEqual(5, queue_add_mock.call_count)
queue_add_mock.assert_called_once_with('A', transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_task_add_error_BadTaskStateError(self, queue_add_mock):
"""Ensure a BadTaskStateError doesn't get raised from add."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.BadTaskStateError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A',), 'AbCd')
queue_add_mock.assert_called_once_with(('A',), transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
def test_batches_get_split_BadTaskStateError(self, queue_add_mock):
"""Ensure a batches get split and retried on BadTaskStateErrors."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.BadTaskStateError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A', 1, 'B'), 'AbCd')
self.assertEqual(5, queue_add_mock.call_count)
queue_add_mock.assert_called_once_with('A', transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_task_add_error_TaskAlreadyExistsError(self, queue_add_mock):
"""Ensure a TaskAlreadyExistsError doesn't get raised from add."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.TaskAlreadyExistsError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A',), 'AbCd')
queue_add_mock.assert_called_once_with(('A',), transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
def test_batches_get_split_TaskAlreadyExistsError(self, queue_add_mock):
"""Ensure a batches get split and retried on TaskAlreadyExistsErrors.
"""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.TaskAlreadyExistsError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A', 1, 'B'), 'AbCd')
self.assertEqual(5, queue_add_mock.call_count)
queue_add_mock.assert_called_once_with('A', transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
@patch('google.appengine.api.taskqueue.Queue.add_async', auto_spec=True)
def test_task_add_error_TombstonedTaskError(self, queue_add_mock):
"""Ensure a TombstonedTaskError doesn't get raised from add."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.TombstonedTaskError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A',), 'AbCd')
queue_add_mock.assert_called_once_with(('A',), transactional=False)
self.assertEqual(0, inserted)

@patch('google.appengine.api.taskqueue.Queue.add', auto_spec=True)
def test_batches_get_split_TombstonedTaskError(self, queue_add_mock):
"""Ensure a batches get split and retried on TombstonedTaskErrors."""
from furious.context.context import _insert_tasks

def raise_error(*args, **kwargs):
from google.appengine.api import taskqueue
raise taskqueue.TombstonedTaskError()

queue_add_mock.side_effect = raise_error

inserted = _insert_tasks(('A', 1, 'B'), 'AbCd')
self.assertEqual(5, queue_add_mock.call_count)
queue_add_mock.assert_called_once_with('A', transactional=False)
self.assertEqual(0, inserted)


Expand Down