diff --git a/packages/message-broker/package.json b/packages/message-broker/package.json index e6265ba..9b26d93 100644 --- a/packages/message-broker/package.json +++ b/packages/message-broker/package.json @@ -1,6 +1,6 @@ { "name": "@user-office-software/duo-message-broker", - "version": "1.7.0", + "version": "1.8.0", "description": "", "author": "SWAP", "license": "ISC", diff --git a/packages/message-broker/src/index.ts b/packages/message-broker/src/index.ts index 86c97a5..1cffe45 100644 --- a/packages/message-broker/src/index.ts +++ b/packages/message-broker/src/index.ts @@ -49,6 +49,8 @@ export class RabbitMQMessageBroker implements MessageBroker { private connection: Connection | null = null; private channel: Channel | null = null; private messageBuffer: Message[] = []; + private broadcastBuffer: Message[] = []; + private exchangeBuffer: Message[] = []; async listenOnBroadcast(cb: ConsumerCallback) { if (this.channel) { @@ -124,6 +126,8 @@ export class RabbitMQMessageBroker implements MessageBroker { if (!this.channel) { logger.logWarn('Channel is not available', { type, msg }); + this.broadcastBuffer.push({ queue: Queue.BROADCAST, type, msg }); + return; } @@ -178,6 +182,8 @@ export class RabbitMQMessageBroker implements MessageBroker { if (!this.channel) { logger.logWarn('Channel is not available', { type, msg }); + this.exchangeBuffer.push({ queue: exchangeName as Queue, type, msg }); + return; } @@ -296,7 +302,11 @@ export class RabbitMQMessageBroker implements MessageBroker { private scheduleReconnect() { logger.logInfo( - 'RabbitMQMessageBroker: Trying to reconnecting after 5 sec', + `RabbitMQMessageBroker: Trying to reconnecting after 5 sec there are: + ${this.messageBuffer.length} messages in the message buffer + ${this.exchangeBuffer.length} messages in the exchange buffer + ${this.broadcastBuffer.length} messages in the broadcast buffer + `, {} ); @@ -478,12 +488,20 @@ export class RabbitMQMessageBroker implements MessageBroker { } private flushMessages() { - if (this.messageBuffer.length === 0) { + if ( + this.messageBuffer.length === 0 && + this.exchangeBuffer.length === 0 && + this.broadcastBuffer.length === 0 + ) { return; } logger.logInfo( - `RabbitMQMessageBroker: flushMessage triggered, buffered messages: ${this.messageBuffer.length}`, + `RabbitMQMessageBroker: flushMessage triggered. + Buffered messages: ${this.messageBuffer.length} + Buffered exchange messages: ${this.exchangeBuffer.length} + Buffered broadcast messages: ${this.broadcastBuffer.length} + `, {} ); @@ -493,5 +511,19 @@ export class RabbitMQMessageBroker implements MessageBroker { messageBuffer.forEach(({ queue, type, msg }) => { this.sendMessage(queue, type, msg); }); + + const exchangeBuffer = this.exchangeBuffer; + this.exchangeBuffer = []; + + exchangeBuffer.forEach(({ queue, type, msg }) => { + this.sendMessageToExchange(queue, type, msg); + }); + + const broadcastBuffer = this.broadcastBuffer; + this.broadcastBuffer = []; + + broadcastBuffer.forEach(({ type, msg }) => { + this.sendBroadcast(type, msg); + }); } }