Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/message-broker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@user-office-software/duo-message-broker",
"version": "1.7.0",
"version": "1.8.0",
"description": "",
"author": "SWAP",
"license": "ISC",
Expand Down
38 changes: 35 additions & 3 deletions packages/message-broker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export class RabbitMQMessageBroker implements MessageBroker {
private connection: Connection | null = null;
private channel: Channel | null = null;
private messageBuffer: Message[] = [];
private broadcastBuffer: Message[] = [];
private exchangeBuffer: Message[] = [];

async listenOnBroadcast(cb: ConsumerCallback) {
if (this.channel) {
Expand Down Expand Up @@ -124,6 +126,8 @@ export class RabbitMQMessageBroker implements MessageBroker {
if (!this.channel) {
logger.logWarn('Channel is not available', { type, msg });

this.broadcastBuffer.push({ queue: Queue.BROADCAST, type, msg });

return;
}

Expand Down Expand Up @@ -178,6 +182,8 @@ export class RabbitMQMessageBroker implements MessageBroker {
if (!this.channel) {
logger.logWarn('Channel is not available', { type, msg });

this.exchangeBuffer.push({ queue: exchangeName as Queue, type, msg });

return;
}

Expand Down Expand Up @@ -296,7 +302,11 @@ export class RabbitMQMessageBroker implements MessageBroker {

private scheduleReconnect() {
logger.logInfo(
'RabbitMQMessageBroker: Trying to reconnecting after 5 sec',
`RabbitMQMessageBroker: Trying to reconnecting after 5 sec there are:
${this.messageBuffer.length} messages in the message buffer
${this.exchangeBuffer.length} messages in the exchange buffer
${this.broadcastBuffer.length} messages in the broadcast buffer
`,
{}
);

Expand Down Expand Up @@ -478,12 +488,20 @@ export class RabbitMQMessageBroker implements MessageBroker {
}

private flushMessages() {
if (this.messageBuffer.length === 0) {
if (
this.messageBuffer.length === 0 &&
this.exchangeBuffer.length === 0 &&
this.broadcastBuffer.length === 0
) {
return;
}

logger.logInfo(
`RabbitMQMessageBroker: flushMessage triggered, buffered messages: ${this.messageBuffer.length}`,
`RabbitMQMessageBroker: flushMessage triggered.
Buffered messages: ${this.messageBuffer.length}
Buffered exchange messages: ${this.exchangeBuffer.length}
Buffered broadcast messages: ${this.broadcastBuffer.length}
`,
{}
);

Expand All @@ -493,5 +511,19 @@ export class RabbitMQMessageBroker implements MessageBroker {
messageBuffer.forEach(({ queue, type, msg }) => {
this.sendMessage(queue, type, msg);
});

const exchangeBuffer = this.exchangeBuffer;
this.exchangeBuffer = [];

exchangeBuffer.forEach(({ queue, type, msg }) => {
this.sendMessageToExchange(queue, type, msg);
});

const broadcastBuffer = this.broadcastBuffer;
this.broadcastBuffer = [];

broadcastBuffer.forEach(({ type, msg }) => {
this.sendBroadcast(type, msg);
});
}
}