diff --git a/README.md b/README.md index 80abeadf..65174668 100644 --- a/README.md +++ b/README.md @@ -627,6 +627,7 @@ gog gmail delegates remove --email delegate@example.com gog gmail watch start --topic projects/

/topics/ --label INBOX gog gmail watch serve --bind 127.0.0.1 --token --hook-url http://127.0.0.1:18789/hooks/agent gog gmail watch serve --bind 0.0.0.0 --verify-oidc --oidc-email --hook-url +gog gmail watch serve --bind 127.0.0.1 --token --fetch-delay 5 --hook-url http://127.0.0.1:18789/hooks/agent gog gmail watch serve --bind 127.0.0.1 --token --exclude-labels SPAM,TRASH --hook-url http://127.0.0.1:18789/hooks/agent gog gmail history --since ``` @@ -634,6 +635,7 @@ gog gmail history --since Gmail watch (Pub/Sub push): - Create Pub/Sub topic + push subscription (OIDC preferred; shared token ok for dev). - Full flow + payload details: `docs/watch.md`. +- `watch serve --fetch-delay` defaults to `3s` and helps avoid Gmail History indexing races after push delivery. - `watch serve --exclude-labels` defaults to `SPAM,TRASH`; IDs are case-sensitive. ### Email Tracking diff --git a/docs/watch.md b/docs/watch.md index 81b00eca..b2163f65 100644 --- a/docs/watch.md +++ b/docs/watch.md @@ -48,6 +48,7 @@ gog gmail watch serve \ [--verify-oidc] [--oidc-email ] [--oidc-audience ] \ [--token ] \ [--hook-url ] [--hook-token ] \ + [--fetch-delay ] \ [--include-body] [--max-bytes ] [--exclude-labels ] \ [--history-types ...] [--save-hook] @@ -61,6 +62,7 @@ Notes: - `watch serve` uses stored hook if `--hook-url` not provided. - `watch serve --exclude-labels` defaults to `SPAM,TRASH`; set to an empty string to disable. - Exclude label IDs are matched exactly (case-sensitive opaque IDs). +- `watch serve --fetch-delay` delays Gmail history fetch after each push (default `3s`) to avoid indexing races; accepts seconds (`5`) or Go durations (`5s`). - `watch serve --history-types` accepts `messageAdded`, `messageDeleted`, `labelAdded`, `labelRemoved` (repeatable or comma-separated). Default: `messageAdded` (for backward compatibility). - `watch serve --history-types` must include at least one non-empty type. diff --git a/internal/cmd/gmail_watch_cmds.go b/internal/cmd/gmail_watch_cmds.go index 887fdc8a..4c559775 100644 --- a/internal/cmd/gmail_watch_cmds.go +++ b/internal/cmd/gmail_watch_cmds.go @@ -216,6 +216,7 @@ type GmailWatchServeCmd struct { Bind string `name:"bind" help:"Bind address" default:"127.0.0.1"` Port int `name:"port" help:"Listen port" default:"8788"` Path string `name:"path" help:"Push handler path" default:"/gmail-pubsub"` + FetchDelay string `name:"fetch-delay" help:"Delay before fetching Gmail history (seconds or duration)" default:"3s"` Timezone string `name:"timezone" short:"z" help:"Output timezone (IANA name, e.g. America/New_York, UTC). Default: local"` Local bool `name:"local" help:"Use local timezone (default behavior, useful to override --timezone)"` VerifyOIDC bool `name:"verify-oidc" help:"Verify Pub/Sub OIDC tokens"` @@ -262,6 +263,13 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags if err != nil { return err } + fetchDelay, err := parseDurationSeconds(c.FetchDelay) + if err != nil { + return err + } + if fetchDelay < 0 { + return usage("--fetch-delay must be >= 0") + } store, err := loadGmailWatchStore(account) if err != nil { @@ -326,6 +334,7 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags HookTimeout: defaultHookRequestTimeoutSec * time.Second, HistoryMax: defaultHistoryMaxResults, ResyncMax: defaultHistoryResyncMax, + FetchDelay: fetchDelay, HistoryTypes: historyTypes, AllowNoHook: hook == nil, IncludeBody: includeBody, diff --git a/internal/cmd/gmail_watch_serve_test.go b/internal/cmd/gmail_watch_serve_test.go index 63b052d5..475e698f 100644 --- a/internal/cmd/gmail_watch_serve_test.go +++ b/internal/cmd/gmail_watch_serve_test.go @@ -112,11 +112,100 @@ func TestGmailWatchServeCmd_DefaultMaxBytes(t *testing.T) { if got.cfg.MaxBodyBytes != defaultHookMaxBytes { t.Fatalf("expected default max bytes, got %d", got.cfg.MaxBodyBytes) } + if got.cfg.FetchDelay != defaultHistoryFetchDelay { + t.Fatalf("expected default fetch delay %v, got %v", defaultHistoryFetchDelay, got.cfg.FetchDelay) + } if len(got.cfg.ExcludeLabels) != 2 || got.cfg.ExcludeLabels[0] != "SPAM" || got.cfg.ExcludeLabels[1] != "TRASH" { t.Fatalf("unexpected exclude labels: %#v", got.cfg.ExcludeLabels) } } +func TestGmailWatchServeCmd_FetchDelaySeconds(t *testing.T) { + origListen := listenAndServe + t.Cleanup(func() { listenAndServe = origListen }) + + home := t.TempDir() + t.Setenv("HOME", home) + + store, err := newGmailWatchStore("a@b.com") + if err != nil { + t.Fatalf("store: %v", err) + } + updateErr := store.Update(func(s *gmailWatchState) error { + s.Account = "a@b.com" + return nil + }) + if updateErr != nil { + t.Fatalf("seed: %v", updateErr) + } + + flags := &RootFlags{Account: "a@b.com"} + var got *gmailWatchServer + listenAndServe = func(srv *http.Server) error { + if gs, ok := srv.Handler.(*gmailWatchServer); ok { + got = gs + } + return nil + } + + u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"}) + if err != nil { + t.Fatalf("ui.New: %v", err) + } + if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "5"}, ui.WithUI(context.Background(), u), flags); execErr != nil { + t.Fatalf("execute: %v", execErr) + } + if got == nil { + t.Fatalf("expected server") + } + if got.cfg.FetchDelay != 5*time.Second { + t.Fatalf("expected fetch delay 5s, got %v", got.cfg.FetchDelay) + } +} + +func TestGmailWatchServeCmd_FetchDelayDuration(t *testing.T) { + origListen := listenAndServe + t.Cleanup(func() { listenAndServe = origListen }) + + home := t.TempDir() + t.Setenv("HOME", home) + + store, err := newGmailWatchStore("a@b.com") + if err != nil { + t.Fatalf("store: %v", err) + } + updateErr := store.Update(func(s *gmailWatchState) error { + s.Account = "a@b.com" + return nil + }) + if updateErr != nil { + t.Fatalf("seed: %v", updateErr) + } + + flags := &RootFlags{Account: "a@b.com"} + var got *gmailWatchServer + listenAndServe = func(srv *http.Server) error { + if gs, ok := srv.Handler.(*gmailWatchServer); ok { + got = gs + } + return nil + } + + u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"}) + if err != nil { + t.Fatalf("ui.New: %v", err) + } + if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "750ms"}, ui.WithUI(context.Background(), u), flags); execErr != nil { + t.Fatalf("execute: %v", execErr) + } + if got == nil { + t.Fatalf("expected server") + } + if got.cfg.FetchDelay != 750*time.Millisecond { + t.Fatalf("expected fetch delay 750ms, got %v", got.cfg.FetchDelay) + } +} + func TestGmailWatchServeCmd_ExcludeLabels_Disable(t *testing.T) { origListen := listenAndServe t.Cleanup(func() { listenAndServe = origListen }) diff --git a/internal/cmd/gmail_watch_serve_validation_test.go b/internal/cmd/gmail_watch_serve_validation_test.go index 8b05838d..610192e8 100644 --- a/internal/cmd/gmail_watch_serve_validation_test.go +++ b/internal/cmd/gmail_watch_serve_validation_test.go @@ -37,4 +37,16 @@ func TestGmailWatchServeCmd_ValidationErrors(t *testing.T) { t.Fatalf("expected error") } }) + + t.Run("fetch delay must be non-negative", func(t *testing.T) { + if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "-1", "--port", "9999"}, context.Background(), flags); err == nil { + t.Fatalf("expected error") + } + }) + + t.Run("fetch delay must parse as duration", func(t *testing.T) { + if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "not-a-duration", "--port", "9999"}, context.Background(), flags); err == nil { + t.Fatalf("expected error") + } + }) } diff --git a/internal/cmd/gmail_watch_server.go b/internal/cmd/gmail_watch_server.go index fe1e6615..6e8eed5d 100644 --- a/internal/cmd/gmail_watch_server.go +++ b/internal/cmd/gmail_watch_server.go @@ -30,6 +30,7 @@ type gmailWatchServer struct { store *gmailWatchStore validator *idtoken.Validator newService func(context.Context, string) (*gmail.Service, error) + sleep func(context.Context, time.Duration) error hookClient *http.Client excludeLabelIDs map[string]struct{} logf func(string, ...any) @@ -172,6 +173,11 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl return nil, errNoNewMessages } + err = s.sleepForFetch(ctx) + if err != nil { + return nil, err + } + svc, err := s.newService(ctx, s.cfg.Account) if err != nil { return nil, err @@ -181,8 +187,7 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl if len(s.cfg.HistoryTypes) > 0 { historyCall.HistoryTypes(s.cfg.HistoryTypes...) } - - historyResp, err := historyCall.Do() + historyResp, err := historyCall.Context(ctx).Do() if err != nil { if isStaleHistoryError(err) { return s.resyncHistory(ctx, svc, payload.HistoryID, payload.MessageID) @@ -267,6 +272,23 @@ func (s *gmailWatchServer) resyncHistory(ctx context.Context, svc *gmail.Service }, nil } +func (s *gmailWatchServer) sleepForFetch(ctx context.Context) error { + if s.cfg.FetchDelay <= 0 { + return nil + } + if s.sleep != nil { + return s.sleep(ctx, s.cfg.FetchDelay) + } + timer := time.NewTimer(s.cfg.FetchDelay) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func (s *gmailWatchServer) fetchMessages(ctx context.Context, svc *gmail.Service, ids []string) ([]gmailHookMessage, int, error) { messages := make([]gmailHookMessage, 0, len(ids)) excluded := 0 diff --git a/internal/cmd/gmail_watch_server_more_test.go b/internal/cmd/gmail_watch_server_more_test.go index 7b6f3422..14b29eaf 100644 --- a/internal/cmd/gmail_watch_server_more_test.go +++ b/internal/cmd/gmail_watch_server_more_test.go @@ -362,6 +362,122 @@ func TestGmailWatchHelpers(t *testing.T) { } } +func TestGmailWatchServer_HandlePush_AppliesFetchDelay(t *testing.T) { + home := t.TempDir() + t.Setenv("HOME", home) + + store, err := newGmailWatchStore("a@b.com") + if err != nil { + t.Fatalf("store: %v", err) + } + if updateErr := store.Update(func(s *gmailWatchState) error { + s.Account = "a@b.com" + s.HistoryID = "100" + return nil + }); updateErr != nil { + t.Fatalf("seed: %v", updateErr) + } + + var historyCalls int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.Contains(r.URL.Path, "/gmail/v1/users/me/history"): + historyCalls++ + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "historyId": "200", + "history": []map[string]any{ + {"messagesAdded": []map[string]any{ + {"message": map[string]any{"id": "m1"}}, + }}, + }, + }) + return + case strings.Contains(r.URL.Path, "/gmail/v1/users/me/messages/m1"): + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "id": "m1", + "threadId": "t1", + "snippet": "hi", + "payload": map[string]any{"headers": []map[string]any{{"name": "Subject", "value": "S"}}}, + }) + return + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + gsvc, err := gmail.NewService(context.Background(), + option.WithoutAuthentication(), + option.WithHTTPClient(srv.Client()), + option.WithEndpoint(srv.URL+"/"), + ) + if err != nil { + t.Fatalf("NewService: %v", err) + } + + var slept time.Duration + var sleepCalls int + server := &gmailWatchServer{ + cfg: gmailWatchServeConfig{ + Account: "a@b.com", + HistoryMax: 10, + FetchDelay: 5 * time.Second, + }, + store: store, + newService: func(context.Context, string) (*gmail.Service, error) { return gsvc, nil }, + sleep: func(_ context.Context, d time.Duration) error { + sleepCalls++ + slept = d + return nil + }, + logf: func(string, ...any) {}, + warnf: func(string, ...any) {}, + } + + got, err := server.handlePush(context.Background(), gmailPushPayload{EmailAddress: "a@b.com", HistoryID: "200"}) + if err != nil { + t.Fatalf("handlePush: %v", err) + } + if got == nil || len(got.Messages) != 1 { + t.Fatalf("unexpected payload: %#v", got) + } + if sleepCalls != 1 { + t.Fatalf("expected one sleep call, got %d", sleepCalls) + } + if slept != 5*time.Second { + t.Fatalf("expected 5s sleep, got %v", slept) + } + if historyCalls != 1 { + t.Fatalf("expected one history call, got %d", historyCalls) + } +} + +func TestGmailWatchServer_HandlePush_FetchDelayCanceledContext(t *testing.T) { + var serviceCalls int + server := &gmailWatchServer{ + cfg: gmailWatchServeConfig{Account: "a@b.com", FetchDelay: time.Second}, + store: &gmailWatchStore{state: gmailWatchState{HistoryID: "100"}}, + newService: func(context.Context, string) (*gmail.Service, error) { + serviceCalls++ + return nil, errors.New("unexpected newService call") + }, + logf: func(string, ...any) {}, + warnf: func(string, ...any) {}, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if _, err := server.handlePush(ctx, gmailPushPayload{HistoryID: "200"}); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled context, got %v", err) + } + if serviceCalls != 0 { + t.Fatalf("expected no service calls, got %d", serviceCalls) + } +} + func TestGmailWatchServer_OIDCAudience(t *testing.T) { s := &gmailWatchServer{ cfg: gmailWatchServeConfig{OIDCAudience: ""}, diff --git a/internal/cmd/gmail_watch_types.go b/internal/cmd/gmail_watch_types.go index f93471d8..d6c1f7b7 100644 --- a/internal/cmd/gmail_watch_types.go +++ b/internal/cmd/gmail_watch_types.go @@ -12,6 +12,7 @@ const ( defaultHookMaxBytes = 20000 defaultHistoryMaxResults = 100 defaultHistoryResyncMax = 10 + defaultHistoryFetchDelay = 3 * time.Second defaultPushBodyLimitBytes = 1024 * 1024 defaultHookRequestTimeoutSec = 10 ) @@ -55,6 +56,7 @@ type gmailWatchServeConfig struct { ExcludeLabels []string HistoryMax int64 ResyncMax int64 + FetchDelay time.Duration HistoryTypes []string HookTimeout time.Duration DateLocation *time.Location