Skip to content
This repository was archived by the owner on Apr 30, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
13e6ecc
Add select_queue function for queue group support.
tylertreat-wf Aug 5, 2013
772f5a6
Wire up select_queue with Async's get_queue.
tylertreat-wf Aug 5, 2013
f68eac2
Add queue group support to Message.
tylertreat-wf Aug 5, 2013
bd77828
Correct documentation.
tylertreat-wf Aug 5, 2013
a16d822
Add default queue for queue group selection.
tylertreat-wf Aug 5, 2013
1697364
Use memcache to improve queue selection performance.
tylertreat-wf Aug 7, 2013
c1827f9
Add queue group example.
tylertreat-wf Aug 7, 2013
4973713
Remove optimal queue group selection.
tylertreat-wf Aug 7, 2013
2c7febe
Update queue group demo.
tylertreat-wf Aug 7, 2013
3d6f12a
Remove unused queue weight constants.
tylertreat-wf Aug 7, 2013
ed97eef
Add unit tests for thread local cache.
tylertreat-wf Aug 7, 2013
817601c
Remove unneeded check for when queue group has 1 queue.
tylertreat-wf Aug 7, 2013
3a6f853
Use not check instead of size check on list.
tylertreat-wf Aug 7, 2013
e32b144
Move queue group code into new queues module
tylertreat-wf Aug 20, 2013
e01f9ef
Move queue counts to config
tylertreat-wf Aug 20, 2013
f643555
Correct queue group intro example
tylertreat-wf Aug 20, 2013
53fbe0f
Merge branch 'master' of https://github.com/WebFilings/furious into q…
tylertreat-wf Aug 26, 2013
e853d67
Check to ensure queue_counts property is a dict
tylertreat-wf Aug 26, 2013
45cda54
Use local context instead of creating a new thread local
tylertreat-wf Aug 26, 2013
b445a68
Add test case for when queue_counts is not specified
tylertreat-wf Aug 26, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .context_intro import ContextIntroHandler
from .grep import GrepHandler
from .simple_workflow import SimpleWorkflowHandler
from .queue_group_intro import QueueGroupIntroHandler

config = {
'webapp2_extras.jinja2': {
Expand All @@ -55,5 +56,6 @@
('/batcher/run', BatcherHandler),
('/batcher/stats', BatcherStatsHandler),
('/grep', GrepHandler),
('/queue_group', QueueGroupIntroHandler)
], config=config)

48 changes: 48 additions & 0 deletions example/queue_group_intro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright 2013 WebFilings, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A very basic async example using a queue group.

This will insert 500 async tasks to run the example_function in a queue
group.
"""

import logging

import webapp2


class QueueGroupIntroHandler(webapp2.RequestHandler):
"""Demonstrate the creation and insertion of 500 furious tasks to be run
in a queue group with random distribution."""
def get(self):
from furious import context

with context.new() as ctx:
for i in xrange(500):
ctx.add(target=example_function, args=[i],
queue_group='workers')

logging.info('500 Async jobs inserted.')

self.response.out.write('Successfully inserted 500 Async jobs.')


def example_function(*args, **kwargs):
"""This function is called by furious tasks to demonstrate usage."""
logging.info('example_function executed with args: %r, kwargs: %r',
args, kwargs)
return args
6 changes: 6 additions & 0 deletions furious/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def run_me(*args, **kwargs):
from .job_utils import path_to_reference
from .job_utils import reference_to_path

from .queues import select_queue

from . import errors


Expand Down Expand Up @@ -259,6 +261,10 @@ def get_headers(self):

def get_queue(self):
"""Return the queue the task should run in."""
queue_group = self._options.get('queue_group')
if queue_group:
return select_queue(queue_group)

return self._options.get('queue', ASYNC_DEFAULT_QUEUE)

def get_task_args(self):
Expand Down
5 changes: 5 additions & 0 deletions furious/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.appengine.runtime.apiproxy_errors import DeadlineExceededError

from .async import Async
from .queues import select_queue

MESSAGE_DEFAULT_QUEUE = 'default-pull'
MESSAGE_PROCESSOR_NAME = 'processor'
Expand All @@ -46,6 +47,10 @@ def update_options(self, **options):

def get_queue(self):
"""Return the queue the task should run in."""
queue_group = self._options.get('queue_group')
if queue_group:
return select_queue(queue_group)

return self._options.get('queue', MESSAGE_DEFAULT_QUEUE)

def get_task_args(self):
Expand Down
3 changes: 2 additions & 1 deletion furious/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def default_config():
return {'secret_key':
'931b8-i-f44330b4a5-am-3b9b733f-not-secure-043e96882',
'persistence': 'ndb',
'task_system': 'appengine_taskqueue'}
'task_system': 'appengine_taskqueue',
'queue_counts': {}}


def _load_yaml_config(path=None):
Expand Down
74 changes: 74 additions & 0 deletions furious/queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright 2012 WebFilings, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from random import shuffle

from furious.config import get_config

QUEUE_COUNTS = 'queue_counts'


def select_queue(queue_group):
"""Select an optimal queue to run a task in from the given queue group. By
default, this simply randomly selects a queue from the group.

