From a1ef050a1bb103226b723052330a52e5ee9a6b21 Mon Sep 17 00:00:00 2001 From: L4B0MB4 Date: Sat, 30 Nov 2024 00:22:43 +0100 Subject: [PATCH 1/3] Refactor event polling to use callback for user event processing --- cmd/queryer/main.go | 2 +- pkg/query/eventpolling/polling.go | 22 +++++++--------------- pkg/query/eventpolling/userevents.go | 24 ++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 16 deletions(-) create mode 100644 pkg/query/eventpolling/userevents.go diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 9bbf802..2537512 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -42,7 +42,7 @@ func main() { h := httphandler.NewHttpHandler(uc, aut) eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo) - go eventPolling.PollEvents() + go eventPolling.PollEvents(eventPolling.ProcessUserEvent) h.Start() } diff --git a/pkg/query/eventpolling/polling.go b/pkg/query/eventpolling/polling.go index fde2372..8578429 100644 --- a/pkg/query/eventpolling/polling.go +++ b/pkg/query/eventpolling/polling.go @@ -4,9 +4,8 @@ import ( "time" "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" - "github.com/google/uuid" "github.com/rs/zerolog/log" ) @@ -23,7 +22,7 @@ func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *reposito return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo} } -func (ep *EventPolling) PollEvents() { +func (ep *EventPolling) PollEvents(callback func(event models.Event) error) { hadMoreThenZeroEvents := true for { @@ -44,18 +43,11 @@ func (ep *EventPolling) PollEvents() { } for _, event := range events { - if event.AggregateType == "user" { - ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) - if err != nil { - log.Err(err).Msg("Error while creating user aggregate") - break - } - uI := aggregates.GetUserModelFromAggregate(ua) - err = ep.userRepo.AddOrReplaceUser(uI) - if err != nil { - log.Err(err).Msg("Error while adding or replacing user") - break - } + + err := callback(event) + if err != nil { + log.Err(err).Msg("Error while processing event") + break } } if len(events) == 0 { diff --git a/pkg/query/eventpolling/userevents.go b/pkg/query/eventpolling/userevents.go new file mode 100644 index 0000000..5b4de05 --- /dev/null +++ b/pkg/query/eventpolling/userevents.go @@ -0,0 +1,24 @@ +package eventpolling + +import ( + "github.com/L4B0MB4/EVTSRC/pkg/models" + "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +func (ep *EventPolling) ProcessUserEvent(event models.Event) error { + if event.AggregateType == "user" { + ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) + if err != nil { + return err + } + uI := aggregates.GetUserModelFromAggregate(ua) + err = ep.userRepo.AddOrReplaceUser(uI) + if err != nil { + log.Err(err).Msg("Error while processing user event") + return err + } + } + return nil +} From 6b932f8f56eb39c07c13c2978ec54d77a319e1ed Mon Sep 17 00:00:00 2001 From: L4B0MB4 Date: Sat, 30 Nov 2024 00:40:25 +0100 Subject: [PATCH 2/3] Refactor event handling by introducing UserEventHandler and updating event polling to use it --- cmd/queryer/main.go | 7 +++++-- .../userevents.go => eventhandling/user.go} | 17 ++++++++++++++--- pkg/query/eventpolling/event_handler.go | 7 +++++++ pkg/query/eventpolling/polling.go | 17 ++++++++--------- 4 files changed, 34 insertions(+), 14 deletions(-) rename pkg/query/{eventpolling/userevents.go => eventhandling/user.go} (53%) create mode 100644 pkg/query/eventpolling/event_handler.go diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 2537512..2ceaf9b 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -4,6 +4,7 @@ import ( "os" "github.com/L4B0MB4/EVTSRC/pkg/client" + "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventhandling" "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventpolling" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler/controller" @@ -41,8 +42,10 @@ func main() { aut := auth.NewAuthMiddleware(tokenManager) h := httphandler.NewHttpHandler(uc, aut) - eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo) - go eventPolling.PollEvents(eventPolling.ProcessUserEvent) + userEventHandler := eventhandling.NewUserEventHandler(userRepo) + + eventPolling := eventpolling.NewEventPolling(c, eventRepo, userEventHandler) + go eventPolling.PollEvents() h.Start() } diff --git a/pkg/query/eventpolling/userevents.go b/pkg/query/eventhandling/user.go similarity index 53% rename from pkg/query/eventpolling/userevents.go rename to pkg/query/eventhandling/user.go index 5b4de05..bab128c 100644 --- a/pkg/query/eventpolling/userevents.go +++ b/pkg/query/eventhandling/user.go @@ -1,20 +1,31 @@ -package eventpolling +package eventhandling import ( "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" "github.com/google/uuid" "github.com/rs/zerolog/log" ) -func (ep *EventPolling) ProcessUserEvent(event models.Event) error { +type UserEventHandler struct { + userRepo *repository.UserRepository +} + +func NewUserEventHandler(userRepo *repository.UserRepository) *UserEventHandler { + return &UserEventHandler{ + userRepo: userRepo, + } +} + +func (eh *UserEventHandler) HandleEvent(event models.Event) error { if event.AggregateType == "user" { ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) if err != nil { return err } uI := aggregates.GetUserModelFromAggregate(ua) - err = ep.userRepo.AddOrReplaceUser(uI) + err = eh.userRepo.AddOrReplaceUser(uI) if err != nil { log.Err(err).Msg("Error while processing user event") return err diff --git a/pkg/query/eventpolling/event_handler.go b/pkg/query/eventpolling/event_handler.go new file mode 100644 index 0000000..b729347 --- /dev/null +++ b/pkg/query/eventpolling/event_handler.go @@ -0,0 +1,7 @@ +package eventpolling + +import "github.com/L4B0MB4/EVTSRC/pkg/models" + +type EventHanlder interface { + HandleEvent(event models.Event) error +} diff --git a/pkg/query/eventpolling/polling.go b/pkg/query/eventpolling/polling.go index 8578429..90bfc45 100644 --- a/pkg/query/eventpolling/polling.go +++ b/pkg/query/eventpolling/polling.go @@ -4,25 +4,24 @@ import ( "time" "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" "github.com/rs/zerolog/log" ) type EventPolling struct { - client *client.EventSourcingHttpClient - eventRepo *repository.EventRepository - userRepo *repository.UserRepository + client *client.EventSourcingHttpClient + eventRepo *repository.EventRepository + eventHandler EventHanlder } -func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, userRepo *repository.UserRepository) *EventPolling { - if client == nil || eventRepo == nil || userRepo == nil { +func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, eventHandler EventHanlder) *EventPolling { + if client == nil || eventRepo == nil || eventHandler == nil { return nil } - return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo} + return &EventPolling{client: client, eventRepo: eventRepo, eventHandler: eventHandler} } -func (ep *EventPolling) PollEvents(callback func(event models.Event) error) { +func (ep *EventPolling) PollEvents() { hadMoreThenZeroEvents := true for { @@ -44,7 +43,7 @@ func (ep *EventPolling) PollEvents(callback func(event models.Event) error) { for _, event := range events { - err := callback(event) + err := ep.eventHandler.HandleEvent(event) if err != nil { log.Err(err).Msg("Error while processing event") break From bfc1de20ace8574f5a957df19f993a23cc9ba2a0 Mon Sep 17 00:00:00 2001 From: L4B0MB4 Date: Sat, 30 Nov 2024 00:46:21 +0100 Subject: [PATCH 3/3] Refactor event polling by removing legacy implementation and updating event repository initialization --- cmd/queryer/main.go | 5 +- go.mod | 2 +- go.sum | 2 + pkg/query/eventpolling/event_handler.go | 7 -- pkg/query/eventpolling/polling.go | 66 ------------------- .../store/repository/event_repository.go | 66 ------------------- 6 files changed, 6 insertions(+), 142 deletions(-) delete mode 100644 pkg/query/eventpolling/event_handler.go delete mode 100644 pkg/query/eventpolling/polling.go delete mode 100644 pkg/query/store/repository/event_repository.go diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 2ceaf9b..5dac3c2 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -5,12 +5,13 @@ import ( "github.com/L4B0MB4/EVTSRC/pkg/client" "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventhandling" - "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventpolling" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler/controller" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" "github.com/PRYVT/utils/pkg/auth" + "github.com/PRYVT/utils/pkg/eventpolling" + utilsRepo "github.com/PRYVT/utils/pkg/store/repository" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -36,7 +37,7 @@ func main() { log.Error().Err(err).Msg("Unsuccessful initialization of token manager") return } - eventRepo := repository.NewEventRepository(conn) + eventRepo := utilsRepo.NewEventRepository(conn) userRepo := repository.NewUserRepository(conn) uc := controller.NewUserController(userRepo, tokenManager) aut := auth.NewAuthMiddleware(tokenManager) diff --git a/go.mod b/go.mod index 9dbc8af..e9f2919 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require github.com/mattn/go-sqlite3 v1.14.24 require ( github.com/L4B0MB4/EVTSRC v0.4.5 // indirect - github.com/PRYVT/utils v0.1.2 // indirect + github.com/PRYVT/utils v0.2.0 // indirect github.com/bytedance/sonic v1.12.2 // indirect github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect diff --git a/go.sum b/go.sum index 1588445..fd86ff0 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/PRYVT/utils v0.1.1 h1:Y/WHkTHID0T40O2XfzlLM1QsDyQWE8FtH8ncaU4ags8= github.com/PRYVT/utils v0.1.1/go.mod h1:b7zk2FAGwJ8BPJx2JQ8qd+bA59g5EY7Y1vZQPWZHK3s= github.com/PRYVT/utils v0.1.2 h1:U9qhq+18iIblQDrM4I0fmJkvlZ+BCY+DIjjKI4ebtlk= github.com/PRYVT/utils v0.1.2/go.mod h1:b7zk2FAGwJ8BPJx2JQ8qd+bA59g5EY7Y1vZQPWZHK3s= +github.com/PRYVT/utils v0.2.0 h1:hWdHchXlGOYlJ1nfMmGffq/EjFn3ncvzTgsGCLUpiEE= +github.com/PRYVT/utils v0.2.0/go.mod h1:j61GmoyWWXgnCq/laZTIJm4yhD0PreLDMZnYQqjSv7w= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic v1.12.2 h1:oaMFuRTpMHYLpCntGca65YWt5ny+wAceDERTkT2L9lg= diff --git a/pkg/query/eventpolling/event_handler.go b/pkg/query/eventpolling/event_handler.go deleted file mode 100644 index b729347..0000000 --- a/pkg/query/eventpolling/event_handler.go +++ /dev/null @@ -1,7 +0,0 @@ -package eventpolling - -import "github.com/L4B0MB4/EVTSRC/pkg/models" - -type EventHanlder interface { - HandleEvent(event models.Event) error -} diff --git a/pkg/query/eventpolling/polling.go b/pkg/query/eventpolling/polling.go deleted file mode 100644 index 90bfc45..0000000 --- a/pkg/query/eventpolling/polling.go +++ /dev/null @@ -1,66 +0,0 @@ -package eventpolling - -import ( - "time" - - "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" - "github.com/rs/zerolog/log" -) - -type EventPolling struct { - client *client.EventSourcingHttpClient - eventRepo *repository.EventRepository - eventHandler EventHanlder -} - -func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, eventHandler EventHanlder) *EventPolling { - if client == nil || eventRepo == nil || eventHandler == nil { - return nil - } - return &EventPolling{client: client, eventRepo: eventRepo, eventHandler: eventHandler} -} - -func (ep *EventPolling) PollEvents() { - - hadMoreThenZeroEvents := true - for { - if hadMoreThenZeroEvents { - time.Sleep(100 * time.Millisecond) - } else { - time.Sleep(500 * time.Millisecond) - } - eId, err := ep.eventRepo.GetLastEvent() - if err != nil { - log.Err(err).Msg("Error while getting last events") - continue - } - events, err := ep.client.GetEventsSince(eId, 2) - if err != nil { - log.Err(err).Msg("Error while polling events") - continue - } - - for _, event := range events { - - err := ep.eventHandler.HandleEvent(event) - if err != nil { - log.Err(err).Msg("Error while processing event") - break - } - } - if len(events) == 0 { - hadMoreThenZeroEvents = true - continue - } - hadMoreThenZeroEvents = false - //will this break the db consistency if there are going to be multiple instances of this service? - // probably but if we dont a volume (that both instances use as a db file) this should be fine - err = ep.eventRepo.ReplaceEvent(events[len(events)-1].Id) - if err != nil { - log.Err(err).Msg("Error while replacing event") - break - } - } - -} diff --git a/pkg/query/store/repository/event_repository.go b/pkg/query/store/repository/event_repository.go deleted file mode 100644 index 01a5646..0000000 --- a/pkg/query/store/repository/event_repository.go +++ /dev/null @@ -1,66 +0,0 @@ -package repository - -import ( - "database/sql" -) - -type EventRepository struct { - db *sql.DB -} - -func NewEventRepository(db *sql.DB) *EventRepository { - if db == nil { - return nil - } - return &EventRepository{db: db} -} - -func (repo *EventRepository) ReplaceEvent(newEventId string) error { - tx, err := repo.db.Begin() - if err != nil { - return err - } - - // Delete old event - stmt, err := tx.Prepare("DELETE FROM events") - if err != nil { - tx.Rollback() - return err - } - defer stmt.Close() - - _, err = stmt.Exec() - if err != nil { - tx.Rollback() - return err - } - - // Save new event - stmt, err = tx.Prepare("INSERT INTO events (id) VALUES (?)") - if err != nil { - tx.Rollback() - return err - } - defer stmt.Close() - - _, err = stmt.Exec(newEventId) - if err != nil { - tx.Rollback() - return err - } - - return tx.Commit() -} - -func (repo *EventRepository) GetLastEvent() (string, error) { - var eventId string - err := repo.db.QueryRow("SELECT id FROM events LIMIT 1").Scan(&eventId) - if err != nil { - if err == sql.ErrNoRows { - return "0", nil - } - - return "", err - } - return eventId, nil -}