From 9f99f089c76a00ceba7266304e49ce1644e86c07 Mon Sep 17 00:00:00 2001 From: Fawzi Essam Date: Sat, 28 Feb 2026 01:13:40 +0200 Subject: [PATCH 1/3] Feat(#17): Adding Metadata manager and update consumer creator to store max retries Signed-off-by: Fawzi Essam --- README.md | 21 +- src/core/constants/index.ts | 1 + src/core/consumer/RunMQConsumerCreator.ts | 13 + .../management/Policies/RabbitMQMetadata.ts | 36 +++ .../Policies/RunMQMetadataManager.ts | 137 +++++++++ .../management/Policies/RunMQQueueMetadata.ts | 34 +++ .../management/RabbitMQManagementClient.ts | 112 ++++++- src/index.ts | 4 +- tests/e2e/RunMQMetadataManager.e2e.test.ts | 279 ++++++++++++++++++ .../consumer/RunMQConsumerCreator.test.ts | 72 +++++ .../core/management/RabbitMQMetadata.test.ts | 63 ++++ .../management/RunMQMetadataManager.test.ts | 271 +++++++++++++++++ 12 files changed, 1036 insertions(+), 7 deletions(-) create mode 100644 src/core/management/Policies/RabbitMQMetadata.ts create mode 100644 src/core/management/Policies/RunMQMetadataManager.ts create mode 100644 src/core/management/Policies/RunMQQueueMetadata.ts create mode 100644 tests/e2e/RunMQMetadataManager.e2e.test.ts create mode 100644 tests/unit/core/management/RabbitMQMetadata.test.ts create mode 100644 tests/unit/core/management/RunMQMetadataManager.test.ts diff --git a/README.md b/README.md index c9af641..13cc547 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Whether you’re running background jobs, designing an event-driven ar - **Isolated Queues per Processor**: Each processor gets its own dedicated queue and DLQ, ensuring full isolation and predictable behavior across services. - **Schema Validation**: Optional JSON Schema validation powered by AJV for safer message handling and data integrity. - **Concurrent Consumers**: Scale either horizontally (multiple instances) or vertically (multiple consumers per queue, leveraging RabbitMQ prefetch) to maximize throughput and efficiency. -- **RabbitMQ Durability & Acknowledgements**: Leverages RabbitMQ’s persistent storage and acknowledgment model to guarantee at-least-once delivery, even across restarts and failures. +- **RabbitMQ Durability & Acknowledgements**: Leverages RabbitMQ's persistent storage and acknowledgment model to guarantee at-least-once delivery, even across restarts and failures. - **Custom Logging**: Plug in your own logger or use the default console logger for full control over message visibility. ## Installation @@ -212,7 +212,24 @@ RunMQ can leverage RabbitMQ policies to manage the delay between attempts, it's #### Benefits - Flexible and easy management of retry delays - Reduces operational overhead -- Fully compatible with RunMQ’s retry and DLQ mechanisms +- Fully compatible with RunMQ's retry and DLQ mechanisms + +### Queue Metadata Storage + +RunMQ automatically stores queue metadata (such as max retries and creation timestamp) using RabbitMQ's parameters API. This enables external tools and dashboards to discover RunMQ-managed queues and understand their configuration without direct access to the application code. + +When a processor is configured, RunMQ creates a metadata parameter that stores: +- **Version**: Schema version for future-proof migrations. +- **Max Retries**: The configured retry limit for the queue. +- **Created At**: ISO 8601 timestamp when the queue was first configured. +- **Updated At**: ISO 8601 timestamp when the configuration was last changed (if applicable). + +#### Benefits +- **Dashboard Integration**: External monitoring tools and dashboards can query RabbitMQ's management API to retrieve queue metadata and display topology information (e.g., "10 retries with 5s delay, then to DLQ"). +- **Self-Documenting Queues**: Queue configurations are discoverable directly from RabbitMQ, without needing access to application source code. +- **Automatic Updates**: When processor configuration changes, metadata is automatically updated while preserving the original creation timestamp. + +> **Note**: This feature requires RabbitMQ Management Plugin to be enabled for external tools to query the metadata parameters and for the parameters to be set. ### Custom Logger diff --git a/src/core/constants/index.ts b/src/core/constants/index.ts index eda2e29..9270943 100644 --- a/src/core/constants/index.ts +++ b/src/core/constants/index.ts @@ -5,6 +5,7 @@ export const Constants = { RETRY_DELAY_QUEUE_PREFIX: RUNMQ_PREFIX + "retry_delay_", DLQ_QUEUE_PREFIX: RUNMQ_PREFIX + "dlq_", MESSAGE_TTL_OPERATOR_POLICY_PREFIX: RUNMQ_PREFIX + "message_ttl_operator_policy", + METADATA_STORE_PREFIX: RUNMQ_PREFIX + "metadata_", } export const DEFAULTS = { diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index 224301f..f01ecb3 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -15,11 +15,13 @@ import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator"; import {AMQPChannel, AMQPClient, RabbitMQManagementConfig} from "@src/types"; import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; +import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager"; import {RunMQException} from "@src/core/exceptions/RunMQException"; import {Exceptions} from "@src/core/exceptions/Exceptions"; export class RunMQConsumerCreator { private ttlPolicyManager: RunMQTTLPolicyManager; + private metadataManager: RunMQMetadataManager; constructor( private client: AMQPClient, @@ -27,17 +29,28 @@ export class RunMQConsumerCreator { managementConfig?: RabbitMQManagementConfig ) { this.ttlPolicyManager = new RunMQTTLPolicyManager(logger, managementConfig); + this.metadataManager = new RunMQMetadataManager(logger, managementConfig); } public async createConsumer(consumerConfiguration: ConsumerConfiguration) { await this.ttlPolicyManager.initialize(); + await this.metadataManager.initialize(); await this.assertQueues(consumerConfiguration); await this.bindQueues(consumerConfiguration); + await this.storeMetadata(consumerConfiguration); for (let i = 0; i < consumerConfiguration.processorConfig.consumersCount; i++) { await this.runProcessor(consumerConfiguration); } } + private async storeMetadata(consumerConfiguration: ConsumerConfiguration): Promise { + const maxRetries = consumerConfiguration.processorConfig.attempts ?? DEFAULTS.PROCESSING_ATTEMPTS; + + await this.metadataManager.apply( + consumerConfiguration.processorConfig.name, + maxRetries + ); + } private async runProcessor(consumerConfiguration: ConsumerConfiguration): Promise { const consumerChannel = await this.getProcessorChannel(); diff --git a/src/core/management/Policies/RabbitMQMetadata.ts b/src/core/management/Policies/RabbitMQMetadata.ts new file mode 100644 index 0000000..92d626d --- /dev/null +++ b/src/core/management/Policies/RabbitMQMetadata.ts @@ -0,0 +1,36 @@ +import {Constants} from "@src/core/constants"; +import {METADATA_SCHEMA_VERSION, RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata"; + +/** + * Creates metadata objects for storing queue configuration. + * Uses RabbitMQ parameters API for storage, which allows custom JSON data. + */ +export class RabbitMQMetadata { + /** + * Creates metadata for a queue. + * @param maxRetries - Maximum retry attempts configured for the queue + * @param existingMetadata - Optional existing metadata to preserve createdAt + * @returns RunMQQueueMetadata object + */ + static createMetadataFor( + maxRetries: number, + existingMetadata?: RunMQQueueMetadata + ): RunMQQueueMetadata { + const now = new Date().toISOString(); + + return { + version: METADATA_SCHEMA_VERSION, + maxRetries, + createdAt: existingMetadata?.createdAt ?? now, + ...(existingMetadata ? { updatedAt: now } : {}) + }; + } + + /** + * Gets the parameter name for a given queue. + */ + static getParameterName(queueName: string): string { + return Constants.METADATA_STORE_PREFIX + queueName; + } +} + diff --git a/src/core/management/Policies/RunMQMetadataManager.ts b/src/core/management/Policies/RunMQMetadataManager.ts new file mode 100644 index 0000000..33d776a --- /dev/null +++ b/src/core/management/Policies/RunMQMetadataManager.ts @@ -0,0 +1,137 @@ +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {RunMQLogger} from "@src/core/logging/RunMQLogger"; +import {RabbitMQManagementConfig} from "@src"; +import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata"; +import {RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata"; + +/** + * Manages metadata for RunMQ queues. + * Stores queue configuration metadata using RabbitMQ parameters API. + */ +export class RunMQMetadataManager { + private readonly managementClient: RabbitMQManagementClient | null = null; + private isManagementPluginEnabled = false; + + constructor( + private logger: RunMQLogger, + private managementConfig?: RabbitMQManagementConfig + ) { + if (this.managementConfig) { + this.managementClient = new RabbitMQManagementClient(this.managementConfig, this.logger); + } + } + + /** + * Initialize the manager by checking if management plugin is available. + */ + public async initialize(): Promise { + if (!this.managementClient) { + this.logger.warn("Management client not configured - metadata storage disabled"); + return; + } + + this.isManagementPluginEnabled = await this.managementClient.checkManagementPluginEnabled(); + + if (!this.isManagementPluginEnabled) { + this.logger.warn("RabbitMQ management plugin is not enabled - metadata storage disabled"); + } else { + this.logger.info("RunMQ metadata storage initialized"); + } + } + + /** + * Store or update metadata for a queue. + * If metadata already exists, preserves createdAt and sets updatedAt. + * + * @param queueName - The name of the queue + * @param maxRetries - Maximum retry attempts + * @param vhost - Virtual host (default: %2F for /) + * @returns true if metadata was stored successfully, false otherwise + */ + public async apply( + queueName: string, + maxRetries: number, + vhost: string = "%2F" + ): Promise { + if (!this.isManagementPluginEnabled || !this.managementClient) { + this.logger.warn(`Cannot store metadata for queue '${queueName}' - management plugin not available`); + return false; + } + + try { + const existingMetadata = await this.getMetadata(queueName); + + const metadata = RabbitMQMetadata.createMetadataFor( + maxRetries, + existingMetadata ?? undefined + ); + + const paramName = RabbitMQMetadata.getParameterName(queueName); + + const success = await this.managementClient.setParameter( + paramName, + metadata + ); + + if (success) { + const action = existingMetadata ? "Updated" : "Created"; + this.logger.info(`${action} metadata for queue: ${queueName}`); + return true; + } + + this.logger.error(`Failed to store metadata for queue: ${queueName}`); + return false; + } catch (error) { + this.logger.error(`Error storing metadata for queue ${queueName}: ${error}`); + return false; + } + } + + /** + * Get metadata for a queue. + * + * @param queueName - The name of the queue + * @returns The queue metadata or null if not found + */ + public async getMetadata( + queueName: string, + ): Promise { + if (!this.isManagementPluginEnabled || !this.managementClient) { + return null; + } + + try { + const paramName = RabbitMQMetadata.getParameterName(queueName); + + return await this.managementClient.getParameter( + paramName + ); + } catch (error) { + this.logger.warn(`Failed to get metadata for queue ${queueName}: ${error}`); + return null; + } + } + + /** + * Delete metadata for a queue. + * + * @param queueName - The name of the queue + */ + public async cleanup(queueName: string): Promise { + if (!this.isManagementPluginEnabled || !this.managementClient) { + return; + } + + const paramName = RabbitMQMetadata.getParameterName(queueName); + await this.managementClient.deleteParameter(paramName); + this.logger.info(`Deleted metadata for queue: ${queueName}`); + } + + /** + * Check if management plugin is enabled and metadata storage is available. + */ + public isEnabled(): boolean { + return this.isManagementPluginEnabled; + } +} + diff --git a/src/core/management/Policies/RunMQQueueMetadata.ts b/src/core/management/Policies/RunMQQueueMetadata.ts new file mode 100644 index 0000000..ba508f8 --- /dev/null +++ b/src/core/management/Policies/RunMQQueueMetadata.ts @@ -0,0 +1,34 @@ +/** + * Metadata stored for each RunMQ-managed queue. + * This metadata is stored as a RabbitMQ operator policy with lowest priority. + */ +export interface RunMQQueueMetadata { + /** + * Schema version of the metadata object. + * Used for future migrations if metadata structure changes. + */ + version: number; + + /** + * Maximum number of retry attempts before message goes to DLQ. + */ + maxRetries: number; + + /** + * ISO 8601 timestamp when the metadata was first created. + */ + createdAt: string; + + /** + * ISO 8601 timestamp when the metadata was last updated. + * Only present if metadata has been updated after creation. + */ + updatedAt?: string; +} + +/** + * Current version of the metadata schema. + * Increment this when making breaking changes to RunMQQueueMetadata. + */ +export const METADATA_SCHEMA_VERSION = 0; + diff --git a/src/core/management/RabbitMQManagementClient.ts b/src/core/management/RabbitMQManagementClient.ts index 597a77b..b040b68 100644 --- a/src/core/management/RabbitMQManagementClient.ts +++ b/src/core/management/RabbitMQManagementClient.ts @@ -16,7 +16,7 @@ export class RabbitMQManagementClient { public async createOrUpdateOperatorPolicy(vhost: string, policy: RabbitMQOperatorPolicy): Promise { try { const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policy.name)}`; - + const response = await fetch(url, { method: 'PUT', headers: { @@ -48,7 +48,7 @@ export class RabbitMQManagementClient { public async getOperatorPolicy(vhost: string, policyName: string): Promise { try { const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`; - + const response = await fetch(url, { method: 'GET', headers: { @@ -75,7 +75,7 @@ export class RabbitMQManagementClient { public async deleteOperatorPolicy(vhost: string, policyName: string): Promise { try { const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`; - + const response = await fetch(url, { method: 'DELETE', headers: { @@ -100,7 +100,7 @@ export class RabbitMQManagementClient { public async checkManagementPluginEnabled(): Promise { try { const url = `${this.config.url}/api/overview`; - + const response = await fetch(url, { method: 'GET', headers: { @@ -114,4 +114,108 @@ export class RabbitMQManagementClient { return false; } } + + /** + * Creates or updates a RabbitMQ parameter. + * Parameters are custom key-value stores that can hold any JSON data. + * + * @param name - The parameter name + * @param value - The parameter value (any JSON-serializable object) + */ + public async setParameter( + name: string, + value: T + ): Promise { + try { + const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`; + + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Authorization': this.getAuthHeader() + }, + body: JSON.stringify({value}) + }); + + if (!response.ok) { + const error = await response.text(); + this.logger.error(`Failed to set parameter ${name}: ${response.status} - ${error}`); + return false; + } + + this.logger.info(`Successfully set parameter: ${name}`); + return true; + } catch (error) { + this.logger.error(`Error setting parameter: ${error}`); + return false; + } + } + + /** + * Gets a RabbitMQ parameter. + * + * @param name - The parameter name + */ + public async getParameter( + name: string + ): Promise { + try { + const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': this.getAuthHeader() + } + }); + + if (!response.ok) { + if (response.status === 404) { + return null; + } + const error = await response.text(); + this.logger.error(`Failed to get parameter ${name}: ${response.status} - ${error}`); + return null; + } + + const data = await response.json(); + return data.value as T; + } catch (error) { + this.logger.error(`Error getting parameter: ${error}`); + return null; + } + } + + /** + * Deletes a RabbitMQ parameter. + * + * @param name - The parameter name + */ + public async deleteParameter( + name: string + ): Promise { + try { + const url = `${this.config.url}/api/global-parameters/${encodeURIComponent(name)}`; + + const response = await fetch(url, { + method: 'DELETE', + headers: { + 'Authorization': this.getAuthHeader() + } + }); + + if (!response.ok && response.status !== 404) { + const error = await response.text(); + this.logger.error(`Failed to delete parameter ${name}: ${response.status} - ${error}`); + return false; + } + + this.logger.info(`Successfully deleted parameter: ${name}`); + return true; + } catch (error) { + this.logger.error(`Error deleting parameter: ${error}`); + return false; + } + } } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 294c286..5ef89b6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,4 +10,6 @@ export { RabbitMQManagementConfig, } from "./types"; -export {RunMQLogger} from "./core/logging/RunMQLogger"; \ No newline at end of file +export {RunMQLogger} from "./core/logging/RunMQLogger"; + +export {RunMQQueueMetadata} from "./core/management/Policies/RunMQQueueMetadata"; diff --git a/tests/e2e/RunMQMetadataManager.e2e.test.ts b/tests/e2e/RunMQMetadataManager.e2e.test.ts new file mode 100644 index 0000000..077b7d2 --- /dev/null +++ b/tests/e2e/RunMQMetadataManager.e2e.test.ts @@ -0,0 +1,279 @@ +import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager"; +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {METADATA_SCHEMA_VERSION} from "@src/core/management/Policies/RunMQQueueMetadata"; + +describe('RunMQMetadataManager E2E Tests', () => { + const validConfig = RabbitMQManagementConfigExample.valid(); + let metadataManager: RunMQMetadataManager; + let managementClient: RabbitMQManagementClient; + const testQueueNames: string[] = []; + + beforeEach(async () => { + jest.clearAllMocks(); + managementClient = new RabbitMQManagementClient(validConfig, MockedRunMQLogger); + metadataManager = new RunMQMetadataManager(MockedRunMQLogger, validConfig); + await metadataManager.initialize() + }); + + afterEach(async () => { + for (const queueName of testQueueNames) { + const paramName = RabbitMQMetadata.getParameterName(queueName); + await managementClient.deleteParameter(paramName); + } + testQueueNames.length = 0; + await RunMQUtils.delay(100); + }); + + describe('Initialization', () => { + it('should initialize successfully when management plugin is enabled', async () => { + await metadataManager.initialize(); + expect(metadataManager.isEnabled()).toBe(true); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + "RunMQ metadata storage initialized" + ); + }); + + it('should disable when management plugin is not accessible', async () => { + const invalidManager = new RunMQMetadataManager( + MockedRunMQLogger, + RabbitMQManagementConfigExample.invalid() + ); + await invalidManager.initialize(); + + expect(invalidManager.isEnabled()).toBe(false); + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + "RabbitMQ management plugin is not enabled - metadata storage disabled" + ); + }); + + it('should disable when no management config provided', async () => { + const noConfigManager = new RunMQMetadataManager(MockedRunMQLogger); + await noConfigManager.initialize(); + + expect(noConfigManager.isEnabled()).toBe(false); + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + "Management client not configured - metadata storage disabled" + ); + }); + }); + + describe('Metadata Creation', () => { + it('should create metadata parameter for a new queue', async () => { + const queueName = `test_queue_${Date.now()}`; + testQueueNames.push(queueName); + + const result = await metadataManager.apply(queueName, 5); + + expect(result).toBe(true); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + `Created metadata for queue: ${queueName}` + ); + + // Verify metadata was stored correctly + const metadata = await metadataManager.getMetadata(queueName); + expect(metadata).not.toBeNull(); + expect(metadata?.version).toBe(METADATA_SCHEMA_VERSION); + expect(metadata?.maxRetries).toBe(5); + expect(metadata?.createdAt).toBeDefined(); + expect(metadata?.updatedAt).toBeUndefined(); + }); + + it('should create metadata with correct maxRetries value', async () => { + const queueName = `test_queue_retries_${Date.now()}`; + testQueueNames.push(queueName); + + await metadataManager.apply(queueName, 10); + + const metadata = await metadataManager.getMetadata(queueName); + expect(metadata?.maxRetries).toBe(10); + }); + + it('should create metadata with valid ISO timestamp', async () => { + const queueName = `test_queue_timestamp_${Date.now()}`; + testQueueNames.push(queueName); + + const beforeCreate = new Date().toISOString(); + await metadataManager.apply(queueName, 3); + const afterCreate = new Date().toISOString(); + + const metadata = await metadataManager.getMetadata(queueName); + expect(metadata?.createdAt).toBeDefined(); + + // Verify the timestamp is within expected range + const createdAt = new Date(metadata!.createdAt); + expect(createdAt.getTime()).toBeGreaterThanOrEqual(new Date(beforeCreate).getTime() - 1000); + expect(createdAt.getTime()).toBeLessThanOrEqual(new Date(afterCreate).getTime() + 1000); + }); + }); + + describe('Metadata Update', () => { + it('should update metadata while preserving createdAt', async () => { + const queueName = `test_queue_update_${Date.now()}`; + testQueueNames.push(queueName); + + // Create initial metadata + await metadataManager.apply(queueName, 5); + const initialMetadata = await metadataManager.getMetadata(queueName); + const originalCreatedAt = initialMetadata?.createdAt; + + // Wait a bit to ensure different timestamp + await RunMQUtils.delay(100); + + // Update metadata + const updateResult = await metadataManager.apply(queueName, 10); + + expect(updateResult).toBe(true); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + `Updated metadata for queue: ${queueName}` + ); + + const updatedMetadata = await metadataManager.getMetadata(queueName); + expect(updatedMetadata?.maxRetries).toBe(10); + expect(updatedMetadata?.createdAt).toBe(originalCreatedAt); + expect(updatedMetadata?.updatedAt).toBeDefined(); + expect(updatedMetadata?.updatedAt).not.toBe(updatedMetadata?.createdAt); + }); + + it('should track updatedAt on subsequent updates', async () => { + const queueName = `test_queue_multi_update_${Date.now()}`; + testQueueNames.push(queueName); + + // Create and then update twice + await metadataManager.apply(queueName, 3); + await RunMQUtils.delay(50); + await metadataManager.apply(queueName, 5); + const firstUpdate = await metadataManager.getMetadata(queueName); + const firstUpdatedAt = firstUpdate?.updatedAt; + + await RunMQUtils.delay(50); + await metadataManager.apply(queueName, 7); + const secondUpdate = await metadataManager.getMetadata(queueName); + + expect(secondUpdate?.maxRetries).toBe(7); + expect(secondUpdate?.updatedAt).toBeDefined(); + // The updatedAt should be different from the first update + expect(new Date(secondUpdate!.updatedAt!).getTime()) + .toBeGreaterThanOrEqual(new Date(firstUpdatedAt!).getTime()); + }); + }); + + describe('Metadata Retrieval', () => { + it('should retrieve existing metadata', async () => { + const queueName = `test_queue_retrieve_${Date.now()}`; + testQueueNames.push(queueName); + + await metadataManager.apply(queueName, 8); + + const metadata = await metadataManager.getMetadata(queueName); + + expect(metadata).not.toBeNull(); + expect(metadata?.version).toBe(METADATA_SCHEMA_VERSION); + expect(metadata?.maxRetries).toBe(8); + }); + + it('should return null for non-existent queue metadata', async () => { + const metadata = await metadataManager.getMetadata('non_existent_queue_12345'); + + expect(metadata).toBeNull(); + }); + + it('should retrieve metadata from custom vhost', async () => { + // Note: This test assumes the default vhost is used + // In a real environment, you might need to create a custom vhost first + const queueName = `test_queue_vhost_${Date.now()}`; + testQueueNames.push(queueName); + + await metadataManager.apply(queueName, 4, "%2F"); + + const metadata = await metadataManager.getMetadata(queueName); + + expect(metadata).not.toBeNull(); + expect(metadata?.maxRetries).toBe(4); + }); + }); + + describe('Metadata Cleanup', () => { + it('should delete metadata for a queue', async () => { + const queueName = `test_queue_cleanup_${Date.now()}`; + // Don't add to testQueueNames since we're cleaning up in the test + + await metadataManager.apply(queueName, 5); + + // Verify it exists + const beforeCleanup = await metadataManager.getMetadata(queueName); + expect(beforeCleanup).not.toBeNull(); + + // Cleanup + await metadataManager.cleanup(queueName); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + `Deleted metadata for queue: ${queueName}` + ); + + // Verify it's gone + const afterCleanup = await metadataManager.getMetadata(queueName); + expect(afterCleanup).toBeNull(); + }); + + it('should handle cleanup of non-existent metadata gracefully', async () => { + // Should not throw + await expect(metadataManager.cleanup('non_existent_queue_cleanup')) + .resolves.not.toThrow(); + }); + }); + + describe('Edge Cases', () => { + it('should handle queue names with special characters', async () => { + const queueName = `test.queue-with_special.chars_${Date.now()}`; + testQueueNames.push(queueName); + + const result = await metadataManager.apply(queueName, 3); + + expect(result).toBe(true); + + const metadata = await metadataManager.getMetadata(queueName); + expect(metadata?.maxRetries).toBe(3); + }); + + it('should handle zero retries', async () => { + const queueName = `test_queue_zero_retries_${Date.now()}`; + testQueueNames.push(queueName); + + await metadataManager.apply(queueName, 0); + + const metadata = await metadataManager.getMetadata(queueName); + expect(metadata?.maxRetries).toBe(0); + }); + + it('should handle large retry numbers', async () => { + const queueName = `test_queue_large_retries_${Date.now()}`; + testQueueNames.push(queueName); + + await metadataManager.apply(queueName, 1000); + + const metadata = await metadataManager.getMetadata(queueName); + expect(metadata?.maxRetries).toBe(1000); + }); + }); + + describe('Integration with RabbitMQ Parameters API', () => { + it('should store metadata as RabbitMQ parameter', async () => { + const queueName = `test_queue_param_${Date.now()}`; + testQueueNames.push(queueName); + + await metadataManager.apply(queueName, 5); + + // Directly verify via management client + const paramName = RabbitMQMetadata.getParameterName(queueName); + const storedValue = await managementClient.getParameter(paramName); + + expect(storedValue).not.toBeNull(); + expect((storedValue as any).maxRetries).toBe(5); + expect((storedValue as any).version).toBe(METADATA_SCHEMA_VERSION); + }); + }); +}); + diff --git a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts index 68c18a0..eaa5200 100644 --- a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts +++ b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts @@ -7,9 +7,11 @@ import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator"; import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {MockedAMQPClient} from "@tests/mocks/MockedAMQPClient"; import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; +import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager"; import {RunMQException} from "@src/core/exceptions/RunMQException"; jest.mock('@src/core/management/Policies/RunMQTTLPolicyManager'); +jest.mock('@src/core/management/Policies/RunMQMetadataManager'); describe('RunMQConsumerCreator Unit Tests', () => { const mockedChannel = new MockedAMQPChannel(); @@ -18,6 +20,10 @@ describe('RunMQConsumerCreator Unit Tests', () => { initialize: jest.fn(), apply: jest.fn() }; + const mockMetadataManager = { + initialize: jest.fn(), + apply: jest.fn() + }; const testProcessorConfig = RunMQProcessorConfigurationExample.random( 'testProcessor', @@ -41,8 +47,11 @@ describe('RunMQConsumerCreator Unit Tests', () => { beforeEach(() => { jest.clearAllMocks(); jest.mocked(RunMQTTLPolicyManager).mockImplementation(() => mockTTLPolicyManager as any); + jest.mocked(RunMQMetadataManager).mockImplementation(() => mockMetadataManager as any); mockTTLPolicyManager.initialize.mockResolvedValue(undefined); mockTTLPolicyManager.apply.mockResolvedValue(true); + mockMetadataManager.initialize.mockResolvedValue(undefined); + mockMetadataManager.apply.mockResolvedValue(true); consumerCreator = new RunMQConsumerCreator(mockedClient, MockedRunMQLogger, undefined); }); @@ -178,5 +187,68 @@ describe('RunMQConsumerCreator Unit Tests', () => { ); }); }); + + describe('metadata storage', () => { + it('should initialize metadata policy manager', async () => { + await consumerCreator.createConsumer(testConsumerConfig); + + expect(mockMetadataManager.initialize).toHaveBeenCalled(); + }); + + it('should store metadata with maxRetries from processor config', async () => { + await consumerCreator.createConsumer(testConsumerConfig); + + expect(mockMetadataManager.apply).toHaveBeenCalledWith( + testProcessorConfig.name, + testProcessorConfig.attempts + ); + }); + + it('should use default processing attempts when not specified', async () => { + const configWithoutAttempts = new ConsumerConfiguration( + 'test.topic', + { + name: 'noAttemptsProcessor', + consumersCount: 1 + }, + jest.fn() + ); + + await consumerCreator.createConsumer(configWithoutAttempts); + + expect(mockMetadataManager.apply).toHaveBeenCalledWith( + 'noAttemptsProcessor', + DEFAULTS.PROCESSING_ATTEMPTS + ); + }); + + it('should store metadata with custom maxRetries', async () => { + const customAttempts = 15; + const configWithCustomAttempts = new ConsumerConfiguration( + 'test.topic', + { + name: 'customAttemptsProcessor', + consumersCount: 1, + attempts: customAttempts + }, + jest.fn() + ); + + await consumerCreator.createConsumer(configWithCustomAttempts); + + expect(mockMetadataManager.apply).toHaveBeenCalledWith( + 'customAttemptsProcessor', + customAttempts + ); + }); + + it('should continue consumer creation even if metadata storage fails', async () => { + mockMetadataManager.apply.mockResolvedValue(false); + + await consumerCreator.createConsumer(testConsumerConfig); + + expect(mockedChannel.consume).toHaveBeenCalled(); + }); + }); }); }); \ No newline at end of file diff --git a/tests/unit/core/management/RabbitMQMetadata.test.ts b/tests/unit/core/management/RabbitMQMetadata.test.ts new file mode 100644 index 0000000..6b79335 --- /dev/null +++ b/tests/unit/core/management/RabbitMQMetadata.test.ts @@ -0,0 +1,63 @@ +import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata"; +import {Constants} from "@src/core/constants"; +import {METADATA_SCHEMA_VERSION, RunMQQueueMetadata} from "@src/core/management/Policies/RunMQQueueMetadata"; + +describe('RabbitMQMetadata', () => { + const mockDate = '2024-01-15T10:30:00.000Z'; + + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date(mockDate)); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + describe('createMetadataFor', () => { + it('should create metadata with correct version', () => { + const metadata = RabbitMQMetadata.createMetadataFor(3); + + expect(metadata.version).toBe(METADATA_SCHEMA_VERSION); + }); + + it('should include maxRetries in metadata', () => { + const maxRetries = 10; + const metadata = RabbitMQMetadata.createMetadataFor( maxRetries); + + expect(metadata.maxRetries).toBe(maxRetries); + }); + + it('should set createdAt when no existing metadata provided', () => { + const metadata = RabbitMQMetadata.createMetadataFor( 3); + + expect(metadata.createdAt).toBe(mockDate); + expect(metadata.updatedAt).toBeUndefined(); + }); + + it('should preserve createdAt and set updatedAt when existing metadata provided', () => { + const existingCreatedAt = '2023-01-01T00:00:00.000Z'; + const existingMetadata: RunMQQueueMetadata = { + version: 0, + maxRetries: 3, + createdAt: existingCreatedAt + }; + + const metadata = RabbitMQMetadata.createMetadataFor( 5, existingMetadata); + + expect(metadata.createdAt).toBe(existingCreatedAt); + expect(metadata.updatedAt).toBe(mockDate); + expect(metadata.maxRetries).toBe(5); + }); + }); + + describe('getParameterName', () => { + it('should return correct parameter name', () => { + const queueName = 'test_processor'; + const paramName = RabbitMQMetadata.getParameterName(queueName); + + expect(paramName).toBe(Constants.METADATA_STORE_PREFIX + queueName); + }); + }); +}); + diff --git a/tests/unit/core/management/RunMQMetadataManager.test.ts b/tests/unit/core/management/RunMQMetadataManager.test.ts new file mode 100644 index 0000000..04d399f --- /dev/null +++ b/tests/unit/core/management/RunMQMetadataManager.test.ts @@ -0,0 +1,271 @@ +import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager"; +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {RunMQConsoleLogger} from "@src/core/logging/RunMQConsoleLogger"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; +import {RabbitMQMetadata} from "@src/core/management/Policies/RabbitMQMetadata"; +import {METADATA_SCHEMA_VERSION} from "@src/core/management/Policies/RunMQQueueMetadata"; + +jest.mock("@src/core/management/RabbitMQManagementClient"); + +describe('RunMQMetadataManager', () => { + let metadataManager: RunMQMetadataManager; + let logger: RunMQConsoleLogger; + let mockManagementClient: jest.Mocked; + + const mockDate = '2024-01-15T10:30:00.000Z'; + + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date(mockDate)); + + logger = new RunMQConsoleLogger(); + jest.spyOn(logger, 'info').mockImplementation(); + jest.spyOn(logger, 'warn').mockImplementation(); + jest.spyOn(logger, 'error').mockImplementation(); + + mockManagementClient = new RabbitMQManagementClient( + RabbitMQManagementConfigExample.valid(), + logger + ) as jest.Mocked; + }); + + afterEach(() => { + jest.useRealTimers(); + jest.clearAllMocks(); + }); + + describe('without management config', () => { + beforeEach(() => { + metadataManager = new RunMQMetadataManager(logger); + }); + + it('should initialize without management client', async () => { + await metadataManager.initialize(); + + expect(logger.warn).toHaveBeenCalledWith( + "Management client not configured - metadata storage disabled" + ); + }); + + it('should return false when applying metadata without management config', async () => { + await metadataManager.initialize(); + const result = await metadataManager.apply('test-queue', 5); + + expect(result).toBe(false); + }); + + it('should return null when getting metadata without management config', async () => { + await metadataManager.initialize(); + const result = await metadataManager.getMetadata('test-queue'); + + expect(result).toBeNull(); + }); + + it('should report as disabled', async () => { + await metadataManager.initialize(); + expect(metadataManager.isEnabled()).toBe(false); + }); + }); + + describe('with management config', () => { + beforeEach(() => { + metadataManager = new RunMQMetadataManager( + logger, + RabbitMQManagementConfigExample.valid() + ); + (metadataManager as any).managementClient = mockManagementClient; + }); + + describe('initialize', () => { + it('should check if management plugin is enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + + await metadataManager.initialize(); + + expect(mockManagementClient.checkManagementPluginEnabled).toHaveBeenCalled(); + expect(logger.info).toHaveBeenCalledWith("RunMQ metadata storage initialized"); + }); + + it('should log warning when management plugin is not enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false); + + await metadataManager.initialize(); + + expect(logger.warn).toHaveBeenCalledWith( + "RabbitMQ management plugin is not enabled - metadata storage disabled" + ); + }); + }); + + describe('apply', () => { + it('should create metadata parameter for new queue', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.getParameter.mockResolvedValue(null); + mockManagementClient.setParameter.mockResolvedValue(true); + + await metadataManager.initialize(); + const result = await metadataManager.apply('test_queue', 5); + + expect(result).toBe(true); + expect(mockManagementClient.setParameter).toHaveBeenCalledWith( + RabbitMQMetadata.getParameterName('test_queue'), + expect.objectContaining({ + version: METADATA_SCHEMA_VERSION, + maxRetries: 5, + createdAt: mockDate + }) + ); + expect(logger.info).toHaveBeenCalledWith("Created metadata for queue: test_queue"); + }); + + it('should update existing metadata preserving createdAt', async () => { + const existingCreatedAt = '2023-01-01T00:00:00.000Z'; + const existingMetadata = { + version: 0, + maxRetries: 3, + createdAt: existingCreatedAt + }; + + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.getParameter.mockResolvedValue(existingMetadata); + mockManagementClient.setParameter.mockResolvedValue(true); + + await metadataManager.initialize(); + const result = await metadataManager.apply('test_queue', 10); + + expect(result).toBe(true); + expect(mockManagementClient.setParameter).toHaveBeenCalledWith( + RabbitMQMetadata.getParameterName('test_queue'), + expect.objectContaining({ + maxRetries: 10, + createdAt: existingCreatedAt, + updatedAt: mockDate + }) + ); + expect(logger.info).toHaveBeenCalledWith("Updated metadata for queue: test_queue"); + }); + + it('should return false when management plugin is not enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false); + + await metadataManager.initialize(); + const result = await metadataManager.apply('test_queue', 5); + + expect(result).toBe(false); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("Cannot store metadata for queue 'test_queue'") + ); + }); + + it('should return false when parameter creation fails', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.getParameter.mockResolvedValue(null); + mockManagementClient.setParameter.mockResolvedValue(false); + + await metadataManager.initialize(); + const result = await metadataManager.apply('test_queue', 5); + + expect(result).toBe(false); + expect(logger.error).toHaveBeenCalledWith( + "Failed to store metadata for queue: test_queue" + ); + }); + + it('should use custom vhost when provided', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.getParameter.mockResolvedValue(null); + mockManagementClient.setParameter.mockResolvedValue(true); + + await metadataManager.initialize(); + await metadataManager.apply('test_queue', 5, 'custom_vhost'); + + expect(mockManagementClient.setParameter).toHaveBeenCalledWith( + expect.any(String), + expect.any(Object) + ); + }); + }); + + describe('getMetadata', () => { + it('should return metadata when parameter exists', async () => { + const expectedMetadata = { + version: 0, + maxRetries: 5, + createdAt: mockDate + }; + + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.getParameter.mockResolvedValue(expectedMetadata); + + await metadataManager.initialize(); + const result = await metadataManager.getMetadata('test_queue'); + + expect(result).toEqual(expectedMetadata); + }); + + it('should return null when parameter does not exist', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.getParameter.mockResolvedValue(null); + + await metadataManager.initialize(); + const result = await metadataManager.getMetadata('test_queue'); + + expect(result).toBeNull(); + }); + + it('should return null when management plugin is not enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false); + + await metadataManager.initialize(); + const result = await metadataManager.getMetadata('test_queue'); + + expect(result).toBeNull(); + }); + }); + + describe('cleanup', () => { + it('should delete metadata parameter', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.deleteParameter.mockResolvedValue(true); + + await metadataManager.initialize(); + await metadataManager.cleanup('test_queue'); + + expect(mockManagementClient.deleteParameter).toHaveBeenCalledWith( + RabbitMQMetadata.getParameterName('test_queue') + ); + expect(logger.info).toHaveBeenCalledWith( + "Deleted metadata for queue: test_queue" + ); + }); + + it('should not call delete when management plugin is not enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false); + + await metadataManager.initialize(); + await metadataManager.cleanup('test_queue'); + + expect(mockManagementClient.deleteParameter).not.toHaveBeenCalled(); + }); + }); + + describe('isEnabled', () => { + it('should return true when management plugin is enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + + await metadataManager.initialize(); + + expect(metadataManager.isEnabled()).toBe(true); + }); + + it('should return false when management plugin is not enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false); + + await metadataManager.initialize(); + + expect(metadataManager.isEnabled()).toBe(false); + }); + }); + }); +}); + From 5e0e387900131a1a944cb243be190740546b631c Mon Sep 17 00:00:00 2001 From: Fawzi Essam Date: Sat, 28 Feb 2026 01:37:55 +0200 Subject: [PATCH 2/3] update tests Signed-off-by: Fawzi Essam --- src/core/management/Policies/RunMQMetadataManager.ts | 3 --- tests/e2e/RunMQMetadataManager.e2e.test.ts | 2 +- tests/unit/core/management/RunMQMetadataManager.test.ts | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/core/management/Policies/RunMQMetadataManager.ts b/src/core/management/Policies/RunMQMetadataManager.ts index 33d776a..e5667ab 100644 --- a/src/core/management/Policies/RunMQMetadataManager.ts +++ b/src/core/management/Policies/RunMQMetadataManager.ts @@ -45,13 +45,11 @@ export class RunMQMetadataManager { * * @param queueName - The name of the queue * @param maxRetries - Maximum retry attempts - * @param vhost - Virtual host (default: %2F for /) * @returns true if metadata was stored successfully, false otherwise */ public async apply( queueName: string, maxRetries: number, - vhost: string = "%2F" ): Promise { if (!this.isManagementPluginEnabled || !this.managementClient) { this.logger.warn(`Cannot store metadata for queue '${queueName}' - management plugin not available`); @@ -60,7 +58,6 @@ export class RunMQMetadataManager { try { const existingMetadata = await this.getMetadata(queueName); - const metadata = RabbitMQMetadata.createMetadataFor( maxRetries, existingMetadata ?? undefined diff --git a/tests/e2e/RunMQMetadataManager.e2e.test.ts b/tests/e2e/RunMQMetadataManager.e2e.test.ts index 077b7d2..8902836 100644 --- a/tests/e2e/RunMQMetadataManager.e2e.test.ts +++ b/tests/e2e/RunMQMetadataManager.e2e.test.ts @@ -187,7 +187,7 @@ describe('RunMQMetadataManager E2E Tests', () => { const queueName = `test_queue_vhost_${Date.now()}`; testQueueNames.push(queueName); - await metadataManager.apply(queueName, 4, "%2F"); + await metadataManager.apply(queueName, 4); const metadata = await metadataManager.getMetadata(queueName); diff --git a/tests/unit/core/management/RunMQMetadataManager.test.ts b/tests/unit/core/management/RunMQMetadataManager.test.ts index 04d399f..4ae3b84 100644 --- a/tests/unit/core/management/RunMQMetadataManager.test.ts +++ b/tests/unit/core/management/RunMQMetadataManager.test.ts @@ -177,7 +177,7 @@ describe('RunMQMetadataManager', () => { mockManagementClient.setParameter.mockResolvedValue(true); await metadataManager.initialize(); - await metadataManager.apply('test_queue', 5, 'custom_vhost'); + await metadataManager.apply('test_queue', 5); expect(mockManagementClient.setParameter).toHaveBeenCalledWith( expect.any(String), From dec91974f796958b4af0e42e1f6a6000d60cc2f0 Mon Sep 17 00:00:00 2001 From: Fawzi Essam Date: Sat, 28 Feb 2026 01:53:45 +0200 Subject: [PATCH 3/3] chore: shared instance of consumer creator Signed-off-by: Fawzi Essam --- src/core/RunMQ.ts | 5 +++-- src/core/management/Policies/RunMQMetadataManager.ts | 3 +++ src/core/management/Policies/RunMQTTLPolicyManager.ts | 3 +++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts index dc89cc7..b98bece 100644 --- a/src/core/RunMQ.ts +++ b/src/core/RunMQ.ts @@ -15,6 +15,7 @@ import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProper export class RunMQ { private readonly client: RabbitMQClientAdapter; private readonly config: RunMQConnectionConfig; + private readonly consumer: RunMQConsumerCreator; private publisher: RunMQPublisher | undefined private readonly logger: RunMQLogger private retryAttempts: number = 0; @@ -28,6 +29,7 @@ export class RunMQ { maxReconnectAttempts: config.maxReconnectAttempts ?? DEFAULTS.MAX_RECONNECT_ATTEMPTS, }; this.client = new RabbitMQClientAdapter(this.config, this.logger); + this.consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management); } /** @@ -50,8 +52,7 @@ export class RunMQ { * @param processor The function that will process the incoming messages */ public async process>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent) => Promise) { - const consumer = new RunMQConsumerCreator(this.client, this.logger, this.config.management); - await consumer.createConsumer(new ConsumerConfiguration(topic, config, processor)) + await this.consumer.createConsumer(new ConsumerConfiguration(topic, config, processor)) } /** diff --git a/src/core/management/Policies/RunMQMetadataManager.ts b/src/core/management/Policies/RunMQMetadataManager.ts index e5667ab..ca6f752 100644 --- a/src/core/management/Policies/RunMQMetadataManager.ts +++ b/src/core/management/Policies/RunMQMetadataManager.ts @@ -25,6 +25,9 @@ export class RunMQMetadataManager { * Initialize the manager by checking if management plugin is available. */ public async initialize(): Promise { + if (this.isManagementPluginEnabled) { + return; + } if (!this.managementClient) { this.logger.warn("Management client not configured - metadata storage disabled"); return; diff --git a/src/core/management/Policies/RunMQTTLPolicyManager.ts b/src/core/management/Policies/RunMQTTLPolicyManager.ts index 687d092..6e3a549 100644 --- a/src/core/management/Policies/RunMQTTLPolicyManager.ts +++ b/src/core/management/Policies/RunMQTTLPolicyManager.ts @@ -18,6 +18,9 @@ export class RunMQTTLPolicyManager { } public async initialize(): Promise { + if (this.isManagementPluginEnabled) { + return; + } if (!this.managementClient) { this.logger.warn("Management client not configured"); return;