From 13e6ecc037140f14f45ce0d439913307a249d32d Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 5 Aug 2013 16:33:54 -0500 Subject: [PATCH 01/19] Add select_queue function for queue group support. --- furious/async.py | 73 +++++++++++++++++++++ furious/tests/test_async.py | 125 ++++++++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+) diff --git a/furious/async.py b/furious/async.py index 8cee01f..dea6538 100644 --- a/furious/async.py +++ b/furious/async.py @@ -88,6 +88,8 @@ def run_me(*args, **kwargs): MAX_DEPTH = 100 MAX_RESTARTS = 10 DISABLE_RECURSION_CHECK = -1 +TASK_WEIGHT = 0.6 +EXECUTED_WEIGHT = 0.4 DEFAULT_RETRY_OPTIONS = { 'task_retry_limit': MAX_RESTARTS @@ -448,6 +450,77 @@ def wrapper(*args, **kwargs): return real_decorator +# Use a thread local cache to optimize performance in select_queue. +from threading import local +_queue_group_cache = local() +_queue_group_cache.lists = {} + + +def select_queue(queue_group, queue_count=1, random=True): + """Select an optimal queue to run a task in from the given queue group. By + default, this simply randomly selects a queue from the group, otherwise it + leverages the taskqueue API to try and determine the best queue to use. The + queue_count kwarg indicates the number of queues allocated to the group. + """ + + if not queue_group: + return ASYNC_DEFAULT_QUEUE + + if queue_count <= 0: + raise Exception('Queue group must have at least 1 queue.') + + if queue_count == 1: + return '%s-0' % queue_group + + if random: + group_queues = _queue_group_cache.lists.setdefault(queue_group, []) + + if len(group_queues) == 0: + from random import shuffle + + group_queues.extend('%s-%d' % (queue_group, i) + for i in xrange(queue_count)) + + shuffle(group_queues) + + return group_queues.pop() + + return _calculate_optimal_queue(queue_group, queue_count) + + +def _calculate_optimal_queue(queue_group, queue_count): + """Determine an approximate optimal queue from the given queue group. The + queue_count argument indicates the number of queues allocated to the group. + """ + + from google.appengine.api import taskqueue + + # Asynchronously fetch the stats for every queue in the group. + queue_stats = taskqueue.QueueStatistics.fetch( + [taskqueue.Queue(name='%s-%d' % (queue_group, i)) + for i in xrange(queue_count)]) + + # Apply ranks to the queues. + ranks = [(_calculate_queue_rank(queue_stat), queue_stat.queue.name) + for queue_stat in queue_stats] + + # Sort on rank and return the best one (lowest rank score). + ranks.sort(key=lambda tup: tup[0]) + + # The second value of the tuple is the queue name. + return ranks[0][1] + + +def _calculate_queue_rank(queue_stats): + """Calculate a ranking for the given QueueStatistics object such that the + lower the rank, the more "optimal" the queue is. + """ + + # TODO: This is just a rudimentary ranking formula - may want to revisit. + return queue_stats.tasks * TASK_WEIGHT - \ + queue_stats.executed_last_minute * EXECUTED_WEIGHT + + def _check_options(options): """Make sure no one passes something not allowed in.""" if not options: diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index 036a82b..73cbc4b 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -18,6 +18,8 @@ import unittest +from mock import call +from mock import Mock from mock import patch @@ -852,3 +854,126 @@ def test_has_type(self): self.assertIsInstance(result, MessageProcessor) + +class TestSelectQueue(unittest.TestCase): + """Ensure select_queue() works correctly.""" + + def setUp(self): + from threading import local + from furious import async + + async._queue_group_cache = local() + async._queue_group_cache.lists = {} + + def test_none(self): + """Ensure that if the queue group is None, the default queue is + returned. + """ + from furious.async import ASYNC_DEFAULT_QUEUE + from furious.async import select_queue + + actual = select_queue(None) + + self.assertEqual(actual, ASYNC_DEFAULT_QUEUE) + + def test_invalid_queue_count(self): + """Ensure that an exception is raised when a bad queue count is given. + """ + from furious.async import select_queue + + with self.assertRaises(Exception) as context: + select_queue('foo-queue', queue_count=0) + + self.assertEqual(context.exception.message, + 'Queue group must have at least 1 queue.') + + def test_single_queue(self): + """Ensure that if the queue group passed in has a single queue, that + queue is returned. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + expected = '%s-0' % queue_group + + actual = select_queue(queue_group) + + self.assertEqual(actual, expected) + + @patch('random.shuffle') + def test_random(self, mock_shuffle): + """Ensure that a random queue is selected from the group.""" + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + + actual = select_queue(queue_group, queue_count=queue_count) + + self.assertEqual(actual, 'foo-queue-4') + + # TODO: assert call args + self.assertTrue(mock_shuffle.called) + + @patch('furious.async._calculate_optimal_queue') + def test_optimal_selection(self, mock_optimal_queue): + """Ensure that select_queue calls through to _calculate_optimal_queue + when random=False. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + expected = '%s-0' % queue_group + mock_optimal_queue.return_value = expected + + actual = select_queue(queue_group, random=False, + queue_count=queue_count) + + self.assertEqual(actual, expected) + mock_optimal_queue.assert_called_once_with(queue_group, queue_count) + + @patch('furious.async._calculate_queue_rank') + @patch('google.appengine.api.taskqueue.QueueStatistics.fetch') + def test_calculate_optimal_queue(self, mock_fetch, mock_queue_rank): + """Ensure _calculate_optimal_queue returns the queue with the highest + rank. + """ + from furious.async import _calculate_optimal_queue + + queue_group = 'foo-queue' + queue_count = 5 + + mock_queues = [] + for i in range(queue_count): + mock_queue = Mock() + mock_queue.name = '%s-%d' % (queue_group, i) + mock_queues.append(mock_queue) + + mock_stats = [Mock(queue=mock_queue) for mock_queue in mock_queues] + mock_fetch.return_value = mock_stats + mock_queue_rank.side_effect = [2, 4, 3, 1, 0] + + actual = _calculate_optimal_queue(queue_group, queue_count) + + self.assertEqual(actual, '%s-4' % queue_group) + mock_calls = [call(mock_stat) for mock_stat in mock_stats] + self.assertEqual(mock_queue_rank.call_args_list, mock_calls) + + def test_calculate_queue_rank(self): + """Ensure _calculate_queue_rank returns the correct value.""" + from furious.async import EXECUTED_WEIGHT + from furious.async import TASK_WEIGHT + from furious.async import _calculate_queue_rank + + tasks = 5 + executed = 10 + queue_stats = Mock() + queue_stats.tasks = tasks + queue_stats.executed_last_minute = executed + + expected = TASK_WEIGHT * tasks - EXECUTED_WEIGHT * executed + + actual = _calculate_queue_rank(queue_stats) + + self.assertEqual(actual, expected) From 772f5a616999926a2d368d2506f648c0a437f3da Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 5 Aug 2013 16:34:46 -0500 Subject: [PATCH 02/19] Wire up select_queue with Async's get_queue. This adds support for queue groups when creating an Async. --- furious/async.py | 6 ++++++ furious/tests/test_async.py | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/furious/async.py b/furious/async.py index dea6538..a5fd324 100644 --- a/furious/async.py +++ b/furious/async.py @@ -261,6 +261,12 @@ def get_headers(self): def get_queue(self): """Return the queue the task should run in.""" + queue_group = self._options.get('queue_group') + if queue_group: + return select_queue(queue_group[0], + queue_count=queue_group[1], + random=queue_group[2]) + return self._options.get('queue', ASYNC_DEFAULT_QUEUE) def get_task_args(self): diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index 73cbc4b..f454c73 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -329,6 +329,25 @@ def test_get_default_queue(self): self.assertEqual('default', job.get_queue()) + @patch('furious.async.select_queue') + def test_get_queue_group_queue(self, mock_select_queue): + """Ensure get_queue calls through to select_queue when a queue group + is given. + """ + from furious.async import Async + + queue_group = 'foo-queue' + queue_count = 5 + random = False + + job = Async('nonexistant', + queue_group=(queue_group, queue_count, random)) + + expected = '%s-2' % queue_group + mock_select_queue.return_value = expected + + self.assertEqual(expected, job.get_queue()) + def test_get_task_args(self): """Ensure get_task_args returns the job task_args.""" from furious.async import Async From f68eac21b323f9ae3a1c85cfdc32b7b094f405d7 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 5 Aug 2013 16:43:58 -0500 Subject: [PATCH 03/19] Add queue group support to Message. --- furious/batcher.py | 6 ++++++ furious/tests/test_batcher.py | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/furious/batcher.py b/furious/batcher.py index b104aab..8707ddc 100644 --- a/furious/batcher.py +++ b/furious/batcher.py @@ -21,6 +21,7 @@ from google.appengine.api import memcache from .async import Async +from .async import select_queue MESSAGE_DEFAULT_QUEUE = 'default-pull' MESSAGE_PROCESSOR_NAME = 'processor' @@ -45,6 +46,11 @@ def update_options(self, **options): def get_queue(self): """Return the queue the task should run in.""" + queue_group = self._options.get('queue_group') + if queue_group: + return select_queue(queue_group[0], queue_count=queue_group[1], + random=queue_group[2]) + return self._options.get('queue', MESSAGE_DEFAULT_QUEUE) def get_task_args(self): diff --git a/furious/tests/test_batcher.py b/furious/tests/test_batcher.py index 5ad3d9e..932a316 100644 --- a/furious/tests/test_batcher.py +++ b/furious/tests/test_batcher.py @@ -71,6 +71,24 @@ def test_get_default_queue(self): self.assertEqual('default-pull', message.get_queue()) + @patch('furious.batcher.select_queue') + def test_get_queue_group_queue(self, mock_select_queue): + """Ensure get_queue calls through to select_queue when a queue group + is given. + """ + from furious.batcher import Message + + queue_group = 'foo-queue' + queue_count = 5 + random = False + + message = Message(queue_group=(queue_group, queue_count, random)) + + expected = '%s-2' % queue_group + mock_select_queue.return_value = expected + + self.assertEqual(expected, message.get_queue()) + def test_get_task_args(self): """Ensure get_task_args returns the message task_args.""" from furious.batcher import Message From bd77828684ec9c92ed3c4a9bc42a8c93ea4bd18f Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 5 Aug 2013 16:51:49 -0500 Subject: [PATCH 04/19] Correct documentation. We're no longer asynchronously fetching queue statistics. --- furious/async.py | 1 - 1 file changed, 1 deletion(-) diff --git a/furious/async.py b/furious/async.py index a5fd324..06fcb0c 100644 --- a/furious/async.py +++ b/furious/async.py @@ -501,7 +501,6 @@ def _calculate_optimal_queue(queue_group, queue_count): from google.appengine.api import taskqueue - # Asynchronously fetch the stats for every queue in the group. queue_stats = taskqueue.QueueStatistics.fetch( [taskqueue.Queue(name='%s-%d' % (queue_group, i)) for i in xrange(queue_count)]) From a16d8220bf5a2a7c82f5355da8aab09cad905ae9 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 5 Aug 2013 17:05:25 -0500 Subject: [PATCH 05/19] Add default queue for queue group selection. This allows us to fall back to an appropriate default queue when something goes wrong during queue selection for both Messages and Asyncs. --- furious/async.py | 12 +++++++----- furious/batcher.py | 6 ++++-- furious/tests/test_async.py | 6 ++++-- furious/tests/test_batcher.py | 4 +++- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/furious/async.py b/furious/async.py index 06fcb0c..b73920b 100644 --- a/furious/async.py +++ b/furious/async.py @@ -263,9 +263,10 @@ def get_queue(self): """Return the queue the task should run in.""" queue_group = self._options.get('queue_group') if queue_group: - return select_queue(queue_group[0], - queue_count=queue_group[1], - random=queue_group[2]) + queue_count = self._options.get('queue_count', 1) + random = self._options.get('random', True) + return select_queue(queue_group, queue_count=queue_count, + random=random) return self._options.get('queue', ASYNC_DEFAULT_QUEUE) @@ -462,7 +463,8 @@ def wrapper(*args, **kwargs): _queue_group_cache.lists = {} -def select_queue(queue_group, queue_count=1, random=True): +def select_queue(queue_group, queue_count=1, random=True, + default=ASYNC_DEFAULT_QUEUE): """Select an optimal queue to run a task in from the given queue group. By default, this simply randomly selects a queue from the group, otherwise it leverages the taskqueue API to try and determine the best queue to use. The @@ -470,7 +472,7 @@ def select_queue(queue_group, queue_count=1, random=True): """ if not queue_group: - return ASYNC_DEFAULT_QUEUE + return default if queue_count <= 0: raise Exception('Queue group must have at least 1 queue.') diff --git a/furious/batcher.py b/furious/batcher.py index 8707ddc..bef41db 100644 --- a/furious/batcher.py +++ b/furious/batcher.py @@ -48,8 +48,10 @@ def get_queue(self): """Return the queue the task should run in.""" queue_group = self._options.get('queue_group') if queue_group: - return select_queue(queue_group[0], queue_count=queue_group[1], - random=queue_group[2]) + queue_count = self._options.get('queue_count', 1) + random = self._options.get('random', True) + return select_queue(queue_group, queue_count=queue_count, + random=random, default=MESSAGE_DEFAULT_QUEUE) return self._options.get('queue', MESSAGE_DEFAULT_QUEUE) diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index f454c73..148bab2 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -335,13 +335,15 @@ def test_get_queue_group_queue(self, mock_select_queue): is given. """ from furious.async import Async + from furious.async import ASYNC_DEFAULT_QUEUE queue_group = 'foo-queue' queue_count = 5 random = False - job = Async('nonexistant', - queue_group=(queue_group, queue_count, random)) + job = Async('nonexistant', queue_group=queue_group, + queue_count=queue_count, random=random, + default=ASYNC_DEFAULT_QUEUE) expected = '%s-2' % queue_group mock_select_queue.return_value = expected diff --git a/furious/tests/test_batcher.py b/furious/tests/test_batcher.py index 932a316..fa31859 100644 --- a/furious/tests/test_batcher.py +++ b/furious/tests/test_batcher.py @@ -77,12 +77,14 @@ def test_get_queue_group_queue(self, mock_select_queue): is given. """ from furious.batcher import Message + from furious.batcher import MESSAGE_DEFAULT_QUEUE queue_group = 'foo-queue' queue_count = 5 random = False - message = Message(queue_group=(queue_group, queue_count, random)) + message = Message(queue_group=queue_group, queue_count=queue_count, + random=random, default=MESSAGE_DEFAULT_QUEUE) expected = '%s-2' % queue_group mock_select_queue.return_value = expected From 1697364ac9e46697b9615cd4b4258c52a7e83f96 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 09:36:56 -0500 Subject: [PATCH 06/19] Use memcache to improve queue selection performance. Temporarily cache the QueueStatistics to reduce RPCs. --- furious/async.py | 80 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/furious/async.py b/furious/async.py index b73920b..b59aec1 100644 --- a/furious/async.py +++ b/furious/async.py @@ -457,12 +457,6 @@ def wrapper(*args, **kwargs): return real_decorator -# Use a thread local cache to optimize performance in select_queue. -from threading import local -_queue_group_cache = local() -_queue_group_cache.lists = {} - - def select_queue(queue_group, queue_count=1, random=True, default=ASYNC_DEFAULT_QUEUE): """Select an optimal queue to run a task in from the given queue group. By @@ -481,7 +475,8 @@ def select_queue(queue_group, queue_count=1, random=True, return '%s-0' % queue_group if random: - group_queues = _queue_group_cache.lists.setdefault(queue_group, []) + lists = _get_from_cache('queues', default={}) + group_queues = lists.setdefault(queue_group, []) if len(group_queues) == 0: from random import shuffle @@ -501,31 +496,80 @@ def _calculate_optimal_queue(queue_group, queue_count): queue_count argument indicates the number of queues allocated to the group. """ + from google.appengine.api import memcache from google.appengine.api import taskqueue - queue_stats = taskqueue.QueueStatistics.fetch( - [taskqueue.Queue(name='%s-%d' % (queue_group, i)) - for i in xrange(queue_count)]) + STATS_NAMESPACE = 'queuestats' + + queues = ['%s-%d' % (queue_group, i) for i in xrange(queue_count)] + queue_dict = memcache.get_multi(queues, namespace=STATS_NAMESPACE) + + if len(queue_dict) != len(queues): + # Fetch the QueueStatistics and cache them for 30 seconds. + queue_stats = taskqueue.QueueStatistics.fetch(queues) + stats_and_counts = [(stats, 0) for stats in queue_stats] + queue_dict = dict(zip(queues, stats_and_counts)) + memcache.set_multi(queue_dict, namespace=STATS_NAMESPACE, time=30) # Apply ranks to the queues. - ranks = [(_calculate_queue_rank(queue_stat), queue_stat.queue.name) - for queue_stat in queue_stats] + ranks = [(queue, _calculate_queue_rank(*queue_dict.get(queue))) + for queue in queues] # Sort on rank and return the best one (lowest rank score). - ranks.sort(key=lambda tup: tup[0]) + ranks.sort(key=lambda tup: tup[1]) - # The second value of the tuple is the queue name. - return ranks[0][1] + # The first value of the tuple is the queue name. + queue = ranks[0][0] + stats, assigned = queue_dict.get(queue) + assigned += 1 + memcache.set(queue, (stats, assigned), namespace=STATS_NAMESPACE, time=30) -def _calculate_queue_rank(queue_stats): + return queue + + +def _calculate_queue_rank(queue_stats, assigned): """Calculate a ranking for the given QueueStatistics object such that the lower the rank, the more "optimal" the queue is. """ + if not queue_stats: + return 0 + + executed_last_minute = queue_stats.executed_last_minute + if not executed_last_minute: + executed_last_minute = 0 + # TODO: This is just a rudimentary ranking formula - may want to revisit. - return queue_stats.tasks * TASK_WEIGHT - \ - queue_stats.executed_last_minute * EXECUTED_WEIGHT + # Also, it's ineffective when adding tasks to a context because this is + # called for every task before any are inserted, so a queue will get the + # same rank even if it has had tasks added. + rank = (queue_stats.tasks + assigned) * TASK_WEIGHT - \ + executed_last_minute * EXECUTED_WEIGHT + + print '%s: %d' % (queue_stats.queue.name, rank) + + +# Use a thread local cache to optimize performance in select_queue. +from threading import local +_cache = local() + + +def _get_from_cache(key, default=None): + """Fetch the value for the given key from the thread local cache. If the + key does not exist, set it for the given default value and return the + default. + """ + + if not key: + return default + + if hasattr(_cache, key): + return getattr(_cache, key) + + setattr(_cache, key, default) + + return default def _check_options(options): From c1827f9e95583e3af630aea467bbc4fd647dfa84 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 09:38:01 -0500 Subject: [PATCH 07/19] Add queue group example. --- example/__init__.py | 2 ++ example/queue_group_intro.py | 57 ++++++++++++++++++++++++++++++++++++ queue.yaml | 17 +++++++++++ 3 files changed, 76 insertions(+) create mode 100644 example/queue_group_intro.py diff --git a/example/__init__.py b/example/__init__.py index 1d35440..3415285 100644 --- a/example/__init__.py +++ b/example/__init__.py @@ -35,6 +35,7 @@ from .context_intro import ContextIntroHandler from .grep import GrepHandler from .simple_workflow import SimpleWorkflowHandler +from .queue_group_intro import QueueGroupIntroHandler config = { 'webapp2_extras.jinja2': { @@ -55,5 +56,6 @@ ('/batcher/run', BatcherHandler), ('/batcher/stats', BatcherStatsHandler), ('/grep', GrepHandler), + ('/queue_group', QueueGroupIntroHandler) ], config=config) diff --git a/example/queue_group_intro.py b/example/queue_group_intro.py new file mode 100644 index 0000000..973e639 --- /dev/null +++ b/example/queue_group_intro.py @@ -0,0 +1,57 @@ +# +# Copyright 2013 WebFilings, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A very basic async example using a queue group. + +This will insert 500 async tasks to run the example_function in a queue +group. +""" + +import logging + +import webapp2 + + +class QueueGroupIntroHandler(webapp2.RequestHandler): + """Demonstrate the creation and insertion of 500 furious tasks to be run + in a queue group with optimal queue selection enabled.""" + def get(self): + from furious.async import Async + + random = self.request.get('random') + if random and random == '1': + random = True + else: + random = False + + for i in xrange(500): + async_task = Async(target=example_function, args=[i], + queue_group='workers', queue_count=4, + random=random) + async_task.start() + + logging.info('500 Async jobs inserted.') + + self.response.out.write('Successfully inserted 500 Async jobs.') + + +def example_function(*args, **kwargs): + """This function is called by furious tasks to demonstrate usage.""" + logging.info('example_function executed with args: %r, kwargs: %r', + args, kwargs) + import time + time.sleep(2) + return args diff --git a/queue.yaml b/queue.yaml index cdc630f..3c388db 100644 --- a/queue.yaml +++ b/queue.yaml @@ -4,6 +4,23 @@ queue: rate: 100/s bucket_size: 100 +# PUSH QUEUE GROUP +- name: workers-0 + rate: 100/s + bucket_size: 100 + +- name: workers-1 + rate: 100/s + bucket_size: 100 + +- name: workers-2 + rate: 100/s + bucket_size: 100 + +- name: workers-3 + rate: 100/s + bucket_size: 100 + # PULL QUEUES - name: default-pull mode: pull From 49737135bf9e6473e308a19afd1be2ef6b351729 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 09:56:42 -0500 Subject: [PATCH 08/19] Remove optimal queue group selection. Selecting an optimal queue from a group via the taskqueue API is looking like it will add a lot of code, so for now let's keep things lightweight and only use random queue selection. --- furious/async.py | 94 +++---------------- furious/batcher.py | 3 +- furious/tests/test_async.py | 69 +------------- furious/tests/test_batcher.py | 3 +- .../tests/test_stubs/appengine/test_queues.py | 6 +- 5 files changed, 23 insertions(+), 152 deletions(-) diff --git a/furious/async.py b/furious/async.py index b59aec1..bb92631 100644 --- a/furious/async.py +++ b/furious/async.py @@ -264,9 +264,7 @@ def get_queue(self): queue_group = self._options.get('queue_group') if queue_group: queue_count = self._options.get('queue_count', 1) - random = self._options.get('random', True) - return select_queue(queue_group, queue_count=queue_count, - random=random) + return select_queue(queue_group, queue_count=queue_count) return self._options.get('queue', ASYNC_DEFAULT_QUEUE) @@ -457,12 +455,12 @@ def wrapper(*args, **kwargs): return real_decorator -def select_queue(queue_group, queue_count=1, random=True, - default=ASYNC_DEFAULT_QUEUE): +def select_queue(queue_group, queue_count=1, default=ASYNC_DEFAULT_QUEUE): """Select an optimal queue to run a task in from the given queue group. By - default, this simply randomly selects a queue from the group, otherwise it - leverages the taskqueue API to try and determine the best queue to use. The - queue_count kwarg indicates the number of queues allocated to the group. + default, this simply randomly selects a queue from the group. + + TODO: leverage the taskqueue API to try and determine the best queue to + use via a kwarg. """ if not queue_group: @@ -474,83 +472,21 @@ def select_queue(queue_group, queue_count=1, random=True, if queue_count == 1: return '%s-0' % queue_group - if random: - lists = _get_from_cache('queues', default={}) - group_queues = lists.setdefault(queue_group, []) - - if len(group_queues) == 0: - from random import shuffle - - group_queues.extend('%s-%d' % (queue_group, i) - for i in xrange(queue_count)) - - shuffle(group_queues) - - return group_queues.pop() - - return _calculate_optimal_queue(queue_group, queue_count) - - -def _calculate_optimal_queue(queue_group, queue_count): - """Determine an approximate optimal queue from the given queue group. The - queue_count argument indicates the number of queues allocated to the group. - """ - - from google.appengine.api import memcache - from google.appengine.api import taskqueue - - STATS_NAMESPACE = 'queuestats' - - queues = ['%s-%d' % (queue_group, i) for i in xrange(queue_count)] - queue_dict = memcache.get_multi(queues, namespace=STATS_NAMESPACE) - - if len(queue_dict) != len(queues): - # Fetch the QueueStatistics and cache them for 30 seconds. - queue_stats = taskqueue.QueueStatistics.fetch(queues) - stats_and_counts = [(stats, 0) for stats in queue_stats] - queue_dict = dict(zip(queues, stats_and_counts)) - memcache.set_multi(queue_dict, namespace=STATS_NAMESPACE, time=30) - - # Apply ranks to the queues. - ranks = [(queue, _calculate_queue_rank(*queue_dict.get(queue))) - for queue in queues] - - # Sort on rank and return the best one (lowest rank score). - ranks.sort(key=lambda tup: tup[1]) - - # The first value of the tuple is the queue name. - queue = ranks[0][0] - - stats, assigned = queue_dict.get(queue) - assigned += 1 - memcache.set(queue, (stats, assigned), namespace=STATS_NAMESPACE, time=30) - - return queue - - -def _calculate_queue_rank(queue_stats, assigned): - """Calculate a ranking for the given QueueStatistics object such that the - lower the rank, the more "optimal" the queue is. - """ + queue_lists = _get_from_cache('queues', default={}) + group_queues = queue_lists.setdefault(queue_group, []) - if not queue_stats: - return 0 + if len(group_queues) == 0: + from random import shuffle - executed_last_minute = queue_stats.executed_last_minute - if not executed_last_minute: - executed_last_minute = 0 + group_queues.extend('%s-%d' % (queue_group, i) + for i in xrange(queue_count)) - # TODO: This is just a rudimentary ranking formula - may want to revisit. - # Also, it's ineffective when adding tasks to a context because this is - # called for every task before any are inserted, so a queue will get the - # same rank even if it has had tasks added. - rank = (queue_stats.tasks + assigned) * TASK_WEIGHT - \ - executed_last_minute * EXECUTED_WEIGHT + shuffle(group_queues) - print '%s: %d' % (queue_stats.queue.name, rank) + return group_queues.pop() -# Use a thread local cache to optimize performance in select_queue. +# Use a thread local cache to optimize performance and queue distribution. from threading import local _cache = local() diff --git a/furious/batcher.py b/furious/batcher.py index bef41db..8ab850f 100644 --- a/furious/batcher.py +++ b/furious/batcher.py @@ -49,9 +49,8 @@ def get_queue(self): queue_group = self._options.get('queue_group') if queue_group: queue_count = self._options.get('queue_count', 1) - random = self._options.get('random', True) return select_queue(queue_group, queue_count=queue_count, - random=random, default=MESSAGE_DEFAULT_QUEUE) + default=MESSAGE_DEFAULT_QUEUE) return self._options.get('queue', MESSAGE_DEFAULT_QUEUE) diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index 148bab2..923c1a4 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -339,11 +339,9 @@ def test_get_queue_group_queue(self, mock_select_queue): queue_group = 'foo-queue' queue_count = 5 - random = False job = Async('nonexistant', queue_group=queue_group, - queue_count=queue_count, random=random, - default=ASYNC_DEFAULT_QUEUE) + queue_count=queue_count, default=ASYNC_DEFAULT_QUEUE) expected = '%s-2' % queue_group mock_select_queue.return_value = expected @@ -883,8 +881,7 @@ def setUp(self): from threading import local from furious import async - async._queue_group_cache = local() - async._queue_group_cache.lists = {} + async._cache = local() def test_none(self): """Ensure that if the queue group is None, the default queue is @@ -936,65 +933,3 @@ def test_random(self, mock_shuffle): # TODO: assert call args self.assertTrue(mock_shuffle.called) - @patch('furious.async._calculate_optimal_queue') - def test_optimal_selection(self, mock_optimal_queue): - """Ensure that select_queue calls through to _calculate_optimal_queue - when random=False. - """ - from furious.async import select_queue - - queue_group = 'foo-queue' - queue_count = 5 - expected = '%s-0' % queue_group - mock_optimal_queue.return_value = expected - - actual = select_queue(queue_group, random=False, - queue_count=queue_count) - - self.assertEqual(actual, expected) - mock_optimal_queue.assert_called_once_with(queue_group, queue_count) - - @patch('furious.async._calculate_queue_rank') - @patch('google.appengine.api.taskqueue.QueueStatistics.fetch') - def test_calculate_optimal_queue(self, mock_fetch, mock_queue_rank): - """Ensure _calculate_optimal_queue returns the queue with the highest - rank. - """ - from furious.async import _calculate_optimal_queue - - queue_group = 'foo-queue' - queue_count = 5 - - mock_queues = [] - for i in range(queue_count): - mock_queue = Mock() - mock_queue.name = '%s-%d' % (queue_group, i) - mock_queues.append(mock_queue) - - mock_stats = [Mock(queue=mock_queue) for mock_queue in mock_queues] - mock_fetch.return_value = mock_stats - mock_queue_rank.side_effect = [2, 4, 3, 1, 0] - - actual = _calculate_optimal_queue(queue_group, queue_count) - - self.assertEqual(actual, '%s-4' % queue_group) - mock_calls = [call(mock_stat) for mock_stat in mock_stats] - self.assertEqual(mock_queue_rank.call_args_list, mock_calls) - - def test_calculate_queue_rank(self): - """Ensure _calculate_queue_rank returns the correct value.""" - from furious.async import EXECUTED_WEIGHT - from furious.async import TASK_WEIGHT - from furious.async import _calculate_queue_rank - - tasks = 5 - executed = 10 - queue_stats = Mock() - queue_stats.tasks = tasks - queue_stats.executed_last_minute = executed - - expected = TASK_WEIGHT * tasks - EXECUTED_WEIGHT * executed - - actual = _calculate_queue_rank(queue_stats) - - self.assertEqual(actual, expected) diff --git a/furious/tests/test_batcher.py b/furious/tests/test_batcher.py index fa31859..306d6ed 100644 --- a/furious/tests/test_batcher.py +++ b/furious/tests/test_batcher.py @@ -81,10 +81,9 @@ def test_get_queue_group_queue(self, mock_select_queue): queue_group = 'foo-queue' queue_count = 5 - random = False message = Message(queue_group=queue_group, queue_count=queue_count, - random=random, default=MESSAGE_DEFAULT_QUEUE) + default=MESSAGE_DEFAULT_QUEUE) expected = '%s-2' % queue_group mock_select_queue.return_value = expected diff --git a/furious/tests/test_stubs/appengine/test_queues.py b/furious/tests/test_stubs/appengine/test_queues.py index 4c0c3c4..de6f685 100644 --- a/furious/tests/test_stubs/appengine/test_queues.py +++ b/furious/tests/test_stubs/appengine/test_queues.py @@ -469,7 +469,8 @@ def test_get_push_queue_names(self): names = get_push_queue_names(self.taskqueue_service) - self.assertEqual(names, ['default']) + self.assertEqual(names, ['default', 'workers-0', 'workers-1', + 'workers-2', 'workers-3']) def test_get_queue_names(self): """Ensure the correct queue names are returned from get_queue_names.""" @@ -478,7 +479,8 @@ def test_get_queue_names(self): names = get_queue_names(self.taskqueue_service) - self.assertEqual(names, ['default', 'default-pull']) + self.assertEqual(names, ['default', 'default-pull', 'workers-0', + 'workers-1', 'workers-2', 'workers-3']) @attr('slow') From 2c7febe470d0c07dfcf54c6b3d9eb84565a1ba2c Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 10:30:00 -0500 Subject: [PATCH 09/19] Update queue group demo. --- example/queue_group_intro.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/example/queue_group_intro.py b/example/queue_group_intro.py index 973e639..2409c9d 100644 --- a/example/queue_group_intro.py +++ b/example/queue_group_intro.py @@ -27,21 +27,14 @@ class QueueGroupIntroHandler(webapp2.RequestHandler): """Demonstrate the creation and insertion of 500 furious tasks to be run - in a queue group with optimal queue selection enabled.""" + in a queue group with random distribution.""" def get(self): - from furious.async import Async + from furious import context - random = self.request.get('random') - if random and random == '1': - random = True - else: - random = False - - for i in xrange(500): - async_task = Async(target=example_function, args=[i], - queue_group='workers', queue_count=4, - random=random) - async_task.start() + with context.new() as ctx: + for i in xrange(500): + ctx.add(target=example_function, args=[i], + queue_group='workers', queue_count=4) logging.info('500 Async jobs inserted.') @@ -52,6 +45,4 @@ def example_function(*args, **kwargs): """This function is called by furious tasks to demonstrate usage.""" logging.info('example_function executed with args: %r, kwargs: %r', args, kwargs) - import time - time.sleep(2) return args From 3d6f12a72d442fc694a5c8f95e141aab8eb1d85a Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 10:32:37 -0500 Subject: [PATCH 10/19] Remove unused queue weight constants. --- furious/async.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/furious/async.py b/furious/async.py index bb92631..6f42751 100644 --- a/furious/async.py +++ b/furious/async.py @@ -88,8 +88,6 @@ def run_me(*args, **kwargs): MAX_DEPTH = 100 MAX_RESTARTS = 10 DISABLE_RECURSION_CHECK = -1 -TASK_WEIGHT = 0.6 -EXECUTED_WEIGHT = 0.4 DEFAULT_RETRY_OPTIONS = { 'task_retry_limit': MAX_RESTARTS From ed97eef41012247ab7d28db4ac13937a441bbf7e Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 10:58:21 -0500 Subject: [PATCH 11/19] Add unit tests for thread local cache. --- furious/tests/test_async.py | 72 ++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 5 deletions(-) diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index 923c1a4..00d5716 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -918,18 +918,80 @@ def test_single_queue(self): self.assertEqual(actual, expected) - @patch('random.shuffle') - def test_random(self, mock_shuffle): - """Ensure that a random queue is selected from the group.""" + @patch('furious.async._get_from_cache') + def test_random_from_cache(self, mock_cache): + """Ensure that a random queue is selected from the group when there are + cached queues. + """ from furious.async import select_queue queue_group = 'foo-queue' queue_count = 5 + expected = '%s-4' % queue_group + + mock_cache.return_value = {queue_group: [expected]} actual = select_queue(queue_group, queue_count=queue_count) - self.assertEqual(actual, 'foo-queue-4') + self.assertEqual(actual, expected) + mock_cache.assert_called_once_with('queues', default={}) + + @patch('random.shuffle') + @patch('furious.async._get_from_cache') + def test_random_no_cache(self, mock_cache, mock_shuffle): + """Ensure that a random queue is selected from the group when there are + no cached queues. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + expected = '%s-4' % queue_group + + mock_cache.return_value = {} - # TODO: assert call args + actual = select_queue(queue_group, queue_count=queue_count) + + self.assertEqual(actual, expected) + mock_cache.assert_called_once_with('queues', default={}) self.assertTrue(mock_shuffle.called) + def test_get_from_cache_no_key(self): + """Ensure the default value is returned when the key is None.""" + from furious.async import _get_from_cache + + key = None + expected = 'default' + + actual = _get_from_cache(key, default=expected) + + self.assertEqual(actual, expected) + + def test_get_from_cache_present(self): + """Ensure the correct value is returned when the key is present.""" + from furious.async import _cache + from furious.async import _get_from_cache + + key = 'key' + expected = 'value' + setattr(_cache, key, expected) + + actual = _get_from_cache(key) + + self.assertEqual(actual, expected) + + def test_get_from_cache_not_present(self): + """Ensure the default value is returned and set when the key is not + present. + """ + from furious.async import _cache + from furious.async import _get_from_cache + + key = 'key' + expected = 'default' + + actual = _get_from_cache(key, default=expected) + + self.assertEqual(actual, expected) + self.assertTrue(hasattr(_cache, key)) + From 817601c8668006a7033347497127c7b98afec0e9 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 11:14:47 -0500 Subject: [PATCH 12/19] Remove unneeded check for when queue group has 1 queue. There's no reason to have this extra check since it will be handled without issue. --- furious/async.py | 3 --- furious/tests/test_async.py | 13 ------------- 2 files changed, 16 deletions(-) diff --git a/furious/async.py b/furious/async.py index 6f42751..8112a77 100644 --- a/furious/async.py +++ b/furious/async.py @@ -467,9 +467,6 @@ def select_queue(queue_group, queue_count=1, default=ASYNC_DEFAULT_QUEUE): if queue_count <= 0: raise Exception('Queue group must have at least 1 queue.') - if queue_count == 1: - return '%s-0' % queue_group - queue_lists = _get_from_cache('queues', default={}) group_queues = queue_lists.setdefault(queue_group, []) diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index 00d5716..a83d6df 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -905,19 +905,6 @@ def test_invalid_queue_count(self): self.assertEqual(context.exception.message, 'Queue group must have at least 1 queue.') - def test_single_queue(self): - """Ensure that if the queue group passed in has a single queue, that - queue is returned. - """ - from furious.async import select_queue - - queue_group = 'foo-queue' - expected = '%s-0' % queue_group - - actual = select_queue(queue_group) - - self.assertEqual(actual, expected) - @patch('furious.async._get_from_cache') def test_random_from_cache(self, mock_cache): """Ensure that a random queue is selected from the group when there are From 3a6f853d1003e1db84c9c088bfe3dffdcde8b250 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Wed, 7 Aug 2013 14:35:20 -0500 Subject: [PATCH 13/19] Use not check instead of size check on list. --- furious/async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/furious/async.py b/furious/async.py index 8112a77..354d91d 100644 --- a/furious/async.py +++ b/furious/async.py @@ -470,7 +470,7 @@ def select_queue(queue_group, queue_count=1, default=ASYNC_DEFAULT_QUEUE): queue_lists = _get_from_cache('queues', default={}) group_queues = queue_lists.setdefault(queue_group, []) - if len(group_queues) == 0: + if not group_queues: from random import shuffle group_queues.extend('%s-%d' % (queue_group, i) From e32b144b247889c379356a76921e582ad5b43ce8 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Tue, 20 Aug 2013 11:18:44 -0500 Subject: [PATCH 14/19] Move queue group code into new queues module The queues module will be used for any queue selection logic and dealing with queue groups. --- furious/async.py | 52 +-------------- furious/batcher.py | 5 +- furious/queues.py | 66 ++++++++++++++++++ furious/tests/test_async.py | 111 ------------------------------ furious/tests/test_queues.py | 126 +++++++++++++++++++++++++++++++++++ 5 files changed, 196 insertions(+), 164 deletions(-) create mode 100644 furious/queues.py create mode 100644 furious/tests/test_queues.py diff --git a/furious/async.py b/furious/async.py index 354d91d..f2eec40 100644 --- a/furious/async.py +++ b/furious/async.py @@ -77,6 +77,8 @@ def run_me(*args, **kwargs): from .job_utils import path_to_reference from .job_utils import reference_to_path +from .queues import select_queue + from . import errors @@ -453,56 +455,6 @@ def wrapper(*args, **kwargs): return real_decorator -def select_queue(queue_group, queue_count=1, default=ASYNC_DEFAULT_QUEUE): - """Select an optimal queue to run a task in from the given queue group. By - default, this simply randomly selects a queue from the group. - - TODO: leverage the taskqueue API to try and determine the best queue to - use via a kwarg. - """ - - if not queue_group: - return default - - if queue_count <= 0: - raise Exception('Queue group must have at least 1 queue.') - - queue_lists = _get_from_cache('queues', default={}) - group_queues = queue_lists.setdefault(queue_group, []) - - if not group_queues: - from random import shuffle - - group_queues.extend('%s-%d' % (queue_group, i) - for i in xrange(queue_count)) - - shuffle(group_queues) - - return group_queues.pop() - - -# Use a thread local cache to optimize performance and queue distribution. -from threading import local -_cache = local() - - -def _get_from_cache(key, default=None): - """Fetch the value for the given key from the thread local cache. If the - key does not exist, set it for the given default value and return the - default. - """ - - if not key: - return default - - if hasattr(_cache, key): - return getattr(_cache, key) - - setattr(_cache, key, default) - - return default - - def _check_options(options): """Make sure no one passes something not allowed in.""" if not options: diff --git a/furious/batcher.py b/furious/batcher.py index 8ab850f..a5df1a1 100644 --- a/furious/batcher.py +++ b/furious/batcher.py @@ -21,7 +21,7 @@ from google.appengine.api import memcache from .async import Async -from .async import select_queue +from .queues import select_queue MESSAGE_DEFAULT_QUEUE = 'default-pull' MESSAGE_PROCESSOR_NAME = 'processor' @@ -49,8 +49,7 @@ def get_queue(self): queue_group = self._options.get('queue_group') if queue_group: queue_count = self._options.get('queue_count', 1) - return select_queue(queue_group, queue_count=queue_count, - default=MESSAGE_DEFAULT_QUEUE) + return select_queue(queue_group, queue_count=queue_count) return self._options.get('queue', MESSAGE_DEFAULT_QUEUE) diff --git a/furious/queues.py b/furious/queues.py new file mode 100644 index 0000000..678f80f --- /dev/null +++ b/furious/queues.py @@ -0,0 +1,66 @@ +# +# Copyright 2012 WebFilings, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from random import shuffle +from threading import local + + +def select_queue(queue_group, queue_count=1): + """Select an optimal queue to run a task in from the given queue group. By + default, this simply randomly selects a queue from the group. + + TODO: leverage the taskqueue API to try and determine the best queue to + use via a kwarg. + """ + + if not queue_group: + return None + + if queue_count <= 0: + raise Exception('Queue group must have at least 1 queue.') + + queue_lists = _get_from_cache('queues', default={}) + group_queues = queue_lists.setdefault(queue_group, []) + + if not group_queues: + group_queues.extend('%s-%d' % (queue_group, i) + for i in xrange(queue_count)) + + shuffle(group_queues) + + return group_queues.pop() + + +# Use a thread local cache to optimize performance and queue distribution. +_cache = local() + + +def _get_from_cache(key, default=None): + """Fetch the value for the given key from the thread local cache. If the + key does not exist, set it for the given default value and return the + default. + """ + + if not key: + return default + + if hasattr(_cache, key): + return getattr(_cache, key) + + setattr(_cache, key, default) + + return default + diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index a83d6df..f969373 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -18,8 +18,6 @@ import unittest -from mock import call -from mock import Mock from mock import patch @@ -873,112 +871,3 @@ def test_has_type(self): self.assertIsInstance(result, MessageProcessor) - -class TestSelectQueue(unittest.TestCase): - """Ensure select_queue() works correctly.""" - - def setUp(self): - from threading import local - from furious import async - - async._cache = local() - - def test_none(self): - """Ensure that if the queue group is None, the default queue is - returned. - """ - from furious.async import ASYNC_DEFAULT_QUEUE - from furious.async import select_queue - - actual = select_queue(None) - - self.assertEqual(actual, ASYNC_DEFAULT_QUEUE) - - def test_invalid_queue_count(self): - """Ensure that an exception is raised when a bad queue count is given. - """ - from furious.async import select_queue - - with self.assertRaises(Exception) as context: - select_queue('foo-queue', queue_count=0) - - self.assertEqual(context.exception.message, - 'Queue group must have at least 1 queue.') - - @patch('furious.async._get_from_cache') - def test_random_from_cache(self, mock_cache): - """Ensure that a random queue is selected from the group when there are - cached queues. - """ - from furious.async import select_queue - - queue_group = 'foo-queue' - queue_count = 5 - expected = '%s-4' % queue_group - - mock_cache.return_value = {queue_group: [expected]} - - actual = select_queue(queue_group, queue_count=queue_count) - - self.assertEqual(actual, expected) - mock_cache.assert_called_once_with('queues', default={}) - - @patch('random.shuffle') - @patch('furious.async._get_from_cache') - def test_random_no_cache(self, mock_cache, mock_shuffle): - """Ensure that a random queue is selected from the group when there are - no cached queues. - """ - from furious.async import select_queue - - queue_group = 'foo-queue' - queue_count = 5 - expected = '%s-4' % queue_group - - mock_cache.return_value = {} - - actual = select_queue(queue_group, queue_count=queue_count) - - self.assertEqual(actual, expected) - mock_cache.assert_called_once_with('queues', default={}) - self.assertTrue(mock_shuffle.called) - - def test_get_from_cache_no_key(self): - """Ensure the default value is returned when the key is None.""" - from furious.async import _get_from_cache - - key = None - expected = 'default' - - actual = _get_from_cache(key, default=expected) - - self.assertEqual(actual, expected) - - def test_get_from_cache_present(self): - """Ensure the correct value is returned when the key is present.""" - from furious.async import _cache - from furious.async import _get_from_cache - - key = 'key' - expected = 'value' - setattr(_cache, key, expected) - - actual = _get_from_cache(key) - - self.assertEqual(actual, expected) - - def test_get_from_cache_not_present(self): - """Ensure the default value is returned and set when the key is not - present. - """ - from furious.async import _cache - from furious.async import _get_from_cache - - key = 'key' - expected = 'default' - - actual = _get_from_cache(key, default=expected) - - self.assertEqual(actual, expected) - self.assertTrue(hasattr(_cache, key)) - diff --git a/furious/tests/test_queues.py b/furious/tests/test_queues.py new file mode 100644 index 0000000..0804611 --- /dev/null +++ b/furious/tests/test_queues.py @@ -0,0 +1,126 @@ +# +# Copyright 2012 WebFilings, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from mock import patch + + +class TestSelectQueue(unittest.TestCase): + """Ensure select_queue() works correctly.""" + + def setUp(self): + from threading import local + from furious import async + + async._cache = local() + + def test_none(self): + """Ensure that None is returned when the queue group is None.""" + from furious.async import select_queue + + actual = select_queue(None) + + self.assertEqual(actual, None) + + def test_invalid_queue_count(self): + """Ensure that an exception is raised when a bad queue count is given. + """ + from furious.async import select_queue + + with self.assertRaises(Exception) as context: + select_queue('foo-queue', queue_count=0) + + self.assertEqual(context.exception.message, + 'Queue group must have at least 1 queue.') + + @patch('furious.queues._get_from_cache') + def test_random_from_cache(self, mock_cache): + """Ensure that a random queue is selected from the group when there are + cached queues. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + expected = '%s-4' % queue_group + + mock_cache.return_value = {queue_group: [expected]} + + actual = select_queue(queue_group, queue_count=queue_count) + + self.assertEqual(actual, expected) + mock_cache.assert_called_once_with('queues', default={}) + + @patch('furious.queues.shuffle') + @patch('furious.queues._get_from_cache') + def test_random_no_cache(self, mock_cache, mock_shuffle): + """Ensure that a random queue is selected from the group when there are + no cached queues. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + expected = '%s-4' % queue_group + + mock_cache.return_value = {} + + actual = select_queue(queue_group, queue_count=queue_count) + + self.assertEqual(actual, expected) + mock_cache.assert_called_once_with('queues', default={}) + self.assertTrue(mock_shuffle.called) + + def test_get_from_cache_no_key(self): + """Ensure the default value is returned when the key is None.""" + from furious.queues import _get_from_cache + + key = None + expected = 'default' + + actual = _get_from_cache(key, default=expected) + + self.assertEqual(actual, expected) + + def test_get_from_cache_present(self): + """Ensure the correct value is returned when the key is present.""" + from furious.queues import _cache + from furious.queues import _get_from_cache + + key = 'key' + expected = 'value' + setattr(_cache, key, expected) + + actual = _get_from_cache(key) + + self.assertEqual(actual, expected) + + def test_get_from_cache_not_present(self): + """Ensure the default value is returned and set when the key is not + present. + """ + from furious.queues import _cache + from furious.queues import _get_from_cache + + key = 'key' + expected = 'default' + + actual = _get_from_cache(key, default=expected) + + self.assertEqual(actual, expected) + self.assertTrue(hasattr(_cache, key)) + From e01f9ef3b246544e2b856531582e52a8a616c3cd Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Tue, 20 Aug 2013 12:30:13 -0500 Subject: [PATCH 15/19] Move queue counts to config The queue counts indicate the number of queues a queue group has. Rather than passing this information into select_queue, we will make in configurable in furious.yaml. --- furious/async.py | 3 +-- furious/batcher.py | 3 +-- furious/config.py | 3 ++- furious/queues.py | 9 ++++++++- furious/tests/test_config.py | 7 +++++-- furious/tests/test_queues.py | 26 ++++++++++++++++++++------ 6 files changed, 37 insertions(+), 14 deletions(-) diff --git a/furious/async.py b/furious/async.py index f2eec40..163e7ea 100644 --- a/furious/async.py +++ b/furious/async.py @@ -263,8 +263,7 @@ def get_queue(self): """Return the queue the task should run in.""" queue_group = self._options.get('queue_group') if queue_group: - queue_count = self._options.get('queue_count', 1) - return select_queue(queue_group, queue_count=queue_count) + return select_queue(queue_group) return self._options.get('queue', ASYNC_DEFAULT_QUEUE) diff --git a/furious/batcher.py b/furious/batcher.py index a5df1a1..27b2d80 100644 --- a/furious/batcher.py +++ b/furious/batcher.py @@ -48,8 +48,7 @@ def get_queue(self): """Return the queue the task should run in.""" queue_group = self._options.get('queue_group') if queue_group: - queue_count = self._options.get('queue_count', 1) - return select_queue(queue_group, queue_count=queue_count) + return select_queue(queue_group) return self._options.get('queue', MESSAGE_DEFAULT_QUEUE) diff --git a/furious/config.py b/furious/config.py index 476876e..818bfca 100644 --- a/furious/config.py +++ b/furious/config.py @@ -131,7 +131,8 @@ def default_config(): return {'secret_key': '931b8-i-f44330b4a5-am-3b9b733f-not-secure-043e96882', 'persistence': 'ndb', - 'task_system': 'appengine_taskqueue'} + 'task_system': 'appengine_taskqueue', + 'queue_counts': {}} def _load_yaml_config(path=None): diff --git a/furious/queues.py b/furious/queues.py index 678f80f..9f3cb1a 100644 --- a/furious/queues.py +++ b/furious/queues.py @@ -17,8 +17,12 @@ from random import shuffle from threading import local +from furious.config import get_config -def select_queue(queue_group, queue_count=1): +QUEUE_COUNTS = 'queue_counts' + + +def select_queue(queue_group): """Select an optimal queue to run a task in from the given queue group. By default, this simply randomly selects a queue from the group. @@ -29,6 +33,9 @@ def select_queue(queue_group, queue_count=1): if not queue_group: return None + queue_counts = get_config().get(QUEUE_COUNTS) + queue_count = queue_counts.get(queue_group, 0) + if queue_count <= 0: raise Exception('Queue group must have at least 1 queue.') diff --git a/furious/tests/test_config.py b/furious/tests/test_config.py index 45bf924..b3597fc 100644 --- a/furious/tests/test_config.py +++ b/furious/tests/test_config.py @@ -62,13 +62,16 @@ def test_get_config(self): example_yaml = str('secret_key: "blah"\n' 'persistence: bubble\n' - 'task_system: flah\n') + 'task_system: flah\n' + 'queue_counts:\n' + ' queue-group: 5') my_config = _parse_yaml_config(example_yaml) self.assertEqual(my_config, {'secret_key': 'blah', 'persistence': 'bubble', - 'task_system': 'flah'}) + 'task_system': 'flah', + 'queue_counts': {'queue-group': 5}}) def test_get_configured_persistence_exists(self): """Ensure a chosen persistence module is selected.""" diff --git a/furious/tests/test_queues.py b/furious/tests/test_queues.py index 0804611..9b5aad3 100644 --- a/furious/tests/test_queues.py +++ b/furious/tests/test_queues.py @@ -36,19 +36,26 @@ def test_none(self): self.assertEqual(actual, None) - def test_invalid_queue_count(self): + @patch('furious.queues.get_config') + def test_invalid_queue_count(self, mock_get_config): """Ensure that an exception is raised when a bad queue count is given. """ from furious.async import select_queue + queue_group = 'foo-queue' + + mock_get_config.return_value = {'queue_counts': + {queue_group: 0}} + with self.assertRaises(Exception) as context: - select_queue('foo-queue', queue_count=0) + select_queue(queue_group) self.assertEqual(context.exception.message, 'Queue group must have at least 1 queue.') + @patch('furious.queues.get_config') @patch('furious.queues._get_from_cache') - def test_random_from_cache(self, mock_cache): + def test_random_from_cache(self, mock_cache, mock_get_config): """Ensure that a random queue is selected from the group when there are cached queues. """ @@ -58,16 +65,20 @@ def test_random_from_cache(self, mock_cache): queue_count = 5 expected = '%s-4' % queue_group + mock_get_config.return_value = {'queue_counts': + {queue_group: queue_count}} + mock_cache.return_value = {queue_group: [expected]} - actual = select_queue(queue_group, queue_count=queue_count) + actual = select_queue(queue_group) self.assertEqual(actual, expected) mock_cache.assert_called_once_with('queues', default={}) + @patch('furious.queues.get_config') @patch('furious.queues.shuffle') @patch('furious.queues._get_from_cache') - def test_random_no_cache(self, mock_cache, mock_shuffle): + def test_random_no_cache(self, mock_cache, mock_shuffle, mock_get_config): """Ensure that a random queue is selected from the group when there are no cached queues. """ @@ -77,9 +88,12 @@ def test_random_no_cache(self, mock_cache, mock_shuffle): queue_count = 5 expected = '%s-4' % queue_group + mock_get_config.return_value = {'queue_counts': + {queue_group: queue_count}} + mock_cache.return_value = {} - actual = select_queue(queue_group, queue_count=queue_count) + actual = select_queue(queue_group) self.assertEqual(actual, expected) mock_cache.assert_called_once_with('queues', default={}) From f643555d8ab4df496265013d44ab72c9d199dab3 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Tue, 20 Aug 2013 13:35:29 -0500 Subject: [PATCH 16/19] Correct queue group intro example We are no longer passing in queue counts. --- example/queue_group_intro.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/queue_group_intro.py b/example/queue_group_intro.py index 2409c9d..f2194a9 100644 --- a/example/queue_group_intro.py +++ b/example/queue_group_intro.py @@ -34,7 +34,7 @@ def get(self): with context.new() as ctx: for i in xrange(500): ctx.add(target=example_function, args=[i], - queue_group='workers', queue_count=4) + queue_group='workers') logging.info('500 Async jobs inserted.') From e853d67e30c382dcdf0bab158e415f1ea3978822 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 26 Aug 2013 16:05:18 -0500 Subject: [PATCH 17/19] Check to ensure queue_counts property is a dict This adds an additional check in select_queue to ensure the configured queue_counts property is a dictionary. --- furious/queues.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/furious/queues.py b/furious/queues.py index 9f3cb1a..31ba77a 100644 --- a/furious/queues.py +++ b/furious/queues.py @@ -34,9 +34,12 @@ def select_queue(queue_group): return None queue_counts = get_config().get(QUEUE_COUNTS) + if not isinstance(queue_counts, dict): + raise Exception('%s config property must be a dict.' % QUEUE_COUNTS) + queue_count = queue_counts.get(queue_group, 0) - if queue_count <= 0: + if not isinstance(queue_count, int) or queue_count <= 0: raise Exception('Queue group must have at least 1 queue.') queue_lists = _get_from_cache('queues', default={}) From 45cda54a9d23267f8a276416e8ae290432da5e72 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 26 Aug 2013 16:06:31 -0500 Subject: [PATCH 18/19] Use local context instead of creating a new thread local Piggyback off of _local.get_local_context() to store queue group caching instead of creating a separate thread local variable. --- furious/queues.py | 16 +++++++--------- furious/tests/test_queues.py | 37 +++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/furious/queues.py b/furious/queues.py index 31ba77a..8381e6c 100644 --- a/furious/queues.py +++ b/furious/queues.py @@ -15,7 +15,6 @@ # from random import shuffle -from threading import local from furious.config import get_config @@ -54,23 +53,22 @@ def select_queue(queue_group): return group_queues.pop() -# Use a thread local cache to optimize performance and queue distribution. -_cache = local() - - def _get_from_cache(key, default=None): - """Fetch the value for the given key from the thread local cache. If the + """Fetch the value for the given key from the thread local context. If the key does not exist, set it for the given default value and return the default. """ + from furious.context._local import get_local_context if not key: return default - if hasattr(_cache, key): - return getattr(_cache, key) + local_context = get_local_context() + + if hasattr(local_context, key): + return getattr(local_context, key) - setattr(_cache, key, default) + setattr(local_context, key, default) return default diff --git a/furious/tests/test_queues.py b/furious/tests/test_queues.py index 9b5aad3..f5bfaf2 100644 --- a/furious/tests/test_queues.py +++ b/furious/tests/test_queues.py @@ -18,15 +18,15 @@ from mock import patch +from nose.plugins.attrib import attr + class TestSelectQueue(unittest.TestCase): """Ensure select_queue() works correctly.""" - def setUp(self): - from threading import local - from furious import async - - async._cache = local() + def tearDown(self): + from furious.context import _local + _local._clear_context() def test_none(self): """Ensure that None is returned when the queue group is None.""" @@ -53,6 +53,23 @@ def test_invalid_queue_count(self, mock_get_config): self.assertEqual(context.exception.message, 'Queue group must have at least 1 queue.') + @patch('furious.queues.get_config') + def test_invalid_queue_count_config(self, mock_get_config): + """Ensure that an exception is raised when a bad queue count config is + provided. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + + mock_get_config.return_value = {'queue_counts': 'woops'} + + with self.assertRaises(Exception) as context: + select_queue(queue_group) + + self.assertEqual(context.exception.message, + 'queue_counts config property must be a dict.') + @patch('furious.queues.get_config') @patch('furious.queues._get_from_cache') def test_random_from_cache(self, mock_cache, mock_get_config): @@ -110,24 +127,26 @@ def test_get_from_cache_no_key(self): self.assertEqual(actual, expected) + @attr('slow') def test_get_from_cache_present(self): """Ensure the correct value is returned when the key is present.""" - from furious.queues import _cache + from furious.context._local import get_local_context from furious.queues import _get_from_cache key = 'key' expected = 'value' - setattr(_cache, key, expected) + setattr(get_local_context(), key, expected) actual = _get_from_cache(key) self.assertEqual(actual, expected) + @attr('slow') def test_get_from_cache_not_present(self): """Ensure the default value is returned and set when the key is not present. """ - from furious.queues import _cache + from furious.context._local import get_local_context from furious.queues import _get_from_cache key = 'key' @@ -136,5 +155,5 @@ def test_get_from_cache_not_present(self): actual = _get_from_cache(key, default=expected) self.assertEqual(actual, expected) - self.assertTrue(hasattr(_cache, key)) + self.assertTrue(hasattr(get_local_context(), key)) From b445a68ee2905deff4b3419427b56add9ba59297 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 26 Aug 2013 16:07:35 -0500 Subject: [PATCH 19/19] Add test case for when queue_counts is not specified --- furious/tests/test_config.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/furious/tests/test_config.py b/furious/tests/test_config.py index b3597fc..dc8f32c 100644 --- a/furious/tests/test_config.py +++ b/furious/tests/test_config.py @@ -73,6 +73,23 @@ def test_get_config(self): 'task_system': 'flah', 'queue_counts': {'queue-group': 5}}) + def test_get_config_no_queue_counts(self): + """Ensure a config contents without queue_counts specified produces the + expected dictionary. + """ + from furious.config import _parse_yaml_config + + example_yaml = str('secret_key: "blah"\n' + 'persistence: bubble\n' + 'task_system: flah') + + my_config = _parse_yaml_config(example_yaml) + + self.assertEqual(my_config, {'secret_key': 'blah', + 'persistence': 'bubble', + 'task_system': 'flah', + 'queue_counts': {}}) + def test_get_configured_persistence_exists(self): """Ensure a chosen persistence module is selected.""" from furious.config import _parse_yaml_config