diff --git a/Cargo.lock b/Cargo.lock index 412fce8..e121550 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "dtoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c3cf4824e2d5f025c7b531afcb2325364084a16806f6d47fbc1f5fbd9960590" + [[package]] name = "equivalent" version = "1.0.2" @@ -758,6 +764,7 @@ dependencies = [ "inventory", "oxanus", "oxanus-macros", + "prometheus-client", "rand", "redis 1.0.2", "sentry-core", @@ -880,6 +887,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-client" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4500adecd7af8e0e9f4dbce15cfee07ce913fbf6ad605cc468b83f2d531ee94" +dependencies = [ + "dtoa", + "itoa", + "parking_lot", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quote" version = "1.0.43" diff --git a/oxanus/Cargo.toml b/oxanus/Cargo.toml index 0173c82..ff676b0 100644 --- a/oxanus/Cargo.toml +++ b/oxanus/Cargo.toml @@ -19,6 +19,7 @@ sentry = ["sentry-core"] tracing-instrument = [] macros = ["oxanus-macros"] registry = ["inventory", "oxanus-macros?/registry"] +prometheus = ["prometheus-client"] [dependencies] async-trait = "0.1" @@ -29,6 +30,7 @@ redis = { version = "^1.0", features = ["aio", "tokio-comp"] } futures = { version = "^0.3" } gethostname = "^1.0" inventory = { version = "0.3", optional = true } +prometheus-client = { version = "0.24", optional = true } sentry-core = { version = "^0.46", optional = true } serde = { version = "^1.0.218", features = ["derive"] } serde_json = { version = "^1.0", features = ["preserve_order"] } diff --git a/oxanus/README.md b/oxanus/README.md index acc4458..1cf274b 100644 --- a/oxanus/README.md +++ b/oxanus/README.md @@ -149,3 +149,13 @@ Configuration is done through the [`Config`] builder, which allows you to: Oxanus uses a custom error type [`OxanusError`] that covers all possible error cases in the library. Workers can define their own error type that implements `std::error::Error`. +### Prometheus Metrics + +Enable the `prometheus` feature to expose metrics: + +```rust +let metrics = storage.metrics().await?; +let output = metrics.encode_to_string()?; +// Serve `output` on your metrics endpoint +``` + diff --git a/oxanus/src/lib.rs b/oxanus/src/lib.rs index 26a8226..6aaa8ed 100644 --- a/oxanus/src/lib.rs +++ b/oxanus/src/lib.rs @@ -87,6 +87,7 @@ mod launcher; mod queue; mod result_collector; mod semaphores_map; +mod stats; mod storage; mod storage_builder; mod storage_internal; @@ -99,6 +100,9 @@ mod worker_registry; #[cfg(feature = "registry")] mod registry; +#[cfg(feature = "prometheus")] +pub mod prometheus; + #[cfg(test)] mod test_helper; diff --git a/oxanus/src/prometheus.rs b/oxanus/src/prometheus.rs new file mode 100644 index 0000000..668d37f --- /dev/null +++ b/oxanus/src/prometheus.rs @@ -0,0 +1,432 @@ +//! Prometheus metrics integration for Oxanus. +//! +//! This module provides Prometheus metrics based on the [`Stats`] from the storage. +//! +//! # Example +//! +//! ```rust,ignore +//! use oxanus::Storage; +//! +//! async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> { +//! let metrics = storage.metrics().await?; +//! +//! // Encode metrics to text format +//! let output = metrics.encode_to_string()?; +//! println!("{}", output); +//! Ok(()) +//! } +//! ``` + +use prometheus_client::{ + encoding::{EncodeLabelSet, text::encode}, + metrics::{family::Family, gauge::Gauge}, + registry::Registry, +}; +use std::sync::atomic::{AtomicI64, AtomicU64}; + +use crate::stats::Stats; + +/// Label set for queue-level metrics. +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct QueueLabels { + /// The queue key/name. + pub queue: String, +} + +/// Label set for dynamic sub-queue metrics. +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct DynamicQueueLabels { + /// The parent queue key/name. + pub queue: String, + /// The dynamic queue suffix. + pub suffix: String, +} + +/// Label set for process-level metrics. +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct ProcessLabels { + /// The hostname of the process. + pub hostname: String, + /// The process ID. + pub pid: String, +} + +/// Prometheus metrics for Oxanus job queue. +/// +/// This struct holds all the Prometheus metrics and the registry. +/// Use [`PrometheusMetrics::from_stats()`] to create an instance from storage stats. +pub struct PrometheusMetrics { + registry: Registry, +} + +impl PrometheusMetrics { + /// Creates a new [`PrometheusMetrics`] instance from the provided stats. + #[must_use] + pub fn from_stats(stats: &Stats) -> Self { + Self::from_stats_with_prefix(stats, "oxanus") + } + + /// Creates a new [`PrometheusMetrics`] instance from the provided stats with a custom prefix. + #[must_use] + pub fn from_stats_with_prefix(stats: &Stats, prefix: &str) -> Self { + let mut registry = Registry::with_prefix(prefix); + + // Global metrics + let jobs_total = Gauge::::default(); + jobs_total.set(stats.global.jobs as i64); + registry.register( + "jobs_total", + "Total number of jobs (enqueued + scheduled)", + jobs_total, + ); + + let enqueued_total = Gauge::::default(); + enqueued_total.set(stats.global.enqueued as i64); + registry.register( + "enqueued_total", + "Total number of jobs currently enqueued", + enqueued_total, + ); + + let processed_total = Gauge::::default(); + processed_total.set(stats.global.processed); + registry.register( + "processed_total", + "Total number of jobs processed", + processed_total, + ); + + let dead_total = Gauge::::default(); + dead_total.set(stats.global.dead as i64); + registry.register("dead_total", "Total number of dead jobs", dead_total); + + let scheduled_total = Gauge::::default(); + scheduled_total.set(stats.global.scheduled as i64); + registry.register( + "scheduled_total", + "Total number of scheduled jobs", + scheduled_total, + ); + + let retries_total = Gauge::::default(); + retries_total.set(stats.global.retries as i64); + registry.register( + "retries_total", + "Total number of jobs in retry queue", + retries_total, + ); + + let latency_max_seconds = Gauge::::default(); + latency_max_seconds.set(stats.global.latency_s_max); + registry.register( + "latency_max_seconds", + "Maximum latency across all queues in seconds", + latency_max_seconds, + ); + + // Queue metrics + let queue_enqueued = Family::>::default(); + let queue_processed = Family::>::default(); + let queue_succeeded = Family::>::default(); + let queue_panicked = Family::>::default(); + let queue_failed = Family::>::default(); + let queue_latency_seconds = Family::>::default(); + + // Dynamic sub-queue metrics + let dynamic_queue_enqueued = Family::>::default(); + let dynamic_queue_processed = + Family::>::default(); + let dynamic_queue_succeeded = + Family::>::default(); + let dynamic_queue_panicked = Family::>::default(); + let dynamic_queue_failed = Family::>::default(); + let dynamic_queue_latency_seconds = + Family::>::default(); + + // Set queue values + for queue_stats in &stats.queues { + let labels = QueueLabels { + queue: queue_stats.key.clone(), + }; + + queue_enqueued + .get_or_create(&labels) + .set(queue_stats.enqueued as i64); + queue_processed + .get_or_create(&labels) + .set(queue_stats.processed); + queue_succeeded + .get_or_create(&labels) + .set(queue_stats.succeeded); + queue_panicked + .get_or_create(&labels) + .set(queue_stats.panicked); + queue_failed.get_or_create(&labels).set(queue_stats.failed); + queue_latency_seconds + .get_or_create(&labels) + .set(queue_stats.latency_s); + + // Set dynamic sub-queue values + for dyn_stats in &queue_stats.queues { + let dyn_labels = DynamicQueueLabels { + queue: queue_stats.key.clone(), + suffix: dyn_stats.suffix.clone(), + }; + + dynamic_queue_enqueued + .get_or_create(&dyn_labels) + .set(dyn_stats.enqueued as i64); + dynamic_queue_processed + .get_or_create(&dyn_labels) + .set(dyn_stats.processed); + dynamic_queue_succeeded + .get_or_create(&dyn_labels) + .set(dyn_stats.succeeded); + dynamic_queue_panicked + .get_or_create(&dyn_labels) + .set(dyn_stats.panicked); + dynamic_queue_failed + .get_or_create(&dyn_labels) + .set(dyn_stats.failed); + dynamic_queue_latency_seconds + .get_or_create(&dyn_labels) + .set(dyn_stats.latency_s); + } + } + + // Register queue metrics + registry.register( + "queue_enqueued", + "Number of jobs enqueued per queue", + queue_enqueued, + ); + registry.register( + "queue_processed_total", + "Total number of jobs processed per queue", + queue_processed, + ); + registry.register( + "queue_succeeded_total", + "Total number of jobs succeeded per queue", + queue_succeeded, + ); + registry.register( + "queue_panicked_total", + "Total number of jobs panicked per queue", + queue_panicked, + ); + registry.register( + "queue_failed_total", + "Total number of jobs failed per queue", + queue_failed, + ); + registry.register( + "queue_latency_seconds", + "Current latency per queue in seconds", + queue_latency_seconds, + ); + + // Register dynamic queue metrics + registry.register( + "dynamic_queue_enqueued", + "Number of jobs enqueued per dynamic sub-queue", + dynamic_queue_enqueued, + ); + registry.register( + "dynamic_queue_processed_total", + "Total number of jobs processed per dynamic sub-queue", + dynamic_queue_processed, + ); + registry.register( + "dynamic_queue_succeeded_total", + "Total number of jobs succeeded per dynamic sub-queue", + dynamic_queue_succeeded, + ); + registry.register( + "dynamic_queue_panicked_total", + "Total number of jobs panicked per dynamic sub-queue", + dynamic_queue_panicked, + ); + registry.register( + "dynamic_queue_failed_total", + "Total number of jobs failed per dynamic sub-queue", + dynamic_queue_failed, + ); + registry.register( + "dynamic_queue_latency_seconds", + "Current latency per dynamic sub-queue in seconds", + dynamic_queue_latency_seconds, + ); + + // Process metrics + let process_heartbeat_timestamp = Family::>::default(); + let process_started_timestamp = Family::>::default(); + + let processes_count = Gauge::::default(); + processes_count.set(stats.processes.len() as i64); + + for process in &stats.processes { + let labels = ProcessLabels { + hostname: process.hostname.clone(), + pid: process.pid.to_string(), + }; + + process_heartbeat_timestamp + .get_or_create(&labels) + .set(process.heartbeat_at); + process_started_timestamp + .get_or_create(&labels) + .set(process.started_at); + } + + registry.register( + "process_heartbeat_timestamp_seconds", + "Last heartbeat timestamp per process", + process_heartbeat_timestamp, + ); + registry.register( + "process_started_timestamp_seconds", + "Start timestamp per process", + process_started_timestamp, + ); + registry.register( + "processes_count", + "Number of active Oxanus processes", + processes_count, + ); + + Self { registry } + } + + /// Returns a reference to the underlying Prometheus registry. + /// + /// This can be used for custom encoding or to add additional metrics. + #[must_use] + pub fn registry(&self) -> &Registry { + &self.registry + } + + /// Encodes the metrics to the `OpenMetrics` text format. + /// + /// # Errors + /// + /// Returns an error if encoding fails. + pub fn encode(&self, writer: &mut String) -> Result<(), std::fmt::Error> { + encode(writer, &self.registry) + } + + /// Encodes the metrics and returns them as a string. + /// + /// # Errors + /// + /// Returns an error if encoding fails. + pub fn encode_to_string(&self) -> Result { + let mut buffer = String::new(); + self.encode(&mut buffer)?; + Ok(buffer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stats::{DynamicQueueStats, Process, QueueStats, Stats, StatsGlobal}; + + fn create_test_stats() -> Stats { + Stats { + global: StatsGlobal { + jobs: 100, + enqueued: 50, + processed: 200, + dead: 5, + scheduled: 30, + retries: 10, + latency_s_max: 2.5, + }, + processes: vec![Process { + hostname: "test-host".to_string(), + pid: 12345, + heartbeat_at: 1700000000, + started_at: 1699999000, + }], + processing: vec![], + queues: vec![ + QueueStats { + key: "default".to_string(), + enqueued: 30, + processed: 150, + succeeded: 140, + panicked: 2, + failed: 8, + latency_s: 1.5, + queues: vec![], + }, + QueueStats { + key: "priority".to_string(), + enqueued: 20, + processed: 50, + succeeded: 48, + panicked: 0, + failed: 2, + latency_s: 0.5, + queues: vec![DynamicQueueStats { + suffix: "user_123".to_string(), + enqueued: 5, + processed: 10, + succeeded: 9, + panicked: 0, + failed: 1, + latency_s: 0.3, + }], + }, + ], + } + } + + #[test] + fn test_prometheus_metrics_from_stats() { + let stats = create_test_stats(); + let metrics = PrometheusMetrics::from_stats(&stats); + + // Verify metrics can be encoded + let output = metrics.encode_to_string().expect("encoding should succeed"); + assert!(!output.is_empty()); + } + + #[test] + fn test_prometheus_metrics_with_prefix() { + let stats = create_test_stats(); + let metrics = PrometheusMetrics::from_stats_with_prefix(&stats, "my_app"); + let output = metrics.encode_to_string().expect("encoding should succeed"); + assert!(output.contains("my_app_")); + } + + #[test] + fn test_prometheus_metrics_encode() { + let stats = create_test_stats(); + let metrics = PrometheusMetrics::from_stats(&stats); + + let output = metrics.encode_to_string().expect("encoding should succeed"); + + // Check that metrics are present in the output + assert!(output.contains("oxanus_jobs_total")); + assert!(output.contains("oxanus_enqueued_total")); + assert!(output.contains("oxanus_processed_total")); + assert!(output.contains("oxanus_dead_total")); + assert!(output.contains("oxanus_scheduled_total")); + assert!(output.contains("oxanus_retries_total")); + assert!(output.contains("oxanus_queue_enqueued")); + assert!(output.contains("oxanus_processes_count")); + + // Check that queue labels are present + assert!(output.contains("queue=\"default\"")); + assert!(output.contains("queue=\"priority\"")); + + // Check that dynamic queue labels are present + assert!(output.contains("suffix=\"user_123\"")); + + // Check that process labels are present + assert!(output.contains("hostname=\"test-host\"")); + assert!(output.contains("pid=\"12345\"")); + } +} diff --git a/oxanus/src/stats.rs b/oxanus/src/stats.rs new file mode 100644 index 0000000..c5aa6a3 --- /dev/null +++ b/oxanus/src/stats.rs @@ -0,0 +1,111 @@ +//! Stats types for Oxanus job queue monitoring. + +use serde::{Deserialize, Serialize}; + +use crate::job_envelope::JobEnvelope; + +/// Overall statistics for the Oxanus job queue system. +#[derive(Debug, Clone, Serialize)] +pub struct Stats { + /// Global aggregate statistics. + pub global: StatsGlobal, + /// List of active processes. + pub processes: Vec, + /// Jobs currently being processed. + pub processing: Vec, + /// Per-queue statistics. + pub queues: Vec, +} + +/// Global aggregate statistics. +#[derive(Debug, Clone, Serialize)] +pub struct StatsGlobal { + /// Total number of jobs (enqueued + scheduled). + pub jobs: usize, + /// Number of jobs currently enqueued. + pub enqueued: usize, + /// Total number of jobs processed. + pub processed: i64, + /// Number of dead jobs. + pub dead: usize, + /// Number of scheduled jobs. + pub scheduled: usize, + /// Number of jobs in retry queue. + pub retries: usize, + /// Maximum latency across all queues in seconds. + pub latency_s_max: f64, +} + +/// Information about a job currently being processed. +#[derive(Debug, Clone, Serialize)] +pub struct StatsProcessing { + /// The process ID handling the job. + pub process_id: String, + /// The job envelope being processed. + pub job_envelope: JobEnvelope, +} + +/// Statistics for a specific queue. +#[derive(Debug, Clone, Serialize)] +pub struct QueueStats { + /// The queue key/name. + pub key: String, + + /// Number of jobs currently enqueued. + pub enqueued: usize, + /// Total number of jobs processed. + pub processed: i64, + /// Total number of jobs succeeded. + pub succeeded: i64, + /// Total number of jobs panicked. + pub panicked: i64, + /// Total number of jobs failed. + pub failed: i64, + /// Current latency in seconds. + pub latency_s: f64, + + /// Dynamic sub-queue statistics (if any). + #[serde(skip_serializing_if = "Vec::is_empty")] + pub queues: Vec, +} + +/// Statistics for a dynamic sub-queue. +#[derive(Debug, Clone, Serialize)] +pub struct DynamicQueueStats { + /// The dynamic queue suffix. + pub suffix: String, + + /// Number of jobs currently enqueued. + pub enqueued: usize, + /// Total number of jobs processed. + pub processed: i64, + /// Total number of jobs succeeded. + pub succeeded: i64, + /// Total number of jobs panicked. + pub panicked: i64, + /// Total number of jobs failed. + pub failed: i64, + /// Current latency in seconds. + pub latency_s: f64, +} + +/// Information about an Oxanus worker process. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Process { + /// The hostname where the process is running. + pub hostname: String, + /// The process ID. + pub pid: u32, + /// Last heartbeat timestamp (Unix timestamp). + pub heartbeat_at: i64, + /// Process start timestamp (Unix timestamp). + pub started_at: i64, +} + +impl Process { + /// Returns a unique identifier for the process. + #[must_use] + pub fn id(&self) -> String { + format!("{}-{}", self.hostname, self.pid) + } +} diff --git a/oxanus/src/storage.rs b/oxanus/src/storage.rs index 90dbd96..9a2c840 100644 --- a/oxanus/src/storage.rs +++ b/oxanus/src/storage.rs @@ -4,11 +4,15 @@ use crate::{ error::OxanusError, job_envelope::{JobEnvelope, JobId}, queue::Queue, + stats::{Process, Stats}, storage_builder::StorageBuilder, - storage_internal::{Process, Stats, StorageInternal}, + storage_internal::StorageInternal, worker::Worker, }; +#[cfg(feature = "prometheus")] +use crate::prometheus::PrometheusMetrics; + /// Storage provides the main interface for job management in Oxanus. /// /// It handles all job operations including enqueueing, scheduling, and monitoring. @@ -256,4 +260,29 @@ impl Storage { pub fn namespace(&self) -> &str { self.internal.namespace() } + + /// Returns Prometheus metrics based on the current stats. + /// + /// # Returns + /// + /// A [`PrometheusMetrics`] instance containing all current metrics, + /// or an [`OxanusError`] if fetching stats fails. + /// + /// # Examples + /// + /// ```rust,ignore + /// use oxanus::Storage; + /// + /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> { + /// let metrics = storage.metrics().await?; + /// let output = metrics.encode_to_string()?; + /// println!("{}", output); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "prometheus")] + pub async fn metrics(&self) -> Result { + let stats = self.stats().await?; + Ok(PrometheusMetrics::from_stats(&stats)) + } } diff --git a/oxanus/src/storage_internal.rs b/oxanus/src/storage_internal.rs index b3e106a..72cdc39 100644 --- a/oxanus/src/storage_internal.rs +++ b/oxanus/src/storage_internal.rs @@ -1,6 +1,5 @@ use chrono::{DateTime, Utc}; use deadpool_redis::redis::{self, AsyncCommands}; -use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, num::NonZero, @@ -11,6 +10,7 @@ use crate::{ OxanusError, job_envelope::{JobConflictStrategy, JobEnvelope, JobId}, result_collector::{JobResult, JobResultKind}, + stats::{DynamicQueueStats, Process, QueueStats, Stats, StatsGlobal, StatsProcessing}, storage_keys::StorageKeys, worker_registry::CronJob, }; @@ -25,72 +25,6 @@ pub(crate) struct StorageInternal { started_at: i64, } -#[derive(Debug, Clone, Serialize)] -pub struct Stats { - pub global: StatsGlobal, - pub processes: Vec, - pub processing: Vec, - pub queues: Vec, -} - -#[derive(Debug, Clone, Serialize)] -pub struct StatsGlobal { - pub jobs: usize, - pub enqueued: usize, - pub processed: i64, - pub dead: usize, - pub scheduled: usize, - pub retries: usize, - pub latency_s_max: f64, -} - -#[derive(Debug, Clone, Serialize)] -pub struct StatsProcessing { - pub process_id: String, - pub job_envelope: JobEnvelope, -} - -#[derive(Debug, Clone, Serialize)] -pub struct QueueStats { - pub key: String, - - pub enqueued: usize, - pub processed: i64, - pub succeeded: i64, - pub panicked: i64, - pub failed: i64, - pub latency_s: f64, - - #[serde(skip_serializing_if = "Vec::is_empty")] - pub queues: Vec, -} - -#[derive(Debug, Clone, Serialize)] -pub struct DynamicQueueStats { - pub suffix: String, - - pub enqueued: usize, - pub processed: i64, - pub succeeded: i64, - pub panicked: i64, - pub failed: i64, - pub latency_s: f64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Process { - pub hostname: String, - pub pid: u32, - pub heartbeat_at: i64, - pub started_at: i64, -} - -impl Process { - pub fn id(&self) -> String { - format!("{}-{}", self.hostname, self.pid) - } -} - enum JobEnqueueAction { Default, Skip,