A mini-framework for building resilient asynchronous AMQP/RabbitMQ consumer applications in Rust, inspired by Axum's ergonomic handler pattern.
Built on top of amqprs and tokio.
- Axum-style handlers -- plain async functions become message handlers via compile-time extraction
- Automatic connection recovery -- background IO loop with reconnection and a pre-allocated channel pool for publishing
- Consumer resilience -- consumers automatically recover from channel/connection failures
- Flexible serialization -- JSON, MessagePack, BSON, raw bytes, or runtime content-type dispatch via
Auto<T> - Dependency injection -- global state and per-delivery metadata extractors
- Result-driven actions -- handler return types control ack/nack, publish, or RPC reply behavior
- Distributed tracing -- optional OpenTelemetry trace propagation through AMQP headers
- Graceful shutdown -- signal-based shutdown that drains in-flight deliveries
- RPC support -- built-in Direct Reply-To pattern
- Test utilities -- testcontainers-based integration test harness
Add streameroo to your Cargo.toml:
[dependencies]
streameroo = "0.4.2"The default features enable tokio and json (serde_json). See Feature Flags for additional options.
Any DeserializeOwned struct works with the built-in Json<T> wrapper:
#[derive(Debug, Deserialize)]
struct MyEvent {
hello: String,
}A handler is any async function whose parameters are extractors followed by a final event parameter:
async fn my_handler(
pool: State<PgPool>,
redelivered: Redelivered,
event: Json<MyEvent>,
) -> anyhow::Result<()> {
tracing::info!(?event, "received");
Ok(())
}pool-- shared state injected from the applicationContextredelivered-- extractor for the AMQPredeliveredflagevent-- the deserialized message body (last parameter, must implementAMQPDecode)- Returning
Ok(())acknowledges the delivery; returningErrnacks with requeue
use streameroo::amqp::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = amqprs::connection::OpenConnectionArguments::new(
"localhost", 5672, "guest", "guest",
);
let connection = AMQPConnection::connect(args).await?;
let mut context = Context::new();
context.data(pgpool); // register any shared state
let mut app = Streameroo::new(connection, context, "my-consumer");
app.with_graceful_shutdown(tokio::signal::ctrl_c());
app.consume(my_handler, "my-queue", 10).await?;
app.join().await;
Ok(())
}The third argument to consume sets both the number of concurrent handler tasks and the prefetch count. For full control over QoS and consume options, use consume_with_options.
The AMQPHandler trait is automatically implemented for async functions with up to 13 extractor parameters plus one event parameter. You never implement it manually.
async fn handler(
// 0..13 extractors (impl FromDeliveryContext)
state: StateOwned<Arc<AtomicU8>>,
exchange: Exchange,
redelivered: Redelivered,
// last parameter: the event (impl AMQPDecode)
event: Json<MyEvent>,
) -> anyhow::Result<()> {
// ...
Ok(())
}Constraints:
- The last parameter must implement
AMQPDecode(the message payload) - All preceding parameters must implement
FromDeliveryContext - The error type must implement
Into<Box<dyn Error + Send + Sync>>(e.g.anyhow::Error) - The return type
Tmust implementAMQPResult - The function must be
Clone + Send + Sync + 'static
Extractors pull data from the delivery context and are used as handler parameters (before the event).
| Extractor | Inner Type | Description |
|---|---|---|
State<T> |
&'static T |
Static reference to shared state from Context |
StateOwned<T> |
T |
Cloned copy of shared state (requires T: Clone) |
Exchange |
String |
The exchange the message was published to |
RoutingKey |
String |
The routing key of the delivery |
ReplyTo |
Option<String> |
The reply-to property, if present |
DeliveryTag |
u64 |
The AMQP delivery tag |
Redelivered |
bool |
Whether this is a redelivery |
BasicProperties |
BasicProperties |
The full AMQP properties (cloned) |
Channel |
Channel |
The AMQP channel that received the delivery |
All wrapper extractors implement Deref to their inner type and provide into_inner().
The handler's return type controls what happens after successful execution. The trait provides a manual() flag: when false (the default), the framework auto-acks on success.
| Return Type | Behavior |
|---|---|
() |
No-op. Framework auto-acks on Ok, nacks with requeue on Err. |
Publish<E> |
Publishes E to a specified exchange/routing_key, then auto-acks. |
PublishReply<E> |
Publishes E to the reply-to address (RPC pattern), then auto-acks. |
DeliveryAction |
Manual ack/nack control. Framework does not auto-ack. |
Publishes a message to another queue after handling:
async fn forward_handler(
event: Json<MyEvent>,
) -> anyhow::Result<Publish<Json<MyEvent>>> {
Ok(Publish::new(event, "", "forwarded-queue"))
}The Publish struct also exposes options and properties fields for full control over the publish arguments.
Implements the RPC reply-to pattern:
async fn rpc_handler(
event: Json<Request>,
) -> anyhow::Result<PublishReply<Json<Response>>> {
let response = process(event.into_inner());
Ok(PublishReply::new(Json(response)))
}If no reply-to header is present on the incoming message, the reply is silently discarded.
For fine-grained ack/nack control:
async fn manual_handler(
event: Json<MyEvent>,
) -> Result<DeliveryAction, Infallible> {
if should_requeue(&event) {
Ok(DeliveryAction::Nack { requeue: true, multiple: false })
} else {
Ok(DeliveryAction::Ack { multiple: false })
}
}The framework distinguishes error types to decide requeue behavior:
| Error Source | Behavior |
|---|---|
Decode failure (Error::Event) |
Nack without requeue (would fail again) |
Handler error (Error::Handler) |
Nack with requeue (assumed transient) |
| Result action failure | Nack with requeue |
The event module provides format-agnostic serialization traits:
pub trait Decode: Sized {
type Error: std::error::Error + Send + Sync + 'static;
fn decode(data: Vec<u8>) -> Result<Self, Self::Error>;
}
pub trait Encode {
type Error: std::error::Error + Send + Sync + 'static;
fn encode(&self) -> Result<Vec<u8>, Self::Error>;
fn content_type() -> Option<&'static str> { None }
}The content_type() associated function is used by the publishing system to automatically set the AMQP content_type property on outgoing messages.
| Type | Feature | Content-Type | Decode | Encode |
|---|---|---|---|---|
Json<T> |
json (default) |
application/json |
T: DeserializeOwned |
T: Serialize |
MsgPack<T> |
msgpack |
application/msgpack |
T: DeserializeOwned |
T: Serialize |
Bson<T> |
bson |
application/bson |
T: DeserializeOwned |
T: Serialize |
serde_json::Value |
json |
application/json |
Yes | Yes |
Vec<u8> |
always | none | passthrough | passthrough |
bytes::Bytes |
bytes |
none | decode only | -- |
All wrapper types implement Deref/DerefMut to the inner T and provide into_inner().
Auto<T> selects the deserializer at runtime based on the message's content_type AMQP property:
async fn handler(event: Auto<MyEvent>) -> anyhow::Result<()> {
// Works with JSON, MsgPack, or BSON depending on the content-type header
Ok(())
}Supported content-type values:
| Header Value | Decoder | Feature Required |
|---|---|---|
application/json or json |
serde_json | json (default) |
application/msgpack or msgpack |
rmp_serde | msgpack |
application/bson or bson |
bson | bson |
Returns an error if the content-type header is missing or unsupported. Auto<T> is decode-only; it does not implement Encode.
AMQPConnection is a Clone-able handle that wraps the raw connection behind an actor-like IO loop:
let args = OpenConnectionArguments::new("localhost", 5672, "guest", "guest");
let connection = AMQPConnection::connect(args).await?;Key behaviors:
- Channel pool -- pre-allocates 10 channels for publishing, used in round-robin
- Automatic reconnection -- on connection failure, retries every 3 seconds indefinitely, re-opening all pool channels on success
- RPC timeout -- all operations (open channel, publish) have a 10-second timeout
Public API:
connect(args)-- create a new connection with IO loopopen_channel()-- open a new channel (for consuming or advanced operations)basic_publish(properties, data, args)-- publish via the channel pool
Extension trait implemented for both amqprs::Channel and AMQPConnection:
pub trait ChannelExt {
fn publish<E: Encode>(&self, exchange: &str, routing_key: &str, event: E) -> ...;
fn publish_with_options<E: Encode>(&self, args, properties, event: E) -> ...;
fn direct_rpc<E: Encode, T: Decode>(&mut self, exchange: &str, routing_key: &str, timeout: Duration, event: E) -> ...;
}publish and publish_with_options automatically set the content_type property from the Encode implementation when not already present. With the telemetry feature enabled, they also inject OpenTelemetry trace context into AMQP headers.
Implements the RabbitMQ Direct Reply-To pattern:
let response: Json<MyResponse> = channel
.direct_rpc(
"",
&queue,
Duration::from_secs(5),
Json(MyRequest { data: "hello".into() }),
)
.await?;Register a shutdown signal (any Future) to stop all consumers:
app.with_graceful_shutdown(tokio::signal::ctrl_c());When the signal fires, all consumer loops are notified. In-flight handler tasks are awaited before the consumer channels are closed. Call app.join().await to block until all consumers have fully stopped.
You can also obtain a manual shutdown handle:
let handle = app.shutdown_handle();
// Later, from anywhere:
handle.notify_waiters();Enable distributed tracing with the telemetry feature:
streameroo = { version = "0.4", features = ["telemetry"] }- Producer side: when publishing via
ChannelExt, aSpanKind::Producerspan is created and trace context (W3Ctraceparent/tracestate) is injected into AMQP message headers - Consumer side: when a delivery arrives, trace context is extracted from AMQP headers and set as the parent of a
SpanKind::Consumerspan that wraps the handler execution
This creates unbroken traces across producer -> broker -> consumer boundaries, visible in any OpenTelemetry-compatible backend (Jaeger, Grafana Tempo, Honeycomb, Datadog, etc.).
Consumer and producer spans include:
otel.name--{exchange}.{routing_key}otel.kind--ConsumerorProduceramqp.exchange,amqp.routing_keyamqp.correlation_id,amqp.reply_to,amqp.content_typedelivery_tag(consumer only)
The library does not initialize a tracing subscriber or OpenTelemetry pipeline. Your application is responsible for setting up the OTel exporter. The library uses opentelemetry::global::get_text_map_propagator for context propagation, so standard OTEL_* environment variables apply:
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
OTEL_PROPAGATORS=tracecontext,baggageA compose.yaml is included in the repository for running a local Jaeger instance:
docker compose up -d
# Jaeger UI at http://localhost:16686The telemetry module is public when the feature is enabled:
use streameroo::amqp::telemetry;
telemetry::inject_context(&otel_context, &mut headers); // producer: inject into AMQP headers
let ctx = telemetry::extract_context(&headers); // consumer: extract from AMQP headers
let span = telemetry::make_span_from_delivery_context(&delivery_ctx);
let span = telemetry::make_span_from_properties(&props, SpanKind::Producer, "exchange", "key");Convenience macro for building amqprs::FieldTable values (e.g. for queue arguments):
use streameroo::field_table;
use streameroo::amqp::XQueueType;
let args = field_table!(
("x-queue-type", XQueueType::Quorum),
("x-delivery-limit", amqprs::FieldValue::u(5)),
);Enum for RabbitMQ queue types, convertible to FieldValue:
pub enum XQueueType { Classic, Quorum, Stream }Converts a HashMap<String, String> into a FieldTable:
let map = HashMap::from([("key".into(), "value".into())]);
let table = table_from_map(&map);| Feature | Default | Description |
|---|---|---|
tokio |
yes | Tokio async runtime |
json |
yes | Json<T> wrapper + serde_json::Value support |
msgpack |
no | MsgPack<T> wrapper via MessagePack |
bson |
no | Bson<T> wrapper via BSON |
bytes |
no | Decode impl for bytes::Bytes |
telemetry |
no | OpenTelemetry distributed tracing through AMQP headers |
amqp-test |
no | Test utilities: AMQPTest context, start_rabbitmq(), consume_next() |
Enable the amqp-test feature to access the test harness in your own integration tests:
[dev-dependencies]
streameroo = { version = "0.4", features = ["amqp-test"] }
test-context = "0.3"use streameroo::amqp::amqp_test::AMQPTest;
use test_context::test_context;
#[test_context(AMQPTest)]
#[tokio::test]
async fn my_integration_test(ctx: &mut AMQPTest) {
let channel = ctx.connection.open_channel().await.unwrap();
// declare queues, publish messages, consume, assert...
}AMQPTest automatically spins up a RabbitMQ container via testcontainers, creates a connection, and tears everything down after the test. The consume_next helper on AMQPConnection consumes and acks a single message from a queue:
let result: Json<MyEvent> = ctx.connection.consume_next("my-queue").await;Apache-2.0