Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 88 additions & 8 deletions backend/workers/workflows/externalPushEntityWorkflow.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as PgDrizzle from '@effect/sql-drizzle/Pg'
import { Activity, Workflow } from '@effect/workflow'
import { processMutation } from '@openfaith/adapter-core/chains/syncEngine.chain'
import {
Expand All @@ -11,10 +12,20 @@ import {
RelationshipProcessingError,
TokenKey,
} from '@openfaith/adapter-core/server'
import { adapterDetailsTable } from '@openfaith/db'
import { CRUDMutation, CRUDOp } from '@openfaith/domain'
import { PcoAdapterManagerLayer } from '@openfaith/pco/server'
import { InternalManagerLive } from '@openfaith/server/live/internalManagerLive'
import { Effect, Layer, Schema } from 'effect'
import { and, eq } from 'drizzle-orm'
import { Array, Effect, Layer, pipe, Schema } from 'effect'

// Registry of adapter layers indexed by adapter name.
// Add new adapters here as they become available.
const adapterLayerRegistry = {
pco: PcoAdapterManagerLayer,
} as const

type AdapterName = keyof typeof adapterLayerRegistry

// Define the workflow payload schema
const ExternalPushEntityPayload = Schema.Struct({
Expand Down Expand Up @@ -69,6 +80,7 @@ export const ExternalPushEntityWorkflowLayer = ExternalPushEntityWorkflow.toLaye
),
execute: Effect.gen(function* () {
const attempt = yield* Activity.CurrentAttempt
const db = yield* PgDrizzle.PgDrizzle

yield* Effect.annotateLogs(
Effect.log(`📊 Syncing external data for entity: ${entityName}`),
Expand All @@ -81,14 +93,82 @@ export const ExternalPushEntityWorkflowLayer = ExternalPushEntityWorkflow.toLaye
},
)

// Process each mutation using the new chain function - no error mapping needed!
yield* Effect.forEach(mutations, (mutationData) => processMutation(mutationData.op), {
concurrency: 'unbounded',
// Query all enabled adapters for this organization
const enabledAdapters = yield* db
.select({ adapter: adapterDetailsTable.adapter })
.from(adapterDetailsTable)
.where(
and(
eq(adapterDetailsTable.orgId, tokenKey),
eq(adapterDetailsTable.enabled, true),
),
)
.pipe(
Effect.tapError((error) =>
Effect.logError('Failed to query enabled adapters, skipping external push', {
error,
entityName,
tokenKey,
}),
),
Effect.orElse(() => Effect.succeed([] as Array<{ adapter: string }>)),
)

yield* Effect.log('Found enabled adapters for org', {
adapters: pipe(
enabledAdapters,
Array.map((a) => a.adapter),
),
entityName,
orgId: tokenKey,
})
}).pipe(
Effect.provide(Layer.mergeAll(PcoAdapterManagerLayer, InternalManagerLive)),
Effect.provideService(TokenKey, tokenKey),
),

if (enabledAdapters.length === 0) {
yield* Effect.log('No enabled adapters found, skipping external push', {
entityName,
orgId: tokenKey,
})
return
}

// Send mutations to each enabled adapter that has a registered layer
yield* Effect.forEach(
enabledAdapters,
({ adapter }) =>
Effect.gen(function* () {
const adapterLayer = adapterLayerRegistry[adapter as AdapterName]

if (!adapterLayer) {
yield* Effect.log(
`No layer registered for adapter: ${adapter}, skipping`,
{
adapter,
entityName,
tokenKey,
},
)
return
}

yield* Effect.log(`Processing mutations for adapter: ${adapter}`, {
adapter,
entityName,
mutationCount: mutations.length,
})

// Process each mutation op through this adapter
yield* Effect.forEach(
mutations,
(mutationData) => processMutation(mutationData.op),
{ concurrency: 'unbounded' },
).pipe(
Effect.provide(Layer.mergeAll(adapterLayer, InternalManagerLive)),
Effect.provideService(TokenKey, tokenKey),
)
}),
{ concurrency: 'unbounded' },
)
}),
name: 'SyncExternalEntityData',
}).pipe(Activity.retry({ times: 3 }))

Expand Down