From c8b329a880266977ef9f6f56907bf24d45c6e1ad Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 13 Jan 2026 00:09:07 +0000 Subject: [PATCH 1/8] Run unit tests with macros disabled --- .github/workflows/test.yml | 4 +++- Cargo.lock | 4 ++-- oxanus-macros/src/worker.rs | 7 ++++++- oxanus/Cargo.toml | 2 +- oxanus/src/worker.rs | 7 ++++--- oxanus/tests/integration/main.rs | 1 + 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f6f1312..0d701e5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,7 +33,9 @@ jobs: tool: nextest - name: Test - run: cargo nextest run --all-features --no-fail-fast + run: | + cargo test --lib --no-default-features + cargo nextest run --all-features --no-fail-fast - name: Lint run: cargo clippy --all-features --workspace --all-targets -- -D warnings diff --git a/Cargo.lock b/Cargo.lock index 3a37605..045fdb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -745,7 +745,7 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "oxanus" -version = "0.7.1" +version = "0.8.0" dependencies = [ "async-trait", "chrono", @@ -774,7 +774,7 @@ dependencies = [ [[package]] name = "oxanus-macros" -version = "0.7.0" +version = "0.8.0" dependencies = [ "darling", "heck", diff --git a/oxanus-macros/src/worker.rs b/oxanus-macros/src/worker.rs index 9a6f317..7864f6d 100644 --- a/oxanus-macros/src/worker.rs +++ b/oxanus-macros/src/worker.rs @@ -257,9 +257,14 @@ fn expand_retry_delay(retry_delay: RetryDelay) -> TokenStream { fn expand_unique_id(spec: UniqueIdSpec, fields: &Fields) -> TokenStream { let formatter = match spec { UniqueIdSpec::Shorthand(fmt) => { + let fmt_str = fmt.value(); let args = fields.iter().filter_map(|f| { let name = f.ident.as_ref()?; - Some(quote!(#name = self.#name)) + if fmt_str.contains(&format!("{{{name}}}")) { + Some(quote!(#name = self.#name)) + } else { + None + } }); quote! { diff --git a/oxanus/Cargo.toml b/oxanus/Cargo.toml index 300ab69..0aa0a1e 100644 --- a/oxanus/Cargo.toml +++ b/oxanus/Cargo.toml @@ -47,7 +47,7 @@ rand = "^0.9" testresult = "^0.4" tracing-subscriber = { version = "^0.3", features = ["env-filter"] } -oxanus = { path = ".", features = ["registry"] } +oxanus = { path = ".", default-features = false, features = ["registry"] } [[bench]] name = "concurrency" diff --git a/oxanus/src/worker.rs b/oxanus/src/worker.rs index b50f3db..9cddd5d 100644 --- a/oxanus/src/worker.rs +++ b/oxanus/src/worker.rs @@ -148,6 +148,7 @@ mod tests { #[oxanus(unique_id = "test_worker_{id}")] struct TestWorkerUniqueId { id: i32, + _1: i32, } impl TestWorkerUniqueId { @@ -156,13 +157,13 @@ mod tests { } } - assert_eq!(TestWorkerUniqueId { id: 1 }.max_retries(), 2); + assert_eq!(TestWorkerUniqueId { id: 1, _1: 0 }.max_retries(), 2); assert_eq!( - TestWorkerUniqueId { id: 1 }.unique_id().unwrap(), + TestWorkerUniqueId { id: 1, _1: 0 }.unique_id().unwrap(), "test_worker_1" ); assert_eq!( - TestWorkerUniqueId { id: 12 }.unique_id().unwrap(), + TestWorkerUniqueId { id: 12, _1: 0 }.unique_id().unwrap(), "test_worker_12" ); diff --git a/oxanus/tests/integration/main.rs b/oxanus/tests/integration/main.rs index 6a1358d..ae242f5 100644 --- a/oxanus/tests/integration/main.rs +++ b/oxanus/tests/integration/main.rs @@ -3,6 +3,7 @@ mod dead; mod drain; mod dynamic; mod panic; +#[cfg(feature = "macros")] mod registry; mod retry; mod shared; From 002da440a9c6cedd2fc6e1283740883291bf1ab1 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 13 Jan 2026 12:10:57 +0000 Subject: [PATCH 2/8] Allow register cron worker with schedule override --- oxanus/examples/cron_w_err.rs | 2 +- oxanus/src/config.rs | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/oxanus/examples/cron_w_err.rs b/oxanus/examples/cron_w_err.rs index b029b67..efeb985 100644 --- a/oxanus/examples/cron_w_err.rs +++ b/oxanus/examples/cron_w_err.rs @@ -42,7 +42,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) - .register_cron_worker::(QueueDynamic(2)) + .register_cron_worker::("", QueueDynamic(2)) .with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index 8846c8b..11d256e 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -70,13 +70,20 @@ impl Config { } /// Register a cron worker with a dynamic queue - pub fn register_cron_worker(mut self, queue: impl Queue) -> Self + pub fn register_cron_worker(mut self, cron_schedule: &str, queue: impl Queue) -> Self where W: Worker + serde::de::DeserializeOwned + 'static, { self.register_queue_with(queue.config()); - let schedule = W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); - self.registry.register_cron::(&schedule, queue.key()); + let schedule_string; + let schedule = if cron_schedule == "" { + schedule_string = + W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); + &schedule_string + } else { + cron_schedule + }; + self.registry.register_cron::(schedule, queue.key()); self } From 7bf61a13227e8d18223597c5c759c052f72370c0 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 13 Jan 2026 16:23:06 +0000 Subject: [PATCH 3/8] max_retries support custom function --- oxanus-macros/src/worker.rs | 80 ++++++++++++++++++++++++++----------- oxanus/src/worker.rs | 6 +++ 2 files changed, 63 insertions(+), 23 deletions(-) diff --git a/oxanus-macros/src/worker.rs b/oxanus-macros/src/worker.rs index 7864f6d..3cc6f8a 100644 --- a/oxanus-macros/src/worker.rs +++ b/oxanus-macros/src/worker.rs @@ -12,7 +12,7 @@ struct OxanusArgs { context: Option, error: Option, registry: Option, - max_retries: Option, + max_retries: Option, retry_delay: Option, unique_id: Option, on_conflict: Option, @@ -34,6 +34,14 @@ enum UniqueIdSpec { CustomFunc(Path), } +#[derive(Debug)] +enum MaxRetries { + /// #[max_retries = 3] + Value(u32), + /// #[max_retries = mymod::func] + CustomFunc(Path), +} + #[derive(Debug)] enum RetryDelay { /// #[retry_delay = 3] @@ -48,27 +56,38 @@ struct Cron { queue: Option, } -impl FromMeta for RetryDelay { - fn from_meta(meta: &Meta) -> darling::Result { - match meta { - Meta::NameValue(nv) => match &nv.value { - Expr::Lit(syn::ExprLit { - lit: syn::Lit::Int(lit), - .. - }) => { - let value = lit.base10_parse::()?; - Ok(RetryDelay::Value(value)) +macro_rules! impl_from_meta_for_num_or_path { + ($ty:ty, $int:ty, $name:literal) => { + impl FromMeta for $ty { + fn from_meta(meta: &Meta) -> darling::Result { + match meta { + Meta::NameValue(nv) => match &nv.value { + Expr::Lit(syn::ExprLit { + lit: syn::Lit::Int(lit), + .. + }) => { + let value = lit.base10_parse::<$int>()?; + Ok(Self::Value(value)) + } + Expr::Path(expr_path) => Ok(Self::CustomFunc(expr_path.path.clone())), + other => Err(Error::custom(format!( + "Unsupported {} value: {other:?}", + $name + ))), + }, + _ => Err(Error::custom(format!( + "{} must be a name-value attribute", + $name + ))), } - Expr::Path(expr_path) => Ok(RetryDelay::CustomFunc(expr_path.path.clone())), - other => Err(Error::custom(format!( - "Unsupported retry_delay value: {other:?}", - ))), - }, - _ => Err(Error::custom("retry_delay must be a name-value attribute")), + } } - } + }; } +impl_from_meta_for_num_or_path!(MaxRetries, u32, "max_retries"); +impl_from_meta_for_num_or_path!(RetryDelay, u64, "retry_delay"); + impl FromMeta for UniqueIdSpec { fn from_meta(meta: &Meta) -> darling::Result { match meta { @@ -146,11 +165,7 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { }; let max_retries = match args.max_retries { - Some(max_retries) => quote! { - fn max_retries(&self) -> u32 { - #max_retries - } - }, + Some(max_retries) => expand_max_retries(max_retries), None => quote!(), }; @@ -235,6 +250,25 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { } } +fn expand_max_retries(max_retries: MaxRetries) -> TokenStream { + match max_retries { + MaxRetries::Value(value) => { + quote! { + fn max_retries(&self) -> u32 { + #value + } + } + } + MaxRetries::CustomFunc(func) => { + quote! { + fn max_retries(&self) -> u32 { + #func(self) + } + } + } + } +} + fn expand_retry_delay(retry_delay: RetryDelay) -> TokenStream { match retry_delay { RetryDelay::Value(value) => { diff --git a/oxanus/src/worker.rs b/oxanus/src/worker.rs index 9cddd5d..5c59dfe 100644 --- a/oxanus/src/worker.rs +++ b/oxanus/src/worker.rs @@ -219,6 +219,7 @@ mod tests { #[derive(Serialize, Deserialize, oxanus::Worker)] #[oxanus(unique_id = Self::unique_id)] #[oxanus(retry_delay = Self::retry_delay)] + #[oxanus(max_retries = Self::max_retries)] struct TestWorkerCustomUniqueId { id: i32, task: TestWorkerNestedTask, @@ -236,6 +237,10 @@ mod tests { fn retry_delay(&self, retries: u32) -> u64 { retries as u64 * 2 } + + fn max_retries(&self) -> u32 { + 9 + } } assert_eq!( @@ -257,6 +262,7 @@ mod tests { assert_eq!(Worker::unique_id(&worker2).unwrap(), "worker_id_2_task_22"); assert_eq!(worker2.retry_delay(1), 2); assert_eq!(worker2.retry_delay(2), 4); + assert_eq!(worker2.max_retries(), 9); } #[tokio::test] From b834d8542529149e55af71cac8b7cec6114f9728 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 13 Jan 2026 18:32:54 +0000 Subject: [PATCH 4/8] clippy --- oxanus/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index 11d256e..b26aff7 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -76,7 +76,7 @@ impl Config { { self.register_queue_with(queue.config()); let schedule_string; - let schedule = if cron_schedule == "" { + let schedule = if cron_schedule.is_empty() { schedule_string = W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); &schedule_string From d20e5cf070f201db545fa6d9f1911821bacc74e7 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 13 Jan 2026 23:04:49 +0000 Subject: [PATCH 5/8] Add has_registered_cron_worker --- oxanus/src/config.rs | 7 +++++++ oxanus/src/worker_registry.rs | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index b26aff7..da274c6 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -122,6 +122,13 @@ impl Config { { self.registry.has_registered::() } + + pub fn has_registered_cron_worker(&self) -> bool + where + W: Worker, + { + self.registry.has_registered_cron::() + } } #[cfg(any(target_os = "linux", target_os = "macos"))] diff --git a/oxanus/src/worker_registry.rs b/oxanus/src/worker_registry.rs index f291d81..d82371b 100644 --- a/oxanus/src/worker_registry.rs +++ b/oxanus/src/worker_registry.rs @@ -115,6 +115,13 @@ impl WorkerRegistry { self.jobs.contains_key(type_name::()) } + pub fn has_registered_cron(&self) -> bool + where + T: Worker, + { + self.schedules.contains_key(type_name::()) + } + pub fn build( &self, name: &str, From e7c9d166e84ae3aa79f6aced17219cda42fa4a27 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 13 Jan 2026 23:52:20 +0000 Subject: [PATCH 6/8] Tweak testcase --- oxanus/tests/integration/registry.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/oxanus/tests/integration/registry.rs b/oxanus/tests/integration/registry.rs index 1b6c330..4e0a177 100644 --- a/oxanus/tests/integration/registry.rs +++ b/oxanus/tests/integration/registry.rs @@ -54,13 +54,16 @@ pub async fn test_registry() -> TestResult { // no need to manually register, here we verify they were registered assert!(config.has_registered_queue::()); assert!(config.has_registered_worker::()); - assert!(config.has_registered_worker::()); - - let worker = WorkerCounter { - key: "test_worker:counter".to_owned(), - }; - - storage.enqueue(QueueTwo, worker).await?; + assert!(config.has_registered_cron_worker::()); + + storage + .enqueue( + QueueTwo, + WorkerCounter { + key: "test_worker:counter".to_owned(), + }, + ) + .await?; oxanus::run(config, ctx).await?; From aeb18d94ed00b2c99958eb64ab5bd21462d5b9b8 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 20 Jan 2026 16:08:24 +0000 Subject: [PATCH 7/8] Restore register_cron_worker API --- oxanus/examples/cron_w_err.rs | 2 +- oxanus/src/config.rs | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/oxanus/examples/cron_w_err.rs b/oxanus/examples/cron_w_err.rs index efeb985..b029b67 100644 --- a/oxanus/examples/cron_w_err.rs +++ b/oxanus/examples/cron_w_err.rs @@ -42,7 +42,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) - .register_cron_worker::("", QueueDynamic(2)) + .register_cron_worker::(QueueDynamic(2)) .with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index da274c6..1643708 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -70,20 +70,13 @@ impl Config { } /// Register a cron worker with a dynamic queue - pub fn register_cron_worker(mut self, cron_schedule: &str, queue: impl Queue) -> Self + pub fn register_cron_worker(mut self, queue: impl Queue) -> Self where W: Worker + serde::de::DeserializeOwned + 'static, { self.register_queue_with(queue.config()); - let schedule_string; - let schedule = if cron_schedule.is_empty() { - schedule_string = - W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); - &schedule_string - } else { - cron_schedule - }; - self.registry.register_cron::(schedule, queue.key()); + let schedule = W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); + self.registry.register_cron::(&schedule, queue.key()); self } From 3e01cc81887abd69baeeed9df74ec99c7d0d73dc Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Tue, 20 Jan 2026 16:08:49 +0000 Subject: [PATCH 8/8] Add version to cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c4df950..f703e9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,4 @@ rust-version = "1.75" [workspace.dependencies] oxanus = { path = "oxanus" } -oxanus-macros = { path = "oxanus-macros" } +oxanus-macros = { version = "0.8.0", path = "oxanus-macros" }