From d31a393ae680f69c9d2e2486a41b0b98399cf2da Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Fri, 23 May 2025 15:45:54 -0300 Subject: [PATCH 1/7] Fixed issue that causes error on parallel task execution on Postgres backend. --- backend/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index cf119de8..e8c9b6f2 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -311,7 +311,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi builder := strings.Builder{} builder.WriteString("INSERT INTO NewTasks (InstanceID, EventPayload) VALUES ") for i := 0; i < newActivityCount; i++ { - builder.WriteString(fmt.Sprintf("($%d, $%d)", 3*i+1, 3*i+2)) + builder.WriteString(fmt.Sprintf("($%d, $%d)", 2*i+1, 2*i+2)) if i < newActivityCount-1 { builder.WriteString(", ") } From 0ffee530d6fecb63dbb8299e1e2722ce7da68f27 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Wed, 12 Nov 2025 19:03:11 -0300 Subject: [PATCH 2/7] Added order by over sequence number to prioritize older instances and tasks --- backend/postgres/postgres.go | 2 ++ backend/sqlite/sqlite.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index a2c23325..bb7232ce 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -778,6 +778,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) + ORDER BY SequenceNumber ASC LIMIT 1 ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table @@ -864,6 +865,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 + ORDER BY SequenceNumber ASC LIMIT 1 ) RETURNING SequenceNumber, InstanceID, EventPayload`, be.workerName, diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 83433422..b3013d41 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,6 +763,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) + ORDER BY [SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table @@ -852,6 +853,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti WHERE [SequenceNumber] = ( SELECT [SequenceNumber] FROM NewTasks T WHERE T.[LockExpiration] IS NULL OR T.[LockExpiration] < ? + ORDER BY [SequenceNumber] ASC LIMIT 1 ) RETURNING [SequenceNumber], [InstanceID], [EventPayload]`, be.workerName, From b443bea7dd4562d01a7fe3a3049104fb99ef78f7 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 12:32:52 -0300 Subject: [PATCH 3/7] Fixed table alias issue. Applied resource to avoid race conditions. --- backend/postgres/postgres.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index bb7232ce..7de78900 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -778,8 +778,9 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) - ORDER BY SequenceNumber ASC + ORDER BY I.SequenceNumber ASC LIMIT 1 + FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table newLockExpiration, // Updated LockExpiration for Instances table @@ -865,8 +866,9 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 - ORDER BY SequenceNumber ASC + ORDER BY T.SequenceNumber ASC LIMIT 1 + FOR UPDATE SKIP LOCKED ) RETURNING SequenceNumber, InstanceID, EventPayload`, be.workerName, newLockExpiration, From 48ba21fcf42665dfc389de7dd89505ffd3f38813 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 13:26:01 -0300 Subject: [PATCH 4/7] Fixed table alias issue for sqlite besides postgres --- backend/sqlite/sqlite.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index b3013d41..0a15b71b 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,7 +763,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) - ORDER BY [SequenceNumber] ASC + ORDER BY I.[SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table @@ -853,7 +853,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti WHERE [SequenceNumber] = ( SELECT [SequenceNumber] FROM NewTasks T WHERE T.[LockExpiration] IS NULL OR T.[LockExpiration] < ? - ORDER BY [SequenceNumber] ASC + ORDER BY T.[SequenceNumber] ASC LIMIT 1 ) RETURNING [SequenceNumber], [InstanceID], [EventPayload]`, be.workerName, From 0f7ec06c2ea20bc7db98243c0c264032a410a496 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 17:36:28 -0300 Subject: [PATCH 5/7] Fixed wrong order by over nonexistent column Instances.SequenceNumber for sqlite. --- backend/sqlite/sqlite.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 0a15b71b..c39b14f3 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,7 +763,6 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) - ORDER BY I.[SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table From 1405b00656b56a8d637a10056f7258ff7880a906 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 17:52:54 -0300 Subject: [PATCH 6/7] Created index to improve queries with ORDER BY Instances.SequenceNumber --- backend/postgres/schema.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/postgres/schema.sql b/backend/postgres/schema.sql index 153a5ff6..435fc3ab 100644 --- a/backend/postgres/schema.sql +++ b/backend/postgres/schema.sql @@ -18,6 +18,9 @@ CREATE TABLE IF NOT EXISTS Instances ( ParentInstanceID TEXT NULL ); +-- This index is used to improve queries with ORDER BY Instances.SequenceNumber +CREATE INDEX IF NOT EXISTS IX_Instances_SequenceNumber ON Instances(SequenceNumber); + -- This index is used by LockNext and Purge logic CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus); From e0762cf003e2516394211b7e8ef45d2ca39c7406 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Fri, 13 Feb 2026 16:39:51 -0300 Subject: [PATCH 7/7] - implemented Backend Start and Stop methods - adapted Backend CreateTaskHub and DeleteTaskHub to call Start and Stop methods instead of directly manage backend database --- backend/postgres/postgres.go | 40 ++++++++++++++++++++++---------- backend/sqlite/sqlite.go | 44 ++++++++++++++++++++++-------------- 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 7de78900..4b23b1e0 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -79,16 +79,16 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba } // CreateTaskHub creates the postgres database and applies the schema -func (be *postgresBackend) CreateTaskHub(context.Context) error { - pool, err := pgxpool.NewWithConfig(context.Background(), be.options.PgOptions) - if err != nil { - be.logger.Error("CreateTaskHub", "failed to create a new postgres pool", err) - return err +func (be *postgresBackend) CreateTaskHub(ctx context.Context) error { + if err := be.Start(ctx); err != nil { + be.logger.Error("CreateTaskHub", "failed to start the backend", err) + return fmt.Errorf("failed to start the backend: %w", err) } - be.db = pool + // Initialize database - if _, err := be.db.Exec(context.Background(), schema); err != nil { - panic(fmt.Errorf("failed to initialize the database: %w", err)) + if _, err := be.db.Exec(ctx, schema); err != nil { + be.logger.Error("CreateTaskHub", "failed to initialize the database", err) + return fmt.Errorf("failed to initialize the database: %w", err) } return nil @@ -120,8 +120,10 @@ func (be *postgresBackend) DeleteTaskHub(ctx context.Context) error { return fmt.Errorf("failed to drop NewTasks table: %w", err) } - be.db.Close() - be.db = nil + if err := be.Stop(ctx); err != nil { + be.logger.Error("DeleteTaskHub", "failed to stop the backend", err) + return fmt.Errorf("failed to stop the backend: %w", err) + } return nil } @@ -989,12 +991,26 @@ func (be *postgresBackend) PurgeOrchestrationState(ctx context.Context, id api.I } // Start implements backend.Backend -func (*postgresBackend) Start(context.Context) error { +func (be *postgresBackend) Start(ctx context.Context) error { + if be.db == nil { + pool, err := pgxpool.NewWithConfig(ctx, be.options.PgOptions) + if err != nil { + be.logger.Error("Start", "failed to create a new postgres pool", err) + return fmt.Errorf("failed to create a new postgres pool %w", err) + } + be.db = pool + } + return nil } // Stop implements backend.Backend -func (*postgresBackend) Stop(context.Context) error { +func (be *postgresBackend) Stop(context.Context) error { + if be.db != nil { + be.db.Close() + be.db = nil + } + return nil } diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index c39b14f3..fb09705a 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -86,30 +86,21 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen } // CreateTaskHub creates the sqlite database and applies the schema -func (be *sqliteBackend) CreateTaskHub(context.Context) error { - db, err := sql.Open("sqlite", be.dsn) - if err != nil { - panic(fmt.Errorf("failed to open the database: %w", err)) +func (be *sqliteBackend) CreateTaskHub(ctx context.Context) error { + if err := be.Start(ctx); err != nil { + return fmt.Errorf("failed to start the backend: %w", err) } // Initialize database - if _, err := db.Exec(schema); err != nil { - panic(fmt.Errorf("failed to initialize the database: %w", err)) + if _, err := be.db.Exec(schema); err != nil { + return fmt.Errorf("failed to initialize the database: %w", err) } - // TODO: This is to avoid SQLITE_BUSY errors when there are concurrent - // operations on the database. However, it can hurt performance. - // We should consider removing this and looking for alternate - // solutions if sqlite performance becomes a problem for users. - db.SetMaxOpenConns(1) - - be.db = db - return nil } func (be *sqliteBackend) DeleteTaskHub(ctx context.Context) error { - be.db = nil + be.Stop(ctx) if be.options.FilePath == "" { // In-memory DB @@ -978,12 +969,31 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins } // Start implements backend.Backend -func (*sqliteBackend) Start(context.Context) error { +func (be *sqliteBackend) Start(context.Context) error { + if be.db == nil { + db, err := sql.Open("sqlite", be.dsn) + if err != nil { + return fmt.Errorf("failed to open the database: %w", err) + } + + // TODO: This is to avoid SQLITE_BUSY errors when there are concurrent + // operations on the database. However, it can hurt performance. + // We should consider removing this and looking for alternate + // solutions if sqlite performance becomes a problem for users. + db.SetMaxOpenConns(1) + + be.db = db + } + return nil } // Stop implements backend.Backend -func (*sqliteBackend) Stop(context.Context) error { +func (be *sqliteBackend) Stop(context.Context) error { + if be.db != nil { + be.db = nil + } + return nil }