diff --git a/example/__init__.py b/example/__init__.py index 1d35440..3415285 100644 --- a/example/__init__.py +++ b/example/__init__.py @@ -35,6 +35,7 @@ from .context_intro import ContextIntroHandler from .grep import GrepHandler from .simple_workflow import SimpleWorkflowHandler +from .queue_group_intro import QueueGroupIntroHandler config = { 'webapp2_extras.jinja2': { @@ -55,5 +56,6 @@ ('/batcher/run', BatcherHandler), ('/batcher/stats', BatcherStatsHandler), ('/grep', GrepHandler), + ('/queue_group', QueueGroupIntroHandler) ], config=config) diff --git a/example/queue_group_intro.py b/example/queue_group_intro.py new file mode 100644 index 0000000..f2194a9 --- /dev/null +++ b/example/queue_group_intro.py @@ -0,0 +1,48 @@ +# +# Copyright 2013 WebFilings, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A very basic async example using a queue group. + +This will insert 500 async tasks to run the example_function in a queue +group. +""" + +import logging + +import webapp2 + + +class QueueGroupIntroHandler(webapp2.RequestHandler): + """Demonstrate the creation and insertion of 500 furious tasks to be run + in a queue group with random distribution.""" + def get(self): + from furious import context + + with context.new() as ctx: + for i in xrange(500): + ctx.add(target=example_function, args=[i], + queue_group='workers') + + logging.info('500 Async jobs inserted.') + + self.response.out.write('Successfully inserted 500 Async jobs.') + + +def example_function(*args, **kwargs): + """This function is called by furious tasks to demonstrate usage.""" + logging.info('example_function executed with args: %r, kwargs: %r', + args, kwargs) + return args diff --git a/furious/async.py b/furious/async.py index 8cee01f..163e7ea 100644 --- a/furious/async.py +++ b/furious/async.py @@ -77,6 +77,8 @@ def run_me(*args, **kwargs): from .job_utils import path_to_reference from .job_utils import reference_to_path +from .queues import select_queue + from . import errors @@ -259,6 +261,10 @@ def get_headers(self): def get_queue(self): """Return the queue the task should run in.""" + queue_group = self._options.get('queue_group') + if queue_group: + return select_queue(queue_group) + return self._options.get('queue', ASYNC_DEFAULT_QUEUE) def get_task_args(self): diff --git a/furious/batcher.py b/furious/batcher.py index 8835af2..7942b38 100644 --- a/furious/batcher.py +++ b/furious/batcher.py @@ -22,6 +22,7 @@ from google.appengine.runtime.apiproxy_errors import DeadlineExceededError from .async import Async +from .queues import select_queue MESSAGE_DEFAULT_QUEUE = 'default-pull' MESSAGE_PROCESSOR_NAME = 'processor' @@ -46,6 +47,10 @@ def update_options(self, **options): def get_queue(self): """Return the queue the task should run in.""" + queue_group = self._options.get('queue_group') + if queue_group: + return select_queue(queue_group) + return self._options.get('queue', MESSAGE_DEFAULT_QUEUE) def get_task_args(self): diff --git a/furious/config.py b/furious/config.py index 476876e..818bfca 100644 --- a/furious/config.py +++ b/furious/config.py @@ -131,7 +131,8 @@ def default_config(): return {'secret_key': '931b8-i-f44330b4a5-am-3b9b733f-not-secure-043e96882', 'persistence': 'ndb', - 'task_system': 'appengine_taskqueue'} + 'task_system': 'appengine_taskqueue', + 'queue_counts': {}} def _load_yaml_config(path=None): diff --git a/furious/queues.py b/furious/queues.py new file mode 100644 index 0000000..8381e6c --- /dev/null +++ b/furious/queues.py @@ -0,0 +1,74 @@ +# +# Copyright 2012 WebFilings, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from random import shuffle + +from furious.config import get_config + +QUEUE_COUNTS = 'queue_counts' + + +def select_queue(queue_group): + """Select an optimal queue to run a task in from the given queue group. By + default, this simply randomly selects a queue from the group. + + TODO: leverage the taskqueue API to try and determine the best queue to + use via a kwarg. + """ + + if not queue_group: + return None + + queue_counts = get_config().get(QUEUE_COUNTS) + if not isinstance(queue_counts, dict): + raise Exception('%s config property must be a dict.' % QUEUE_COUNTS) + + queue_count = queue_counts.get(queue_group, 0) + + if not isinstance(queue_count, int) or queue_count <= 0: + raise Exception('Queue group must have at least 1 queue.') + + queue_lists = _get_from_cache('queues', default={}) + group_queues = queue_lists.setdefault(queue_group, []) + + if not group_queues: + group_queues.extend('%s-%d' % (queue_group, i) + for i in xrange(queue_count)) + + shuffle(group_queues) + + return group_queues.pop() + + +def _get_from_cache(key, default=None): + """Fetch the value for the given key from the thread local context. If the + key does not exist, set it for the given default value and return the + default. + """ + from furious.context._local import get_local_context + + if not key: + return default + + local_context = get_local_context() + + if hasattr(local_context, key): + return getattr(local_context, key) + + setattr(local_context, key, default) + + return default + diff --git a/furious/tests/test_async.py b/furious/tests/test_async.py index 036a82b..f969373 100644 --- a/furious/tests/test_async.py +++ b/furious/tests/test_async.py @@ -327,6 +327,25 @@ def test_get_default_queue(self): self.assertEqual('default', job.get_queue()) + @patch('furious.async.select_queue') + def test_get_queue_group_queue(self, mock_select_queue): + """Ensure get_queue calls through to select_queue when a queue group + is given. + """ + from furious.async import Async + from furious.async import ASYNC_DEFAULT_QUEUE + + queue_group = 'foo-queue' + queue_count = 5 + + job = Async('nonexistant', queue_group=queue_group, + queue_count=queue_count, default=ASYNC_DEFAULT_QUEUE) + + expected = '%s-2' % queue_group + mock_select_queue.return_value = expected + + self.assertEqual(expected, job.get_queue()) + def test_get_task_args(self): """Ensure get_task_args returns the job task_args.""" from furious.async import Async diff --git a/furious/tests/test_batcher.py b/furious/tests/test_batcher.py index 7185a38..c5084c4 100644 --- a/furious/tests/test_batcher.py +++ b/furious/tests/test_batcher.py @@ -71,6 +71,25 @@ def test_get_default_queue(self): self.assertEqual('default-pull', message.get_queue()) + @patch('furious.batcher.select_queue') + def test_get_queue_group_queue(self, mock_select_queue): + """Ensure get_queue calls through to select_queue when a queue group + is given. + """ + from furious.batcher import Message + from furious.batcher import MESSAGE_DEFAULT_QUEUE + + queue_group = 'foo-queue' + queue_count = 5 + + message = Message(queue_group=queue_group, queue_count=queue_count, + default=MESSAGE_DEFAULT_QUEUE) + + expected = '%s-2' % queue_group + mock_select_queue.return_value = expected + + self.assertEqual(expected, message.get_queue()) + def test_get_task_args(self): """Ensure get_task_args returns the message task_args.""" from furious.batcher import Message diff --git a/furious/tests/test_config.py b/furious/tests/test_config.py index 45bf924..dc8f32c 100644 --- a/furious/tests/test_config.py +++ b/furious/tests/test_config.py @@ -62,13 +62,33 @@ def test_get_config(self): example_yaml = str('secret_key: "blah"\n' 'persistence: bubble\n' - 'task_system: flah\n') + 'task_system: flah\n' + 'queue_counts:\n' + ' queue-group: 5') + + my_config = _parse_yaml_config(example_yaml) + + self.assertEqual(my_config, {'secret_key': 'blah', + 'persistence': 'bubble', + 'task_system': 'flah', + 'queue_counts': {'queue-group': 5}}) + + def test_get_config_no_queue_counts(self): + """Ensure a config contents without queue_counts specified produces the + expected dictionary. + """ + from furious.config import _parse_yaml_config + + example_yaml = str('secret_key: "blah"\n' + 'persistence: bubble\n' + 'task_system: flah') my_config = _parse_yaml_config(example_yaml) self.assertEqual(my_config, {'secret_key': 'blah', 'persistence': 'bubble', - 'task_system': 'flah'}) + 'task_system': 'flah', + 'queue_counts': {}}) def test_get_configured_persistence_exists(self): """Ensure a chosen persistence module is selected.""" diff --git a/furious/tests/test_queues.py b/furious/tests/test_queues.py new file mode 100644 index 0000000..f5bfaf2 --- /dev/null +++ b/furious/tests/test_queues.py @@ -0,0 +1,159 @@ +# +# Copyright 2012 WebFilings, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from mock import patch + +from nose.plugins.attrib import attr + + +class TestSelectQueue(unittest.TestCase): + """Ensure select_queue() works correctly.""" + + def tearDown(self): + from furious.context import _local + _local._clear_context() + + def test_none(self): + """Ensure that None is returned when the queue group is None.""" + from furious.async import select_queue + + actual = select_queue(None) + + self.assertEqual(actual, None) + + @patch('furious.queues.get_config') + def test_invalid_queue_count(self, mock_get_config): + """Ensure that an exception is raised when a bad queue count is given. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + + mock_get_config.return_value = {'queue_counts': + {queue_group: 0}} + + with self.assertRaises(Exception) as context: + select_queue(queue_group) + + self.assertEqual(context.exception.message, + 'Queue group must have at least 1 queue.') + + @patch('furious.queues.get_config') + def test_invalid_queue_count_config(self, mock_get_config): + """Ensure that an exception is raised when a bad queue count config is + provided. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + + mock_get_config.return_value = {'queue_counts': 'woops'} + + with self.assertRaises(Exception) as context: + select_queue(queue_group) + + self.assertEqual(context.exception.message, + 'queue_counts config property must be a dict.') + + @patch('furious.queues.get_config') + @patch('furious.queues._get_from_cache') + def test_random_from_cache(self, mock_cache, mock_get_config): + """Ensure that a random queue is selected from the group when there are + cached queues. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + expected = '%s-4' % queue_group + + mock_get_config.return_value = {'queue_counts': + {queue_group: queue_count}} + + mock_cache.return_value = {queue_group: [expected]} + + actual = select_queue(queue_group) + + self.assertEqual(actual, expected) + mock_cache.assert_called_once_with('queues', default={}) + + @patch('furious.queues.get_config') + @patch('furious.queues.shuffle') + @patch('furious.queues._get_from_cache') + def test_random_no_cache(self, mock_cache, mock_shuffle, mock_get_config): + """Ensure that a random queue is selected from the group when there are + no cached queues. + """ + from furious.async import select_queue + + queue_group = 'foo-queue' + queue_count = 5 + expected = '%s-4' % queue_group + + mock_get_config.return_value = {'queue_counts': + {queue_group: queue_count}} + + mock_cache.return_value = {} + + actual = select_queue(queue_group) + + self.assertEqual(actual, expected) + mock_cache.assert_called_once_with('queues', default={}) + self.assertTrue(mock_shuffle.called) + + def test_get_from_cache_no_key(self): + """Ensure the default value is returned when the key is None.""" + from furious.queues import _get_from_cache + + key = None + expected = 'default' + + actual = _get_from_cache(key, default=expected) + + self.assertEqual(actual, expected) + + @attr('slow') + def test_get_from_cache_present(self): + """Ensure the correct value is returned when the key is present.""" + from furious.context._local import get_local_context + from furious.queues import _get_from_cache + + key = 'key' + expected = 'value' + setattr(get_local_context(), key, expected) + + actual = _get_from_cache(key) + + self.assertEqual(actual, expected) + + @attr('slow') + def test_get_from_cache_not_present(self): + """Ensure the default value is returned and set when the key is not + present. + """ + from furious.context._local import get_local_context + from furious.queues import _get_from_cache + + key = 'key' + expected = 'default' + + actual = _get_from_cache(key, default=expected) + + self.assertEqual(actual, expected) + self.assertTrue(hasattr(get_local_context(), key)) + diff --git a/furious/tests/test_stubs/appengine/test_queues.py b/furious/tests/test_stubs/appengine/test_queues.py index 4c0c3c4..de6f685 100644 --- a/furious/tests/test_stubs/appengine/test_queues.py +++ b/furious/tests/test_stubs/appengine/test_queues.py @@ -469,7 +469,8 @@ def test_get_push_queue_names(self): names = get_push_queue_names(self.taskqueue_service) - self.assertEqual(names, ['default']) + self.assertEqual(names, ['default', 'workers-0', 'workers-1', + 'workers-2', 'workers-3']) def test_get_queue_names(self): """Ensure the correct queue names are returned from get_queue_names.""" @@ -478,7 +479,8 @@ def test_get_queue_names(self): names = get_queue_names(self.taskqueue_service) - self.assertEqual(names, ['default', 'default-pull']) + self.assertEqual(names, ['default', 'default-pull', 'workers-0', + 'workers-1', 'workers-2', 'workers-3']) @attr('slow') diff --git a/queue.yaml b/queue.yaml index cdc630f..3c388db 100644 --- a/queue.yaml +++ b/queue.yaml @@ -4,6 +4,23 @@ queue: rate: 100/s bucket_size: 100 +# PUSH QUEUE GROUP +- name: workers-0 + rate: 100/s + bucket_size: 100 + +- name: workers-1 + rate: 100/s + bucket_size: 100 + +- name: workers-2 + rate: 100/s + bucket_size: 100 + +- name: workers-3 + rate: 100/s + bucket_size: 100 + # PULL QUEUES - name: default-pull mode: pull