diff --git a/README.md b/README.md
index c9af641..13cc547 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ Whether you’re running background jobs, designing an event-driven ar
- **Isolated Queues per Processor**: Each processor gets its own dedicated queue and DLQ, ensuring full isolation and predictable behavior across services.
- **Schema Validation**: Optional JSON Schema validation powered by AJV for safer message handling and data integrity.
- **Concurrent Consumers**: Scale either horizontally (multiple instances) or vertically (multiple consumers per queue, leveraging RabbitMQ prefetch) to maximize throughput and efficiency.
-- **RabbitMQ Durability & Acknowledgements**: Leverages RabbitMQ’s persistent storage and acknowledgment model to guarantee at-least-once delivery, even across restarts and failures.
+- **RabbitMQ Durability & Acknowledgements**: Leverages RabbitMQ's persistent storage and acknowledgment model to guarantee at-least-once delivery, even across restarts and failures.
- **Custom Logging**: Plug in your own logger or use the default console logger for full control over message visibility.
## Installation
@@ -212,7 +212,24 @@ RunMQ can leverage RabbitMQ policies to manage the delay between attempts, it's
#### Benefits
- Flexible and easy management of retry delays
- Reduces operational overhead
-- Fully compatible with RunMQ’s retry and DLQ mechanisms
+- Fully compatible with RunMQ's retry and DLQ mechanisms
+
+### Queue Metadata Storage
+
+RunMQ automatically stores queue metadata (such as max retries and creation timestamp) using RabbitMQ's parameters API. This enables external tools and dashboards to discover RunMQ-managed queues and understand their configuration without direct access to the application code.
+
+When a processor is configured, RunMQ creates a metadata parameter that stores:
+- **Version**: Schema version for future-proof migrations.
+- **Max Retries**: The configured retry limit for the queue.
+- **Created At**: ISO 8601 timestamp when the queue was first configured.
+- **Updated At**: ISO 8601 timestamp when the configuration was last changed (if applicable).
+
+#### Benefits
+- **Dashboard Integration**: External monitoring tools and dashboards can query RabbitMQ's management API to retrieve queue metadata and display topology information (e.g., "10 retries with 5s delay, then to DLQ").
+- **Self-Documenting Queues**: Queue configurations are discoverable directly from RabbitMQ, without needing access to application source code.
+- **Automatic Updates**: When processor configuration changes, metadata is automatically updated while preserving the original creation timestamp.
+
+> **Note**: This feature requires RabbitMQ Management Plugin to be enabled for external tools to query the metadata parameters and for the parameters to be set.
### Custom Logger
diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts
index dc89cc7..b98bece 100644
--- a/src/core/RunMQ.ts
+++ b/src/core/RunMQ.ts
@@ -15,6 +15,7 @@ import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProper
export class RunMQ {
private readonly client: RabbitMQClientAdapter;
private readonly config: RunMQConnectionConfig;
+ private readonly consumer: RunMQConsumerCreator;
private publisher: RunMQPublisher | undefined
private readonly logger: RunMQLogger
private retryAttempts: number = 0;
@@ -28,6 +29,7 @@ export class RunMQ {
maxReconnectAttempts: config.maxReconnectAttempts ?? DEFAULTS.MAX_RECONNECT_ATTEMPTS,
};
this.client = new RabbitMQClientAdapter(this.config, this.logger);
+ this.consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management);
}
/**
@@ -50,8 +52,7 @@ export class RunMQ {
* @param processor The function that will process the incoming messages
*/
public async process>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent) => Promise) {
- const consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management);
- await consumer.createConsumer(new ConsumerConfiguration(topic, config, processor))
+ await this.consumer.createConsumer(new ConsumerConfiguration(topic, config, processor))
}
/**
diff --git a/src/core/constants/index.ts b/src/core/constants/index.ts
index eda2e29..9270943 100644
--- a/src/core/constants/index.ts
+++ b/src/core/constants/index.ts
@@ -5,6 +5,7 @@ export const Constants = {
RETRY_DELAY_QUEUE_PREFIX: RUNMQ_PREFIX + "retry_delay_",
DLQ_QUEUE_PREFIX: RUNMQ_PREFIX + "dlq_",
MESSAGE_TTL_OPERATOR_POLICY_PREFIX: RUNMQ_PREFIX + "message_ttl_operator_policy",
+ METADATA_STORE_PREFIX: RUNMQ_PREFIX + "metadata_",
}
export const DEFAULTS = {
diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts
index 224301f..f01ecb3 100644
--- a/src/core/consumer/RunMQConsumerCreator.ts
+++ b/src/core/consumer/RunMQConsumerCreator.ts
@@ -15,11 +15,13 @@ import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator";
import {AMQPChannel, AMQPClient, RabbitMQManagementConfig} from "@src/types";
import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager";
+import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {Exceptions} from "@src/core/exceptions/Exceptions";
export class RunMQConsumerCreator {
private ttlPolicyManager: RunMQTTLPolicyManager;
+ private metadataManager: RunMQMetadataManager;
constructor(
private client: AMQPClient,
@@ -27,17 +29,28 @@ export class RunMQConsumerCreator {
managementConfig?: RabbitMQManagementConfig
) {
this.ttlPolicyManager = new RunMQTTLPolicyManager(logger, managementConfig);
+ this.metadataManager = new RunMQMetadataManager(logger, managementConfig);
}
public async createConsumer(consumerConfiguration: ConsumerConfiguration) {
await this.ttlPolicyManager.initialize();
+ await this.metadataManager.initialize();
await this.assertQueues(consumerConfiguration);
await this.bindQueues(consumerConfiguration);
+ await this.storeMetadata(consumerConfiguration);
for (let i = 0; i < consumerConfiguration.processorConfig.consumersCount; i++) {
await this.runProcessor(consumerConfiguration);
}
}
+ private async storeMetadata(consumerConfiguration: ConsumerConfiguration): Promise {
+ const maxRetries = consumerConfiguration.processorConfig.attempts ?? DEFAULTS.PROCESSING_ATTEMPTS;
+
+ await this.metadataManager.apply(
+ consumerConfiguration.processorConfig.name,
+ maxRetries
+ );
+ }
private async runProcessor(consumerConfiguration: ConsumerConfiguration): Promise {
const consumerChannel = await this.getProcessorChannel();
diff --git a/src/core/management/Policies/RabbitMQMetadata.ts b/src/core/management/Policies/RabbitMQMetadata.ts
new file mode 100644
index 0000000..92d626d
--- /dev/null
+++ b/src/core/management/Policies/RabbitMQMetadata.ts
@@ -0,0 +1,36 @@
+import {Constants} from "@src/core/constants";
+import {METADATA_SCHEMA_VERSION, RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata";
+
+/**
+ * Creates metadata objects for storing queue configuration.
+ * Uses RabbitMQ parameters API for storage, which allows custom JSON data.
+ */
+export class RabbitMQMetadata {
+ /**
+ * Creates metadata for a queue.
+ * @param maxRetries - Maximum retry attempts configured for the queue
+ * @param existingMetadata - Optional existing metadata to preserve createdAt
+ * @returns RunMQQueueMetadata object
+ */
+ static createMetadataFor(
+ maxRetries: number,
+ existingMetadata?: RunMQQueueMetadata
+ ): RunMQQueueMetadata {
+ const now = new Date().toISOString();
+
+ return {
+ version: METADATA_SCHEMA_VERSION,
+ maxRetries,
+ createdAt: existingMetadata?.createdAt ?? now,
+ ...(existingMetadata ? { updatedAt: now } : {})
+ };
+ }
+
+ /**
+ * Gets the parameter name for a given queue.
+ */
+ static getParameterName(queueName: string): string {
+ return Constants.METADATA_STORE_PREFIX + queueName;
+ }
+}
+
diff --git a/src/core/management/Policies/RunMQMetadataManager.ts b/src/core/management/Policies/RunMQMetadataManager.ts
new file mode 100644
index 0000000..ca6f752
--- /dev/null
+++ b/src/core/management/Policies/RunMQMetadataManager.ts
@@ -0,0 +1,137 @@
+import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient";
+import {RunMQLogger} from "@src/core/logging/RunMQLogger";
+import {RabbitMQManagementConfig} from "@src";
+import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata";
+import {RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata";
+
+/**
+ * Manages metadata for RunMQ queues.
+ * Stores queue configuration metadata using RabbitMQ parameters API.
+ */
+export class RunMQMetadataManager {
+ private readonly managementClient: RabbitMQManagementClient | null = null;
+ private isManagementPluginEnabled = false;
+
+ constructor(
+ private logger: RunMQLogger,
+ private managementConfig?: RabbitMQManagementConfig
+ ) {
+ if (this.managementConfig) {
+ this.managementClient = new RabbitMQManagementClient(this.managementConfig, this.logger);
+ }
+ }
+
+ /**
+ * Initialize the manager by checking if management plugin is available.
+ */
+ public async initialize(): Promise {
+ if (this.isManagementPluginEnabled) {
+ return;
+ }
+ if (!this.managementClient) {
+ this.logger.warn("Management client not configured - metadata storage disabled");
+ return;
+ }
+
+ this.isManagementPluginEnabled = await this.managementClient.checkManagementPluginEnabled();
+
+ if (!this.isManagementPluginEnabled) {
+ this.logger.warn("RabbitMQ management plugin is not enabled - metadata storage disabled");
+ } else {
+ this.logger.info("RunMQ metadata storage initialized");
+ }
+ }
+
+ /**
+ * Store or update metadata for a queue.
+ * If metadata already exists, preserves createdAt and sets updatedAt.
+ *
+ * @param queueName - The name of the queue
+ * @param maxRetries - Maximum retry attempts
+ * @returns true if metadata was stored successfully, false otherwise
+ */
+ public async apply(
+ queueName: string,
+ maxRetries: number,
+ ): Promise {
+ if (!this.isManagementPluginEnabled || !this.managementClient) {
+ this.logger.warn(`Cannot store metadata for queue '${queueName}' - management plugin not available`);
+ return false;
+ }
+
+ try {
+ const existingMetadata = await this.getMetadata(queueName);
+ const metadata = RabbitMQMetadata.createMetadataFor(
+ maxRetries,
+ existingMetadata ?? undefined
+ );
+
+ const paramName = RabbitMQMetadata.getParameterName(queueName);
+
+ const success = await this.managementClient.setParameter(
+ paramName,
+ metadata
+ );
+
+ if (success) {
+ const action = existingMetadata ? "Updated" : "Created";
+ this.logger.info(`${action} metadata for queue: ${queueName}`);
+ return true;
+ }
+
+ this.logger.error(`Failed to store metadata for queue: ${queueName}`);
+ return false;
+ } catch (error) {
+ this.logger.error(`Error storing metadata for queue ${queueName}: ${error}`);
+ return false;
+ }
+ }
+
+ /**
+ * Get metadata for a queue.
+ *
+ * @param queueName - The name of the queue
+ * @returns The queue metadata or null if not found
+ */
+ public async getMetadata(
+ queueName: string,
+ ): Promise {
+ if (!this.isManagementPluginEnabled || !this.managementClient) {
+ return null;
+ }
+
+ try {
+ const paramName = RabbitMQMetadata.getParameterName(queueName);
+
+ return await this.managementClient.getParameter(
+ paramName
+ );
+ } catch (error) {
+ this.logger.warn(`Failed to get metadata for queue ${queueName}: ${error}`);
+ return null;
+ }
+ }
+
+ /**
+ * Delete metadata for a queue.
+ *
+ * @param queueName - The name of the queue
+ */
+ public async cleanup(queueName: string): Promise {
+ if (!this.isManagementPluginEnabled || !this.managementClient) {
+ return;
+ }
+
+ const paramName = RabbitMQMetadata.getParameterName(queueName);
+ await this.managementClient.deleteParameter(paramName);
+ this.logger.info(`Deleted metadata for queue: ${queueName}`);
+ }
+
+ /**
+ * Check if management plugin is enabled and metadata storage is available.
+ */
+ public isEnabled(): boolean {
+ return this.isManagementPluginEnabled;
+ }
+}
+
diff --git a/src/core/management/Policies/RunMQQueueMetadata.ts b/src/core/management/Policies/RunMQQueueMetadata.ts
new file mode 100644
index 0000000..ba508f8
--- /dev/null
+++ b/src/core/management/Policies/RunMQQueueMetadata.ts
@@ -0,0 +1,34 @@
+/**
+ * Metadata stored for each RunMQ-managed queue.
+ * This metadata is stored as a RabbitMQ operator policy with lowest priority.
+ */
+export interface RunMQQueueMetadata {
+ /**
+ * Schema version of the metadata object.
+ * Used for future migrations if metadata structure changes.
+ */
+ version: number;
+
+ /**
+ * Maximum number of retry attempts before message goes to DLQ.
+ */
+ maxRetries: number;
+
+ /**
+ * ISO 8601 timestamp when the metadata was first created.
+ */
+ createdAt: string;
+
+ /**
+ * ISO 8601 timestamp when the metadata was last updated.
+ * Only present if metadata has been updated after creation.
+ */
+ updatedAt?: string;
+}
+
+/**
+ * Current version of the metadata schema.
+ * Increment this when making breaking changes to RunMQQueueMetadata.
+ */
+export const METADATA_SCHEMA_VERSION = 0;
+
diff --git a/src/core/management/Policies/RunMQTTLPolicyManager.ts b/src/core/management/Policies/RunMQTTLPolicyManager.ts
index 687d092..6e3a549 100644
--- a/src/core/management/Policies/RunMQTTLPolicyManager.ts
+++ b/src/core/management/Policies/RunMQTTLPolicyManager.ts
@@ -18,6 +18,9 @@ export class RunMQTTLPolicyManager {
}
public async initialize(): Promise {
+ if (this.isManagementPluginEnabled) {
+ return;
+ }
if (!this.managementClient) {
this.logger.warn("Management client not configured");
return;
diff --git a/src/core/management/RabbitMQManagementClient.ts b/src/core/management/RabbitMQManagementClient.ts
index 597a77b..b040b68 100644
--- a/src/core/management/RabbitMQManagementClient.ts
+++ b/src/core/management/RabbitMQManagementClient.ts
@@ -16,7 +16,7 @@ export class RabbitMQManagementClient {
public async createOrUpdateOperatorPolicy(vhost: string, policy: RabbitMQOperatorPolicy): Promise {
try {
const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policy.name)}`;
-
+
const response = await fetch(url, {
method: 'PUT',
headers: {
@@ -48,7 +48,7 @@ export class RabbitMQManagementClient {
public async getOperatorPolicy(vhost: string, policyName: string): Promise {
try {
const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`;
-
+
const response = await fetch(url, {
method: 'GET',
headers: {
@@ -75,7 +75,7 @@ export class RabbitMQManagementClient {
public async deleteOperatorPolicy(vhost: string, policyName: string): Promise {
try {
const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`;
-
+
const response = await fetch(url, {
method: 'DELETE',
headers: {
@@ -100,7 +100,7 @@ export class RabbitMQManagementClient {
public async checkManagementPluginEnabled(): Promise {
try {
const url = `${this.config.url}/api/overview`;
-
+
const response = await fetch(url, {
method: 'GET',
headers: {
@@ -114,4 +114,108 @@ export class RabbitMQManagementClient {
return false;
}
}
+
+ /**
+ * Creates or updates a RabbitMQ parameter.
+ * Parameters are custom key-value stores that can hold any JSON data.
+ *
+ * @param name - The parameter name
+ * @param value - The parameter value (any JSON-serializable object)
+ */
+ public async setParameter(
+ name: string,
+ value: T
+ ): Promise {
+ try {
+ const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`;
+
+ const response = await fetch(url, {
+ method: 'PUT',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'Authorization': this.getAuthHeader()
+ },
+ body: JSON.stringify({value})
+ });
+
+ if (!response.ok) {
+ const error = await response.text();
+ this.logger.error(`Failed to set parameter ${name}: ${response.status} - ${error}`);
+ return false;
+ }
+
+ this.logger.info(`Successfully set parameter: ${name}`);
+ return true;
+ } catch (error) {
+ this.logger.error(`Error setting parameter: ${error}`);
+ return false;
+ }
+ }
+
+ /**
+ * Gets a RabbitMQ parameter.
+ *
+ * @param name - The parameter name
+ */
+ public async getParameter(
+ name: string
+ ): Promise {
+ try {
+ const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`;
+
+ const response = await fetch(url, {
+ method: 'GET',
+ headers: {
+ 'Authorization': this.getAuthHeader()
+ }
+ });
+
+ if (!response.ok) {
+ if (response.status === 404) {
+ return null;
+ }
+ const error = await response.text();
+ this.logger.error(`Failed to get parameter ${name}: ${response.status} - ${error}`);
+ return null;
+ }
+
+ const data = await response.json();
+ return data.value as T;
+ } catch (error) {
+ this.logger.error(`Error getting parameter: ${error}`);
+ return null;
+ }
+ }
+
+ /**
+ * Deletes a RabbitMQ parameter.
+ *
+ * @param name - The parameter name
+ */
+ public async deleteParameter(
+ name: string
+ ): Promise {
+ try {
+ const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`;
+
+ const response = await fetch(url, {
+ method: 'DELETE',
+ headers: {
+ 'Authorization': this.getAuthHeader()
+ }
+ });
+
+ if (!response.ok && response.status !== 404) {
+ const error = await response.text();
+ this.logger.error(`Failed to delete parameter ${name}: ${response.status} - ${error}`);
+ return false;
+ }
+
+ this.logger.info(`Successfully deleted parameter: ${name}`);
+ return true;
+ } catch (error) {
+ this.logger.error(`Error deleting parameter: ${error}`);
+ return false;
+ }
+ }
}
\ No newline at end of file
diff --git a/src/index.ts b/src/index.ts
index 294c286..5ef89b6 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -10,4 +10,6 @@ export {
RabbitMQManagementConfig,
} from "./types";
-export {RunMQLogger} from "./core/logging/RunMQLogger";
\ No newline at end of file
+export {RunMQLogger} from "./core/logging/RunMQLogger";
+
+export {RunMQQueueMetadata} from "./core/management/Policies/RunMQQueueMetadata";
diff --git a/tests/e2e/RunMQMetadataManager.e2e.test.ts b/tests/e2e/RunMQMetadataManager.e2e.test.ts
new file mode 100644
index 0000000..8902836
--- /dev/null
+++ b/tests/e2e/RunMQMetadataManager.e2e.test.ts
@@ -0,0 +1,279 @@
+import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
+import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient";
+import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata";
+import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
+import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample";
+import {RunMQUtils} from "@src/core/utils/RunMQUtils";
+import {METADATA_SCHEMA_VERSION} from "@src/core/management/Policies/RunMQQueueMetadata";
+
+describe('RunMQMetadataManager E2E Tests', () => {
+ const validConfig = RabbitMQManagementConfigExample.valid();
+ let metadataManager: RunMQMetadataManager;
+ let managementClient: RabbitMQManagementClient;
+ const testQueueNames: string[] = [];
+
+ beforeEach(async () => {
+ jest.clearAllMocks();
+ managementClient = new RabbitMQManagementClient(validConfig, MockedRunMQLogger);
+ metadataManager = new RunMQMetadataManager(MockedRunMQLogger, validConfig);
+ await metadataManager.initialize()
+ });
+
+ afterEach(async () => {
+ for (const queueName of testQueueNames) {
+ const paramName = RabbitMQMetadata.getParameterName(queueName);
+ await managementClient.deleteParameter(paramName);
+ }
+ testQueueNames.length = 0;
+ await RunMQUtils.delay(100);
+ });
+
+ describe('Initialization', () => {
+ it('should initialize successfully when management plugin is enabled', async () => {
+ await metadataManager.initialize();
+ expect(metadataManager.isEnabled()).toBe(true);
+ expect(MockedRunMQLogger.info).toHaveBeenCalledWith(
+ "RunMQ metadata storage initialized"
+ );
+ });
+
+ it('should disable when management plugin is not accessible', async () => {
+ const invalidManager = new RunMQMetadataManager(
+ MockedRunMQLogger,
+ RabbitMQManagementConfigExample.invalid()
+ );
+ await invalidManager.initialize();
+
+ expect(invalidManager.isEnabled()).toBe(false);
+ expect(MockedRunMQLogger.warn).toHaveBeenCalledWith(
+ "RabbitMQ management plugin is not enabled - metadata storage disabled"
+ );
+ });
+
+ it('should disable when no management config provided', async () => {
+ const noConfigManager = new RunMQMetadataManager(MockedRunMQLogger);
+ await noConfigManager.initialize();
+
+ expect(noConfigManager.isEnabled()).toBe(false);
+ expect(MockedRunMQLogger.warn).toHaveBeenCalledWith(
+ "Management client not configured - metadata storage disabled"
+ );
+ });
+ });
+
+ describe('Metadata Creation', () => {
+ it('should create metadata parameter for a new queue', async () => {
+ const queueName = `test_queue_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ const result = await metadataManager.apply(queueName, 5);
+
+ expect(result).toBe(true);
+ expect(MockedRunMQLogger.info).toHaveBeenCalledWith(
+ `Created metadata for queue: ${queueName}`
+ );
+
+ // Verify metadata was stored correctly
+ const metadata = await metadataManager.getMetadata(queueName);
+ expect(metadata).not.toBeNull();
+ expect(metadata?.version).toBe(METADATA_SCHEMA_VERSION);
+ expect(metadata?.maxRetries).toBe(5);
+ expect(metadata?.createdAt).toBeDefined();
+ expect(metadata?.updatedAt).toBeUndefined();
+ });
+
+ it('should create metadata with correct maxRetries value', async () => {
+ const queueName = `test_queue_retries_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ await metadataManager.apply(queueName, 10);
+
+ const metadata = await metadataManager.getMetadata(queueName);
+ expect(metadata?.maxRetries).toBe(10);
+ });
+
+ it('should create metadata with valid ISO timestamp', async () => {
+ const queueName = `test_queue_timestamp_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ const beforeCreate = new Date().toISOString();
+ await metadataManager.apply(queueName, 3);
+ const afterCreate = new Date().toISOString();
+
+ const metadata = await metadataManager.getMetadata(queueName);
+ expect(metadata?.createdAt).toBeDefined();
+
+ // Verify the timestamp is within expected range
+ const createdAt = new Date(metadata!.createdAt);
+ expect(createdAt.getTime()).toBeGreaterThanOrEqual(new Date(beforeCreate).getTime() - 1000);
+ expect(createdAt.getTime()).toBeLessThanOrEqual(new Date(afterCreate).getTime() + 1000);
+ });
+ });
+
+ describe('Metadata Update', () => {
+ it('should update metadata while preserving createdAt', async () => {
+ const queueName = `test_queue_update_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ // Create initial metadata
+ await metadataManager.apply(queueName, 5);
+ const initialMetadata = await metadataManager.getMetadata(queueName);
+ const originalCreatedAt = initialMetadata?.createdAt;
+
+ // Wait a bit to ensure different timestamp
+ await RunMQUtils.delay(100);
+
+ // Update metadata
+ const updateResult = await metadataManager.apply(queueName, 10);
+
+ expect(updateResult).toBe(true);
+ expect(MockedRunMQLogger.info).toHaveBeenCalledWith(
+ `Updated metadata for queue: ${queueName}`
+ );
+
+ const updatedMetadata = await metadataManager.getMetadata(queueName);
+ expect(updatedMetadata?.maxRetries).toBe(10);
+ expect(updatedMetadata?.createdAt).toBe(originalCreatedAt);
+ expect(updatedMetadata?.updatedAt).toBeDefined();
+ expect(updatedMetadata?.updatedAt).not.toBe(updatedMetadata?.createdAt);
+ });
+
+ it('should track updatedAt on subsequent updates', async () => {
+ const queueName = `test_queue_multi_update_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ // Create and then update twice
+ await metadataManager.apply(queueName, 3);
+ await RunMQUtils.delay(50);
+ await metadataManager.apply(queueName, 5);
+ const firstUpdate = await metadataManager.getMetadata(queueName);
+ const firstUpdatedAt = firstUpdate?.updatedAt;
+
+ await RunMQUtils.delay(50);
+ await metadataManager.apply(queueName, 7);
+ const secondUpdate = await metadataManager.getMetadata(queueName);
+
+ expect(secondUpdate?.maxRetries).toBe(7);
+ expect(secondUpdate?.updatedAt).toBeDefined();
+ // The updatedAt should be different from the first update
+ expect(new Date(secondUpdate!.updatedAt!).getTime())
+ .toBeGreaterThanOrEqual(new Date(firstUpdatedAt!).getTime());
+ });
+ });
+
+ describe('Metadata Retrieval', () => {
+ it('should retrieve existing metadata', async () => {
+ const queueName = `test_queue_retrieve_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ await metadataManager.apply(queueName, 8);
+
+ const metadata = await metadataManager.getMetadata(queueName);
+
+ expect(metadata).not.toBeNull();
+ expect(metadata?.version).toBe(METADATA_SCHEMA_VERSION);
+ expect(metadata?.maxRetries).toBe(8);
+ });
+
+ it('should return null for non-existent queue metadata', async () => {
+ const metadata = await metadataManager.getMetadata('non_existent_queue_12345');
+
+ expect(metadata).toBeNull();
+ });
+
+ it('should retrieve metadata from custom vhost', async () => {
+ // Note: This test assumes the default vhost is used
+ // In a real environment, you might need to create a custom vhost first
+ const queueName = `test_queue_vhost_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ await metadataManager.apply(queueName, 4);
+
+ const metadata = await metadataManager.getMetadata(queueName);
+
+ expect(metadata).not.toBeNull();
+ expect(metadata?.maxRetries).toBe(4);
+ });
+ });
+
+ describe('Metadata Cleanup', () => {
+ it('should delete metadata for a queue', async () => {
+ const queueName = `test_queue_cleanup_${Date.now()}`;
+ // Don't add to testQueueNames since we're cleaning up in the test
+
+ await metadataManager.apply(queueName, 5);
+
+ // Verify it exists
+ const beforeCleanup = await metadataManager.getMetadata(queueName);
+ expect(beforeCleanup).not.toBeNull();
+
+ // Cleanup
+ await metadataManager.cleanup(queueName);
+ expect(MockedRunMQLogger.info).toHaveBeenCalledWith(
+ `Deleted metadata for queue: ${queueName}`
+ );
+
+ // Verify it's gone
+ const afterCleanup = await metadataManager.getMetadata(queueName);
+ expect(afterCleanup).toBeNull();
+ });
+
+ it('should handle cleanup of non-existent metadata gracefully', async () => {
+ // Should not throw
+ await expect(metadataManager.cleanup('non_existent_queue_cleanup'))
+ .resolves.not.toThrow();
+ });
+ });
+
+ describe('Edge Cases', () => {
+ it('should handle queue names with special characters', async () => {
+ const queueName = `test.queue-with_special.chars_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ const result = await metadataManager.apply(queueName, 3);
+
+ expect(result).toBe(true);
+
+ const metadata = await metadataManager.getMetadata(queueName);
+ expect(metadata?.maxRetries).toBe(3);
+ });
+
+ it('should handle zero retries', async () => {
+ const queueName = `test_queue_zero_retries_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ await metadataManager.apply(queueName, 0);
+
+ const metadata = await metadataManager.getMetadata(queueName);
+ expect(metadata?.maxRetries).toBe(0);
+ });
+
+ it('should handle large retry numbers', async () => {
+ const queueName = `test_queue_large_retries_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ await metadataManager.apply(queueName, 1000);
+
+ const metadata = await metadataManager.getMetadata(queueName);
+ expect(metadata?.maxRetries).toBe(1000);
+ });
+ });
+
+ describe('Integration with RabbitMQ Parameters API', () => {
+ it('should store metadata as RabbitMQ parameter', async () => {
+ const queueName = `test_queue_param_${Date.now()}`;
+ testQueueNames.push(queueName);
+
+ await metadataManager.apply(queueName, 5);
+
+ // Directly verify via management client
+ const paramName = RabbitMQMetadata.getParameterName(queueName);
+ const storedValue = await managementClient.getParameter(paramName);
+
+ expect(storedValue).not.toBeNull();
+ expect((storedValue as any).maxRetries).toBe(5);
+ expect((storedValue as any).version).toBe(METADATA_SCHEMA_VERSION);
+ });
+ });
+});
+
diff --git a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts
index 68c18a0..eaa5200 100644
--- a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts
+++ b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts
@@ -7,9 +7,11 @@ import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator";
import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
import {MockedAMQPClient} from "@tests/mocks/MockedAMQPClient";
import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager";
+import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
import {RunMQException} from "@src/core/exceptions/RunMQException";
jest.mock('@src/core/management/Policies/RunMQTTLPolicyManager');
+jest.mock('@src/core/management/Policies/RunMQMetadataManager');
describe('RunMQConsumerCreator Unit Tests', () => {
const mockedChannel = new MockedAMQPChannel();
@@ -18,6 +20,10 @@ describe('RunMQConsumerCreator Unit Tests', () => {
initialize: jest.fn(),
apply: jest.fn()
};
+ const mockMetadataManager = {
+ initialize: jest.fn(),
+ apply: jest.fn()
+ };
const testProcessorConfig = RunMQProcessorConfigurationExample.random(
'testProcessor',
@@ -41,8 +47,11 @@ describe('RunMQConsumerCreator Unit Tests', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.mocked(RunMQTTLPolicyManager).mockImplementation(() => mockTTLPolicyManager as any);
+ jest.mocked(RunMQMetadataManager).mockImplementation(() => mockMetadataManager as any);
mockTTLPolicyManager.initialize.mockResolvedValue(undefined);
mockTTLPolicyManager.apply.mockResolvedValue(true);
+ mockMetadataManager.initialize.mockResolvedValue(undefined);
+ mockMetadataManager.apply.mockResolvedValue(true);
consumerCreator = new RunMQConsumerCreator(mockedClient, MockedRunMQLogger, undefined);
});
@@ -178,5 +187,68 @@ describe('RunMQConsumerCreator Unit Tests', () => {
);
});
});
+
+ describe('metadata storage', () => {
+ it('should initialize metadata policy manager', async () => {
+ await consumerCreator.createConsumer(testConsumerConfig);
+
+ expect(mockMetadataManager.initialize).toHaveBeenCalled();
+ });
+
+ it('should store metadata with maxRetries from processor config', async () => {
+ await consumerCreator.createConsumer(testConsumerConfig);
+
+ expect(mockMetadataManager.apply).toHaveBeenCalledWith(
+ testProcessorConfig.name,
+ testProcessorConfig.attempts
+ );
+ });
+
+ it('should use default processing attempts when not specified', async () => {
+ const configWithoutAttempts = new ConsumerConfiguration(
+ 'test.topic',
+ {
+ name: 'noAttemptsProcessor',
+ consumersCount: 1
+ },
+ jest.fn()
+ );
+
+ await consumerCreator.createConsumer(configWithoutAttempts);
+
+ expect(mockMetadataManager.apply).toHaveBeenCalledWith(
+ 'noAttemptsProcessor',
+ DEFAULTS.PROCESSING_ATTEMPTS
+ );
+ });
+
+ it('should store metadata with custom maxRetries', async () => {
+ const customAttempts = 15;
+ const configWithCustomAttempts = new ConsumerConfiguration(
+ 'test.topic',
+ {
+ name: 'customAttemptsProcessor',
+ consumersCount: 1,
+ attempts: customAttempts
+ },
+ jest.fn()
+ );
+
+ await consumerCreator.createConsumer(configWithCustomAttempts);
+
+ expect(mockMetadataManager.apply).toHaveBeenCalledWith(
+ 'customAttemptsProcessor',
+ customAttempts
+ );
+ });
+
+ it('should continue consumer creation even if metadata storage fails', async () => {
+ mockMetadataManager.apply.mockResolvedValue(false);
+
+ await consumerCreator.createConsumer(testConsumerConfig);
+
+ expect(mockedChannel.consume).toHaveBeenCalled();
+ });
+ });
});
});
\ No newline at end of file
diff --git a/tests/unit/core/management/RabbitMQMetadata.test.ts b/tests/unit/core/management/RabbitMQMetadata.test.ts
new file mode 100644
index 0000000..6b79335
--- /dev/null
+++ b/tests/unit/core/management/RabbitMQMetadata.test.ts
@@ -0,0 +1,63 @@
+import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata";
+import {Constants} from "@src/core/constants";
+import {METADATA_SCHEMA_VERSION, RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata";
+
+describe('RabbitMQMetadata', () => {
+ const mockDate = '2024-01-15T10:30:00.000Z';
+
+ beforeEach(() => {
+ jest.useFakeTimers();
+ jest.setSystemTime(new Date(mockDate));
+ });
+
+ afterEach(() => {
+ jest.useRealTimers();
+ });
+
+ describe('createMetadataFor', () => {
+ it('should create metadata with correct version', () => {
+ const metadata = RabbitMQMetadata.createMetadataFor(3);
+
+ expect(metadata.version).toBe(METADATA_SCHEMA_VERSION);
+ });
+
+ it('should include maxRetries in metadata', () => {
+ const maxRetries = 10;
+ const metadata = RabbitMQMetadata.createMetadataFor( maxRetries);
+
+ expect(metadata.maxRetries).toBe(maxRetries);
+ });
+
+ it('should set createdAt when no existing metadata provided', () => {
+ const metadata = RabbitMQMetadata.createMetadataFor( 3);
+
+ expect(metadata.createdAt).toBe(mockDate);
+ expect(metadata.updatedAt).toBeUndefined();
+ });
+
+ it('should preserve createdAt and set updatedAt when existing metadata provided', () => {
+ const existingCreatedAt = '2023-01-01T00:00:00.000Z';
+ const existingMetadata: RunMQQueueMetadata = {
+ version: 0,
+ maxRetries: 3,
+ createdAt: existingCreatedAt
+ };
+
+ const metadata = RabbitMQMetadata.createMetadataFor( 5, existingMetadata);
+
+ expect(metadata.createdAt).toBe(existingCreatedAt);
+ expect(metadata.updatedAt).toBe(mockDate);
+ expect(metadata.maxRetries).toBe(5);
+ });
+ });
+
+ describe('getParameterName', () => {
+ it('should return correct parameter name', () => {
+ const queueName = 'test_processor';
+ const paramName = RabbitMQMetadata.getParameterName(queueName);
+
+ expect(paramName).toBe(Constants.METADATA_STORE_PREFIX + queueName);
+ });
+ });
+});
+
diff --git a/tests/unit/core/management/RunMQMetadataManager.test.ts b/tests/unit/core/management/RunMQMetadataManager.test.ts
new file mode 100644
index 0000000..4ae3b84
--- /dev/null
+++ b/tests/unit/core/management/RunMQMetadataManager.test.ts
@@ -0,0 +1,271 @@
+import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
+import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient";
+import {RunMQConsoleLogger} from "@src/core/logging/RunMQConsoleLogger";
+import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample";
+import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata";
+import {METADATA_SCHEMA_VERSION} from "@src/core/management/Policies/RunMQQueueMetadata";
+
+jest.mock("@src/core/management/RabbitMQManagementClient");
+
+describe('RunMQMetadataManager', () => {
+ let metadataManager: RunMQMetadataManager;
+ let logger: RunMQConsoleLogger;
+ let mockManagementClient: jest.Mocked;
+
+ const mockDate = '2024-01-15T10:30:00.000Z';
+
+ beforeEach(() => {
+ jest.useFakeTimers();
+ jest.setSystemTime(new Date(mockDate));
+
+ logger = new RunMQConsoleLogger();
+ jest.spyOn(logger, 'info').mockImplementation();
+ jest.spyOn(logger, 'warn').mockImplementation();
+ jest.spyOn(logger, 'error').mockImplementation();
+
+ mockManagementClient = new RabbitMQManagementClient(
+ RabbitMQManagementConfigExample.valid(),
+ logger
+ ) as jest.Mocked;
+ });
+
+ afterEach(() => {
+ jest.useRealTimers();
+ jest.clearAllMocks();
+ });
+
+ describe('without management config', () => {
+ beforeEach(() => {
+ metadataManager = new RunMQMetadataManager(logger);
+ });
+
+ it('should initialize without management client', async () => {
+ await metadataManager.initialize();
+
+ expect(logger.warn).toHaveBeenCalledWith(
+ "Management client not configured - metadata storage disabled"
+ );
+ });
+
+ it('should return false when applying metadata without management config', async () => {
+ await metadataManager.initialize();
+ const result = await metadataManager.apply('test-queue', 5);
+
+ expect(result).toBe(false);
+ });
+
+ it('should return null when getting metadata without management config', async () => {
+ await metadataManager.initialize();
+ const result = await metadataManager.getMetadata('test-queue');
+
+ expect(result).toBeNull();
+ });
+
+ it('should report as disabled', async () => {
+ await metadataManager.initialize();
+ expect(metadataManager.isEnabled()).toBe(false);
+ });
+ });
+
+ describe('with management config', () => {
+ beforeEach(() => {
+ metadataManager = new RunMQMetadataManager(
+ logger,
+ RabbitMQManagementConfigExample.valid()
+ );
+ (metadataManager as any).managementClient = mockManagementClient;
+ });
+
+ describe('initialize', () => {
+ it('should check if management plugin is enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+
+ await metadataManager.initialize();
+
+ expect(mockManagementClient.checkManagementPluginEnabled).toHaveBeenCalled();
+ expect(logger.info).toHaveBeenCalledWith("RunMQ metadata storage initialized");
+ });
+
+ it('should log warning when management plugin is not enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false);
+
+ await metadataManager.initialize();
+
+ expect(logger.warn).toHaveBeenCalledWith(
+ "RabbitMQ management plugin is not enabled - metadata storage disabled"
+ );
+ });
+ });
+
+ describe('apply', () => {
+ it('should create metadata parameter for new queue', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.getParameter.mockResolvedValue(null);
+ mockManagementClient.setParameter.mockResolvedValue(true);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.apply('test_queue', 5);
+
+ expect(result).toBe(true);
+ expect(mockManagementClient.setParameter).toHaveBeenCalledWith(
+ RabbitMQMetadata.getParameterName('test_queue'),
+ expect.objectContaining({
+ version: METADATA_SCHEMA_VERSION,
+ maxRetries: 5,
+ createdAt: mockDate
+ })
+ );
+ expect(logger.info).toHaveBeenCalledWith("Created metadata for queue: test_queue");
+ });
+
+ it('should update existing metadata preserving createdAt', async () => {
+ const existingCreatedAt = '2023-01-01T00:00:00.000Z';
+ const existingMetadata = {
+ version: 0,
+ maxRetries: 3,
+ createdAt: existingCreatedAt
+ };
+
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.getParameter.mockResolvedValue(existingMetadata);
+ mockManagementClient.setParameter.mockResolvedValue(true);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.apply('test_queue', 10);
+
+ expect(result).toBe(true);
+ expect(mockManagementClient.setParameter).toHaveBeenCalledWith(
+ RabbitMQMetadata.getParameterName('test_queue'),
+ expect.objectContaining({
+ maxRetries: 10,
+ createdAt: existingCreatedAt,
+ updatedAt: mockDate
+ })
+ );
+ expect(logger.info).toHaveBeenCalledWith("Updated metadata for queue: test_queue");
+ });
+
+ it('should return false when management plugin is not enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.apply('test_queue', 5);
+
+ expect(result).toBe(false);
+ expect(logger.warn).toHaveBeenCalledWith(
+ expect.stringContaining("Cannot store metadata for queue 'test_queue'")
+ );
+ });
+
+ it('should return false when parameter creation fails', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.getParameter.mockResolvedValue(null);
+ mockManagementClient.setParameter.mockResolvedValue(false);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.apply('test_queue', 5);
+
+ expect(result).toBe(false);
+ expect(logger.error).toHaveBeenCalledWith(
+ "Failed to store metadata for queue: test_queue"
+ );
+ });
+
+ it('should use custom vhost when provided', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.getParameter.mockResolvedValue(null);
+ mockManagementClient.setParameter.mockResolvedValue(true);
+
+ await metadataManager.initialize();
+ await metadataManager.apply('test_queue', 5);
+
+ expect(mockManagementClient.setParameter).toHaveBeenCalledWith(
+ expect.any(String),
+ expect.any(Object)
+ );
+ });
+ });
+
+ describe('getMetadata', () => {
+ it('should return metadata when parameter exists', async () => {
+ const expectedMetadata = {
+ version: 0,
+ maxRetries: 5,
+ createdAt: mockDate
+ };
+
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.getParameter.mockResolvedValue(expectedMetadata);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.getMetadata('test_queue');
+
+ expect(result).toEqual(expectedMetadata);
+ });
+
+ it('should return null when parameter does not exist', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.getParameter.mockResolvedValue(null);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.getMetadata('test_queue');
+
+ expect(result).toBeNull();
+ });
+
+ it('should return null when management plugin is not enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false);
+
+ await metadataManager.initialize();
+ const result = await metadataManager.getMetadata('test_queue');
+
+ expect(result).toBeNull();
+ });
+ });
+
+ describe('cleanup', () => {
+ it('should delete metadata parameter', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+ mockManagementClient.deleteParameter.mockResolvedValue(true);
+
+ await metadataManager.initialize();
+ await metadataManager.cleanup('test_queue');
+
+ expect(mockManagementClient.deleteParameter).toHaveBeenCalledWith(
+ RabbitMQMetadata.getParameterName('test_queue')
+ );
+ expect(logger.info).toHaveBeenCalledWith(
+ "Deleted metadata for queue: test_queue"
+ );
+ });
+
+ it('should not call delete when management plugin is not enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false);
+
+ await metadataManager.initialize();
+ await metadataManager.cleanup('test_queue');
+
+ expect(mockManagementClient.deleteParameter).not.toHaveBeenCalled();
+ });
+ });
+
+ describe('isEnabled', () => {
+ it('should return true when management plugin is enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true);
+
+ await metadataManager.initialize();
+
+ expect(metadataManager.isEnabled()).toBe(true);
+ });
+
+ it('should return false when management plugin is not enabled', async () => {
+ mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false);
+
+ await metadataManager.initialize();
+
+ expect(metadataManager.isEnabled()).toBe(false);
+ });
+ });
+ });
+});
+