diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 9bbf802..5dac3c2 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -4,12 +4,14 @@ import ( "os" "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventpolling" + "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventhandling" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler/controller" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" "github.com/PRYVT/utils/pkg/auth" + "github.com/PRYVT/utils/pkg/eventpolling" + utilsRepo "github.com/PRYVT/utils/pkg/store/repository" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -35,13 +37,15 @@ func main() { log.Error().Err(err).Msg("Unsuccessful initialization of token manager") return } - eventRepo := repository.NewEventRepository(conn) + eventRepo := utilsRepo.NewEventRepository(conn) userRepo := repository.NewUserRepository(conn) uc := controller.NewUserController(userRepo, tokenManager) aut := auth.NewAuthMiddleware(tokenManager) h := httphandler.NewHttpHandler(uc, aut) - eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo) + userEventHandler := eventhandling.NewUserEventHandler(userRepo) + + eventPolling := eventpolling.NewEventPolling(c, eventRepo, userEventHandler) go eventPolling.PollEvents() h.Start() diff --git a/go.mod b/go.mod index 9dbc8af..e9f2919 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require github.com/mattn/go-sqlite3 v1.14.24 require ( github.com/L4B0MB4/EVTSRC v0.4.5 // indirect - github.com/PRYVT/utils v0.1.2 // indirect + github.com/PRYVT/utils v0.2.0 // indirect github.com/bytedance/sonic v1.12.2 // indirect github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect diff --git a/go.sum b/go.sum index 1588445..fd86ff0 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/PRYVT/utils v0.1.1 h1:Y/WHkTHID0T40O2XfzlLM1QsDyQWE8FtH8ncaU4ags8= github.com/PRYVT/utils v0.1.1/go.mod h1:b7zk2FAGwJ8BPJx2JQ8qd+bA59g5EY7Y1vZQPWZHK3s= github.com/PRYVT/utils v0.1.2 h1:U9qhq+18iIblQDrM4I0fmJkvlZ+BCY+DIjjKI4ebtlk= github.com/PRYVT/utils v0.1.2/go.mod h1:b7zk2FAGwJ8BPJx2JQ8qd+bA59g5EY7Y1vZQPWZHK3s= +github.com/PRYVT/utils v0.2.0 h1:hWdHchXlGOYlJ1nfMmGffq/EjFn3ncvzTgsGCLUpiEE= +github.com/PRYVT/utils v0.2.0/go.mod h1:j61GmoyWWXgnCq/laZTIJm4yhD0PreLDMZnYQqjSv7w= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic v1.12.2 h1:oaMFuRTpMHYLpCntGca65YWt5ny+wAceDERTkT2L9lg= diff --git a/pkg/query/eventhandling/user.go b/pkg/query/eventhandling/user.go new file mode 100644 index 0000000..bab128c --- /dev/null +++ b/pkg/query/eventhandling/user.go @@ -0,0 +1,35 @@ +package eventhandling + +import ( + "github.com/L4B0MB4/EVTSRC/pkg/models" + "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +type UserEventHandler struct { + userRepo *repository.UserRepository +} + +func NewUserEventHandler(userRepo *repository.UserRepository) *UserEventHandler { + return &UserEventHandler{ + userRepo: userRepo, + } +} + +func (eh *UserEventHandler) HandleEvent(event models.Event) error { + if event.AggregateType == "user" { + ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) + if err != nil { + return err + } + uI := aggregates.GetUserModelFromAggregate(ua) + err = eh.userRepo.AddOrReplaceUser(uI) + if err != nil { + log.Err(err).Msg("Error while processing user event") + return err + } + } + return nil +} diff --git a/pkg/query/eventpolling/polling.go b/pkg/query/eventpolling/polling.go deleted file mode 100644 index fde2372..0000000 --- a/pkg/query/eventpolling/polling.go +++ /dev/null @@ -1,75 +0,0 @@ -package eventpolling - -import ( - "time" - - "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" - "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" - "github.com/google/uuid" - "github.com/rs/zerolog/log" -) - -type EventPolling struct { - client *client.EventSourcingHttpClient - eventRepo *repository.EventRepository - userRepo *repository.UserRepository -} - -func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, userRepo *repository.UserRepository) *EventPolling { - if client == nil || eventRepo == nil || userRepo == nil { - return nil - } - return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo} -} - -func (ep *EventPolling) PollEvents() { - - hadMoreThenZeroEvents := true - for { - if hadMoreThenZeroEvents { - time.Sleep(100 * time.Millisecond) - } else { - time.Sleep(500 * time.Millisecond) - } - eId, err := ep.eventRepo.GetLastEvent() - if err != nil { - log.Err(err).Msg("Error while getting last events") - continue - } - events, err := ep.client.GetEventsSince(eId, 2) - if err != nil { - log.Err(err).Msg("Error while polling events") - continue - } - - for _, event := range events { - if event.AggregateType == "user" { - ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) - if err != nil { - log.Err(err).Msg("Error while creating user aggregate") - break - } - uI := aggregates.GetUserModelFromAggregate(ua) - err = ep.userRepo.AddOrReplaceUser(uI) - if err != nil { - log.Err(err).Msg("Error while adding or replacing user") - break - } - } - } - if len(events) == 0 { - hadMoreThenZeroEvents = true - continue - } - hadMoreThenZeroEvents = false - //will this break the db consistency if there are going to be multiple instances of this service? - // probably but if we dont a volume (that both instances use as a db file) this should be fine - err = ep.eventRepo.ReplaceEvent(events[len(events)-1].Id) - if err != nil { - log.Err(err).Msg("Error while replacing event") - break - } - } - -} diff --git a/pkg/query/store/repository/event_repository.go b/pkg/query/store/repository/event_repository.go deleted file mode 100644 index 01a5646..0000000 --- a/pkg/query/store/repository/event_repository.go +++ /dev/null @@ -1,66 +0,0 @@ -package repository - -import ( - "database/sql" -) - -type EventRepository struct { - db *sql.DB -} - -func NewEventRepository(db *sql.DB) *EventRepository { - if db == nil { - return nil - } - return &EventRepository{db: db} -} - -func (repo *EventRepository) ReplaceEvent(newEventId string) error { - tx, err := repo.db.Begin() - if err != nil { - return err - } - - // Delete old event - stmt, err := tx.Prepare("DELETE FROM events") - if err != nil { - tx.Rollback() - return err - } - defer stmt.Close() - - _, err = stmt.Exec() - if err != nil { - tx.Rollback() - return err - } - - // Save new event - stmt, err = tx.Prepare("INSERT INTO events (id) VALUES (?)") - if err != nil { - tx.Rollback() - return err - } - defer stmt.Close() - - _, err = stmt.Exec(newEventId) - if err != nil { - tx.Rollback() - return err - } - - return tx.Commit() -} - -func (repo *EventRepository) GetLastEvent() (string, error) { - var eventId string - err := repo.db.QueryRow("SELECT id FROM events LIMIT 1").Scan(&eventId) - if err != nil { - if err == sql.ErrNoRows { - return "0", nil - } - - return "", err - } - return eventId, nil -}