Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ jobs:
tool: nextest

- name: Test
run: cargo nextest run --all-features --no-fail-fast
run: |
cargo test --lib --no-default-features
cargo nextest run --all-features --no-fail-fast

- name: Lint
run: cargo clippy --all-features --workspace --all-targets -- -D warnings
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ rust-version = "1.75"

[workspace.dependencies]
oxanus = { path = "oxanus" }
oxanus-macros = { path = "oxanus-macros" }
oxanus-macros = { version = "0.8.0", path = "oxanus-macros" }
Copy link
Collaborator Author

@tyt2y3 tyt2y3 Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just easier for the macro and the main crate sharing the same version

87 changes: 63 additions & 24 deletions oxanus-macros/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct OxanusArgs {
context: Option<Path>,
error: Option<Path>,
registry: Option<Path>,
max_retries: Option<u32>,
max_retries: Option<MaxRetries>,
retry_delay: Option<RetryDelay>,
unique_id: Option<UniqueIdSpec>,
on_conflict: Option<Ident>,
Expand All @@ -34,6 +34,14 @@ enum UniqueIdSpec {
CustomFunc(Path),
}

#[derive(Debug)]
enum MaxRetries {
/// #[max_retries = 3]
Value(u32),
/// #[max_retries = mymod::func]
CustomFunc(Path),
}

#[derive(Debug)]
enum RetryDelay {
/// #[retry_delay = 3]
Expand All @@ -48,27 +56,38 @@ struct Cron {
queue: Option<Path>,
}

impl FromMeta for RetryDelay {
fn from_meta(meta: &Meta) -> darling::Result<Self> {
match meta {
Meta::NameValue(nv) => match &nv.value {
Expr::Lit(syn::ExprLit {
lit: syn::Lit::Int(lit),
..
}) => {
let value = lit.base10_parse::<u64>()?;
Ok(RetryDelay::Value(value))
macro_rules! impl_from_meta_for_num_or_path {
($ty:ty, $int:ty, $name:literal) => {
impl FromMeta for $ty {
fn from_meta(meta: &Meta) -> darling::Result<Self> {
match meta {
Meta::NameValue(nv) => match &nv.value {
Expr::Lit(syn::ExprLit {
lit: syn::Lit::Int(lit),
..
}) => {
let value = lit.base10_parse::<$int>()?;
Ok(Self::Value(value))
}
Expr::Path(expr_path) => Ok(Self::CustomFunc(expr_path.path.clone())),
other => Err(Error::custom(format!(
"Unsupported {} value: {other:?}",
$name
))),
},
_ => Err(Error::custom(format!(
"{} must be a name-value attribute",
$name
))),
}
Expr::Path(expr_path) => Ok(RetryDelay::CustomFunc(expr_path.path.clone())),
other => Err(Error::custom(format!(
"Unsupported retry_delay value: {other:?}",
))),
},
_ => Err(Error::custom("retry_delay must be a name-value attribute")),
}
}
}
};
}

impl_from_meta_for_num_or_path!(MaxRetries, u32, "max_retries");
impl_from_meta_for_num_or_path!(RetryDelay, u64, "retry_delay");

impl FromMeta for UniqueIdSpec {
fn from_meta(meta: &Meta) -> darling::Result<Self> {
match meta {
Expand Down Expand Up @@ -146,11 +165,7 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream {
};

let max_retries = match args.max_retries {
Some(max_retries) => quote! {
fn max_retries(&self) -> u32 {
#max_retries
}
},
Some(max_retries) => expand_max_retries(max_retries),
None => quote!(),
};

Expand Down Expand Up @@ -235,6 +250,25 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream {
}
}

fn expand_max_retries(max_retries: MaxRetries) -> TokenStream {
match max_retries {
MaxRetries::Value(value) => {
quote! {
fn max_retries(&self) -> u32 {
#value
}
}
}
MaxRetries::CustomFunc(func) => {
quote! {
fn max_retries(&self) -> u32 {
#func(self)
}
}
}
}
}

fn expand_retry_delay(retry_delay: RetryDelay) -> TokenStream {
match retry_delay {
RetryDelay::Value(value) => {
Expand All @@ -257,9 +291,14 @@ fn expand_retry_delay(retry_delay: RetryDelay) -> TokenStream {
fn expand_unique_id(spec: UniqueIdSpec, fields: &Fields) -> TokenStream {
let formatter = match spec {
UniqueIdSpec::Shorthand(fmt) => {
let fmt_str = fmt.value();
let args = fields.iter().filter_map(|f| {
let name = f.ident.as_ref()?;
Some(quote!(#name = self.#name))
if fmt_str.contains(&format!("{{{name}}}")) {
Some(quote!(#name = self.#name))
} else {
None
}
});

quote! {
Expand Down
2 changes: 1 addition & 1 deletion oxanus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ rand = "^0.9"
testresult = "^0.4"
tracing-subscriber = { version = "^0.3", features = ["env-filter"] }

oxanus = { path = ".", features = ["registry"] }
oxanus = { path = ".", default-features = false, features = ["registry"] }

[[bench]]
name = "concurrency"
Expand Down
7 changes: 7 additions & 0 deletions oxanus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ impl<DT, ET> Config<DT, ET> {
{
self.registry.has_registered::<W>()
}

pub fn has_registered_cron_worker<W>(&self) -> bool
where
W: Worker<Context = DT, Error = ET>,
{
self.registry.has_registered_cron::<W>()
}
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
Expand Down
13 changes: 10 additions & 3 deletions oxanus/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ mod tests {
#[oxanus(unique_id = "test_worker_{id}")]
struct TestWorkerUniqueId {
id: i32,
_1: i32,
}

impl TestWorkerUniqueId {
Expand All @@ -156,13 +157,13 @@ mod tests {
}
}

assert_eq!(TestWorkerUniqueId { id: 1 }.max_retries(), 2);
assert_eq!(TestWorkerUniqueId { id: 1, _1: 0 }.max_retries(), 2);
assert_eq!(
TestWorkerUniqueId { id: 1 }.unique_id().unwrap(),
TestWorkerUniqueId { id: 1, _1: 0 }.unique_id().unwrap(),
"test_worker_1"
);
assert_eq!(
TestWorkerUniqueId { id: 12 }.unique_id().unwrap(),
TestWorkerUniqueId { id: 12, _1: 0 }.unique_id().unwrap(),
"test_worker_12"
);

Expand Down Expand Up @@ -218,6 +219,7 @@ mod tests {
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(unique_id = Self::unique_id)]
#[oxanus(retry_delay = Self::retry_delay)]
#[oxanus(max_retries = Self::max_retries)]
struct TestWorkerCustomUniqueId {
id: i32,
task: TestWorkerNestedTask,
Expand All @@ -235,6 +237,10 @@ mod tests {
fn retry_delay(&self, retries: u32) -> u64 {
retries as u64 * 2
}

fn max_retries(&self) -> u32 {
9
}
}

assert_eq!(
Expand All @@ -256,6 +262,7 @@ mod tests {
assert_eq!(Worker::unique_id(&worker2).unwrap(), "worker_id_2_task_22");
assert_eq!(worker2.retry_delay(1), 2);
assert_eq!(worker2.retry_delay(2), 4);
assert_eq!(worker2.max_retries(), 9);
}

#[tokio::test]
Expand Down
7 changes: 7 additions & 0 deletions oxanus/src/worker_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ impl<DT, ET> WorkerRegistry<DT, ET> {
self.jobs.contains_key(type_name::<T>())
}

pub fn has_registered_cron<T>(&self) -> bool
where
T: Worker<Context = DT, Error = ET>,
{
self.schedules.contains_key(type_name::<T>())
}

pub fn build(
&self,
name: &str,
Expand Down
1 change: 1 addition & 0 deletions oxanus/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod dead;
mod drain;
mod dynamic;
mod panic;
#[cfg(feature = "macros")]
mod registry;
mod retry;
mod shared;
Expand Down
17 changes: 10 additions & 7 deletions oxanus/tests/integration/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ pub async fn test_registry() -> TestResult {
// no need to manually register, here we verify they were registered
assert!(config.has_registered_queue::<QueueTwo>());
assert!(config.has_registered_worker::<WorkerCounter>());
assert!(config.has_registered_worker::<CronWorkerCounter>());

let worker = WorkerCounter {
key: "test_worker:counter".to_owned(),
};

storage.enqueue(QueueTwo, worker).await?;
assert!(config.has_registered_cron_worker::<CronWorkerCounter>());

storage
.enqueue(
QueueTwo,
WorkerCounter {
key: "test_worker:counter".to_owned(),
},
)
.await?;

oxanus::run(config, ctx).await?;

Expand Down