This repository was archived by the owner on Apr 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 35
Queue groups with random distribution #102
Open
tylertreat-wf
wants to merge
20
commits into
Workiva:master
Choose a base branch
from
tylertreat-wf:queue-groups-random
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
13e6ecc
Add select_queue function for queue group support.
tylertreat-wf 772f5a6
Wire up select_queue with Async's get_queue.
tylertreat-wf f68eac2
Add queue group support to Message.
tylertreat-wf bd77828
Correct documentation.
tylertreat-wf a16d822
Add default queue for queue group selection.
tylertreat-wf 1697364
Use memcache to improve queue selection performance.
tylertreat-wf c1827f9
Add queue group example.
tylertreat-wf 4973713
Remove optimal queue group selection.
tylertreat-wf 2c7febe
Update queue group demo.
tylertreat-wf 3d6f12a
Remove unused queue weight constants.
tylertreat-wf ed97eef
Add unit tests for thread local cache.
tylertreat-wf 817601c
Remove unneeded check for when queue group has 1 queue.
tylertreat-wf 3a6f853
Use not check instead of size check on list.
tylertreat-wf e32b144
Move queue group code into new queues module
tylertreat-wf e01f9ef
Move queue counts to config
tylertreat-wf f643555
Correct queue group intro example
tylertreat-wf 53fbe0f
Merge branch 'master' of https://github.com/WebFilings/furious into q…
tylertreat-wf e853d67
Check to ensure queue_counts property is a dict
tylertreat-wf 45cda54
Use local context instead of creating a new thread local
tylertreat-wf b445a68
Add test case for when queue_counts is not specified
tylertreat-wf File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| # | ||
| # Copyright 2013 WebFilings, LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| """A very basic async example using a queue group. | ||
|
|
||
| This will insert 500 async tasks to run the example_function in a queue | ||
| group. | ||
| """ | ||
|
|
||
| import logging | ||
|
|
||
| import webapp2 | ||
|
|
||
|
|
||
| class QueueGroupIntroHandler(webapp2.RequestHandler): | ||
| """Demonstrate the creation and insertion of 500 furious tasks to be run | ||
| in a queue group with random distribution.""" | ||
| def get(self): | ||
| from furious import context | ||
|
|
||
| with context.new() as ctx: | ||
| for i in xrange(500): | ||
| ctx.add(target=example_function, args=[i], | ||
| queue_group='workers') | ||
|
|
||
| logging.info('500 Async jobs inserted.') | ||
|
|
||
| self.response.out.write('Successfully inserted 500 Async jobs.') | ||
|
|
||
|
|
||
| def example_function(*args, **kwargs): | ||
| """This function is called by furious tasks to demonstrate usage.""" | ||
| logging.info('example_function executed with args: %r, kwargs: %r', | ||
| args, kwargs) | ||
| return args |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| # | ||
| # Copyright 2012 WebFilings, LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| from random import shuffle | ||
|
|
||
| from furious.config import get_config | ||
|
|
||
| QUEUE_COUNTS = 'queue_counts' | ||
|
|
||
|
|
||
| def select_queue(queue_group): | ||
| """Select an optimal queue to run a task in from the given queue group. By | ||
| default, this simply randomly selects a queue from the group. | ||
|
|
||
| TODO: leverage the taskqueue API to try and determine the best queue to | ||
| use via a kwarg. | ||
| """ | ||
|
|
||
| if not queue_group: | ||
| return None | ||
|
|
||
| queue_counts = get_config().get(QUEUE_COUNTS) | ||
| if not isinstance(queue_counts, dict): | ||
| raise Exception('%s config property must be a dict.' % QUEUE_COUNTS) | ||
|
|
||
| queue_count = queue_counts.get(queue_group, 0) | ||
|
|
||
| if not isinstance(queue_count, int) or queue_count <= 0: | ||
| raise Exception('Queue group must have at least 1 queue.') | ||
|
|
||
| queue_lists = _get_from_cache('queues', default={}) | ||
| group_queues = queue_lists.setdefault(queue_group, []) | ||
|
|
||
| if not group_queues: | ||
| group_queues.extend('%s-%d' % (queue_group, i) | ||
| for i in xrange(queue_count)) | ||
|
|
||
| shuffle(group_queues) | ||
|
|
||
| return group_queues.pop() | ||
|
|
||
|
|
||
| def _get_from_cache(key, default=None): | ||
| """Fetch the value for the given key from the thread local context. If the | ||
| key does not exist, set it for the given default value and return the | ||
| default. | ||
| """ | ||
| from furious.context._local import get_local_context | ||
|
|
||
| if not key: | ||
| return default | ||
|
|
||
| local_context = get_local_context() | ||
|
|
||
| if hasattr(local_context, key): | ||
| return getattr(local_context, key) | ||
|
|
||
| setattr(local_context, key, default) | ||
|
|
||
| return default | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,13 +62,33 @@ def test_get_config(self): | |
|
|
||
| example_yaml = str('secret_key: "blah"\n' | ||
| 'persistence: bubble\n' | ||
| 'task_system: flah\n') | ||
| 'task_system: flah\n' | ||
| 'queue_counts:\n' | ||
| ' queue-group: 5') | ||
|
|
||
| my_config = _parse_yaml_config(example_yaml) | ||
|
|
||
| self.assertEqual(my_config, {'secret_key': 'blah', | ||
| 'persistence': 'bubble', | ||
| 'task_system': 'flah', | ||
| 'queue_counts': {'queue-group': 5}}) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding another test where you check for the expected result when queue_counts is not specified. |
||
|
|
||
| def test_get_config_no_queue_counts(self): | ||
| """Ensure a config contents without queue_counts specified produces the | ||
| expected dictionary. | ||
| """ | ||
| from furious.config import _parse_yaml_config | ||
|
|
||
| example_yaml = str('secret_key: "blah"\n' | ||
| 'persistence: bubble\n' | ||
| 'task_system: flah') | ||
|
|
||
| my_config = _parse_yaml_config(example_yaml) | ||
|
|
||
| self.assertEqual(my_config, {'secret_key': 'blah', | ||
| 'persistence': 'bubble', | ||
| 'task_system': 'flah'}) | ||
| 'task_system': 'flah', | ||
| 'queue_counts': {}}) | ||
|
|
||
| def test_get_configured_persistence_exists(self): | ||
| """Ensure a chosen persistence module is selected.""" | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say if we're splitting this into two lines, we may as well test that
queue_countsis in fact a dict as well. Otherwise this could blow up. Having a meaningful message as to why the exception is raised would be good.