Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Whether you’re running <b>background jobs</b>, designing an <b>event-driven ar
- **Isolated Queues per Processor**: Each processor gets its own dedicated queue and DLQ, ensuring full isolation and predictable behavior across services.
- **Schema Validation**: Optional JSON Schema validation powered by AJV for safer message handling and data integrity.
- **Concurrent Consumers**: Scale either horizontally (multiple instances) or vertically (multiple consumers per queue, leveraging RabbitMQ prefetch) to maximize throughput and efficiency.
- **RabbitMQ Durability & Acknowledgements**: Leverages RabbitMQs persistent storage and acknowledgment model to guarantee at-least-once delivery, even across restarts and failures.
- **RabbitMQ Durability & Acknowledgements**: Leverages RabbitMQ's persistent storage and acknowledgment model to guarantee at-least-once delivery, even across restarts and failures.
- **Custom Logging**: Plug in your own logger or use the default console logger for full control over message visibility.

## Installation
Expand Down Expand Up @@ -212,7 +212,24 @@ RunMQ can leverage RabbitMQ policies to manage the delay between attempts, it's
#### Benefits
- Flexible and easy management of retry delays
- Reduces operational overhead
- Fully compatible with RunMQ’s retry and DLQ mechanisms
- Fully compatible with RunMQ's retry and DLQ mechanisms

### Queue Metadata Storage

RunMQ automatically stores queue metadata (such as max retries and creation timestamp) using RabbitMQ's parameters API. This enables external tools and dashboards to discover RunMQ-managed queues and understand their configuration without direct access to the application code.

When a processor is configured, RunMQ creates a metadata parameter that stores:
- **Version**: Schema version for future-proof migrations.
- **Max Retries**: The configured retry limit for the queue.
- **Created At**: ISO 8601 timestamp when the queue was first configured.
- **Updated At**: ISO 8601 timestamp when the configuration was last changed (if applicable).

#### Benefits
- **Dashboard Integration**: External monitoring tools and dashboards can query RabbitMQ's management API to retrieve queue metadata and display topology information (e.g., "10 retries with 5s delay, then to DLQ").
- **Self-Documenting Queues**: Queue configurations are discoverable directly from RabbitMQ, without needing access to application source code.
- **Automatic Updates**: When processor configuration changes, metadata is automatically updated while preserving the original creation timestamp.

> **Note**: This feature requires RabbitMQ Management Plugin to be enabled for external tools to query the metadata parameters and for the parameters to be set.

### Custom Logger

Expand Down
5 changes: 3 additions & 2 deletions src/core/RunMQ.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProper
export class RunMQ {
private readonly client: RabbitMQClientAdapter;
private readonly config: RunMQConnectionConfig;
private readonly consumer: RunMQConsumerCreator;
private publisher: RunMQPublisher | undefined
private readonly logger: RunMQLogger
private retryAttempts: number = 0;
Expand All @@ -28,6 +29,7 @@ export class RunMQ {
maxReconnectAttempts: config.maxReconnectAttempts ?? DEFAULTS.MAX_RECONNECT_ATTEMPTS,
};
this.client = new RabbitMQClientAdapter(this.config, this.logger);
this.consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management);
}

/**
Expand All @@ -50,8 +52,7 @@ export class RunMQ {
* @param processor The function that will process the incoming messages
*/
public async process<T = Record<string, never>>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent<T>) => Promise<void>) {
const consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management);
await consumer.createConsumer<T>(new ConsumerConfiguration(topic, config, processor))
await this.consumer.createConsumer<T>(new ConsumerConfiguration(topic, config, processor))
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/core/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const Constants = {
RETRY_DELAY_QUEUE_PREFIX: RUNMQ_PREFIX + "retry_delay_",
DLQ_QUEUE_PREFIX: RUNMQ_PREFIX + "dlq_",
MESSAGE_TTL_OPERATOR_POLICY_PREFIX: RUNMQ_PREFIX + "message_ttl_operator_policy",
METADATA_STORE_PREFIX: RUNMQ_PREFIX + "metadata_",
}

