QueueHandler is a high-level, type-safe wrapper designed for Cloudflare Worker Queues. It leverages Zod to provide runtime validation and static type safety, ensuring that every message entering your worker conforms to a predefined schema before your business logic ever executes.
The following example demonstrates how to define a schema, establish context, and register handlers using the QueueHandler orchestration.
import { QueueHandler } from 'hono-utils/queue';
import { z } from 'zod';
import { ContextFn, MessageHandlers } from './types';
import { DB } from './yourDb';
// 1. Define your message schema
export const QueueSchema = z.object({
email: z.object({
to: z.string().email(),
subject: z.string(),
body: z.string(),
}),
log: z.object({
level: z.enum(['debug', 'info', 'warn', 'error']),
msg: z.string(),
}),
});
type Environment = {
db_url: string;
};
type QueueData = z.infer<typeof QueueSchema>;
type Context = {
db: DB;
};
// 2. Define your dependency injection (Context)
export const setContext: ContextFn<Environment, Context> = (
env,
{ eventId }
) => ({
db: new DB(env.db_url, eventId),
});
export type Handlers = MessageHandlers<QueueData, Context>;
// 3. Implement specific logic handlers
const emailHandler: Handlers['email'] = async (content, { metadata }) => {
console.log('📧 emailHandler invoked', { content, metadata });
// YOUR LOGIC HERE
};
const logHandler: Handlers['log'] = async (content, { metadata, context }) => {
console.log('📝 logHandler invoked', { content, metadata });
await context.db.insert('logs', { level: content.level, msg: content.msg });
};
// 4. Orchestrate and Export
export const queueHandler = new QueueHandler({
setContext,
schema: QueueSchema,
})
.addHandler('email', emailHandler)
.addHandler('log', logHandler);
export const queue = queueHandler.getConsumer();
export const producer = queueHandler.getProducer();
export type QueueProducer = ReturnType<typeof queueHandler.getProducer>;import { Hono } from 'hono'
import { queue } from 'hono-utils';
import { queueHandler, type QueueProducer } from './queue';
type Env = {
Binding: {
QUEUE: Queue
},
Variables: {
QUEUE: QueueProducer
}
}
const app = new Hono<Env>()
app.use('*', queue({
name: 'QUEUE',
queueHandler,
}))
app.get('/email', async (c) => {
await c.var.QUEUE('email', {
to: 'test@example.com',
subject: 'Test',
body: 'Test',
})
return c.text(`Email sent!`)
})
export default appIn your Worker's entry point (index.ts), export the consumer function.
import { queue } from './queue';
import { app } from './app';
export default {
queue,
fetch: app.fetch,
};Initializes the handler with validation and context rules.
| Property | Type | Description |
|---|---|---|
schema |
ZodObject |
The Zod schema defining all valid message keys and their payloads. |
setContext |
Function |
A factory function that generates shared resources (DB, config) for handlers. |
Registers a processing function for a specific message type. This method is chainable.
handlerName: A key from yourQueueSchema(e.g.,'email').handler: An async function that receives the validatedcontentand an object containingcontextandmetadata.
Returns an asynchronous function compatible with the Cloudflare Worker queue export.
Important
The consumer uses Promise.allSettled to process batches. This means if one message fails, it will not prevent other messages in the same batch from being acknowledged or retried independently.
Generates a typed producer function to push messages into a specific Cloudflare Queue. It automatically generates a unique eventId using cuid2.
- Ingestion: Cloudflare delivers a
MessageBatchto the worker. - Validation: The
QueueHandleriterates through messages. It identifies thehandlerkey and uses the corresponding Zod sub-schema (e.g.,QueueSchema.shape.email) to.parse()thecontent. - Context Creation:
setContextis invoked to provide the handler with necessary dependencies. - Execution: The registered handler is executed.
- Settlement: All results are settled via
allSettled. If validation fails or a handler is missing, it logs a warning and moves to the next message.