-
Notifications
You must be signed in to change notification settings - Fork 35
Change _insert_tasks to use add_async #112
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -247,21 +247,22 @@ def _insert_tasks(tasks, queue, transactional=False): | |
| if not tasks: | ||
| return 0 | ||
|
|
||
| try: | ||
| taskqueue.Queue(name=queue).add(tasks, transactional=transactional) | ||
| return len(tasks) | ||
| except (taskqueue.BadTaskStateError, | ||
| taskqueue.TaskAlreadyExistsError, | ||
| taskqueue.TombstonedTaskError, | ||
| taskqueue.TransientError): | ||
| count = len(tasks) | ||
| if count <= 1: | ||
| return 0 | ||
|
|
||
| inserted = _insert_tasks(tasks[:count / 2], queue, transactional) | ||
| inserted += _insert_tasks(tasks[count / 2:], queue, transactional) | ||
|
|
||
| return inserted | ||
| inserted = len(tasks) | ||
|
|
||
| # NOTE: I don't believe we need all these exceptions on here anymore, but | ||
| # I'm going to leave them just to be safe. | ||
| for task in tasks: | ||
| try: | ||
| taskqueue.Queue(name=queue).add_async( | ||
| task, transactional=transactional) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue with never calling |
||
| except (taskqueue.BadTaskStateError, | ||
| taskqueue.TaskAlreadyExistsError, | ||
| taskqueue.TombstonedTaskError, | ||
| taskqueue.TransientError): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I presume these errors would not actually be raised by the async call? Instead, you could probably call |
||
| # TODO: Not sure if this is correct anymore. Any thoughts? | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See the above comment. |
||
| inserted -= 1 | ||
|
|
||
| return inserted | ||
|
|
||
|
|
||
| def _task_batcher(tasks, batch_size=None): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see more statistics on this. Just some basic scenario testing....
Old style (batch and split)
All Async
Batch and Async (take X tasks and split into Y batches and insert that batches async)
Batch and Async on failure (Do the batch insert and if it fails then fallback to all async)
Batch and Async with Async on Failure (take X tasks and split into Y batches and insert that batches async and if those fail split into single tasks to insert async)
Then run those scenarios against 1, 10, 100, 1000 tasks, etc. Also run with some tasks failing and hitting the different split scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @tannermiller-wf @ericolson-wf @johnlockwood-wf