export const DEFAULTS = {
Expand Down
13 changes: 13 additions & 0 deletions src/core/consumer/RunMQConsumerCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,42 @@ import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator";
import {AMQPChannel, AMQPClient, RabbitMQManagementConfig} from "@src/types";
import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager";
import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {Exceptions} from "@src/core/exceptions/Exceptions";

export class RunMQConsumerCreator {
private ttlPolicyManager: RunMQTTLPolicyManager;
private metadataManager: RunMQMetadataManager;

constructor(
private client: AMQPClient,
private logger: RunMQLogger,
managementConfig?: RabbitMQManagementConfig
) {
this.ttlPolicyManager = new RunMQTTLPolicyManager(logger, managementConfig);
this.metadataManager = new RunMQMetadataManager(logger, managementConfig);
}

public async createConsumer<T>(consumerConfiguration: ConsumerConfiguration<T>) {
await this.ttlPolicyManager.initialize();
await this.metadataManager.initialize();
await this.assertQueues<T>(consumerConfiguration);
await this.bindQueues<T>(consumerConfiguration);
await this.storeMetadata<T>(consumerConfiguration);
for (let i = 0; i < consumerConfiguration.processorConfig.consumersCount; i++) {
await this.runProcessor<T>(consumerConfiguration);
}
}

private async storeMetadata<T>(consumerConfiguration: ConsumerConfiguration<T>): Promise<void> {
const maxRetries = consumerConfiguration.processorConfig.attempts ?? DEFAULTS.PROCESSING_ATTEMPTS;

await this.metadataManager.apply(
consumerConfiguration.processorConfig.name,
maxRetries
);
}

private async runProcessor<T>(consumerConfiguration: ConsumerConfiguration<T>): Promise<void> {
const consumerChannel = await this.getProcessorChannel();
Expand Down
36 changes: 36 additions & 0 deletions src/core/management/Policies/RabbitMQMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {Constants} from "@src/core/constants";
import {METADATA_SCHEMA_VERSION, RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata";

/**
* Creates metadata objects for storing queue configuration.
* Uses RabbitMQ parameters API for storage, which allows custom JSON data.
*/
export class RabbitMQMetadata {
/**
* Creates metadata for a queue.
* @param maxRetries - Maximum retry attempts configured for the queue
* @param existingMetadata - Optional existing metadata to preserve createdAt
* @returns RunMQQueueMetadata object
*/
static createMetadataFor(
maxRetries: number,
existingMetadata?: RunMQQueueMetadata
): RunMQQueueMetadata {
const now = new Date().toISOString();

return {
version: METADATA_SCHEMA_VERSION,
maxRetries,
createdAt: existingMetadata?.createdAt ?? now,
...(existingMetadata ? { updatedAt: now } : {})
};
}

/**
* Gets the parameter name for a given queue.
*/
static getParameterName(queueName: string): string {
return Constants.METADATA_STORE_PREFIX + queueName;
}
}

137 changes: 137 additions & 0 deletions src/core/management/Policies/RunMQMetadataManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient";
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
import {RabbitMQManagementConfig} from "@src";
import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata";
import {RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata";

/**
* Manages metadata for RunMQ queues.
* Stores queue configuration metadata using RabbitMQ parameters API.
*/
export class RunMQMetadataManager {
private readonly managementClient: RabbitMQManagementClient | null = null;
private isManagementPluginEnabled = false;

constructor(
private logger: RunMQLogger,
private managementConfig?: RabbitMQManagementConfig
) {
if (this.managementConfig) {
this.managementClient = new RabbitMQManagementClient(this.managementConfig, this.logger);
}
}

/**
* Initialize the manager by checking if management plugin is available.
*/
public async initialize(): Promise<void> {
if (this.isManagementPluginEnabled) {
return;
}
if (!this.managementClient) {
this.logger.warn("Management client not configured - metadata storage disabled");
return;
}

this.isManagementPluginEnabled = await this.managementClient.checkManagementPluginEnabled();

if (!this.isManagementPluginEnabled) {
this.logger.warn("RabbitMQ management plugin is not enabled - metadata storage disabled");
} else {
this.logger.info("RunMQ metadata storage initialized");
}
}

/**
* Store or update metadata for a queue.
* If metadata already exists, preserves createdAt and sets updatedAt.
*
* @param queueName - The name of the queue
* @param maxRetries - Maximum retry attempts
* @returns true if metadata was stored successfully, false otherwise
*/
public async apply(
queueName: string,
maxRetries: number,
): Promise<boolean> {
if (!this.isManagementPluginEnabled || !this.managementClient) {
this.logger.warn(`Cannot store metadata for queue '${queueName}' - management plugin not available`);
return false;
}

try {
const existingMetadata = await this.getMetadata(queueName);
const metadata = RabbitMQMetadata.createMetadataFor(
maxRetries,
existingMetadata ?? undefined
);

const paramName = RabbitMQMetadata.getParameterName(queueName);

const success = await this.managementClient.setParameter(
paramName,
metadata
);

if (success) {
const action = existingMetadata ? "Updated" : "Created";
this.logger.info(`${action} metadata for queue: ${queueName}`);
return true;
}

this.logger.error(`Failed to store metadata for queue: ${queueName}`);
return false;
} catch (error) {
this.logger.error(`Error storing metadata for queue ${queueName}: ${error}`);
return false;
}
}

/**
* Get metadata for a queue.
*
* @param queueName - The name of the queue
* @returns The queue metadata or null if not found
*/
public async getMetadata(
queueName: string,
): Promise<RunMQQueueMetadata | null> {
if (!this.isManagementPluginEnabled || !this.managementClient) {
return null;
}

try {
const paramName = RabbitMQMetadata.getParameterName(queueName);

return await this.managementClient.getParameter<RunMQQueueMetadata>(
paramName
);
} catch (error) {
this.logger.warn(`Failed to get metadata for queue ${queueName}: ${error}`);
return null;
}
}

/**
* Delete metadata for a queue.
*
* @param queueName - The name of the queue
*/
public async cleanup(queueName: string): Promise<void> {
if (!this.isManagementPluginEnabled || !this.managementClient) {
return;
}

const paramName = RabbitMQMetadata.getParameterName(queueName);
await this.managementClient.deleteParameter(paramName);
this.logger.info(`Deleted metadata for queue: ${queueName}`);
}

/**
* Check if management plugin is enabled and metadata storage is available.
*/
public isEnabled(): boolean {
return this.isManagementPluginEnabled;
}
}

34 changes: 34 additions & 0 deletions src/core/management/Policies/RunMQQueueMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Metadata stored for each RunMQ-managed queue.
* This metadata is stored as a RabbitMQ operator policy with lowest priority.
*/
export interface RunMQQueueMetadata {
/**
* Schema version of the metadata object.
* Used for future migrations if metadata structure changes.
*/
version: number;

/**
* Maximum number of retry attempts before message goes to DLQ.
*/
maxRetries: number;

/**
* ISO 8601 timestamp when the metadata was first created.
*/
createdAt: string;

/**
* ISO 8601 timestamp when the metadata was last updated.
* Only present if metadata has been updated after creation.
*/
updatedAt?: string;
}

/**
* Current version of the metadata schema.
* Increment this when making breaking changes to RunMQQueueMetadata.
*/
export const METADATA_SCHEMA_VERSION = 0;

3 changes: 3 additions & 0 deletions src/core/management/Policies/RunMQTTLPolicyManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export class RunMQTTLPolicyManager {
}

public async initialize(): Promise<void> {
if (this.isManagementPluginEnabled) {
return;
}
if (!this.managementClient) {
this.logger.warn("Management client not configured");
return;
Expand Down
Loading