From f6b125a2491d753c0e809e29dd3b7730bd9e94cd Mon Sep 17 00:00:00 2001 From: Gianfranco Paoloni Date: Tue, 10 Feb 2026 11:11:35 -0300 Subject: [PATCH 1/2] chore: wrap elasticsearch index & update operations in result type --- packages/elasticsearch-client/index.d.ts | 2 + .../elasticsearch-client/src/indexDocument.ts | 34 +++++--- .../src/updateDocument.ts | 85 ++++++++++++------- 3 files changed, 77 insertions(+), 44 deletions(-) diff --git a/packages/elasticsearch-client/index.d.ts b/packages/elasticsearch-client/index.d.ts index 49973c9d8..e1dafeb81 100644 --- a/packages/elasticsearch-client/index.d.ts +++ b/packages/elasticsearch-client/index.d.ts @@ -25,6 +25,8 @@ export * from './src/client'; export * from './src/createIndex'; export * from './src/deleteIndex'; export * from './src/indexDocument'; +export * from './src/updateDocument'; +export * from './src/deleteDocument'; export * from './src/executeBulk'; export * from './src/search'; export * from './src/suggest'; diff --git a/packages/elasticsearch-client/src/indexDocument.ts b/packages/elasticsearch-client/src/indexDocument.ts index 12c7b5e54..e2645260f 100644 --- a/packages/elasticsearch-client/src/indexDocument.ts +++ b/packages/elasticsearch-client/src/indexDocument.ts @@ -16,6 +16,7 @@ import { IndexResponse } from '@elastic/elasticsearch/lib/api/types'; import { PassThroughConfig } from './client'; import createIndex from './createIndex'; +import { newErr, newOk, TResult } from '@tech-matters/types'; export type IndexDocumentExtraParams = { id: string; @@ -24,7 +25,7 @@ export type IndexDocumentExtraParams = { }; export type IndexDocumentParams = PassThroughConfig & IndexDocumentExtraParams; -export type IndexDocumentResponse = IndexResponse; +export type IndexDocumentResponse = TResult<'IndexDocumentError', IndexResponse>; export const indexDocument = async ({ client, @@ -34,19 +35,28 @@ export const indexDocument = async ({ indexConfig, autocreate = false, }: IndexDocumentParams): Promise => { - if (autocreate) { - // const exists = await client.indices.exists({ index }); - // NOTE: above check is already performed in createIndex - await createIndex({ client, index, indexConfig }); - } + try { + if (autocreate) { + // const exists = await client.indices.exists({ index }); + // NOTE: above check is already performed in createIndex + await createIndex({ client, index, indexConfig }); + } - const convertedDocument = indexConfig.convertToIndexDocument(document, index); + const convertedDocument = indexConfig.convertToIndexDocument(document, index); - return client.index({ - index, - id, - document: convertedDocument, - }); + const response = await client.index({ + index, + id, + document: convertedDocument, + }); + return newOk({ data: response }); + } catch (error) { + return newErr({ + error: 'IndexDocumentError', + message: error instanceof Error ? error.message : String(error), + extraProperties: { ...(error as any)?.meta, originalError: error }, + }); + } }; export default indexDocument; diff --git a/packages/elasticsearch-client/src/updateDocument.ts b/packages/elasticsearch-client/src/updateDocument.ts index 7dba5b051..7961d3518 100644 --- a/packages/elasticsearch-client/src/updateDocument.ts +++ b/packages/elasticsearch-client/src/updateDocument.ts @@ -16,6 +16,7 @@ import { UpdateResponse } from '@elastic/elasticsearch/lib/api/types'; import { PassThroughConfig } from './client'; import createIndex from './createIndex'; +import { newErr, newOk, TResult } from '@tech-matters/types'; type UpdateParams = { id: string; document: T; autocreate?: boolean }; @@ -24,7 +25,7 @@ export type UpdateDocumentExtraParams = UpdateParams & { }; export type UpdateDocumentParams = PassThroughConfig & UpdateDocumentExtraParams; -export type UpdateDocumentResponse = UpdateResponse; +export type UpdateDocumentResponse = TResult<'UpdateDocumentError', UpdateResponse>; export const updateDocument = async ({ client, @@ -35,20 +36,30 @@ export const updateDocument = async ({ docAsUpsert = false, autocreate = false, }: UpdateDocumentParams): Promise => { - if (docAsUpsert && autocreate) { - // const exists = await client.indices.exists({ index }); - // NOTE: above check is already performed in createIndex - await createIndex({ client, index, indexConfig }); - } + try { + if (docAsUpsert && autocreate) { + // const exists = await client.indices.exists({ index }); + // NOTE: above check is already performed in createIndex + await createIndex({ client, index, indexConfig }); + } + + const documentUpdate = indexConfig.convertToIndexDocument(document, index); - const documentUpdate = indexConfig.convertToIndexDocument(document, index); + const response = await client.update({ + index, + id, + doc: documentUpdate, + doc_as_upsert: docAsUpsert, + }); - return client.update({ - index, - id, - doc: documentUpdate, - doc_as_upsert: docAsUpsert, - }); + return newOk({ data: response }); + } catch (error) { + return newErr({ + error: 'UpdateDocumentError', + message: error instanceof Error ? error.message : String(error), + extraProperties: { ...(error as any)?.meta, originalError: error }, + }); + } }; export type UpdateScriptExtraParams = UpdateParams & { scriptedUpsert?: boolean }; @@ -63,26 +74,36 @@ export const updateScript = async ({ scriptedUpsert = false, autocreate = false, }: UpdateScriptParams): Promise => { - if (!indexConfig.convertToScriptUpdate) { - throw new Error(`updateScript error: convertToScriptDocument not provided`); - } + try { + if (!indexConfig.convertToScriptUpdate) { + throw new Error(`updateScript error: convertToScriptDocument not provided`); + } - if (scriptedUpsert && autocreate) { - // const exists = await client.indices.exists({ index }); - // NOTE: above check is already performed in createIndex - await createIndex({ client, index, indexConfig }); - } + if (scriptedUpsert && autocreate) { + // const exists = await client.indices.exists({ index }); + // NOTE: above check is already performed in createIndex + await createIndex({ client, index, indexConfig }); + } + + const { documentUpdate, scriptUpdate } = indexConfig.convertToScriptUpdate( + document, + index, + ); - const { documentUpdate, scriptUpdate } = indexConfig.convertToScriptUpdate( - document, - index, - ); + const response = await client.update({ + index, + id, + script: scriptUpdate, + upsert: documentUpdate, + scripted_upsert: scriptedUpsert, + }); - return client.update({ - index, - id, - script: scriptUpdate, - upsert: documentUpdate, - scripted_upsert: scriptedUpsert, - }); + return newOk({ data: response }); + } catch (error) { + return newErr({ + error: 'UpdateDocumentError', + message: error instanceof Error ? error.message : String(error), + extraProperties: { ...(error as any)?.meta, originalError: error }, + }); + } }; From d750848832a2adaaf077678a1cd42ddcabc12dce Mon Sep 17 00:00:00 2001 From: Gianfranco Paoloni Date: Tue, 10 Feb 2026 11:12:45 -0300 Subject: [PATCH 2/2] chore: search-index-consumer - handle version conflict race condition --- .../lambdas/search-index-consumer/index.ts | 1 + .../search-index-consumer/payloadToIndex.ts | 83 +++++++++++++------ 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/hrm-domain/lambdas/search-index-consumer/index.ts b/hrm-domain/lambdas/search-index-consumer/index.ts index f986b3659..a90793d97 100644 --- a/hrm-domain/lambdas/search-index-consumer/index.ts +++ b/hrm-domain/lambdas/search-index-consumer/index.ts @@ -43,6 +43,7 @@ export const handler = async (event: SQSEvent): Promise => { console.warn( `[generalised-search-${indexType}] ${result.error}. Account SID: ${accountSid}, Message ID: ${messageId}.`, result.message, + result, ); documentsWithErrors.push(resultItem); return; diff --git a/hrm-domain/lambdas/search-index-consumer/payloadToIndex.ts b/hrm-domain/lambdas/search-index-consumer/payloadToIndex.ts index 7317f22f6..98d375f01 100644 --- a/hrm-domain/lambdas/search-index-consumer/payloadToIndex.ts +++ b/hrm-domain/lambdas/search-index-consumer/payloadToIndex.ts @@ -14,7 +14,12 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -import { type IndexClient, getClient } from '@tech-matters/elasticsearch-client'; +import { + type IndexClient, + type IndexDocumentResponse, + type UpdateDocumentResponse, + getClient, +} from '@tech-matters/elasticsearch-client'; import { type IndexPayload, hrmIndexConfiguration, @@ -24,8 +29,29 @@ import type { PayloadsByAccountSid, PayloadsByIndex, } from './messagesToPayloads'; -import { type HrmAccountId, newErr, newOkFromData, isErr } from '@tech-matters/types'; -import { HrmIndexProcessorError } from '@tech-matters/job-errors'; +import { type HrmAccountId, newErr, isErr } from '@tech-matters/types'; + +const tryOperationWithRetry = async ({ + callback, + retryCount = 0, +}: { + callback: () => Promise | Promise; + retryCount?: number; +}) => { + const result = await callback(); + if (isErr(result) && retryCount < 3) { + if ( + result.extraProperties?.statusCode === 409 && + (result.extraProperties.originalError as Error).message?.includes( + 'version_conflict_engine_exception', + ) + ) { + return tryOperationWithRetry({ callback, retryCount: retryCount + 1 }); + } + } + + return result; +}; const handleIndexPayload = ({ @@ -42,47 +68,56 @@ const handleIndexPayload = try { switch (payloadWithMeta.indexHandler) { case 'indexDocument': { - const result = await client.indexDocument({ - id: documentId.toString(), - document: payloadWithMeta.payload, - autocreate: true, + const result = await tryOperationWithRetry({ + callback: () => + client.indexDocument({ + id: documentId.toString(), + document: payloadWithMeta.payload, + autocreate: true, + }), }); return { accountSid, indexType, messageId, - result: newOkFromData(result), + result, }; } case 'updateDocument': { - const result = await client.updateDocument({ - id: documentId.toString(), - document: payloadWithMeta.payload, - autocreate: true, - docAsUpsert: true, + const result = await tryOperationWithRetry({ + callback: () => + client.updateDocument({ + id: documentId.toString(), + document: payloadWithMeta.payload, + autocreate: true, + docAsUpsert: true, + }), }); return { accountSid, indexType, messageId, - result: newOkFromData(result), + result, }; } case 'updateScript': { - const result = await client.updateScript({ - document: payloadWithMeta.payload, - id: documentId.toString(), - autocreate: true, - scriptedUpsert: true, + const result = await tryOperationWithRetry({ + callback: () => + client.updateScript({ + document: payloadWithMeta.payload, + id: documentId.toString(), + autocreate: true, + scriptedUpsert: true, + }), }); return { accountSid, indexType, messageId, - result: newOkFromData(result), + result, }; } case 'deleteDocument': { @@ -106,17 +141,11 @@ const handleIndexPayload = accountSid, indexType, messageId, - result: newOkFromData(result), + result, }; } } } catch (err) { - console.error( - new HrmIndexProcessorError('handleIndexPayload: Failed to process index request'), - err, - payloadWithMeta, - ); - return { accountSid, indexType,