diff --git a/backend/workers/workflows/externalPushEntityWorkflow.ts b/backend/workers/workflows/externalPushEntityWorkflow.ts index 42e61ad4..8c4c4c2a 100644 --- a/backend/workers/workflows/externalPushEntityWorkflow.ts +++ b/backend/workers/workflows/externalPushEntityWorkflow.ts @@ -1,3 +1,4 @@ +import * as PgDrizzle from '@effect/sql-drizzle/Pg' import { Activity, Workflow } from '@effect/workflow' import { processMutation } from '@openfaith/adapter-core/chains/syncEngine.chain' import { @@ -11,10 +12,20 @@ import { RelationshipProcessingError, TokenKey, } from '@openfaith/adapter-core/server' +import { adapterDetailsTable } from '@openfaith/db' import { CRUDMutation, CRUDOp } from '@openfaith/domain' import { PcoAdapterManagerLayer } from '@openfaith/pco/server' import { InternalManagerLive } from '@openfaith/server/live/internalManagerLive' -import { Effect, Layer, Schema } from 'effect' +import { and, eq } from 'drizzle-orm' +import { Array, Effect, Layer, pipe, Schema } from 'effect' + +// Registry of adapter layers indexed by adapter name. +// Add new adapters here as they become available. +const adapterLayerRegistry = { + pco: PcoAdapterManagerLayer, +} as const + +type AdapterName = keyof typeof adapterLayerRegistry // Define the workflow payload schema const ExternalPushEntityPayload = Schema.Struct({ @@ -69,6 +80,7 @@ export const ExternalPushEntityWorkflowLayer = ExternalPushEntityWorkflow.toLaye ), execute: Effect.gen(function* () { const attempt = yield* Activity.CurrentAttempt + const db = yield* PgDrizzle.PgDrizzle yield* Effect.annotateLogs( Effect.log(`📊 Syncing external data for entity: ${entityName}`), @@ -81,14 +93,82 @@ export const ExternalPushEntityWorkflowLayer = ExternalPushEntityWorkflow.toLaye }, ) - // Process each mutation using the new chain function - no error mapping needed! - yield* Effect.forEach(mutations, (mutationData) => processMutation(mutationData.op), { - concurrency: 'unbounded', + // Query all enabled adapters for this organization + const enabledAdapters = yield* db + .select({ adapter: adapterDetailsTable.adapter }) + .from(adapterDetailsTable) + .where( + and( + eq(adapterDetailsTable.orgId, tokenKey), + eq(adapterDetailsTable.enabled, true), + ), + ) + .pipe( + Effect.tapError((error) => + Effect.logError('Failed to query enabled adapters, skipping external push', { + error, + entityName, + tokenKey, + }), + ), + Effect.orElse(() => Effect.succeed([] as Array<{ adapter: string }>)), + ) + + yield* Effect.log('Found enabled adapters for org', { + adapters: pipe( + enabledAdapters, + Array.map((a) => a.adapter), + ), + entityName, + orgId: tokenKey, }) - }).pipe( - Effect.provide(Layer.mergeAll(PcoAdapterManagerLayer, InternalManagerLive)), - Effect.provideService(TokenKey, tokenKey), - ), + + if (enabledAdapters.length === 0) { + yield* Effect.log('No enabled adapters found, skipping external push', { + entityName, + orgId: tokenKey, + }) + return + } + + // Send mutations to each enabled adapter that has a registered layer + yield* Effect.forEach( + enabledAdapters, + ({ adapter }) => + Effect.gen(function* () { + const adapterLayer = adapterLayerRegistry[adapter as AdapterName] + + if (!adapterLayer) { + yield* Effect.log( + `No layer registered for adapter: ${adapter}, skipping`, + { + adapter, + entityName, + tokenKey, + }, + ) + return + } + + yield* Effect.log(`Processing mutations for adapter: ${adapter}`, { + adapter, + entityName, + mutationCount: mutations.length, + }) + + // Process each mutation op through this adapter + yield* Effect.forEach( + mutations, + (mutationData) => processMutation(mutationData.op), + { concurrency: 'unbounded' }, + ).pipe( + Effect.provide(Layer.mergeAll(adapterLayer, InternalManagerLive)), + Effect.provideService(TokenKey, tokenKey), + ) + }), + { concurrency: 'unbounded' }, + ) + }), name: 'SyncExternalEntityData', }).pipe(Activity.retry({ times: 3 }))