From e74f7d1cc05d625d7332b266f238769de6ab4339 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Malthe=20J=C3=B8rgensen?= Date: Fri, 10 Jun 2022 12:57:15 +0200 Subject: [PATCH] Add `release_on_start` task configuration Passing `release_on_start=True` to a singleton task makes the task release the lock when the job starts running, rather than when it has finished. Note: This is only available in celery versions 5.2 and above since it uses the `before_start`-method on the celery Task object which was introduced in version 5.2. --- README.md | 21 +++++++++++++++++++++ celery_singleton/singleton.py | 11 +++++++++-- tests/test_singleton.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f2b5b10..77a8411 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ This is a baseclass for celery tasks that ensures only one instance of the task - [Task configuration](#task-configuration) - [unique\_on](#uniqueon) - [raise\_on\_duplicate](#raiseonduplicate) + - [lock\_expiry](#lockexpiry) + - [release\_on\_start](#releaseonstart) - [App Configuration](#app-configuration) - [Testing](#testing) - [Contribute](#contribute) @@ -184,6 +186,25 @@ assert task1 != task2 # These are two separate task instances This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config. +### release\_on\_start +_Warning: This is only available in celery versions 5.2 and above_ + +By default, the lock that a singleton task holds is released when the job is finished. By setting `release_on_start=True` on the task, the lock will be released before the task is run and is thus only held while the job is in the queue. + +```python +@app.task(base=Singleton, release_on_start=True) +def runs_for_3_seconds(): + self.time.sleep(3) + + +task1 = runs_for_3_seconds.delay() +time.sleep(1) +# task1 has started and is still running +task2 = runs_for_3_seconds.delay() + +assert task1 != task2 # These are two separate task instances +``` + ## App Configuration diff --git a/celery_singleton/singleton.py b/celery_singleton/singleton.py index 83d433e..9058d61 100644 --- a/celery_singleton/singleton.py +++ b/celery_singleton/singleton.py @@ -21,6 +21,7 @@ class Singleton(BaseTask): unique_on = None raise_on_duplicate = None lock_expiry = None + release_on_start = False @property def _raise_on_duplicate(self): @@ -145,8 +146,14 @@ def on_duplicate(self, existing_task_id): ) return self.AsyncResult(existing_task_id) + def before_start(self, task_id, args, kwargs): + if self.release_on_start: + self.release_lock(task_args=args, task_kwargs=kwargs) + def on_failure(self, exc, task_id, args, kwargs, einfo): - self.release_lock(task_args=args, task_kwargs=kwargs) + if not self.release_on_start: + self.release_lock(task_args=args, task_kwargs=kwargs) def on_success(self, retval, task_id, args, kwargs): - self.release_lock(task_args=args, task_kwargs=kwargs) + if not self.release_on_start: + self.release_lock(task_args=args, task_kwargs=kwargs) diff --git a/tests/test_singleton.py b/tests/test_singleton.py index 9efff1e..029c22f 100644 --- a/tests/test_singleton.py +++ b/tests/test_singleton.py @@ -8,6 +8,7 @@ from celery_singleton.singleton import Singleton, clear_locks from celery_singleton import util, DuplicateTaskError from celery_singleton.backends.redis import RedisBackend +from celery import version_info as celery_version from celery_singleton.backends import get_backend from celery_singleton.config import Config @@ -369,3 +370,33 @@ def simple_task(*args): mock_lock.assert_called_once_with( simple_task.singleton_backend, lock, result.task_id, expiry=60 ) + + +class TestReleaseOnStart: + @pytest.mark.skipif(celery_version < (5, 2), reason="Feature requires least celery-5.2") + def test__release_on_start( + self, scoped_app, celery_worker + ) : + with scoped_app: + @celery_worker.app.task(base=Singleton, release_on_start=True) + def queue_only_task(a): + time.sleep(1) + return a * 2 + + celery_worker.reload() # So task is registered + + # `task1` is in queue, but not started + task1 = queue_only_task.apply_async(args=[1], countdown=1) + time.sleep(0.05) # small delay for on_success + # `task2` is in queue, but not started + task2 = queue_only_task.apply_async(args=[1]) + # Both tasks were in the queue + assert task1 == task2 + + time.sleep(1.10) + # `task1` is now started (and the lock is released) + task3 = queue_only_task.apply_async(args=[1]) + time.sleep(0.05) # small delay for on_success + task3.get() + # `task1` had started executing, so `task3` was free to be scheduled + assert task1 != task3