From 76b43209e381948b7cb18b321f3bf31ce8f7a1ed Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 14:49:48 -0300 Subject: [PATCH 01/13] feat(tracing): integrate OpenTelemetry tracing with new observability services --- .../federation-matrix/src/FederationMatrix.ts | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index a8445c43af10b..9e81e0a7751ed 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -13,6 +13,7 @@ import type { EventID, FileMessageType, PresenceState } from '@rocket.chat/feder import { Logger } from '@rocket.chat/logger'; import { Users, Subscriptions, Messages, Rooms, Settings } from '@rocket.chat/models'; import { addSpanAttributes, traced, tracedClass } from '@rocket.chat/tracing'; +import { addSpanAttributes, traced, tracedClass } from '@rocket.chat/tracing'; import emojione from 'emojione'; import { createOrUpdateFederatedUser } from './helpers/createOrUpdateFederatedUser'; @@ -28,6 +29,115 @@ export const fileTypes: Record = { file: 'm.file', }; +/** helper to validate the username format */ +export function validateFederatedUsername(mxid: string): mxid is UserID { + if (!mxid.startsWith('@')) return false; + + const parts = mxid.substring(1).split(':'); + if (parts.length < 2) return false; + + const localpart = parts[0]; + const domainAndPort = parts.slice(1).join(':'); + + const localpartRegex = /^(?:[a-z0-9._\-]|=[0-9a-fA-F]{2}){1,255}$/; + if (!localpartRegex.test(localpart)) return false; + + const [domain, port] = domainAndPort.split(':'); + + const hostnameRegex = /^(?=.{1,253}$)([a-z0-9](?:[a-z0-9\-]{0,61}[a-z0-9])?)(?:\.[a-z0-9](?:[a-z0-9\-]{0,61}[a-z0-9])?)*$/i; + const ipv4Regex = /^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}$/; + const ipv6Regex = /^\[([0-9a-f:.]+)\]$/i; + + if (!(hostnameRegex.test(domain) || ipv4Regex.test(domain) || ipv6Regex.test(domain))) { + return false; + } + + if (port !== undefined) { + const portNum = Number(port); + if (!/^[0-9]+$/.test(port) || portNum < 1 || portNum > 65535) { + return false; + } + } + + return true; +} +export const extractDomainFromMatrixUserId = (mxid: string): string => { + const separatorIndex = mxid.indexOf(':', 1); + if (separatorIndex === -1) { + throw new Error(`Invalid federated username: ${mxid}`); + } + return mxid.substring(separatorIndex + 1); +}; + +/** + * Extract the username and the servername from a matrix user id + * if the serverName is the same as the serverName in the mxid, return only the username (rocket.chat regular username) + * otherwise, return the full mxid and the servername + */ +export const getUsernameServername = (mxid: string, serverName: string): [mxid: string, serverName: string, isLocal: boolean] => { + const senderServerName = extractDomainFromMatrixUserId(mxid); + // if the serverName is the same as the serverName in the mxid, return only the username (rocket.chat regular username) + if (serverName === senderServerName) { + const separatorIndex = mxid.indexOf(':', 1); + if (separatorIndex === -1) { + throw new Error(`Invalid federated username: ${mxid}`); + } + return [mxid.substring(1, separatorIndex), senderServerName, true]; // removers also the @ + } + + return [mxid, senderServerName, false]; +}; +/** + * Helper function to create a federated user + * + * Because of historical reasons, we can have users only with federated flag but no federation object + * So we need to upsert the user with the federation object + */ +export async function createOrUpdateFederatedUser(options: { username: string; name?: string; origin: string }): Promise { + const { username, name = username, origin } = options; + + // TODO: Have a specific method to handle this upsert + const user = await Users.findOneAndUpdate( + { + username, + }, + { + $set: { + username, + name: name || username, + type: 'user' as const, + status: UserStatus.OFFLINE, + active: true, + roles: ['user'], + requirePasswordChange: false, + federated: true, + federation: { + version: 1, + mui: username, + origin, + }, + _updatedAt: new Date(), + }, + $setOnInsert: { + createdAt: new Date(), + }, + }, + { + upsert: true, + projection: { _id: 1, username: 1 }, + returnDocument: 'after', + }, + ); + + if (!user) { + throw new Error(`Failed to create or update federated user: ${username}`); + } + + return user; +} + +export { generateEd25519RandomSecretKey } from '@rocket.chat/federation-sdk'; + @tracedClass({ type: 'service' }) export class FederationMatrix extends ServiceClass implements IFederationMatrixService { protected name = 'federation-matrix'; From 0a3a4ee2bd91f0cb08d7ea31138866aa13fe55ff Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 15:27:00 -0300 Subject: [PATCH 02/13] refactor: clean up and simplify username validation logic in FederationMatrix --- .../federation-matrix/src/FederationMatrix.ts | 111 +----------------- 1 file changed, 1 insertion(+), 110 deletions(-) diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index 9e81e0a7751ed..e78d80c8fe8db 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -29,115 +29,6 @@ export const fileTypes: Record = { file: 'm.file', }; -/** helper to validate the username format */ -export function validateFederatedUsername(mxid: string): mxid is UserID { - if (!mxid.startsWith('@')) return false; - - const parts = mxid.substring(1).split(':'); - if (parts.length < 2) return false; - - const localpart = parts[0]; - const domainAndPort = parts.slice(1).join(':'); - - const localpartRegex = /^(?:[a-z0-9._\-]|=[0-9a-fA-F]{2}){1,255}$/; - if (!localpartRegex.test(localpart)) return false; - - const [domain, port] = domainAndPort.split(':'); - - const hostnameRegex = /^(?=.{1,253}$)([a-z0-9](?:[a-z0-9\-]{0,61}[a-z0-9])?)(?:\.[a-z0-9](?:[a-z0-9\-]{0,61}[a-z0-9])?)*$/i; - const ipv4Regex = /^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}$/; - const ipv6Regex = /^\[([0-9a-f:.]+)\]$/i; - - if (!(hostnameRegex.test(domain) || ipv4Regex.test(domain) || ipv6Regex.test(domain))) { - return false; - } - - if (port !== undefined) { - const portNum = Number(port); - if (!/^[0-9]+$/.test(port) || portNum < 1 || portNum > 65535) { - return false; - } - } - - return true; -} -export const extractDomainFromMatrixUserId = (mxid: string): string => { - const separatorIndex = mxid.indexOf(':', 1); - if (separatorIndex === -1) { - throw new Error(`Invalid federated username: ${mxid}`); - } - return mxid.substring(separatorIndex + 1); -}; - -/** - * Extract the username and the servername from a matrix user id - * if the serverName is the same as the serverName in the mxid, return only the username (rocket.chat regular username) - * otherwise, return the full mxid and the servername - */ -export const getUsernameServername = (mxid: string, serverName: string): [mxid: string, serverName: string, isLocal: boolean] => { - const senderServerName = extractDomainFromMatrixUserId(mxid); - // if the serverName is the same as the serverName in the mxid, return only the username (rocket.chat regular username) - if (serverName === senderServerName) { - const separatorIndex = mxid.indexOf(':', 1); - if (separatorIndex === -1) { - throw new Error(`Invalid federated username: ${mxid}`); - } - return [mxid.substring(1, separatorIndex), senderServerName, true]; // removers also the @ - } - - return [mxid, senderServerName, false]; -}; -/** - * Helper function to create a federated user - * - * Because of historical reasons, we can have users only with federated flag but no federation object - * So we need to upsert the user with the federation object - */ -export async function createOrUpdateFederatedUser(options: { username: string; name?: string; origin: string }): Promise { - const { username, name = username, origin } = options; - - // TODO: Have a specific method to handle this upsert - const user = await Users.findOneAndUpdate( - { - username, - }, - { - $set: { - username, - name: name || username, - type: 'user' as const, - status: UserStatus.OFFLINE, - active: true, - roles: ['user'], - requirePasswordChange: false, - federated: true, - federation: { - version: 1, - mui: username, - origin, - }, - _updatedAt: new Date(), - }, - $setOnInsert: { - createdAt: new Date(), - }, - }, - { - upsert: true, - projection: { _id: 1, username: 1 }, - returnDocument: 'after', - }, - ); - - if (!user) { - throw new Error(`Failed to create or update federated user: ${username}`); - } - - return user; -} - -export { generateEd25519RandomSecretKey } from '@rocket.chat/federation-sdk'; - @tracedClass({ type: 'service' }) export class FederationMatrix extends ServiceClass implements IFederationMatrixService { protected name = 'federation-matrix'; @@ -302,7 +193,7 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS throw new Error('Creator not found in members list'); } - const roomId = await federationSDK.createDirectMessage({ + const roomId = await federationSDK.createDirectMessageRoom({ creatorUserId: userIdSchema.parse(`@${creator.username}:${this.serverName}`), members: members .filter((member) => member._id !== creatorId) From ae917acb2711eaf1d2c303a9ba86aecc079dc3ee Mon Sep 17 00:00:00 2001 From: dhulke Date: Sat, 31 Jan 2026 17:11:22 -0300 Subject: [PATCH 03/13] feat(federation): implement comprehensive metrics for federated rooms, messages, and events --- .../app/metrics/server/lib/collectMetrics.ts | 86 ++++++++- apps/meteor/app/metrics/server/lib/metrics.ts | 108 +++++++++++ .../ee/server/hooks/federation/index.ts | 80 +++++++-- ee/packages/federation-matrix/package.json | 1 + .../src/api/_matrix/transactions.ts | 10 ++ .../federation-matrix/src/events/member.ts | 79 +++++--- .../federation-matrix/src/events/message.ts | 81 +++++++++ .../src/helpers/metricsHelpers.ts | 170 ++++++++++++++++++ 8 files changed, 582 insertions(+), 33 deletions(-) create mode 100644 ee/packages/federation-matrix/src/helpers/metricsHelpers.ts diff --git a/apps/meteor/app/metrics/server/lib/collectMetrics.ts b/apps/meteor/app/metrics/server/lib/collectMetrics.ts index 8628a52fe4096..e37907934753b 100644 --- a/apps/meteor/app/metrics/server/lib/collectMetrics.ts +++ b/apps/meteor/app/metrics/server/lib/collectMetrics.ts @@ -1,6 +1,6 @@ import http from 'http'; -import { Statistics } from '@rocket.chat/models'; +import { Rooms, Statistics, Users } from '@rocket.chat/models'; import { tracerSpan } from '@rocket.chat/tracing'; import connect from 'connect'; import { Facts } from 'meteor/facts-base'; @@ -86,6 +86,90 @@ const setPrometheusData = async (): Promise => { metrics.totalLivechatAgents.set(statistics.totalLivechatAgents); metrics.pushQueue.set(statistics.pushQueue || 0); + + // Federation statistics + await collectFederationMetrics(); +}; + +/** + * Collects federation-related gauge metrics by querying the database. + * This includes counts of federated rooms and users. + */ +const collectFederationMetrics = async (): Promise => { + try { + // Check if federation is enabled before collecting metrics + const federationEnabled = settings.get('Federation_Matrix_enabled'); + if (!federationEnabled) { + return; + } + + const serverName = settings.get('Federation_Matrix_homeserver_domain') || ''; + + // Count federated rooms by type + const federatedChannels = await Rooms.col.countDocuments({ + 't': 'c', + 'federation.mrid': { $exists: true }, + }); + const federatedPrivateGroups = await Rooms.col.countDocuments({ + 't': 'p', + 'federation.mrid': { $exists: true }, + }); + const federatedDirect = await Rooms.col.countDocuments({ + 't': 'd', + 'federation.mrid': { $exists: true }, + }); + + metrics.totalFederatedChannels.set(federatedChannels); + metrics.totalFederatedPrivateGroups.set(federatedPrivateGroups); + metrics.totalFederatedDirectMessages.set(federatedDirect); + + // Count federated rooms by origin + const federatedRoomsByOrigin = await Rooms.col + .aggregate<{ _id: { room_type: string; origin: string }; count: number }>([ + { $match: { 'federation.mrid': { $exists: true } } }, + { + $group: { + _id: { room_type: '$t', origin: '$federation.origin' }, + count: { $sum: 1 }, + }, + }, + ]) + .toArray(); + + // Reset all federated room gauges before setting new values + metrics.totalFederatedRooms.reset(); + + for (const { _id, count } of federatedRoomsByOrigin) { + const isLocal = _id.origin === serverName; + metrics.totalFederatedRooms.set( + { + room_type: _id.room_type, + origin: isLocal ? 'local' : _id.origin || 'unknown', + }, + count, + ); + } + + // Count federated users by origin + const federatedUsersByOrigin = await Users.col + .aggregate<{ + _id: string; + count: number; + }>([ + { $match: { 'federated': true, 'federation.origin': { $exists: true } } }, + { $group: { _id: '$federation.origin', count: { $sum: 1 } } }, + ]) + .toArray(); + + // Reset federated users gauge before setting new values + metrics.totalFederatedUsers.reset(); + + for (const { _id: origin, count } of federatedUsersByOrigin) { + metrics.totalFederatedUsers.set({ origin: origin || 'unknown' }, count); + } + } catch (error) { + SystemLogger.error({ msg: 'Error collecting federation metrics', error }); + } }; const app = connect(); diff --git a/apps/meteor/app/metrics/server/lib/metrics.ts b/apps/meteor/app/metrics/server/lib/metrics.ts index 36967954a8dbb..9fad9be5cc4a7 100644 --- a/apps/meteor/app/metrics/server/lib/metrics.ts +++ b/apps/meteor/app/metrics/server/lib/metrics.ts @@ -246,6 +246,114 @@ export const metrics = { help: 'Time taken in seconds for an item to be processed for the first time by Omni queues', percentiles, }), + + // Federation metrics - Counters + federatedRoomsCreated: new client.Counter({ + name: 'rocketchat_federation_rooms_created', + labelNames: ['room_type'], // 'c', 'p', 'd' + help: 'Total federated rooms created (locally created)', + }), + + federatedRoomsJoined: new client.Counter({ + name: 'rocketchat_federation_rooms_joined', + labelNames: ['room_type', 'origin'], // origin = remote server domain + help: 'Total federated rooms joined (users invited to rooms created elsewhere)', + }), + + federatedMessagesSent: new client.Counter({ + name: 'rocketchat_federation_messages_sent', + labelNames: ['room_type', 'message_type'], // message_type: 'text', 'file', 'encrypted' + help: 'Total federated messages sent', + }), + + federatedMessagesReceived: new client.Counter({ + name: 'rocketchat_federation_messages_received', + labelNames: ['room_type', 'message_type', 'origin'], + help: 'Total federated messages received', + }), + + federationEventsProcessed: new client.Counter({ + name: 'rocketchat_federation_events_processed', + labelNames: ['event_type', 'direction'], // 'message', 'membership', 'reaction', etc. | 'incoming', 'outgoing' + help: 'Total federation events processed', + }), + + federationEventsFailed: new client.Counter({ + name: 'rocketchat_federation_events_failed', + labelNames: ['event_type', 'direction', 'error_type'], + help: 'Total federation events that failed to process', + }), + + // Federation metrics - Gauges + totalFederatedRooms: new client.Gauge({ + name: 'rocketchat_federation_rooms_total', + labelNames: ['room_type', 'origin'], // origin = 'local' | remote domain + help: 'Current count of federated rooms', + }), + + totalFederatedChannels: new client.Gauge({ + name: 'rocketchat_federation_channels_total', + help: 'Total federated public channels', + }), + + totalFederatedPrivateGroups: new client.Gauge({ + name: 'rocketchat_federation_private_groups_total', + help: 'Total federated private groups', + }), + + totalFederatedDirectMessages: new client.Gauge({ + name: 'rocketchat_federation_direct_total', + help: 'Total federated direct message rooms', + }), + + totalFederatedUsers: new client.Gauge({ + name: 'rocketchat_federation_users_total', + labelNames: ['origin'], // origin = remote server domain + help: 'Current count of federated users', + }), + + // Federation metrics - Performance (Duration Summaries) + federationMessageSendDuration: new client.Summary({ + name: 'rocketchat_federation_message_send_duration_seconds', + labelNames: ['message_type', 'room_type'], + help: 'Time to send a message to Matrix federation SDK (awaited operations only)', + percentiles, + }), + + federationTransactionProcessDuration: new client.Summary({ + name: 'rocketchat_federation_transaction_process_duration_seconds', + labelNames: ['pdu_count', 'edu_count', 'origin'], + help: 'Time to process incoming federation transaction', + percentiles, + }), + + federationIncomingMessageProcessDuration: new client.Summary({ + name: 'rocketchat_federation_incoming_message_process_duration_seconds', + labelNames: ['message_type'], + help: 'Time to process incoming federated message', + percentiles, + }), + + federationRoomCreateDuration: new client.Summary({ + name: 'rocketchat_federation_room_create_duration_seconds', + labelNames: ['room_type'], + help: 'Time to create a federated room', + percentiles, + }), + + federationRoomJoinDuration: new client.Summary({ + name: 'rocketchat_federation_room_join_duration_seconds', + labelNames: ['origin'], + help: 'Time to join a federated room (invite acceptance)', + percentiles, + }), + + federationUserInviteDuration: new client.Summary({ + name: 'rocketchat_federation_user_invite_duration_seconds', + labelNames: ['room_type'], + help: 'Time to invite a user to a federated room', + percentiles, + }), }; // Metrics diff --git a/apps/meteor/ee/server/hooks/federation/index.ts b/apps/meteor/ee/server/hooks/federation/index.ts index f717c7e0847b3..2bb9465be1012 100644 --- a/apps/meteor/ee/server/hooks/federation/index.ts +++ b/apps/meteor/ee/server/hooks/federation/index.ts @@ -4,6 +4,7 @@ import type { IRoomNativeFederated, IMessage, IRoom, IUser } from '@rocket.chat/ import { validateFederatedUsername } from '@rocket.chat/federation-matrix'; import { Rooms } from '@rocket.chat/models'; +import { metrics } from '../../../../app/metrics/server'; import { callbacks } from '../../../../server/lib/callbacks'; import { afterLeaveRoomCallback } from '../../../../server/lib/callbacks/afterLeaveRoomCallback'; import { afterRemoveFromRoomCallback } from '../../../../server/lib/callbacks/afterRemoveFromRoomCallback'; @@ -25,7 +26,13 @@ callbacks.add('federation.afterCreateFederatedRoom', async (room, { owner, origi const federatedRoomId = room?.federation?.mrid; if (!federatedRoomId) { - await FederationMatrix.createRoom(room, owner); + const endTimer = metrics.federationRoomCreateDuration.startTimer({ room_type: room.t }); + try { + await FederationMatrix.createRoom(room, owner); + metrics.federatedRoomsCreated.inc({ room_type: room.t }); + } finally { + endTimer(); + } } else { // TODO unify how to get server // matrix room was already created and passed @@ -43,11 +50,16 @@ callbacks.add('federation.afterCreateFederatedRoom', async (room, { owner, origi } // TODO this won't be neeeded once we receive all state events at ee/packages/federation-matrix/src/events/member.ts - await FederationMatrix.inviteUsersToRoom( - federationRoom, - members.filter((member) => member !== owner.username), - owner, - ); + const inviteEndTimer = metrics.federationUserInviteDuration.startTimer({ room_type: room.t }); + try { + await FederationMatrix.inviteUsersToRoom( + federationRoom, + members.filter((member) => member !== owner.username), + owner, + ); + } finally { + inviteEndTimer(); + } }); callbacks.add( @@ -62,11 +74,33 @@ callbacks.add( // If message is federated, it will save external_message_id like into the message object // if this prop exists here it should not be sent to the federation to avoid loops if (!message.federation?.eventId) { - await FederationMatrix.sendMessage(message, room, user); + const messageType = message.files && message.files.length > 0 ? 'file' : 'text'; + const endTimer = metrics.federationMessageSendDuration.startTimer({ + message_type: messageType, + room_type: room.t, + }); + try { + await FederationMatrix.sendMessage(message, room, user); + metrics.federatedMessagesSent.inc({ + room_type: room.t, + message_type: messageType, + }); + metrics.federationEventsProcessed.inc({ + event_type: 'message', + direction: 'outgoing', + }); + } finally { + endTimer(); + } } } catch (error) { // Log the error but don't prevent the message from being sent locally console.error('[sendMessage] Failed to send message to Native Federation:', error); + metrics.federationEventsFailed.inc({ + event_type: 'message', + direction: 'outgoing', + error_type: error instanceof Error ? error.constructor.name : 'Unknown', + }); } }, callbacks.priority.HIGH, @@ -122,7 +156,12 @@ beforeAddUserToRoom.add( return; } - await FederationMatrix.inviteUsersToRoom(room, [user.username], inviter); + const endTimer = metrics.federationUserInviteDuration.startTimer({ room_type: room.t }); + try { + await FederationMatrix.inviteUsersToRoom(room, [user.username], inviter); + } finally { + endTimer(); + } // after invite is sent we create the invite subscriptions // TODO this may be not needed if we receive the emit for the invite event from matrix @@ -146,7 +185,20 @@ callbacks.add( return; } if (FederationActions.shouldPerformFederationAction(params.room)) { - await FederationMatrix.sendReaction(message._id, params.reaction, params.user); + try { + await FederationMatrix.sendReaction(message._id, params.reaction, params.user); + metrics.federationEventsProcessed.inc({ + event_type: 'reaction', + direction: 'outgoing', + }); + } catch (error) { + metrics.federationEventsFailed.inc({ + event_type: 'reaction', + direction: 'outgoing', + error_type: error instanceof Error ? error.constructor.name : 'Unknown', + }); + throw error; + } } }, callbacks.priority.HIGH, @@ -269,8 +321,14 @@ callbacks.add( // as per federation.beforeCreateDirectMessage we create a DM without federation data because we still don't have it. if (!room.federation.mrid) { - // so after the DM is created we call the federation to create the DM on Matrix side and then updated the reference here - await FederationMatrix.createDirectMessageRoom(room, params.members, params.creatorId); + const endTimer = metrics.federationRoomCreateDuration.startTimer({ room_type: 'd' }); + try { + // so after the DM is created we call the federation to create the DM on Matrix side and then updated the reference here + await FederationMatrix.createDirectMessageRoom(room, params.members, params.creatorId); + metrics.federatedRoomsCreated.inc({ room_type: 'd' }); + } finally { + endTimer(); + } } }, callbacks.priority.HIGH, diff --git a/ee/packages/federation-matrix/package.json b/ee/packages/federation-matrix/package.json index 4518af0e943ea..6a4faf7b64f43 100644 --- a/ee/packages/federation-matrix/package.json +++ b/ee/packages/federation-matrix/package.json @@ -33,6 +33,7 @@ "marked": "^16.1.2", "mongodb": "6.16.0", "pino": "^8.21.0", + "prom-client": "^15.1.3", "reflect-metadata": "^0.2.2", "sanitize-html": "~2.17.0", "tsyringe": "^4.10.0", diff --git a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts index fdc5d03f05561..3957865edb238 100644 --- a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts +++ b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts @@ -6,6 +6,7 @@ import { addSpanAttributes } from '@rocket.chat/tracing'; import { canAccessResourceMiddleware } from '../middlewares/canAccessResource'; import { isAuthenticatedMiddleware } from '../middlewares/isAuthenticated'; +import { federationMetrics, bucketizePduCount, bucketizeEduCount } from '../../helpers/metricsHelpers'; const SendTransactionParamsSchema = { type: 'object', @@ -358,9 +359,17 @@ export const getMatrixTransactionsRoutes = () => { 'federation.edu_types': Array.from(eduTypes).join(','), }); + // Start duration timer for transaction processing + const endTimer = federationMetrics.federationTransactionProcessDuration.startTimer({ + pdu_count: bucketizePduCount(pdus.length), + edu_count: bucketizeEduCount(edus.length), + origin: body.origin, + }); + try { await federationSDK.processIncomingTransaction(body); } catch (error: any) { + endTimer(); // TODO custom error types? if (error.message === 'too-many-concurrent-transactions') { return { @@ -378,6 +387,7 @@ export const getMatrixTransactionsRoutes = () => { }; } + endTimer(); return { body: { pdus: {}, diff --git a/ee/packages/federation-matrix/src/events/member.ts b/ee/packages/federation-matrix/src/events/member.ts index a78c23861f0b8..ac2d5158b5a73 100644 --- a/ee/packages/federation-matrix/src/events/member.ts +++ b/ee/packages/federation-matrix/src/events/member.ts @@ -6,6 +6,7 @@ import { Rooms, Subscriptions, Users } from '@rocket.chat/models'; import { createOrUpdateFederatedUser } from '../helpers/createOrUpdateFederatedUser'; import { getUsernameServername } from '../helpers/getUsernameServername'; +import { federationMetrics, extractOriginFromMatrixRoomId } from '../helpers/metricsHelpers'; const logger = new Logger('federation-matrix:member'); @@ -193,38 +194,64 @@ async function handleInvite({ if (room.t === 'd') { await Room.updateDirectMessageRoomName(room); } + + federationMetrics.federationEventsProcessed.inc({ + event_type: 'membership', + direction: 'incoming', + }); } async function handleJoin({ room_id: roomId, state_key: userId, }: HomeserverEventSignatures['homeserver.matrix.membership']['event']): Promise { - const joiningUser = await getOrCreateFederatedUser(userId); - if (!joiningUser?.username) { - throw new Error(`Failed to get or create joining user: ${userId}`); - } + const roomOrigin = extractOriginFromMatrixRoomId(roomId); + const endTimer = federationMetrics.federationRoomJoinDuration.startTimer({ origin: roomOrigin }); - const room = await Rooms.findOneFederatedByMrid(roomId); - if (!room) { - throw new Error(`Room not found while joining user ${userId} to room ${roomId}`); - } + try { + const joiningUser = await getOrCreateFederatedUser(userId); + if (!joiningUser?.username) { + throw new Error(`Failed to get or create joining user: ${userId}`); + } - const subscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, joiningUser._id); - if (!subscription) { - throw new Error(`Subscription not found while joining user ${userId} to room ${roomId}`); - } + const room = await Rooms.findOneFederatedByMrid(roomId); + if (!room) { + throw new Error(`Room not found while joining user ${userId} to room ${roomId}`); + } - // update room name for DMs - if (room.t === 'd') { - await Room.updateDirectMessageRoomName(room, [subscription._id]); - } + const subscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, joiningUser._id); + if (!subscription) { + throw new Error(`Subscription not found while joining user ${userId} to room ${roomId}`); + } - if (!subscription.status) { - logger.info('User is already joined to the room, skipping...'); - return; - } + // update room name for DMs + if (room.t === 'd') { + await Room.updateDirectMessageRoomName(room, [subscription._id]); + } - await Room.performAcceptRoomInvite(room, subscription, joiningUser); + if (!subscription.status) { + logger.info('User is already joined to the room, skipping...'); + return; + } + + await Room.performAcceptRoomInvite(room, subscription, joiningUser); + + // Increment counter for rooms joined from external servers + const serverName = federationSDK.getConfig('serverName'); + if (roomOrigin !== serverName) { + federationMetrics.federatedRoomsJoined.inc({ + room_type: room.t, + origin: roomOrigin, + }); + } + + federationMetrics.federationEventsProcessed.inc({ + event_type: 'membership', + direction: 'incoming', + }); + } finally { + endTimer(); + } } async function handleLeave({ @@ -251,6 +278,11 @@ async function handleLeave({ await Room.updateDirectMessageRoomName(room); } + federationMetrics.federationEventsProcessed.inc({ + event_type: 'membership', + direction: 'incoming', + }); + // TODO check if there are no pending invites to the room, and if so, delete the room } @@ -275,6 +307,11 @@ export function member() { } } catch (err) { logger.error({ msg: 'Failed to process Matrix membership event', err }); + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'incoming', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); } }); } diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index 2d15fd7e302cf..d29c427ab1509 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -6,6 +6,7 @@ import { Users, Rooms, Messages } from '@rocket.chat/models'; import { fileTypes } from '../FederationMatrix'; import { toInternalMessageFormat, toInternalQuoteMessageFormat } from '../helpers/message.parsers'; +import { federationMetrics, extractOriginFromMatrixUserId, determineMessageType } from '../helpers/metricsHelpers'; import { MatrixMediaService } from '../services/MatrixMediaService'; const logger = new Logger('federation-matrix:message'); @@ -112,6 +113,11 @@ async function handleMediaMessage( export function message() { federationSDK.eventEmitterService.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { + const messageType = determineMessageType(event); + const endTimer = federationMetrics.federationIncomingMessageProcessDuration.startTimer({ + message_type: messageType, + }); + try { const { msgtype, body } = event.content; const messageBody = body.toString(); @@ -133,6 +139,7 @@ export function message() { } const serverName = federationSDK.getConfig('serverName'); + const origin = extractOriginFromMatrixUserId(event.sender); const relation = event.content['m.relates_to']; @@ -227,6 +234,17 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); + + // Track received message + federationMetrics.federatedMessagesReceived.inc({ + room_type: room.t, + message_type: messageType, + origin, + }); + federationMetrics.federationEventsProcessed.inc({ + event_type: 'message', + direction: 'incoming', + }); return; } @@ -261,12 +279,36 @@ export function message() { ts: new Date(event.origin_server_ts), }); } + + // Track received message + federationMetrics.federatedMessagesReceived.inc({ + room_type: room.t, + message_type: messageType, + origin, + }); + federationMetrics.federationEventsProcessed.inc({ + event_type: 'message', + direction: 'incoming', + }); } catch (err) { logger.error({ msg: 'Error processing Matrix message', err }); + federationMetrics.federationEventsFailed.inc({ + event_type: 'message', + direction: 'incoming', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + } finally { + endTimer(); } }); federationSDK.eventEmitterService.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { + const messageType = 'encrypted'; + const endTimer = federationMetrics.federationIncomingMessageProcessDuration.startTimer({ + message_type: messageType, + }); + const origin = extractOriginFromMatrixUserId(event.sender); + try { if (!event.content.ciphertext) { logger.debug('No message content found in event'); @@ -363,6 +405,17 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); + + // Track received encrypted message + federationMetrics.federatedMessagesReceived.inc({ + room_type: room.t, + message_type: messageType, + origin, + }); + federationMetrics.federationEventsProcessed.inc({ + event_type: 'message', + direction: 'incoming', + }); return; } @@ -377,8 +430,26 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); + + // Track received encrypted message + federationMetrics.federatedMessagesReceived.inc({ + room_type: room.t, + message_type: messageType, + origin, + }); + federationMetrics.federationEventsProcessed.inc({ + event_type: 'message', + direction: 'incoming', + }); } catch (err) { logger.error({ msg: 'Error processing Matrix message', err }); + federationMetrics.federationEventsFailed.inc({ + event_type: 'message', + direction: 'incoming', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + } finally { + endTimer(); } }); @@ -409,8 +480,18 @@ export function message() { } await Message.deleteMessage(user, rcMessage); + + federationMetrics.federationEventsProcessed.inc({ + event_type: 'redaction', + direction: 'incoming', + }); } catch (err) { logger.error({ msg: 'Failed to process Matrix removal redaction', err }); + federationMetrics.federationEventsFailed.inc({ + event_type: 'redaction', + direction: 'incoming', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); } }); } diff --git a/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts new file mode 100644 index 0000000000000..adb1a7400a6bb --- /dev/null +++ b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts @@ -0,0 +1,170 @@ +import type { FileMessageType } from '@rocket.chat/federation-sdk'; +import client from 'prom-client'; + +import { fileTypes } from '../FederationMatrix'; + +const percentiles = [0.01, 0.1, 0.5, 0.9, 0.95, 0.99, 1]; + +/** + * Gets an existing metric from the registry or creates it if it doesn't exist. + * This ensures we don't get duplicate registration errors when the same metric + * is accessed from different parts of the application. + */ +function getOrCreateMetric( + name: string, + createFn: () => T, +): T { + const existing = client.register.getSingleMetric(name); + if (existing) { + return existing as T; + } + return createFn(); +} + +/** + * Federation metrics for incoming operations. + * These use getOrCreateMetric to safely access metrics that may already + * be registered by the meteor app's metrics.ts. + */ +export const federationMetrics = { + /** Counter for messages received from other federated servers */ + get federatedMessagesReceived() { + return getOrCreateMetric('rocketchat_federation_messages_received', () => + new client.Counter({ + name: 'rocketchat_federation_messages_received', + labelNames: ['room_type', 'message_type', 'origin'], + help: 'Total federated messages received', + }), + ); + }, + + /** Counter for rooms joined (users invited to rooms created elsewhere) */ + get federatedRoomsJoined() { + return getOrCreateMetric('rocketchat_federation_rooms_joined', () => + new client.Counter({ + name: 'rocketchat_federation_rooms_joined', + labelNames: ['room_type', 'origin'], + help: 'Total federated rooms joined (users invited to rooms created elsewhere)', + }), + ); + }, + + /** Counter for federation events processed */ + get federationEventsProcessed() { + return getOrCreateMetric('rocketchat_federation_events_processed', () => + new client.Counter({ + name: 'rocketchat_federation_events_processed', + labelNames: ['event_type', 'direction'], + help: 'Total federation events processed', + }), + ); + }, + + /** Counter for failed federation events */ + get federationEventsFailed() { + return getOrCreateMetric('rocketchat_federation_events_failed', () => + new client.Counter({ + name: 'rocketchat_federation_events_failed', + labelNames: ['event_type', 'direction', 'error_type'], + help: 'Total federation events that failed to process', + }), + ); + }, + + /** Duration to process incoming federation transaction */ + get federationTransactionProcessDuration() { + return getOrCreateMetric('rocketchat_federation_transaction_process_duration_seconds', () => + new client.Summary({ + name: 'rocketchat_federation_transaction_process_duration_seconds', + labelNames: ['pdu_count', 'edu_count', 'origin'], + help: 'Time to process incoming federation transaction', + percentiles, + }), + ); + }, + + /** Duration to process incoming federated message */ + get federationIncomingMessageProcessDuration() { + return getOrCreateMetric('rocketchat_federation_incoming_message_process_duration_seconds', () => + new client.Summary({ + name: 'rocketchat_federation_incoming_message_process_duration_seconds', + labelNames: ['message_type'], + help: 'Time to process incoming federated message', + percentiles, + }), + ); + }, + + /** Duration to join a federated room (invite acceptance) */ + get federationRoomJoinDuration() { + return getOrCreateMetric('rocketchat_federation_room_join_duration_seconds', () => + new client.Summary({ + name: 'rocketchat_federation_room_join_duration_seconds', + labelNames: ['origin'], + help: 'Time to join a federated room (invite acceptance)', + percentiles, + }), + ); + }, +}; + +/** + * Extracts the origin server domain from a Matrix room ID. + * @example extractOriginFromMatrixRoomId('!room:matrix.org') // 'matrix.org' + */ +export function extractOriginFromMatrixRoomId(roomId: string): string { + return roomId.split(':').pop() || 'unknown'; +} + +/** + * Extracts the origin server domain from a Matrix user ID. + * @example extractOriginFromMatrixUserId('@user:matrix.org') // 'matrix.org' + */ +export function extractOriginFromMatrixUserId(userId: string): string { + return userId.split(':').pop() || 'unknown'; +} + +/** + * Determines the message type from a Matrix event for metrics labeling. + * @returns 'text' | 'file' | 'encrypted' + */ +export function determineMessageType(event: { + type?: string; + content?: { msgtype?: string }; +}): 'text' | 'file' | 'encrypted' { + if (event.type === 'm.room.encrypted') { + return 'encrypted'; + } + + const msgtype = event.content?.msgtype; + if (msgtype && Object.values(fileTypes).includes(msgtype as FileMessageType)) { + return 'file'; + } + + return 'text'; +} + +/** + * Bucketizes PDU count for metrics labeling to avoid high cardinality. + * Groups counts into buckets: 0, 1, 2-5, 6-10, 11-50, 51+ + */ +export function bucketizePduCount(count: number): string { + if (count === 0) return '0'; + if (count === 1) return '1'; + if (count <= 5) return '2-5'; + if (count <= 10) return '6-10'; + if (count <= 50) return '11-50'; + return '51+'; +} + +/** + * Bucketizes EDU count for metrics labeling to avoid high cardinality. + * Groups counts into buckets: 0, 1, 2-5, 6-10, 11+ + */ +export function bucketizeEduCount(count: number): string { + if (count === 0) return '0'; + if (count === 1) return '1'; + if (count <= 5) return '2-5'; + if (count <= 10) return '6-10'; + return '11+'; +} From 48a14791f5a42bd49b934d4f5cb38dd4b2bda33a Mon Sep 17 00:00:00 2001 From: dhulke Date: Sat, 31 Jan 2026 18:35:57 -0300 Subject: [PATCH 04/13] fix(FederationMatrix): remove duplicate import of tracing utilities --- ee/packages/federation-matrix/src/FederationMatrix.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index e78d80c8fe8db..61b21e84dc0bc 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -13,7 +13,6 @@ import type { EventID, FileMessageType, PresenceState } from '@rocket.chat/feder import { Logger } from '@rocket.chat/logger'; import { Users, Subscriptions, Messages, Rooms, Settings } from '@rocket.chat/models'; import { addSpanAttributes, traced, tracedClass } from '@rocket.chat/tracing'; -import { addSpanAttributes, traced, tracedClass } from '@rocket.chat/tracing'; import emojione from 'emojione'; import { createOrUpdateFederatedUser } from './helpers/createOrUpdateFederatedUser'; From bec8c2a1dd83c0f63514338dc0f265545d222c89 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 13:34:57 -0300 Subject: [PATCH 05/13] chore: update OpenTelemetry API version and add prom-client dependency in yarn.lock; modify direct message room creation method in FederationMatrix --- .../federation-matrix/src/FederationMatrix.ts | 2 +- yarn.lock | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index 61b21e84dc0bc..a8445c43af10b 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -192,7 +192,7 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS throw new Error('Creator not found in members list'); } - const roomId = await federationSDK.createDirectMessageRoom({ + const roomId = await federationSDK.createDirectMessage({ creatorUserId: userIdSchema.parse(`@${creator.username}:${this.serverName}`), members: members .filter((member) => member._id !== creatorId) diff --git a/yarn.lock b/yarn.lock index fc3cdb206a58c..021e1782ad527 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5300,7 +5300,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.9.0": +"@opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.4.0, @opentelemetry/api@npm:^1.9.0": version: 1.9.0 resolution: "@opentelemetry/api@npm:1.9.0" checksum: 10/a607f0eef971893c4f2ee2a4c2069aade6ec3e84e2a1f5c2aac19f65c5d9eeea41aa72db917c1029faafdd71789a1a040bdc18f40d63690e22ccae5d7070f194 @@ -8345,6 +8345,7 @@ __metadata: mongodb: "npm:6.16.0" pino: "npm:^8.21.0" pino-pretty: "npm:^7.6.1" + prom-client: "npm:^15.1.3" reflect-metadata: "npm:^0.2.2" sanitize-html: "npm:~2.17.0" tsyringe: "npm:^4.10.0" @@ -30503,6 +30504,16 @@ __metadata: languageName: node linkType: hard +"prom-client@npm:^15.1.3": + version: 15.1.3 + resolution: "prom-client@npm:15.1.3" + dependencies: + "@opentelemetry/api": "npm:^1.4.0" + tdigest: "npm:^0.1.1" + checksum: 10/eba75e15ab896845d39359e3a4d6f7913ea05339b3122d8dde8c8c374669ad1a1d1ab2694ab2101c420bd98086a564e4f2a18aa29018fc14a4732e57c1c19aec + languageName: node + linkType: hard + "prometheus-gc-stats@npm:^1.1.0": version: 1.1.0 resolution: "prometheus-gc-stats@npm:1.1.0" From f37872845926c646caec1d9160df2e5ecd217042 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 14:23:11 -0300 Subject: [PATCH 06/13] feat(metrics): enhance FederationMatrix with detailed outgoing metrics for room creation, message sending, and user invitations --- .../federation-matrix/src/FederationMatrix.ts | 236 ++++++++++++++---- .../src/helpers/metricsHelpers.ts | 236 +++++++++++++----- 2 files changed, 363 insertions(+), 109 deletions(-) diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index a8445c43af10b..ef811e4c6aeed 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -18,6 +18,7 @@ import emojione from 'emojione'; import { createOrUpdateFederatedUser } from './helpers/createOrUpdateFederatedUser'; import { extractDomainFromMatrixUserId } from './helpers/extractDomainFromMatrixUserId'; import { toExternalMessageFormat, toExternalQuoteMessageFormat } from './helpers/message.parsers'; +import { federationMetrics, determineOutgoingMessageType } from './helpers/metricsHelpers'; import { validateFederatedUsername } from './helpers/validateFederatedUsername'; import { MatrixMediaService } from './services/MatrixMediaService'; @@ -118,6 +119,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS throw new Error('Room is not a public or private room'); } + const endTimer = federationMetrics.federationRoomCreateDuration.startTimer({ room_type: room.t }); + try { const matrixUserId = userIdSchema.parse(`@${owner.username}:${this.serverName}`); const roomName = room.name || room.fname || 'Untitled Room'; @@ -143,12 +146,23 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS // Members are NOT invited here - invites are sent via beforeAddUserToRoom callback. + // Increment success metrics + federationMetrics.federatedRoomsCreated.inc({ room_type: room.t }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_create', direction: 'outgoing' }); + this.logger.debug({ msg: 'Room creation completed successfully', roomId: room._id }); return matrixRoomResult; } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_create', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to create room', err }); throw err; + } finally { + endTimer(); } } @@ -184,6 +198,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS creatorId, })) async createDirectMessageRoom(room: IRoom, members: IUser[], creatorId: IUser['_id']): Promise { + const endTimer = federationMetrics.federationRoomCreateDuration.startTimer({ room_type: 'd' }); + try { this.logger.debug({ msg: 'Creating direct message room in Matrix', roomId: room._id, memberCount: members.length }); @@ -210,10 +226,21 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS origin: this.serverName, }); + // Increment success metrics + federationMetrics.federatedRoomsCreated.inc({ room_type: 'd' }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_create', direction: 'outgoing' }); + this.logger.debug({ roomId: room._id, msg: 'Direct message room creation completed successfully' }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_create', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to create direct message room', err }); throw err; + } finally { + endTimer(); } } @@ -356,9 +383,11 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS hasAttachments: Boolean(message?.attachments?.length), })) async sendMessage(message: IMessage, room: IRoomNativeFederated, user: IUser): Promise { + const messageType = determineOutgoingMessageType(message); + const endTimer = federationMetrics.federationOutgoingMessageSendDuration.startTimer({ message_type: messageType }); + try { const userMui = isUserNativeFederated(user) ? user.federation.mui : `@${user.username}:${this.serverName}`; - const messageType = message.files && message.files.length > 0 ? 'file' : 'text'; // Add runtime attributes for computed values addSpanAttributes({ @@ -385,10 +414,21 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await Messages.setFederationEventIdById(message._id, result.eventId); + // Increment success metrics + federationMetrics.federatedMessagesSent.inc({ room_type: room.t, message_type: messageType }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'message', direction: 'outgoing' }); + this.logger.debug({ msg: 'Message sent to Matrix successfully', eventId: result.eventId }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'message', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to send message to Matrix', err }); throw err; + } finally { + endTimer(); } } @@ -452,8 +492,15 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS // TODO message.u?.username is not the user who removed the message const eventId = await federationSDK.redactMessage(roomIdSchema.parse(matrixRoomId), eventIdSchema.parse(matrixEventId)); + federationMetrics.federationEventsProcessed.inc({ event_type: 'redaction', direction: 'outgoing' }); + this.logger.debug({ msg: 'Message redaction sent to Matrix successfully', eventId }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'redaction', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to send redaction to Matrix', err }); throw err; } @@ -466,6 +513,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS inviterUsername: inviter?.username, })) async inviteUsersToRoom(room: IRoomNativeFederated, matrixUsersUsername: string[], inviter: IUser): Promise { + const endTimer = federationMetrics.federationInviteSendDuration.startTimer({ room_type: room.t }); + try { const inviterUserId = `@${inviter.username}:${this.serverName}`; @@ -493,9 +542,20 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS ); }), ); + + // Increment invite counter for each user invited + federationMetrics.federatedInvitesSent.inc({ room_type: room.t }, matrixUsersUsername.length); + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to invite a user to Matrix', err }); throw err; + } finally { + endTimer(); } } @@ -548,8 +608,16 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await Messages.setFederationReactionEventId(user.username || '', messageId, reaction, eventId); + federationMetrics.federatedReactionsSent.inc({ action: 'add' }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'reaction', direction: 'outgoing' }); + this.logger.debug({ eventId, msg: 'Reaction sent to Matrix successfully' }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'reaction', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to send reaction to Matrix', err }); throw err; } @@ -606,9 +674,17 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS } await Messages.unsetFederationReactionEventId(eventId, messageId, reaction); + + federationMetrics.federatedReactionsSent.inc({ action: 'remove' }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'reaction', direction: 'outgoing' }); break; } } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'reaction', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to remove reaction from Matrix', err }); throw err; } @@ -653,8 +729,15 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await federationSDK.leaveRoom(roomIdSchema.parse(room.federation.mrid), userIdSchema.parse(actualMatrixUserId)); + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); + this.logger.info({ msg: 'User left Matrix room successfully', username: user.username, roomId: room.federation.mrid }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to leave room in Matrix', err }); throw err; } @@ -691,6 +774,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS `Kicked by ${userWhoRemoved.username}`, ); + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); + this.logger.info({ msg: 'User was kicked from Matrix room', kickedUsername: removedUser.username, @@ -698,6 +783,11 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS performedBy: userWhoRemoved.username, }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to kick user from Matrix room', err }); throw err; } @@ -737,8 +827,15 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS eventIdSchema.parse(matrixEventId), ); + federationMetrics.federationEventsProcessed.inc({ event_type: 'message_edit', direction: 'outgoing' }); + this.logger.debug({ msg: 'Message updated in Matrix successfully', eventId }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'message_edit', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to update message in Matrix', err }); throw err; } @@ -750,19 +847,30 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS username: user?.username, })) async updateRoomName(rid: string, displayName: string, user: IUser): Promise { - const room = await Rooms.findOneById(rid); - if (!room || !isRoomNativeFederated(room)) { - throw new Error(`No Matrix room mapping found for room ${rid}`); - } + try { + const room = await Rooms.findOneById(rid); + if (!room || !isRoomNativeFederated(room)) { + throw new Error(`No Matrix room mapping found for room ${rid}`); + } - if (isUserNativeFederated(user)) { - this.logger.debug('Only local users can change the name of a room, ignoring action'); - return; - } + if (isUserNativeFederated(user)) { + this.logger.debug('Only local users can change the name of a room, ignoring action'); + return; + } - const userMui = `@${user.username}:${this.serverName}`; + const userMui = `@${user.username}:${this.serverName}`; - await federationSDK.updateRoomName(roomIdSchema.parse(room.federation.mrid), displayName, userIdSchema.parse(userMui)); + await federationSDK.updateRoomName(roomIdSchema.parse(room.federation.mrid), displayName, userIdSchema.parse(userMui)); + + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_update', direction: 'outgoing' }); + } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_update', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + throw err; + } } @traced((room: IRoomNativeFederated, topic: string, user: Pick) => ({ @@ -776,14 +884,25 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS topic: string, user: Pick, ): Promise { - if (isUserNativeFederated(user)) { - this.logger.debug('Only local users can change the topic of a room, ignoring action'); - return; - } + try { + if (isUserNativeFederated(user)) { + this.logger.debug('Only local users can change the topic of a room, ignoring action'); + return; + } + + const userMui = `@${user.username}:${this.serverName}`; - const userMui = `@${user.username}:${this.serverName}`; + await federationSDK.setRoomTopic(roomIdSchema.parse(room.federation.mrid), userIdSchema.parse(userMui), topic); - await federationSDK.setRoomTopic(roomIdSchema.parse(room.federation.mrid), userIdSchema.parse(userMui), topic); + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_update', direction: 'outgoing' }); + } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_update', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + throw err; + } } @traced((room: IRoomNativeFederated, senderId: string, userId: string, role: string) => ({ @@ -868,6 +987,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS const userMui = isUserNativeFederated(localUser) ? localUser.federation.mui : `@${localUser.username}:${this.serverName}`; void federationSDK.sendTypingNotification(room.federation.mrid, userMui, isTyping); + + federationMetrics.federationEventsProcessed.inc({ event_type: 'typing', direction: 'outgoing' }); } @traced((matrixIds: string[]) => ({ @@ -926,50 +1047,61 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS action, })) async handleInvite(roomId: IRoom['_id'], userId: IUser['_id'], action: 'accept' | 'reject'): Promise { - const subscription = await Subscriptions.findInvitedSubscription(roomId, userId); - if (!subscription) { - throw new Error('No subscription found or user does not have permission to accept or reject this invite'); - } + try { + const subscription = await Subscriptions.findInvitedSubscription(roomId, userId); + if (!subscription) { + throw new Error('No subscription found or user does not have permission to accept or reject this invite'); + } - const room = await Rooms.findOneById(roomId); - if (!room || !isRoomNativeFederated(room)) { - throw new Error('Room not found or not federated'); - } + const room = await Rooms.findOneById(roomId); + if (!room || !isRoomNativeFederated(room)) { + throw new Error('Room not found or not federated'); + } - const user = await Users.findOneById(userId); - if (!user) { - throw new Error('User not found'); - } + const user = await Users.findOneById(userId); + if (!user) { + throw new Error('User not found'); + } - if (!user.username) { - throw new Error('User username not found'); - } + if (!user.username) { + throw new Error('User username not found'); + } - // TODO: should use common function to get matrix user ID - const matrixUserId = isUserNativeFederated(user) ? user.federation.mui : `@${user.username}:${this.serverName}`; + // TODO: should use common function to get matrix user ID + const matrixUserId = isUserNativeFederated(user) ? user.federation.mui : `@${user.username}:${this.serverName}`; - // Add runtime attributes after querying room and user - addSpanAttributes({ - matrixRoomId: room.federation.mrid, - matrixUserId, - username: user.username, - isNativeFederatedUser: isUserNativeFederated(user), - }); + // Add runtime attributes after querying room and user + addSpanAttributes({ + matrixRoomId: room.federation.mrid, + matrixUserId, + username: user.username, + isNativeFederatedUser: isUserNativeFederated(user), + }); - if (action === 'accept') { - await federationSDK.acceptInvite(room.federation.mrid, matrixUserId); - } + if (action === 'accept') { + await federationSDK.acceptInvite(room.federation.mrid, matrixUserId); + } - if (action === 'reject') { - try { - await federationSDK.rejectInvite(room.federation.mrid, matrixUserId); - } catch (err) { - if (err instanceof FederationRequestError && err.response.status === 403) { - return Room.performUserRemoval(room, user); + if (action === 'reject') { + try { + await federationSDK.rejectInvite(room.federation.mrid, matrixUserId); + } catch (err) { + if (err instanceof FederationRequestError && err.response.status === 403) { + return Room.performUserRemoval(room, user); + } + this.logger.error({ msg: 'Failed to reject invite in Matrix', err }); + throw err; } - this.logger.error({ msg: 'Failed to reject invite in Matrix', err }); - throw err; } + + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); + } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + throw err; } } } diff --git a/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts index adb1a7400a6bb..b7f5cd0f60364 100644 --- a/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts +++ b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts @@ -1,3 +1,4 @@ +import type { IMessage } from '@rocket.chat/core-typings'; import type { FileMessageType } from '@rocket.chat/federation-sdk'; import client from 'prom-client'; @@ -10,10 +11,7 @@ const percentiles = [0.01, 0.1, 0.5, 0.9, 0.95, 0.99, 1]; * This ensures we don't get duplicate registration errors when the same metric * is accessed from different parts of the application. */ -function getOrCreateMetric( - name: string, - createFn: () => T, -): T { +function getOrCreateMetric(name: string, createFn: () => T): T { const existing = client.register.getSingleMetric(name); if (existing) { return existing as T; @@ -22,88 +20,204 @@ function getOrCreateMetric( } /** - * Federation metrics for incoming operations. + * Federation metrics for both incoming and outgoing operations. * These use getOrCreateMetric to safely access metrics that may already * be registered by the meteor app's metrics.ts. */ export const federationMetrics = { + // ===================================== + // INCOMING METRICS + // ===================================== + /** Counter for messages received from other federated servers */ get federatedMessagesReceived() { - return getOrCreateMetric('rocketchat_federation_messages_received', () => - new client.Counter({ - name: 'rocketchat_federation_messages_received', - labelNames: ['room_type', 'message_type', 'origin'], - help: 'Total federated messages received', - }), + return getOrCreateMetric( + 'rocketchat_federation_messages_received', + () => + new client.Counter({ + name: 'rocketchat_federation_messages_received', + labelNames: ['room_type', 'message_type', 'origin'], + help: 'Total federated messages received', + }), ); }, /** Counter for rooms joined (users invited to rooms created elsewhere) */ get federatedRoomsJoined() { - return getOrCreateMetric('rocketchat_federation_rooms_joined', () => - new client.Counter({ - name: 'rocketchat_federation_rooms_joined', - labelNames: ['room_type', 'origin'], - help: 'Total federated rooms joined (users invited to rooms created elsewhere)', - }), + return getOrCreateMetric( + 'rocketchat_federation_rooms_joined', + () => + new client.Counter({ + name: 'rocketchat_federation_rooms_joined', + labelNames: ['room_type', 'origin'], + help: 'Total federated rooms joined (users invited to rooms created elsewhere)', + }), ); }, - /** Counter for federation events processed */ + /** Counter for federation events processed (both incoming and outgoing) */ get federationEventsProcessed() { - return getOrCreateMetric('rocketchat_federation_events_processed', () => - new client.Counter({ - name: 'rocketchat_federation_events_processed', - labelNames: ['event_type', 'direction'], - help: 'Total federation events processed', - }), + return getOrCreateMetric( + 'rocketchat_federation_events_processed', + () => + new client.Counter({ + name: 'rocketchat_federation_events_processed', + labelNames: ['event_type', 'direction'], + help: 'Total federation events processed', + }), ); }, - /** Counter for failed federation events */ + /** Counter for failed federation events (both incoming and outgoing) */ get federationEventsFailed() { - return getOrCreateMetric('rocketchat_federation_events_failed', () => - new client.Counter({ - name: 'rocketchat_federation_events_failed', - labelNames: ['event_type', 'direction', 'error_type'], - help: 'Total federation events that failed to process', - }), + return getOrCreateMetric( + 'rocketchat_federation_events_failed', + () => + new client.Counter({ + name: 'rocketchat_federation_events_failed', + labelNames: ['event_type', 'direction', 'error_type'], + help: 'Total federation events that failed to process', + }), ); }, /** Duration to process incoming federation transaction */ get federationTransactionProcessDuration() { - return getOrCreateMetric('rocketchat_federation_transaction_process_duration_seconds', () => - new client.Summary({ - name: 'rocketchat_federation_transaction_process_duration_seconds', - labelNames: ['pdu_count', 'edu_count', 'origin'], - help: 'Time to process incoming federation transaction', - percentiles, - }), + return getOrCreateMetric( + 'rocketchat_federation_transaction_process_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_transaction_process_duration_seconds', + labelNames: ['pdu_count', 'edu_count', 'origin'], + help: 'Time to process incoming federation transaction', + percentiles, + }), ); }, /** Duration to process incoming federated message */ get federationIncomingMessageProcessDuration() { - return getOrCreateMetric('rocketchat_federation_incoming_message_process_duration_seconds', () => - new client.Summary({ - name: 'rocketchat_federation_incoming_message_process_duration_seconds', - labelNames: ['message_type'], - help: 'Time to process incoming federated message', - percentiles, - }), + return getOrCreateMetric( + 'rocketchat_federation_incoming_message_process_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_incoming_message_process_duration_seconds', + labelNames: ['message_type'], + help: 'Time to process incoming federated message', + percentiles, + }), ); }, /** Duration to join a federated room (invite acceptance) */ get federationRoomJoinDuration() { - return getOrCreateMetric('rocketchat_federation_room_join_duration_seconds', () => - new client.Summary({ - name: 'rocketchat_federation_room_join_duration_seconds', - labelNames: ['origin'], - help: 'Time to join a federated room (invite acceptance)', - percentiles, - }), + return getOrCreateMetric( + 'rocketchat_federation_room_join_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_room_join_duration_seconds', + labelNames: ['origin'], + help: 'Time to join a federated room (invite acceptance)', + percentiles, + }), + ); + }, + + // ===================================== + // OUTGOING METRICS + // ===================================== + + /** Counter for messages sent to federated rooms */ + get federatedMessagesSent() { + return getOrCreateMetric( + 'rocketchat_federation_messages_sent', + () => + new client.Counter({ + name: 'rocketchat_federation_messages_sent', + labelNames: ['room_type', 'message_type'], + help: 'Total messages sent to federated rooms', + }), + ); + }, + + /** Counter for federated rooms created */ + get federatedRoomsCreated() { + return getOrCreateMetric( + 'rocketchat_federation_rooms_created', + () => + new client.Counter({ + name: 'rocketchat_federation_rooms_created', + labelNames: ['room_type'], + help: 'Total federated rooms created', + }), + ); + }, + + /** Counter for federation invites sent */ + get federatedInvitesSent() { + return getOrCreateMetric( + 'rocketchat_federation_invites_sent', + () => + new client.Counter({ + name: 'rocketchat_federation_invites_sent', + labelNames: ['room_type'], + help: 'Total federation invites sent', + }), + ); + }, + + /** Counter for reactions sent/removed */ + get federatedReactionsSent() { + return getOrCreateMetric( + 'rocketchat_federation_reactions_sent', + () => + new client.Counter({ + name: 'rocketchat_federation_reactions_sent', + labelNames: ['action'], + help: 'Total reactions sent or removed via federation', + }), + ); + }, + + /** Duration to send a message via federation */ + get federationOutgoingMessageSendDuration() { + return getOrCreateMetric( + 'rocketchat_federation_outgoing_message_send_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_outgoing_message_send_duration_seconds', + labelNames: ['message_type'], + help: 'Time to send a message via federation', + percentiles, + }), + ); + }, + + /** Duration to create a federated room */ + get federationRoomCreateDuration() { + return getOrCreateMetric( + 'rocketchat_federation_room_create_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_room_create_duration_seconds', + labelNames: ['room_type'], + help: 'Time to create a federated room', + percentiles, + }), + ); + }, + + /** Duration to send an invitation via federation */ + get federationInviteSendDuration() { + return getOrCreateMetric( + 'rocketchat_federation_invite_send_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_invite_send_duration_seconds', + labelNames: ['room_type'], + help: 'Time to send an invitation via federation', + percentiles, + }), ); }, }; @@ -125,13 +239,10 @@ export function extractOriginFromMatrixUserId(userId: string): string { } /** - * Determines the message type from a Matrix event for metrics labeling. + * Determines the message type from a Matrix event for metrics labeling (incoming). * @returns 'text' | 'file' | 'encrypted' */ -export function determineMessageType(event: { - type?: string; - content?: { msgtype?: string }; -}): 'text' | 'file' | 'encrypted' { +export function determineMessageType(event: { type?: string; content?: { msgtype?: string } }): 'text' | 'file' | 'encrypted' { if (event.type === 'm.room.encrypted') { return 'encrypted'; } @@ -144,6 +255,17 @@ export function determineMessageType(event: { return 'text'; } +/** + * Determines the message type from a Rocket.Chat message for outgoing metrics labeling. + * @returns 'text' | 'file' + */ +export function determineOutgoingMessageType(message: IMessage): 'text' | 'file' { + if (message.files && message.files.length > 0) { + return 'file'; + } + return 'text'; +} + /** * Bucketizes PDU count for metrics labeling to avoid high cardinality. * Groups counts into buckets: 0, 1, 2-5, 6-10, 11-50, 51+ From 132f7649fca84bfde7deb7c15a8fd803a2d302b3 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 15:06:24 -0300 Subject: [PATCH 07/13] refactor(metrics): remove unused federation metrics and simplify metric tracking in FederationMatrix callbacks --- apps/meteor/app/metrics/server/lib/metrics.ts | 82 +------------------ .../ee/server/hooks/federation/index.ts | 80 +++--------------- 2 files changed, 12 insertions(+), 150 deletions(-) diff --git a/apps/meteor/app/metrics/server/lib/metrics.ts b/apps/meteor/app/metrics/server/lib/metrics.ts index 9fad9be5cc4a7..23b091221c928 100644 --- a/apps/meteor/app/metrics/server/lib/metrics.ts +++ b/apps/meteor/app/metrics/server/lib/metrics.ts @@ -247,44 +247,7 @@ export const metrics = { percentiles, }), - // Federation metrics - Counters - federatedRoomsCreated: new client.Counter({ - name: 'rocketchat_federation_rooms_created', - labelNames: ['room_type'], // 'c', 'p', 'd' - help: 'Total federated rooms created (locally created)', - }), - - federatedRoomsJoined: new client.Counter({ - name: 'rocketchat_federation_rooms_joined', - labelNames: ['room_type', 'origin'], // origin = remote server domain - help: 'Total federated rooms joined (users invited to rooms created elsewhere)', - }), - - federatedMessagesSent: new client.Counter({ - name: 'rocketchat_federation_messages_sent', - labelNames: ['room_type', 'message_type'], // message_type: 'text', 'file', 'encrypted' - help: 'Total federated messages sent', - }), - - federatedMessagesReceived: new client.Counter({ - name: 'rocketchat_federation_messages_received', - labelNames: ['room_type', 'message_type', 'origin'], - help: 'Total federated messages received', - }), - - federationEventsProcessed: new client.Counter({ - name: 'rocketchat_federation_events_processed', - labelNames: ['event_type', 'direction'], // 'message', 'membership', 'reaction', etc. | 'incoming', 'outgoing' - help: 'Total federation events processed', - }), - - federationEventsFailed: new client.Counter({ - name: 'rocketchat_federation_events_failed', - labelNames: ['event_type', 'direction', 'error_type'], - help: 'Total federation events that failed to process', - }), - - // Federation metrics - Gauges + // Federation metrics - Gauges (used by collectFederationMetrics) totalFederatedRooms: new client.Gauge({ name: 'rocketchat_federation_rooms_total', labelNames: ['room_type', 'origin'], // origin = 'local' | remote domain @@ -311,49 +274,6 @@ export const metrics = { labelNames: ['origin'], // origin = remote server domain help: 'Current count of federated users', }), - - // Federation metrics - Performance (Duration Summaries) - federationMessageSendDuration: new client.Summary({ - name: 'rocketchat_federation_message_send_duration_seconds', - labelNames: ['message_type', 'room_type'], - help: 'Time to send a message to Matrix federation SDK (awaited operations only)', - percentiles, - }), - - federationTransactionProcessDuration: new client.Summary({ - name: 'rocketchat_federation_transaction_process_duration_seconds', - labelNames: ['pdu_count', 'edu_count', 'origin'], - help: 'Time to process incoming federation transaction', - percentiles, - }), - - federationIncomingMessageProcessDuration: new client.Summary({ - name: 'rocketchat_federation_incoming_message_process_duration_seconds', - labelNames: ['message_type'], - help: 'Time to process incoming federated message', - percentiles, - }), - - federationRoomCreateDuration: new client.Summary({ - name: 'rocketchat_federation_room_create_duration_seconds', - labelNames: ['room_type'], - help: 'Time to create a federated room', - percentiles, - }), - - federationRoomJoinDuration: new client.Summary({ - name: 'rocketchat_federation_room_join_duration_seconds', - labelNames: ['origin'], - help: 'Time to join a federated room (invite acceptance)', - percentiles, - }), - - federationUserInviteDuration: new client.Summary({ - name: 'rocketchat_federation_user_invite_duration_seconds', - labelNames: ['room_type'], - help: 'Time to invite a user to a federated room', - percentiles, - }), }; // Metrics diff --git a/apps/meteor/ee/server/hooks/federation/index.ts b/apps/meteor/ee/server/hooks/federation/index.ts index 2bb9465be1012..f717c7e0847b3 100644 --- a/apps/meteor/ee/server/hooks/federation/index.ts +++ b/apps/meteor/ee/server/hooks/federation/index.ts @@ -4,7 +4,6 @@ import type { IRoomNativeFederated, IMessage, IRoom, IUser } from '@rocket.chat/ import { validateFederatedUsername } from '@rocket.chat/federation-matrix'; import { Rooms } from '@rocket.chat/models'; -import { metrics } from '../../../../app/metrics/server'; import { callbacks } from '../../../../server/lib/callbacks'; import { afterLeaveRoomCallback } from '../../../../server/lib/callbacks/afterLeaveRoomCallback'; import { afterRemoveFromRoomCallback } from '../../../../server/lib/callbacks/afterRemoveFromRoomCallback'; @@ -26,13 +25,7 @@ callbacks.add('federation.afterCreateFederatedRoom', async (room, { owner, origi const federatedRoomId = room?.federation?.mrid; if (!federatedRoomId) { - const endTimer = metrics.federationRoomCreateDuration.startTimer({ room_type: room.t }); - try { - await FederationMatrix.createRoom(room, owner); - metrics.federatedRoomsCreated.inc({ room_type: room.t }); - } finally { - endTimer(); - } + await FederationMatrix.createRoom(room, owner); } else { // TODO unify how to get server // matrix room was already created and passed @@ -50,16 +43,11 @@ callbacks.add('federation.afterCreateFederatedRoom', async (room, { owner, origi } // TODO this won't be neeeded once we receive all state events at ee/packages/federation-matrix/src/events/member.ts - const inviteEndTimer = metrics.federationUserInviteDuration.startTimer({ room_type: room.t }); - try { - await FederationMatrix.inviteUsersToRoom( - federationRoom, - members.filter((member) => member !== owner.username), - owner, - ); - } finally { - inviteEndTimer(); - } + await FederationMatrix.inviteUsersToRoom( + federationRoom, + members.filter((member) => member !== owner.username), + owner, + ); }); callbacks.add( @@ -74,33 +62,11 @@ callbacks.add( // If message is federated, it will save external_message_id like into the message object // if this prop exists here it should not be sent to the federation to avoid loops if (!message.federation?.eventId) { - const messageType = message.files && message.files.length > 0 ? 'file' : 'text'; - const endTimer = metrics.federationMessageSendDuration.startTimer({ - message_type: messageType, - room_type: room.t, - }); - try { - await FederationMatrix.sendMessage(message, room, user); - metrics.federatedMessagesSent.inc({ - room_type: room.t, - message_type: messageType, - }); - metrics.federationEventsProcessed.inc({ - event_type: 'message', - direction: 'outgoing', - }); - } finally { - endTimer(); - } + await FederationMatrix.sendMessage(message, room, user); } } catch (error) { // Log the error but don't prevent the message from being sent locally console.error('[sendMessage] Failed to send message to Native Federation:', error); - metrics.federationEventsFailed.inc({ - event_type: 'message', - direction: 'outgoing', - error_type: error instanceof Error ? error.constructor.name : 'Unknown', - }); } }, callbacks.priority.HIGH, @@ -156,12 +122,7 @@ beforeAddUserToRoom.add( return; } - const endTimer = metrics.federationUserInviteDuration.startTimer({ room_type: room.t }); - try { - await FederationMatrix.inviteUsersToRoom(room, [user.username], inviter); - } finally { - endTimer(); - } + await FederationMatrix.inviteUsersToRoom(room, [user.username], inviter); // after invite is sent we create the invite subscriptions // TODO this may be not needed if we receive the emit for the invite event from matrix @@ -185,20 +146,7 @@ callbacks.add( return; } if (FederationActions.shouldPerformFederationAction(params.room)) { - try { - await FederationMatrix.sendReaction(message._id, params.reaction, params.user); - metrics.federationEventsProcessed.inc({ - event_type: 'reaction', - direction: 'outgoing', - }); - } catch (error) { - metrics.federationEventsFailed.inc({ - event_type: 'reaction', - direction: 'outgoing', - error_type: error instanceof Error ? error.constructor.name : 'Unknown', - }); - throw error; - } + await FederationMatrix.sendReaction(message._id, params.reaction, params.user); } }, callbacks.priority.HIGH, @@ -321,14 +269,8 @@ callbacks.add( // as per federation.beforeCreateDirectMessage we create a DM without federation data because we still don't have it. if (!room.federation.mrid) { - const endTimer = metrics.federationRoomCreateDuration.startTimer({ room_type: 'd' }); - try { - // so after the DM is created we call the federation to create the DM on Matrix side and then updated the reference here - await FederationMatrix.createDirectMessageRoom(room, params.members, params.creatorId); - metrics.federatedRoomsCreated.inc({ room_type: 'd' }); - } finally { - endTimer(); - } + // so after the DM is created we call the federation to create the DM on Matrix side and then updated the reference here + await FederationMatrix.createDirectMessageRoom(room, params.members, params.creatorId); } }, callbacks.priority.HIGH, From f357f672734bddbdd83a7671f77284cf457a8fa8 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 15:11:52 -0300 Subject: [PATCH 08/13] refactor(transactions): remove duplicate import of federation metrics to streamline code --- ee/packages/federation-matrix/src/api/_matrix/transactions.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts index 3957865edb238..17f18bd4a5363 100644 --- a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts +++ b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts @@ -4,9 +4,9 @@ import { Router } from '@rocket.chat/http-router'; import { ajv } from '@rocket.chat/rest-typings/dist/v1/Ajv'; import { addSpanAttributes } from '@rocket.chat/tracing'; +import { federationMetrics, bucketizePduCount, bucketizeEduCount } from '../../helpers/metricsHelpers'; import { canAccessResourceMiddleware } from '../middlewares/canAccessResource'; import { isAuthenticatedMiddleware } from '../middlewares/isAuthenticated'; -import { federationMetrics, bucketizePduCount, bucketizeEduCount } from '../../helpers/metricsHelpers'; const SendTransactionParamsSchema = { type: 'object', From 3a61407d7ce973a69e161361ff40fb49ed1cde4f Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 22:17:17 -0300 Subject: [PATCH 09/13] refactor(metrics): remove unused incoming metrics and streamline metric tracking in FederationMatrix --- .../src/api/_matrix/transactions.ts | 10 -- .../federation-matrix/src/events/member.ts | 81 +++------- .../federation-matrix/src/events/message.ts | 89 +---------- .../src/helpers/metricsHelpers.ts | 140 +----------------- 4 files changed, 29 insertions(+), 291 deletions(-) diff --git a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts index 17f18bd4a5363..fdc5d03f05561 100644 --- a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts +++ b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts @@ -4,7 +4,6 @@ import { Router } from '@rocket.chat/http-router'; import { ajv } from '@rocket.chat/rest-typings/dist/v1/Ajv'; import { addSpanAttributes } from '@rocket.chat/tracing'; -import { federationMetrics, bucketizePduCount, bucketizeEduCount } from '../../helpers/metricsHelpers'; import { canAccessResourceMiddleware } from '../middlewares/canAccessResource'; import { isAuthenticatedMiddleware } from '../middlewares/isAuthenticated'; @@ -359,17 +358,9 @@ export const getMatrixTransactionsRoutes = () => { 'federation.edu_types': Array.from(eduTypes).join(','), }); - // Start duration timer for transaction processing - const endTimer = federationMetrics.federationTransactionProcessDuration.startTimer({ - pdu_count: bucketizePduCount(pdus.length), - edu_count: bucketizeEduCount(edus.length), - origin: body.origin, - }); - try { await federationSDK.processIncomingTransaction(body); } catch (error: any) { - endTimer(); // TODO custom error types? if (error.message === 'too-many-concurrent-transactions') { return { @@ -387,7 +378,6 @@ export const getMatrixTransactionsRoutes = () => { }; } - endTimer(); return { body: { pdus: {}, diff --git a/ee/packages/federation-matrix/src/events/member.ts b/ee/packages/federation-matrix/src/events/member.ts index ac2d5158b5a73..f58d2bc0e4572 100644 --- a/ee/packages/federation-matrix/src/events/member.ts +++ b/ee/packages/federation-matrix/src/events/member.ts @@ -6,7 +6,6 @@ import { Rooms, Subscriptions, Users } from '@rocket.chat/models'; import { createOrUpdateFederatedUser } from '../helpers/createOrUpdateFederatedUser'; import { getUsernameServername } from '../helpers/getUsernameServername'; -import { federationMetrics, extractOriginFromMatrixRoomId } from '../helpers/metricsHelpers'; const logger = new Logger('federation-matrix:member'); @@ -95,7 +94,7 @@ async function getOrCreateFederatedRoom({ function getJoinRuleType(strippedState: PduForType<'m.room.join_rules'>[]): 'p' | 'c' | 'd' { const joinRulesState = strippedState?.find((state: PduForType<'m.room.join_rules'>) => state.type === 'm.room.join_rules'); - // as per the spec, users need to be invited to join a room, unless the room’s join rules state otherwise. + // as per the spec, users need to be invited to join a room, unless the room's join rules state otherwise. if (!joinRulesState) { return 'p'; } @@ -194,64 +193,38 @@ async function handleInvite({ if (room.t === 'd') { await Room.updateDirectMessageRoomName(room); } - - federationMetrics.federationEventsProcessed.inc({ - event_type: 'membership', - direction: 'incoming', - }); } async function handleJoin({ room_id: roomId, state_key: userId, }: HomeserverEventSignatures['homeserver.matrix.membership']['event']): Promise { - const roomOrigin = extractOriginFromMatrixRoomId(roomId); - const endTimer = federationMetrics.federationRoomJoinDuration.startTimer({ origin: roomOrigin }); - - try { - const joiningUser = await getOrCreateFederatedUser(userId); - if (!joiningUser?.username) { - throw new Error(`Failed to get or create joining user: ${userId}`); - } - - const room = await Rooms.findOneFederatedByMrid(roomId); - if (!room) { - throw new Error(`Room not found while joining user ${userId} to room ${roomId}`); - } - - const subscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, joiningUser._id); - if (!subscription) { - throw new Error(`Subscription not found while joining user ${userId} to room ${roomId}`); - } - - // update room name for DMs - if (room.t === 'd') { - await Room.updateDirectMessageRoomName(room, [subscription._id]); - } + const joiningUser = await getOrCreateFederatedUser(userId); + if (!joiningUser?.username) { + throw new Error(`Failed to get or create joining user: ${userId}`); + } - if (!subscription.status) { - logger.info('User is already joined to the room, skipping...'); - return; - } + const room = await Rooms.findOneFederatedByMrid(roomId); + if (!room) { + throw new Error(`Room not found while joining user ${userId} to room ${roomId}`); + } - await Room.performAcceptRoomInvite(room, subscription, joiningUser); + const subscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, joiningUser._id); + if (!subscription) { + throw new Error(`Subscription not found while joining user ${userId} to room ${roomId}`); + } - // Increment counter for rooms joined from external servers - const serverName = federationSDK.getConfig('serverName'); - if (roomOrigin !== serverName) { - federationMetrics.federatedRoomsJoined.inc({ - room_type: room.t, - origin: roomOrigin, - }); - } + // update room name for DMs + if (room.t === 'd') { + await Room.updateDirectMessageRoomName(room, [subscription._id]); + } - federationMetrics.federationEventsProcessed.inc({ - event_type: 'membership', - direction: 'incoming', - }); - } finally { - endTimer(); + if (!subscription.status) { + logger.info('User is already joined to the room, skipping...'); + return; } + + await Room.performAcceptRoomInvite(room, subscription, joiningUser); } async function handleLeave({ @@ -278,11 +251,6 @@ async function handleLeave({ await Room.updateDirectMessageRoomName(room); } - federationMetrics.federationEventsProcessed.inc({ - event_type: 'membership', - direction: 'incoming', - }); - // TODO check if there are no pending invites to the room, and if so, delete the room } @@ -307,11 +275,6 @@ export function member() { } } catch (err) { logger.error({ msg: 'Failed to process Matrix membership event', err }); - federationMetrics.federationEventsFailed.inc({ - event_type: 'membership', - direction: 'incoming', - error_type: err instanceof Error ? err.constructor.name : 'Unknown', - }); } }); } diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index d29c427ab1509..ca619b0c3feb6 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -6,7 +6,6 @@ import { Users, Rooms, Messages } from '@rocket.chat/models'; import { fileTypes } from '../FederationMatrix'; import { toInternalMessageFormat, toInternalQuoteMessageFormat } from '../helpers/message.parsers'; -import { federationMetrics, extractOriginFromMatrixUserId, determineMessageType } from '../helpers/metricsHelpers'; import { MatrixMediaService } from '../services/MatrixMediaService'; const logger = new Logger('federation-matrix:message'); @@ -113,11 +112,6 @@ async function handleMediaMessage( export function message() { federationSDK.eventEmitterService.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { - const messageType = determineMessageType(event); - const endTimer = federationMetrics.federationIncomingMessageProcessDuration.startTimer({ - message_type: messageType, - }); - try { const { msgtype, body } = event.content; const messageBody = body.toString(); @@ -139,11 +133,10 @@ export function message() { } const serverName = federationSDK.getConfig('serverName'); - const origin = extractOriginFromMatrixUserId(event.sender); const relation = event.content['m.relates_to']; - // SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such. + // SPEC: For example, an m.thread relationship type denotes that the event is part of a "thread" of messages and should be rendered as such. const hasRelation = relation && 'rel_type' in relation; const isThreadMessage = hasRelation && relation.rel_type === 'm.thread'; @@ -151,7 +144,7 @@ export function message() { const threadRootEventId = isThreadMessage && relation.event_id; // SPEC: Though rich replies form a relationship to another event, they do not use rel_type to create this relationship. - // Instead, a subkey named m.in_reply_to is used to describe the reply’s relationship, + // Instead, a subkey named m.in_reply_to is used to describe the reply's relationship, const isRichReply = relation && !('rel_type' in relation) && 'm.in_reply_to' in relation; const quoteMessageEventId = isRichReply && relation['m.in_reply_to']?.event_id; @@ -234,17 +227,6 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); - - // Track received message - federationMetrics.federatedMessagesReceived.inc({ - room_type: room.t, - message_type: messageType, - origin, - }); - federationMetrics.federationEventsProcessed.inc({ - event_type: 'message', - direction: 'incoming', - }); return; } @@ -279,36 +261,12 @@ export function message() { ts: new Date(event.origin_server_ts), }); } - - // Track received message - federationMetrics.federatedMessagesReceived.inc({ - room_type: room.t, - message_type: messageType, - origin, - }); - federationMetrics.federationEventsProcessed.inc({ - event_type: 'message', - direction: 'incoming', - }); } catch (err) { logger.error({ msg: 'Error processing Matrix message', err }); - federationMetrics.federationEventsFailed.inc({ - event_type: 'message', - direction: 'incoming', - error_type: err instanceof Error ? err.constructor.name : 'Unknown', - }); - } finally { - endTimer(); } }); federationSDK.eventEmitterService.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { - const messageType = 'encrypted'; - const endTimer = federationMetrics.federationIncomingMessageProcessDuration.startTimer({ - message_type: messageType, - }); - const origin = extractOriginFromMatrixUserId(event.sender); - try { if (!event.content.ciphertext) { logger.debug('No message content found in event'); @@ -328,7 +286,7 @@ export function message() { const relation = event.content['m.relates_to']; - // SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such. + // SPEC: For example, an m.thread relationship type denotes that the event is part of a "thread" of messages and should be rendered as such. const hasRelation = relation && 'rel_type' in relation; const isThreadMessage = hasRelation && relation.rel_type === 'm.thread'; @@ -336,7 +294,7 @@ export function message() { const threadRootEventId = isThreadMessage && relation.event_id; // SPEC: Though rich replies form a relationship to another event, they do not use rel_type to create this relationship. - // Instead, a subkey named m.in_reply_to is used to describe the reply’s relationship, + // Instead, a subkey named m.in_reply_to is used to describe the reply's relationship, const isRichReply = relation && !('rel_type' in relation) && 'm.in_reply_to' in relation; const quoteMessageEventId = isRichReply && relation['m.in_reply_to']?.event_id; @@ -405,17 +363,6 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); - - // Track received encrypted message - federationMetrics.federatedMessagesReceived.inc({ - room_type: room.t, - message_type: messageType, - origin, - }); - federationMetrics.federationEventsProcessed.inc({ - event_type: 'message', - direction: 'incoming', - }); return; } @@ -430,26 +377,8 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); - - // Track received encrypted message - federationMetrics.federatedMessagesReceived.inc({ - room_type: room.t, - message_type: messageType, - origin, - }); - federationMetrics.federationEventsProcessed.inc({ - event_type: 'message', - direction: 'incoming', - }); } catch (err) { logger.error({ msg: 'Error processing Matrix message', err }); - federationMetrics.federationEventsFailed.inc({ - event_type: 'message', - direction: 'incoming', - error_type: err instanceof Error ? err.constructor.name : 'Unknown', - }); - } finally { - endTimer(); } }); @@ -480,18 +409,8 @@ export function message() { } await Message.deleteMessage(user, rcMessage); - - federationMetrics.federationEventsProcessed.inc({ - event_type: 'redaction', - direction: 'incoming', - }); } catch (err) { logger.error({ msg: 'Failed to process Matrix removal redaction', err }); - federationMetrics.federationEventsFailed.inc({ - event_type: 'redaction', - direction: 'incoming', - error_type: err instanceof Error ? err.constructor.name : 'Unknown', - }); } }); } diff --git a/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts index b7f5cd0f60364..5155b24b041cb 100644 --- a/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts +++ b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts @@ -1,9 +1,6 @@ import type { IMessage } from '@rocket.chat/core-typings'; -import type { FileMessageType } from '@rocket.chat/federation-sdk'; import client from 'prom-client'; -import { fileTypes } from '../FederationMatrix'; - const percentiles = [0.01, 0.1, 0.5, 0.9, 0.95, 0.99, 1]; /** @@ -20,41 +17,14 @@ function getOrCreateMetric(name: string, createFn: () = } /** - * Federation metrics for both incoming and outgoing operations. - * These use getOrCreateMetric to safely access metrics that may already - * be registered by the meteor app's metrics.ts. + * Federation metrics for outgoing operations. + * Incoming metrics are now collected by the SDK's EventEmitterService. */ export const federationMetrics = { // ===================================== - // INCOMING METRICS + // OUTGOING METRICS // ===================================== - /** Counter for messages received from other federated servers */ - get federatedMessagesReceived() { - return getOrCreateMetric( - 'rocketchat_federation_messages_received', - () => - new client.Counter({ - name: 'rocketchat_federation_messages_received', - labelNames: ['room_type', 'message_type', 'origin'], - help: 'Total federated messages received', - }), - ); - }, - - /** Counter for rooms joined (users invited to rooms created elsewhere) */ - get federatedRoomsJoined() { - return getOrCreateMetric( - 'rocketchat_federation_rooms_joined', - () => - new client.Counter({ - name: 'rocketchat_federation_rooms_joined', - labelNames: ['room_type', 'origin'], - help: 'Total federated rooms joined (users invited to rooms created elsewhere)', - }), - ); - }, - /** Counter for federation events processed (both incoming and outgoing) */ get federationEventsProcessed() { return getOrCreateMetric( @@ -81,52 +51,6 @@ export const federationMetrics = { ); }, - /** Duration to process incoming federation transaction */ - get federationTransactionProcessDuration() { - return getOrCreateMetric( - 'rocketchat_federation_transaction_process_duration_seconds', - () => - new client.Summary({ - name: 'rocketchat_federation_transaction_process_duration_seconds', - labelNames: ['pdu_count', 'edu_count', 'origin'], - help: 'Time to process incoming federation transaction', - percentiles, - }), - ); - }, - - /** Duration to process incoming federated message */ - get federationIncomingMessageProcessDuration() { - return getOrCreateMetric( - 'rocketchat_federation_incoming_message_process_duration_seconds', - () => - new client.Summary({ - name: 'rocketchat_federation_incoming_message_process_duration_seconds', - labelNames: ['message_type'], - help: 'Time to process incoming federated message', - percentiles, - }), - ); - }, - - /** Duration to join a federated room (invite acceptance) */ - get federationRoomJoinDuration() { - return getOrCreateMetric( - 'rocketchat_federation_room_join_duration_seconds', - () => - new client.Summary({ - name: 'rocketchat_federation_room_join_duration_seconds', - labelNames: ['origin'], - help: 'Time to join a federated room (invite acceptance)', - percentiles, - }), - ); - }, - - // ===================================== - // OUTGOING METRICS - // ===================================== - /** Counter for messages sent to federated rooms */ get federatedMessagesSent() { return getOrCreateMetric( @@ -222,39 +146,6 @@ export const federationMetrics = { }, }; -/** - * Extracts the origin server domain from a Matrix room ID. - * @example extractOriginFromMatrixRoomId('!room:matrix.org') // 'matrix.org' - */ -export function extractOriginFromMatrixRoomId(roomId: string): string { - return roomId.split(':').pop() || 'unknown'; -} - -/** - * Extracts the origin server domain from a Matrix user ID. - * @example extractOriginFromMatrixUserId('@user:matrix.org') // 'matrix.org' - */ -export function extractOriginFromMatrixUserId(userId: string): string { - return userId.split(':').pop() || 'unknown'; -} - -/** - * Determines the message type from a Matrix event for metrics labeling (incoming). - * @returns 'text' | 'file' | 'encrypted' - */ -export function determineMessageType(event: { type?: string; content?: { msgtype?: string } }): 'text' | 'file' | 'encrypted' { - if (event.type === 'm.room.encrypted') { - return 'encrypted'; - } - - const msgtype = event.content?.msgtype; - if (msgtype && Object.values(fileTypes).includes(msgtype as FileMessageType)) { - return 'file'; - } - - return 'text'; -} - /** * Determines the message type from a Rocket.Chat message for outgoing metrics labeling. * @returns 'text' | 'file' @@ -265,28 +156,3 @@ export function determineOutgoingMessageType(message: IMessage): 'text' | 'file' } return 'text'; } - -/** - * Bucketizes PDU count for metrics labeling to avoid high cardinality. - * Groups counts into buckets: 0, 1, 2-5, 6-10, 11-50, 51+ - */ -export function bucketizePduCount(count: number): string { - if (count === 0) return '0'; - if (count === 1) return '1'; - if (count <= 5) return '2-5'; - if (count <= 10) return '6-10'; - if (count <= 50) return '11-50'; - return '51+'; -} - -/** - * Bucketizes EDU count for metrics labeling to avoid high cardinality. - * Groups counts into buckets: 0, 1, 2-5, 6-10, 11+ - */ -export function bucketizeEduCount(count: number): string { - if (count === 0) return '0'; - if (count === 1) return '1'; - if (count <= 5) return '2-5'; - if (count <= 10) return '6-10'; - return '11+'; -} From b5e4a108b46fe58b7f53d929c23c9f30e8bce629 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 22:27:46 -0300 Subject: [PATCH 10/13] refactor(metrics): remove federation metrics collection to streamline metric tracking --- .../app/metrics/server/lib/collectMetrics.ts | 84 ------------------- 1 file changed, 84 deletions(-) diff --git a/apps/meteor/app/metrics/server/lib/collectMetrics.ts b/apps/meteor/app/metrics/server/lib/collectMetrics.ts index e37907934753b..7decb5c71dd30 100644 --- a/apps/meteor/app/metrics/server/lib/collectMetrics.ts +++ b/apps/meteor/app/metrics/server/lib/collectMetrics.ts @@ -86,90 +86,6 @@ const setPrometheusData = async (): Promise => { metrics.totalLivechatAgents.set(statistics.totalLivechatAgents); metrics.pushQueue.set(statistics.pushQueue || 0); - - // Federation statistics - await collectFederationMetrics(); -}; - -/** - * Collects federation-related gauge metrics by querying the database. - * This includes counts of federated rooms and users. - */ -const collectFederationMetrics = async (): Promise => { - try { - // Check if federation is enabled before collecting metrics - const federationEnabled = settings.get('Federation_Matrix_enabled'); - if (!federationEnabled) { - return; - } - - const serverName = settings.get('Federation_Matrix_homeserver_domain') || ''; - - // Count federated rooms by type - const federatedChannels = await Rooms.col.countDocuments({ - 't': 'c', - 'federation.mrid': { $exists: true }, - }); - const federatedPrivateGroups = await Rooms.col.countDocuments({ - 't': 'p', - 'federation.mrid': { $exists: true }, - }); - const federatedDirect = await Rooms.col.countDocuments({ - 't': 'd', - 'federation.mrid': { $exists: true }, - }); - - metrics.totalFederatedChannels.set(federatedChannels); - metrics.totalFederatedPrivateGroups.set(federatedPrivateGroups); - metrics.totalFederatedDirectMessages.set(federatedDirect); - - // Count federated rooms by origin - const federatedRoomsByOrigin = await Rooms.col - .aggregate<{ _id: { room_type: string; origin: string }; count: number }>([ - { $match: { 'federation.mrid': { $exists: true } } }, - { - $group: { - _id: { room_type: '$t', origin: '$federation.origin' }, - count: { $sum: 1 }, - }, - }, - ]) - .toArray(); - - // Reset all federated room gauges before setting new values - metrics.totalFederatedRooms.reset(); - - for (const { _id, count } of federatedRoomsByOrigin) { - const isLocal = _id.origin === serverName; - metrics.totalFederatedRooms.set( - { - room_type: _id.room_type, - origin: isLocal ? 'local' : _id.origin || 'unknown', - }, - count, - ); - } - - // Count federated users by origin - const federatedUsersByOrigin = await Users.col - .aggregate<{ - _id: string; - count: number; - }>([ - { $match: { 'federated': true, 'federation.origin': { $exists: true } } }, - { $group: { _id: '$federation.origin', count: { $sum: 1 } } }, - ]) - .toArray(); - - // Reset federated users gauge before setting new values - metrics.totalFederatedUsers.reset(); - - for (const { _id: origin, count } of federatedUsersByOrigin) { - metrics.totalFederatedUsers.set({ origin: origin || 'unknown' }, count); - } - } catch (error) { - SystemLogger.error({ msg: 'Error collecting federation metrics', error }); - } }; const app = connect(); From 70a3246d9eab5fbd93b3b877c78d07a81f50384c Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 22:28:19 -0300 Subject: [PATCH 11/13] refactor(metrics): remove unused Rooms and Users imports to streamline metrics collection --- apps/meteor/app/metrics/server/lib/collectMetrics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/meteor/app/metrics/server/lib/collectMetrics.ts b/apps/meteor/app/metrics/server/lib/collectMetrics.ts index 7decb5c71dd30..8628a52fe4096 100644 --- a/apps/meteor/app/metrics/server/lib/collectMetrics.ts +++ b/apps/meteor/app/metrics/server/lib/collectMetrics.ts @@ -1,6 +1,6 @@ import http from 'http'; -import { Rooms, Statistics, Users } from '@rocket.chat/models'; +import { Statistics } from '@rocket.chat/models'; import { tracerSpan } from '@rocket.chat/tracing'; import connect from 'connect'; import { Facts } from 'meteor/facts-base'; From d7860c57c22f6988630ba6478a84a1f19a56e99f Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 1 Feb 2026 22:28:59 -0300 Subject: [PATCH 12/13] refactor(metrics): remove additional federation metrics to further streamline metrics collection --- apps/meteor/app/metrics/server/lib/metrics.ts | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/apps/meteor/app/metrics/server/lib/metrics.ts b/apps/meteor/app/metrics/server/lib/metrics.ts index 23b091221c928..36967954a8dbb 100644 --- a/apps/meteor/app/metrics/server/lib/metrics.ts +++ b/apps/meteor/app/metrics/server/lib/metrics.ts @@ -246,34 +246,6 @@ export const metrics = { help: 'Time taken in seconds for an item to be processed for the first time by Omni queues', percentiles, }), - - // Federation metrics - Gauges (used by collectFederationMetrics) - totalFederatedRooms: new client.Gauge({ - name: 'rocketchat_federation_rooms_total', - labelNames: ['room_type', 'origin'], // origin = 'local' | remote domain - help: 'Current count of federated rooms', - }), - - totalFederatedChannels: new client.Gauge({ - name: 'rocketchat_federation_channels_total', - help: 'Total federated public channels', - }), - - totalFederatedPrivateGroups: new client.Gauge({ - name: 'rocketchat_federation_private_groups_total', - help: 'Total federated private groups', - }), - - totalFederatedDirectMessages: new client.Gauge({ - name: 'rocketchat_federation_direct_total', - help: 'Total federated direct message rooms', - }), - - totalFederatedUsers: new client.Gauge({ - name: 'rocketchat_federation_users_total', - labelNames: ['origin'], // origin = remote server domain - help: 'Current count of federated users', - }), }; // Metrics From 3b199efcc34bdf5c01368c2bb5109d79d428de75 Mon Sep 17 00:00:00 2001 From: dhulke Date: Tue, 3 Feb 2026 11:33:14 -0300 Subject: [PATCH 13/13] refactor(events): enhance event handling in FederationMatrix by adding type signatures and improving error handling --- .../federation-matrix/src/events/edu.ts | 26 +-- .../federation-matrix/src/events/member.ts | 12 +- .../federation-matrix/src/events/message.ts | 45 +++-- .../federation-matrix/src/events/ping.ts | 15 +- .../federation-matrix/src/events/reaction.ts | 26 +-- .../federation-matrix/src/events/room.ts | 175 ++++++++++-------- 6 files changed, 164 insertions(+), 135 deletions(-) diff --git a/ee/packages/federation-matrix/src/events/edu.ts b/ee/packages/federation-matrix/src/events/edu.ts index 2235e8479d811..773f90e39479b 100644 --- a/ee/packages/federation-matrix/src/events/edu.ts +++ b/ee/packages/federation-matrix/src/events/edu.ts @@ -1,14 +1,15 @@ import { api } from '@rocket.chat/core-services'; import { UserStatus } from '@rocket.chat/core-typings'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Rooms, Users } from '@rocket.chat/models'; const logger = new Logger('federation-matrix:edu'); export const edus = async () => { - federationSDK.eventEmitterService.on('homeserver.matrix.typing', async (data) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.typing', + async (data: HomeserverEventSignatures['homeserver.matrix.typing']) => { const matrixRoom = await Rooms.findOne({ 'federation.mrid': data.room_id }, { projection: { _id: 1 } }); if (!matrixRoom) { logger.debug({ msg: 'No bridged room found for Matrix room_id', roomId: data.room_id }); @@ -20,13 +21,13 @@ export const edus = async () => { isTyping: data.typing, roomId: matrixRoom._id, }); - } catch (err) { - logger.error({ msg: 'Error handling Matrix typing event', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error handling Matrix typing event', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.presence', async (data) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.presence', + async (data: HomeserverEventSignatures['homeserver.matrix.presence']) => { const matrixUser = await Users.findOneByUsername(data.user_id); if (!matrixUser) { logger.debug({ msg: 'No federated user found for Matrix user_id', userId: data.user_id }); @@ -67,8 +68,7 @@ export const edus = async () => { previousStatus: undefined, }); logger.debug({ msg: 'Updated presence for user from Matrix federation', userId: matrixUser._id, status }); - } catch (err) { - logger.error({ msg: 'Error handling Matrix presence event', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error handling Matrix presence event', err }), + ); }; diff --git a/ee/packages/federation-matrix/src/events/member.ts b/ee/packages/federation-matrix/src/events/member.ts index f58d2bc0e4572..792b5eb6e1ded 100644 --- a/ee/packages/federation-matrix/src/events/member.ts +++ b/ee/packages/federation-matrix/src/events/member.ts @@ -255,8 +255,9 @@ async function handleLeave({ } export function member() { - federationSDK.eventEmitterService.on('homeserver.matrix.membership', async ({ event }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.membership', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.membership']) => { switch (event.content.membership) { case 'invite': await handleInvite(event); @@ -273,8 +274,7 @@ export function member() { default: logger.warn({ msg: 'Unknown membership type', membership: event.content.membership }); } - } catch (err) { - logger.error({ msg: 'Failed to process Matrix membership event', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix membership event', err }), + ); } diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index ca619b0c3feb6..f1659a5b6cd7c 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -1,6 +1,13 @@ import { FederationMatrix, Message, MeteorService } from '@rocket.chat/core-services'; import type { IUser, IRoom, FileAttachmentProps } from '@rocket.chat/core-typings'; -import { type FileMessageType, type MessageType, type FileMessageContent, type EventID, federationSDK } from '@rocket.chat/federation-sdk'; +import { + type FileMessageType, + type MessageType, + type FileMessageContent, + type EventID, + federationSDK, + type HomeserverEventSignatures, +} from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Users, Rooms, Messages } from '@rocket.chat/models'; @@ -111,8 +118,9 @@ async function handleMediaMessage( } export function message() { - federationSDK.eventEmitterService.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.message', + async ({ event, event_id: eventId }: HomeserverEventSignatures['homeserver.matrix.message']) => { const { msgtype, body } = event.content; const messageBody = body.toString(); @@ -261,13 +269,13 @@ export function message() { ts: new Date(event.origin_server_ts), }); } - } catch (err) { - logger.error({ msg: 'Error processing Matrix message', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error processing Matrix message', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.encrypted', + async ({ event, event_id: eventId }: HomeserverEventSignatures['homeserver.matrix.encrypted']) => { if (!event.content.ciphertext) { logger.debug('No message content found in event'); return; @@ -377,13 +385,13 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); - } catch (err) { - logger.error({ msg: 'Error processing Matrix message', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error processing Matrix message', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.redaction', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.redaction']) => { const redactedEventId = event.redacts; if (!redactedEventId) { logger.debug('No redacts field in redaction event'); @@ -409,8 +417,7 @@ export function message() { } await Message.deleteMessage(user, rcMessage); - } catch (err) { - logger.error({ msg: 'Failed to process Matrix removal redaction', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix removal redaction', err }), + ); } diff --git a/ee/packages/federation-matrix/src/events/ping.ts b/ee/packages/federation-matrix/src/events/ping.ts index 204a49fdf891d..9142bb2405fa4 100644 --- a/ee/packages/federation-matrix/src/events/ping.ts +++ b/ee/packages/federation-matrix/src/events/ping.ts @@ -1,7 +1,14 @@ -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { Logger } from '@rocket.chat/logger'; + +const logger = new Logger('federation-matrix:ping'); export const ping = async () => { - federationSDK.eventEmitterService.on('homeserver.ping', async (data) => { - console.log('Message received from homeserver', data); - }); + federationSDK.eventEmitterService.on( + 'homeserver.ping', + async (data: HomeserverEventSignatures['homeserver.ping']) => { + logger.debug({ msg: 'Message received from homeserver', data }); + }, + (err: Error) => logger.error({ msg: 'Error handling homeserver ping', err }), + ); }; diff --git a/ee/packages/federation-matrix/src/events/reaction.ts b/ee/packages/federation-matrix/src/events/reaction.ts index 2871f0d9f5409..a1aa80e4714ad 100644 --- a/ee/packages/federation-matrix/src/events/reaction.ts +++ b/ee/packages/federation-matrix/src/events/reaction.ts @@ -1,5 +1,5 @@ import { Message, FederationMatrix } from '@rocket.chat/core-services'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Users, Messages } from '@rocket.chat/models'; // Rooms import emojione from 'emojione'; @@ -7,8 +7,9 @@ import emojione from 'emojione'; const logger = new Logger('federation-matrix:reaction'); export function reaction() { - federationSDK.eventEmitterService.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.reaction', + async ({ event, event_id: eventId }: HomeserverEventSignatures['homeserver.matrix.reaction']) => { const isSetReaction = event.content?.['m.relates_to']; const reactionTargetEventId = isSetReaction?.event_id; @@ -41,13 +42,13 @@ export function reaction() { const reactionEmoji = emojione.toShort(reactionKey); await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, true); await Messages.setFederationReactionEventId(internalUsername, rcMessage._id, reactionEmoji, eventId); - } catch (err) { - logger.error({ msg: 'Failed to process Matrix reaction', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix reaction', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.redaction', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.redaction']) => { const redactedEventId = event.redacts; if (!redactedEventId) { logger.debug('No redacts field in redaction event'); @@ -85,8 +86,7 @@ export function reaction() { const reactionEmoji = emojione.toShort(reactionKey); await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, false); await Messages.unsetFederationReactionEventId(redactedEventId, rcMessage._id, reactionEmoji); - } catch (err) { - logger.error({ msg: 'Failed to process Matrix reaction redaction', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix reaction redaction', err }), + ); } diff --git a/ee/packages/federation-matrix/src/events/room.ts b/ee/packages/federation-matrix/src/events/room.ts index 23b3688fd9373..101cd90e7a73b 100644 --- a/ee/packages/federation-matrix/src/events/room.ts +++ b/ee/packages/federation-matrix/src/events/room.ts @@ -1,87 +1,102 @@ import { Room } from '@rocket.chat/core-services'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { Logger } from '@rocket.chat/logger'; import { Rooms, Users } from '@rocket.chat/models'; import { getUsernameServername } from '../helpers/getUsernameServername'; +const logger = new Logger('federation-matrix:room'); + export function room() { - federationSDK.eventEmitterService.on('homeserver.matrix.room.name', async ({ event }) => { - const { - room_id: roomId, - content: { name }, - sender: userId, - } = event; - - const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); - if (!localRoomId) { - throw new Error(`mapped room not found: ${roomId}`); - } - - const localUserId = await Users.findOneByUsername(userId, { projection: { _id: 1 } }); - if (!localUserId) { - throw new Error(`mapped user not found: ${userId}`); - } - - await Room.saveRoomName(localRoomId._id, localUserId._id, name); - }); - - federationSDK.eventEmitterService.on('homeserver.matrix.room.topic', async ({ event }) => { - const { - room_id: roomId, - content: { topic }, - sender: userId, - } = event; - - const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); - if (!localRoomId) { - throw new Error('mapped room not found'); - } - - const localUser = await Users.findOneByUsername(userId, { projection: { _id: 1, federation: 1, federated: 1 } }); - if (!localUser) { - throw new Error('mapped user not found'); - } - - await Room.saveRoomTopic(localRoomId._id, topic, { - _id: localUser._id, - username: userId, - federation: localUser.federation, - federated: localUser.federated, - }); - }); - - federationSDK.eventEmitterService.on('homeserver.matrix.room.role', async (data) => { - const { room_id: roomId, user_id: userId, sender_id: senderId, role } = data; - - const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); - if (!localRoomId) { - throw new Error('mapped room not found'); - } - - const serverName = federationSDK.getConfig('serverName'); - - const [allegedUsernameLocal, , allegedUserLocalIsLocal] = getUsernameServername(userId, serverName); - const localUserId = allegedUserLocalIsLocal && (await Users.findOneByUsername(allegedUsernameLocal, { projection: { _id: 1 } })); - - if (!allegedUserLocalIsLocal) { - return; - } - - if (!localUserId) { - throw new Error('mapped user not found'); - } - - const [senderUsername, , senderIsLocal] = getUsernameServername(senderId, serverName); - - if (senderIsLocal) { - return; - } - - const localSenderId = await Users.findOneByUsername(senderUsername, { projection: { _id: 1 } }); - if (!localSenderId) { - throw new Error('mapped user not found'); - } - - await Room.addUserRoleRoomScoped(localSenderId._id, localUserId._id, localRoomId._id, role); - }); + federationSDK.eventEmitterService.on( + 'homeserver.matrix.room.name', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.room.name']) => { + const { + room_id: roomId, + content: { name }, + sender: userId, + } = event; + + const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); + if (!localRoomId) { + throw new Error(`mapped room not found: ${roomId}`); + } + + const localUserId = await Users.findOneByUsername(userId, { projection: { _id: 1 } }); + if (!localUserId) { + throw new Error(`mapped user not found: ${userId}`); + } + + await Room.saveRoomName(localRoomId._id, localUserId._id, name); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix room name event', err }), + ); + + federationSDK.eventEmitterService.on( + 'homeserver.matrix.room.topic', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.room.topic']) => { + const { + room_id: roomId, + content: { topic }, + sender: userId, + } = event; + + const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); + if (!localRoomId) { + throw new Error('mapped room not found'); + } + + const localUser = await Users.findOneByUsername(userId, { projection: { _id: 1, federation: 1, federated: 1 } }); + if (!localUser) { + throw new Error('mapped user not found'); + } + + await Room.saveRoomTopic(localRoomId._id, topic, { + _id: localUser._id, + username: userId, + federation: localUser.federation, + federated: localUser.federated, + }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix room topic event', err }), + ); + + federationSDK.eventEmitterService.on( + 'homeserver.matrix.room.role', + async (data: HomeserverEventSignatures['homeserver.matrix.room.role']) => { + const { room_id: roomId, user_id: userId, sender_id: senderId, role } = data; + + const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); + if (!localRoomId) { + throw new Error('mapped room not found'); + } + + const serverName = federationSDK.getConfig('serverName'); + + const [allegedUsernameLocal, , allegedUserLocalIsLocal] = getUsernameServername(userId, serverName); + const localUserId = allegedUserLocalIsLocal && (await Users.findOneByUsername(allegedUsernameLocal, { projection: { _id: 1 } })); + + if (!allegedUserLocalIsLocal) { + return; + } + + if (!localUserId) { + throw new Error('mapped user not found'); + } + + const [senderUsername, , senderIsLocal] = getUsernameServername(senderId, serverName); + + if (senderIsLocal) { + return; + } + + const localSenderId = await Users.findOneByUsername(senderUsername, { projection: { _id: 1 } }); + if (!localSenderId) { + throw new Error('mapped user not found'); + } + + await Room.addUserRoleRoomScoped(localSenderId._id, localUserId._id, localRoomId._id, role); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix room role event', err }), + ); }