TODO: leverage the taskqueue API to try and determine the best queue to
use via a kwarg.
"""

if not queue_group:
return None

queue_counts = get_config().get(QUEUE_COUNTS)
if not isinstance(queue_counts, dict):
raise Exception('%s config property must be a dict.' % QUEUE_COUNTS)

queue_count = queue_counts.get(queue_group, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say if we're splitting this into two lines, we may as well test that queue_counts is in fact a dict as well. Otherwise this could blow up. Having a meaningful message as to why the exception is raised would be good.


if not isinstance(queue_count, int) or queue_count <= 0:
raise Exception('Queue group must have at least 1 queue.')

queue_lists = _get_from_cache('queues', default={})
group_queues = queue_lists.setdefault(queue_group, [])

if not group_queues:
group_queues.extend('%s-%d' % (queue_group, i)
for i in xrange(queue_count))

shuffle(group_queues)

return group_queues.pop()


def _get_from_cache(key, default=None):
"""Fetch the value for the given key from the thread local context. If the
key does not exist, set it for the given default value and return the
default.
"""
from furious.context._local import get_local_context

if not key:
return default

local_context = get_local_context()

if hasattr(local_context, key):
return getattr(local_context, key)

setattr(local_context, key, default)

return default

19 changes: 19 additions & 0 deletions furious/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,25 @@ def test_get_default_queue(self):

self.assertEqual('default', job.get_queue())

@patch('furious.async.select_queue')
def test_get_queue_group_queue(self, mock_select_queue):
"""Ensure get_queue calls through to select_queue when a queue group
is given.
"""
from furious.async import Async
from furious.async import ASYNC_DEFAULT_QUEUE

queue_group = 'foo-queue'
queue_count = 5

job = Async('nonexistant', queue_group=queue_group,
queue_count=queue_count, default=ASYNC_DEFAULT_QUEUE)

expected = '%s-2' % queue_group
mock_select_queue.return_value = expected

self.assertEqual(expected, job.get_queue())

def test_get_task_args(self):
"""Ensure get_task_args returns the job task_args."""
from furious.async import Async
Expand Down
19 changes: 19 additions & 0 deletions furious/tests/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ def test_get_default_queue(self):

self.assertEqual('default-pull', message.get_queue())

@patch('furious.batcher.select_queue')
def test_get_queue_group_queue(self, mock_select_queue):
"""Ensure get_queue calls through to select_queue when a queue group
is given.
"""
from furious.batcher import Message
from furious.batcher import MESSAGE_DEFAULT_QUEUE

queue_group = 'foo-queue'
queue_count = 5

message = Message(queue_group=queue_group, queue_count=queue_count,
default=MESSAGE_DEFAULT_QUEUE)

expected = '%s-2' % queue_group
mock_select_queue.return_value = expected

self.assertEqual(expected, message.get_queue())

def test_get_task_args(self):
"""Ensure get_task_args returns the message task_args."""
from furious.batcher import Message
Expand Down
24 changes: 22 additions & 2 deletions furious/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,33 @@ def test_get_config(self):

example_yaml = str('secret_key: "blah"\n'
'persistence: bubble\n'
'task_system: flah\n')
'task_system: flah\n'
'queue_counts:\n'
' queue-group: 5')

my_config = _parse_yaml_config(example_yaml)

self.assertEqual(my_config, {'secret_key': 'blah',
'persistence': 'bubble',
'task_system': 'flah',
'queue_counts': {'queue-group': 5}})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding another test where you check for the expected result when queue_counts is not specified.


def test_get_config_no_queue_counts(self):
"""Ensure a config contents without queue_counts specified produces the
expected dictionary.
"""
from furious.config import _parse_yaml_config

example_yaml = str('secret_key: "blah"\n'
'persistence: bubble\n'
'task_system: flah')

my_config = _parse_yaml_config(example_yaml)

self.assertEqual(my_config, {'secret_key': 'blah',
'persistence': 'bubble',
'task_system': 'flah'})
'task_system': 'flah',
'queue_counts': {}})

def test_get_configured_persistence_exists(self):
"""Ensure a chosen persistence module is selected."""
Expand Down
Loading