-
Notifications
You must be signed in to change notification settings - Fork 9
Open
Description
- I inserted 1, 2, 3, 4, 5, 6 messages
- The maximum number of jobs for consumers is 1
- Out-of-order consumption results 2, 3, 1, 4, 5, 6
async brachMsg(conversationId: string, messages: any | Array<Message>, min: number = 0, max: number = 0, sendTopic: boolean = false,msgType?: 'task') {
const msgCount = messages.length;
let sendCount = 0;
const isRoom = conversationId.indexOf('R:') >= -1 ? 1 : 0;
let noticeMessage = null;
let noticeIndex = 0;
// IS ROOM
if (isRoom === 1) {
let isText = 0;
const msgNotice = messages.filter((msg: Message, index: number) => {
if (isText >= 1) {
return;
}
if (msg.type == 'text' && msg.content && msg?.isNotice == 1) {
noticeMessage = msg;
noticeIndex = index;
isText++;
}
});
if (noticeMessage) {
await this.messageQueue.createJob({
type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
conversationId,
message: noticeMessage,
min,
max,
sendTopic,
msgCount
}
});
}
}
for (let index = 0; index < messages.length; index++) {
const message: Message = messages[index];
if (index !== noticeIndex && !noticeMessage) {
await this.messageQueue.createJob({
type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
conversationId,
message,
min,
max,
sendTopic,
msgCount
}
});
sendCount++;
}
}
return sendCount;
}
How to solve the sorting of a large number of concurrent insert jobs?
Metadata
Metadata
Assignees
Labels
No labels