Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hrm-domain/lambdas/search-index-consumer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export const handler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
console.warn(
`[generalised-search-${indexType}] ${result.error}. Account SID: ${accountSid}, Message ID: ${messageId}.`,
result.message,
result,
);
documentsWithErrors.push(resultItem);
return;
Expand Down
83 changes: 56 additions & 27 deletions hrm-domain/lambdas/search-index-consumer/payloadToIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/

import { type IndexClient, getClient } from '@tech-matters/elasticsearch-client';
import {
type IndexClient,
type IndexDocumentResponse,
type UpdateDocumentResponse,
getClient,
} from '@tech-matters/elasticsearch-client';
import {
type IndexPayload,
hrmIndexConfiguration,
Expand All @@ -24,8 +29,29 @@ import type {
PayloadsByAccountSid,
PayloadsByIndex,
} from './messagesToPayloads';
import { type HrmAccountId, newErr, newOkFromData, isErr } from '@tech-matters/types';
import { HrmIndexProcessorError } from '@tech-matters/job-errors';
import { type HrmAccountId, newErr, isErr } from '@tech-matters/types';

const tryOperationWithRetry = async ({
callback,
retryCount = 0,
}: {
callback: () => Promise<IndexDocumentResponse> | Promise<UpdateDocumentResponse>;
retryCount?: number;
}) => {
const result = await callback();
if (isErr(result) && retryCount < 3) {
if (
result.extraProperties?.statusCode === 409 &&
(result.extraProperties.originalError as Error).message?.includes(
'version_conflict_engine_exception',
)
) {
return tryOperationWithRetry({ callback, retryCount: retryCount + 1 });
}
}

return result;
};

const handleIndexPayload =
({
Expand All @@ -42,47 +68,56 @@ const handleIndexPayload =
try {
switch (payloadWithMeta.indexHandler) {
case 'indexDocument': {
const result = await client.indexDocument({
id: documentId.toString(),
document: payloadWithMeta.payload,
autocreate: true,
const result = await tryOperationWithRetry({
callback: () =>
client.indexDocument({
id: documentId.toString(),
document: payloadWithMeta.payload,
autocreate: true,
}),
});

return {
accountSid,
indexType,
messageId,
result: newOkFromData(result),
result,
};
}
case 'updateDocument': {
const result = await client.updateDocument({
id: documentId.toString(),
document: payloadWithMeta.payload,
autocreate: true,
docAsUpsert: true,
const result = await tryOperationWithRetry({
callback: () =>
client.updateDocument({
id: documentId.toString(),
document: payloadWithMeta.payload,
autocreate: true,
docAsUpsert: true,
}),
});

return {
accountSid,
indexType,
messageId,
result: newOkFromData(result),
result,
};
}
case 'updateScript': {
const result = await client.updateScript({
document: payloadWithMeta.payload,
id: documentId.toString(),
autocreate: true,
scriptedUpsert: true,
const result = await tryOperationWithRetry({
callback: () =>
client.updateScript({
document: payloadWithMeta.payload,
id: documentId.toString(),
autocreate: true,
scriptedUpsert: true,
}),
});

return {
accountSid,
indexType,
messageId,
result: newOkFromData(result),
result,
};
}
case 'deleteDocument': {
Expand All @@ -106,17 +141,11 @@ const handleIndexPayload =
accountSid,
indexType,
messageId,
result: newOkFromData(result),
result,
};
}
}
} catch (err) {
console.error(
new HrmIndexProcessorError('handleIndexPayload: Failed to process index request'),
err,
payloadWithMeta,
);

return {
accountSid,
indexType,
Expand Down
2 changes: 2 additions & 0 deletions packages/elasticsearch-client/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export * from './src/client';
export * from './src/createIndex';
export * from './src/deleteIndex';
export * from './src/indexDocument';
export * from './src/updateDocument';
export * from './src/deleteDocument';
export * from './src/executeBulk';
export * from './src/search';
export * from './src/suggest';
Expand Down
34 changes: 22 additions & 12 deletions packages/elasticsearch-client/src/indexDocument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import { IndexResponse } from '@elastic/elasticsearch/lib/api/types';
import { PassThroughConfig } from './client';
import createIndex from './createIndex';
import { newErr, newOk, TResult } from '@tech-matters/types';

export type IndexDocumentExtraParams<T> = {
id: string;
Expand All @@ -24,7 +25,7 @@ export type IndexDocumentExtraParams<T> = {
};

export type IndexDocumentParams<T> = PassThroughConfig<T> & IndexDocumentExtraParams<T>;
export type IndexDocumentResponse = IndexResponse;
export type IndexDocumentResponse = TResult<'IndexDocumentError', IndexResponse>;

export const indexDocument = async <T>({
client,
Expand All @@ -34,19 +35,28 @@ export const indexDocument = async <T>({
indexConfig,
autocreate = false,
}: IndexDocumentParams<T>): Promise<IndexDocumentResponse> => {
if (autocreate) {
// const exists = await client.indices.exists({ index });
// NOTE: above check is already performed in createIndex
await createIndex({ client, index, indexConfig });
}
try {
if (autocreate) {
// const exists = await client.indices.exists({ index });
// NOTE: above check is already performed in createIndex
await createIndex({ client, index, indexConfig });
}

const convertedDocument = indexConfig.convertToIndexDocument(document, index);
const convertedDocument = indexConfig.convertToIndexDocument(document, index);

return client.index({
index,
id,
document: convertedDocument,
});
const response = await client.index({
index,
id,
document: convertedDocument,
});
return newOk({ data: response });
} catch (error) {
return newErr({
error: 'IndexDocumentError',
message: error instanceof Error ? error.message : String(error),
extraProperties: { ...(error as any)?.meta, originalError: error },
});
}
};

export default indexDocument;
85 changes: 53 additions & 32 deletions packages/elasticsearch-client/src/updateDocument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import { UpdateResponse } from '@elastic/elasticsearch/lib/api/types';
import { PassThroughConfig } from './client';
import createIndex from './createIndex';
import { newErr, newOk, TResult } from '@tech-matters/types';

type UpdateParams<T> = { id: string; document: T; autocreate?: boolean };

Expand All @@ -24,7 +25,7 @@ export type UpdateDocumentExtraParams<T> = UpdateParams<T> & {
};

export type UpdateDocumentParams<T> = PassThroughConfig<T> & UpdateDocumentExtraParams<T>;
export type UpdateDocumentResponse = UpdateResponse;
export type UpdateDocumentResponse = TResult<'UpdateDocumentError', UpdateResponse>;

export const updateDocument = async <T>({
client,
Expand All @@ -35,20 +36,30 @@ export const updateDocument = async <T>({
docAsUpsert = false,
autocreate = false,
}: UpdateDocumentParams<T>): Promise<UpdateDocumentResponse> => {
if (docAsUpsert && autocreate) {
// const exists = await client.indices.exists({ index });
// NOTE: above check is already performed in createIndex
await createIndex({ client, index, indexConfig });
}
try {
if (docAsUpsert && autocreate) {
// const exists = await client.indices.exists({ index });
// NOTE: above check is already performed in createIndex
await createIndex({ client, index, indexConfig });
}

const documentUpdate = indexConfig.convertToIndexDocument(document, index);

const documentUpdate = indexConfig.convertToIndexDocument(document, index);
const response = await client.update({
index,
id,
doc: documentUpdate,
doc_as_upsert: docAsUpsert,
});

return client.update({
index,
id,
doc: documentUpdate,
doc_as_upsert: docAsUpsert,
});
return newOk({ data: response });
} catch (error) {
return newErr({
error: 'UpdateDocumentError',
message: error instanceof Error ? error.message : String(error),
extraProperties: { ...(error as any)?.meta, originalError: error },
});
}
};

export type UpdateScriptExtraParams<T> = UpdateParams<T> & { scriptedUpsert?: boolean };
Expand All @@ -63,26 +74,36 @@ export const updateScript = async <T>({
scriptedUpsert = false,
autocreate = false,
}: UpdateScriptParams<T>): Promise<UpdateDocumentResponse> => {
if (!indexConfig.convertToScriptUpdate) {
throw new Error(`updateScript error: convertToScriptDocument not provided`);
}
try {
if (!indexConfig.convertToScriptUpdate) {
throw new Error(`updateScript error: convertToScriptDocument not provided`);
}

if (scriptedUpsert && autocreate) {
// const exists = await client.indices.exists({ index });
// NOTE: above check is already performed in createIndex
await createIndex({ client, index, indexConfig });
}
if (scriptedUpsert && autocreate) {
// const exists = await client.indices.exists({ index });
// NOTE: above check is already performed in createIndex
await createIndex({ client, index, indexConfig });
}

const { documentUpdate, scriptUpdate } = indexConfig.convertToScriptUpdate(
document,
index,
);

const { documentUpdate, scriptUpdate } = indexConfig.convertToScriptUpdate(
document,
index,
);
const response = await client.update({
index,
id,
script: scriptUpdate,
upsert: documentUpdate,
scripted_upsert: scriptedUpsert,
});

return client.update({
index,
id,
script: scriptUpdate,
upsert: documentUpdate,
scripted_upsert: scriptedUpsert,
});
return newOk({ data: response });
} catch (error) {
return newErr({
error: 'UpdateDocumentError',
message: error instanceof Error ? error.message : String(error),
extraProperties: { ...(error as any)?.meta, originalError: error },
});
}
};
Loading