Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba
}

// CreateTaskHub creates the postgres database and applies the schema
func (be *postgresBackend) CreateTaskHub(context.Context) error {
pool, err := pgxpool.NewWithConfig(context.Background(), be.options.PgOptions)
if err != nil {
be.logger.Error("CreateTaskHub", "failed to create a new postgres pool", err)
return err
func (be *postgresBackend) CreateTaskHub(ctx context.Context) error {
if err := be.Start(ctx); err != nil {
be.logger.Error("CreateTaskHub", "failed to start the backend", err)
return fmt.Errorf("failed to start the backend: %w", err)
}
be.db = pool

// Initialize database
if _, err := be.db.Exec(context.Background(), schema); err != nil {
panic(fmt.Errorf("failed to initialize the database: %w", err))
if _, err := be.db.Exec(ctx, schema); err != nil {
be.logger.Error("CreateTaskHub", "failed to initialize the database", err)
return fmt.Errorf("failed to initialize the database: %w", err)
}

return nil
Expand Down Expand Up @@ -120,8 +120,10 @@ func (be *postgresBackend) DeleteTaskHub(ctx context.Context) error {
return fmt.Errorf("failed to drop NewTasks table: %w", err)
}

be.db.Close()
be.db = nil
if err := be.Stop(ctx); err != nil {
be.logger.Error("DeleteTaskHub", "failed to stop the backend", err)
return fmt.Errorf("failed to stop the backend: %w", err)
}

return nil
}
Expand Down Expand Up @@ -989,12 +991,26 @@ func (be *postgresBackend) PurgeOrchestrationState(ctx context.Context, id api.I
}

// Start implements backend.Backend
func (*postgresBackend) Start(context.Context) error {
func (be *postgresBackend) Start(ctx context.Context) error {
if be.db == nil {
pool, err := pgxpool.NewWithConfig(ctx, be.options.PgOptions)
if err != nil {
be.logger.Error("Start", "failed to create a new postgres pool", err)
return fmt.Errorf("failed to create a new postgres pool %w", err)
}
be.db = pool
}

return nil
}

// Stop implements backend.Backend
func (*postgresBackend) Stop(context.Context) error {
func (be *postgresBackend) Stop(context.Context) error {
if be.db != nil {
be.db.Close()
be.db = nil
}

return nil
}

Expand Down
44 changes: 27 additions & 17 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,21 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen
}

// CreateTaskHub creates the sqlite database and applies the schema
func (be *sqliteBackend) CreateTaskHub(context.Context) error {
db, err := sql.Open("sqlite", be.dsn)
if err != nil {
panic(fmt.Errorf("failed to open the database: %w", err))
func (be *sqliteBackend) CreateTaskHub(ctx context.Context) error {
if err := be.Start(ctx); err != nil {
return fmt.Errorf("failed to start the backend: %w", err)
}

// Initialize database
if _, err := db.Exec(schema); err != nil {
panic(fmt.Errorf("failed to initialize the database: %w", err))
if _, err := be.db.Exec(schema); err != nil {
return fmt.Errorf("failed to initialize the database: %w", err)
}

// TODO: This is to avoid SQLITE_BUSY errors when there are concurrent
// operations on the database. However, it can hurt performance.
// We should consider removing this and looking for alternate
// solutions if sqlite performance becomes a problem for users.
db.SetMaxOpenConns(1)

be.db = db

return nil
}

func (be *sqliteBackend) DeleteTaskHub(ctx context.Context) error {
be.db = nil
be.Stop(ctx)

if be.options.FilePath == "" {
// In-memory DB
Expand Down Expand Up @@ -978,12 +969,31 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
}

// Start implements backend.Backend
func (*sqliteBackend) Start(context.Context) error {
func (be *sqliteBackend) Start(context.Context) error {
if be.db == nil {
db, err := sql.Open("sqlite", be.dsn)
if err != nil {
return fmt.Errorf("failed to open the database: %w", err)
}

// TODO: This is to avoid SQLITE_BUSY errors when there are concurrent
// operations on the database. However, it can hurt performance.
// We should consider removing this and looking for alternate
// solutions if sqlite performance becomes a problem for users.
db.SetMaxOpenConns(1)

be.db = db
}

return nil
}

// Stop implements backend.Backend
func (*sqliteBackend) Stop(context.Context) error {
func (be *sqliteBackend) Stop(context.Context) error {
if be.db != nil {
be.db = nil
}

return nil
}

Expand Down
Loading