From c5204896be46403eb577e50ced951198807e2b27 Mon Sep 17 00:00:00 2001
From: salmonumbrella <182032677+salmonumbrella@users.noreply.github.com>
Date: Mon, 2 Mar 2026 19:48:05 -0800
Subject: [PATCH] fix(gmail-watch): delay history fetch in watch serve
---
README.md | 2 +
docs/watch.md | 2 +
internal/cmd/gmail_watch_cmds.go | 9 ++
internal/cmd/gmail_watch_serve_test.go | 89 ++++++++++++++
.../cmd/gmail_watch_serve_validation_test.go | 12 ++
internal/cmd/gmail_watch_server.go | 26 +++-
internal/cmd/gmail_watch_server_more_test.go | 116 ++++++++++++++++++
internal/cmd/gmail_watch_types.go | 2 +
8 files changed, 256 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 80abeadf..65174668 100644
--- a/README.md
+++ b/README.md
@@ -627,6 +627,7 @@ gog gmail delegates remove --email delegate@example.com
gog gmail watch start --topic projects/
/topics/ --label INBOX
gog gmail watch serve --bind 127.0.0.1 --token --hook-url http://127.0.0.1:18789/hooks/agent
gog gmail watch serve --bind 0.0.0.0 --verify-oidc --oidc-email --hook-url
+gog gmail watch serve --bind 127.0.0.1 --token --fetch-delay 5 --hook-url http://127.0.0.1:18789/hooks/agent
gog gmail watch serve --bind 127.0.0.1 --token --exclude-labels SPAM,TRASH --hook-url http://127.0.0.1:18789/hooks/agent
gog gmail history --since
```
@@ -634,6 +635,7 @@ gog gmail history --since
Gmail watch (Pub/Sub push):
- Create Pub/Sub topic + push subscription (OIDC preferred; shared token ok for dev).
- Full flow + payload details: `docs/watch.md`.
+- `watch serve --fetch-delay` defaults to `3s` and helps avoid Gmail History indexing races after push delivery.
- `watch serve --exclude-labels` defaults to `SPAM,TRASH`; IDs are case-sensitive.
### Email Tracking
diff --git a/docs/watch.md b/docs/watch.md
index 81b00eca..b2163f65 100644
--- a/docs/watch.md
+++ b/docs/watch.md
@@ -48,6 +48,7 @@ gog gmail watch serve \
[--verify-oidc] [--oidc-email ] [--oidc-audience ] \
[--token ] \
[--hook-url ] [--hook-token ] \
+ [--fetch-delay ] \
[--include-body] [--max-bytes ] [--exclude-labels ] \
[--history-types ...] [--save-hook]
@@ -61,6 +62,7 @@ Notes:
- `watch serve` uses stored hook if `--hook-url` not provided.
- `watch serve --exclude-labels` defaults to `SPAM,TRASH`; set to an empty string to disable.
- Exclude label IDs are matched exactly (case-sensitive opaque IDs).
+- `watch serve --fetch-delay` delays Gmail history fetch after each push (default `3s`) to avoid indexing races; accepts seconds (`5`) or Go durations (`5s`).
- `watch serve --history-types` accepts `messageAdded`, `messageDeleted`, `labelAdded`, `labelRemoved` (repeatable or comma-separated). Default: `messageAdded` (for backward compatibility).
- `watch serve --history-types` must include at least one non-empty type.
diff --git a/internal/cmd/gmail_watch_cmds.go b/internal/cmd/gmail_watch_cmds.go
index 887fdc8a..4c559775 100644
--- a/internal/cmd/gmail_watch_cmds.go
+++ b/internal/cmd/gmail_watch_cmds.go
@@ -216,6 +216,7 @@ type GmailWatchServeCmd struct {
Bind string `name:"bind" help:"Bind address" default:"127.0.0.1"`
Port int `name:"port" help:"Listen port" default:"8788"`
Path string `name:"path" help:"Push handler path" default:"/gmail-pubsub"`
+ FetchDelay string `name:"fetch-delay" help:"Delay before fetching Gmail history (seconds or duration)" default:"3s"`
Timezone string `name:"timezone" short:"z" help:"Output timezone (IANA name, e.g. America/New_York, UTC). Default: local"`
Local bool `name:"local" help:"Use local timezone (default behavior, useful to override --timezone)"`
VerifyOIDC bool `name:"verify-oidc" help:"Verify Pub/Sub OIDC tokens"`
@@ -262,6 +263,13 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags
if err != nil {
return err
}
+ fetchDelay, err := parseDurationSeconds(c.FetchDelay)
+ if err != nil {
+ return err
+ }
+ if fetchDelay < 0 {
+ return usage("--fetch-delay must be >= 0")
+ }
store, err := loadGmailWatchStore(account)
if err != nil {
@@ -326,6 +334,7 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags
HookTimeout: defaultHookRequestTimeoutSec * time.Second,
HistoryMax: defaultHistoryMaxResults,
ResyncMax: defaultHistoryResyncMax,
+ FetchDelay: fetchDelay,
HistoryTypes: historyTypes,
AllowNoHook: hook == nil,
IncludeBody: includeBody,
diff --git a/internal/cmd/gmail_watch_serve_test.go b/internal/cmd/gmail_watch_serve_test.go
index 63b052d5..475e698f 100644
--- a/internal/cmd/gmail_watch_serve_test.go
+++ b/internal/cmd/gmail_watch_serve_test.go
@@ -112,11 +112,100 @@ func TestGmailWatchServeCmd_DefaultMaxBytes(t *testing.T) {
if got.cfg.MaxBodyBytes != defaultHookMaxBytes {
t.Fatalf("expected default max bytes, got %d", got.cfg.MaxBodyBytes)
}
+ if got.cfg.FetchDelay != defaultHistoryFetchDelay {
+ t.Fatalf("expected default fetch delay %v, got %v", defaultHistoryFetchDelay, got.cfg.FetchDelay)
+ }
if len(got.cfg.ExcludeLabels) != 2 || got.cfg.ExcludeLabels[0] != "SPAM" || got.cfg.ExcludeLabels[1] != "TRASH" {
t.Fatalf("unexpected exclude labels: %#v", got.cfg.ExcludeLabels)
}
}
+func TestGmailWatchServeCmd_FetchDelaySeconds(t *testing.T) {
+ origListen := listenAndServe
+ t.Cleanup(func() { listenAndServe = origListen })
+
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ store, err := newGmailWatchStore("a@b.com")
+ if err != nil {
+ t.Fatalf("store: %v", err)
+ }
+ updateErr := store.Update(func(s *gmailWatchState) error {
+ s.Account = "a@b.com"
+ return nil
+ })
+ if updateErr != nil {
+ t.Fatalf("seed: %v", updateErr)
+ }
+
+ flags := &RootFlags{Account: "a@b.com"}
+ var got *gmailWatchServer
+ listenAndServe = func(srv *http.Server) error {
+ if gs, ok := srv.Handler.(*gmailWatchServer); ok {
+ got = gs
+ }
+ return nil
+ }
+
+ u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"})
+ if err != nil {
+ t.Fatalf("ui.New: %v", err)
+ }
+ if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "5"}, ui.WithUI(context.Background(), u), flags); execErr != nil {
+ t.Fatalf("execute: %v", execErr)
+ }
+ if got == nil {
+ t.Fatalf("expected server")
+ }
+ if got.cfg.FetchDelay != 5*time.Second {
+ t.Fatalf("expected fetch delay 5s, got %v", got.cfg.FetchDelay)
+ }
+}
+
+func TestGmailWatchServeCmd_FetchDelayDuration(t *testing.T) {
+ origListen := listenAndServe
+ t.Cleanup(func() { listenAndServe = origListen })
+
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ store, err := newGmailWatchStore("a@b.com")
+ if err != nil {
+ t.Fatalf("store: %v", err)
+ }
+ updateErr := store.Update(func(s *gmailWatchState) error {
+ s.Account = "a@b.com"
+ return nil
+ })
+ if updateErr != nil {
+ t.Fatalf("seed: %v", updateErr)
+ }
+
+ flags := &RootFlags{Account: "a@b.com"}
+ var got *gmailWatchServer
+ listenAndServe = func(srv *http.Server) error {
+ if gs, ok := srv.Handler.(*gmailWatchServer); ok {
+ got = gs
+ }
+ return nil
+ }
+
+ u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"})
+ if err != nil {
+ t.Fatalf("ui.New: %v", err)
+ }
+ if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "750ms"}, ui.WithUI(context.Background(), u), flags); execErr != nil {
+ t.Fatalf("execute: %v", execErr)
+ }
+ if got == nil {
+ t.Fatalf("expected server")
+ }
+ if got.cfg.FetchDelay != 750*time.Millisecond {
+ t.Fatalf("expected fetch delay 750ms, got %v", got.cfg.FetchDelay)
+ }
+}
+
func TestGmailWatchServeCmd_ExcludeLabels_Disable(t *testing.T) {
origListen := listenAndServe
t.Cleanup(func() { listenAndServe = origListen })
diff --git a/internal/cmd/gmail_watch_serve_validation_test.go b/internal/cmd/gmail_watch_serve_validation_test.go
index 8b05838d..610192e8 100644
--- a/internal/cmd/gmail_watch_serve_validation_test.go
+++ b/internal/cmd/gmail_watch_serve_validation_test.go
@@ -37,4 +37,16 @@ func TestGmailWatchServeCmd_ValidationErrors(t *testing.T) {
t.Fatalf("expected error")
}
})
+
+ t.Run("fetch delay must be non-negative", func(t *testing.T) {
+ if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "-1", "--port", "9999"}, context.Background(), flags); err == nil {
+ t.Fatalf("expected error")
+ }
+ })
+
+ t.Run("fetch delay must parse as duration", func(t *testing.T) {
+ if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "not-a-duration", "--port", "9999"}, context.Background(), flags); err == nil {
+ t.Fatalf("expected error")
+ }
+ })
}
diff --git a/internal/cmd/gmail_watch_server.go b/internal/cmd/gmail_watch_server.go
index fe1e6615..6e8eed5d 100644
--- a/internal/cmd/gmail_watch_server.go
+++ b/internal/cmd/gmail_watch_server.go
@@ -30,6 +30,7 @@ type gmailWatchServer struct {
store *gmailWatchStore
validator *idtoken.Validator
newService func(context.Context, string) (*gmail.Service, error)
+ sleep func(context.Context, time.Duration) error
hookClient *http.Client
excludeLabelIDs map[string]struct{}
logf func(string, ...any)
@@ -172,6 +173,11 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl
return nil, errNoNewMessages
}
+ err = s.sleepForFetch(ctx)
+ if err != nil {
+ return nil, err
+ }
+
svc, err := s.newService(ctx, s.cfg.Account)
if err != nil {
return nil, err
@@ -181,8 +187,7 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl
if len(s.cfg.HistoryTypes) > 0 {
historyCall.HistoryTypes(s.cfg.HistoryTypes...)
}
-
- historyResp, err := historyCall.Do()
+ historyResp, err := historyCall.Context(ctx).Do()
if err != nil {
if isStaleHistoryError(err) {
return s.resyncHistory(ctx, svc, payload.HistoryID, payload.MessageID)
@@ -267,6 +272,23 @@ func (s *gmailWatchServer) resyncHistory(ctx context.Context, svc *gmail.Service
}, nil
}
+func (s *gmailWatchServer) sleepForFetch(ctx context.Context) error {
+ if s.cfg.FetchDelay <= 0 {
+ return nil
+ }
+ if s.sleep != nil {
+ return s.sleep(ctx, s.cfg.FetchDelay)
+ }
+ timer := time.NewTimer(s.cfg.FetchDelay)
+ defer timer.Stop()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ return nil
+ }
+}
+
func (s *gmailWatchServer) fetchMessages(ctx context.Context, svc *gmail.Service, ids []string) ([]gmailHookMessage, int, error) {
messages := make([]gmailHookMessage, 0, len(ids))
excluded := 0
diff --git a/internal/cmd/gmail_watch_server_more_test.go b/internal/cmd/gmail_watch_server_more_test.go
index 7b6f3422..14b29eaf 100644
--- a/internal/cmd/gmail_watch_server_more_test.go
+++ b/internal/cmd/gmail_watch_server_more_test.go
@@ -362,6 +362,122 @@ func TestGmailWatchHelpers(t *testing.T) {
}
}
+func TestGmailWatchServer_HandlePush_AppliesFetchDelay(t *testing.T) {
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ store, err := newGmailWatchStore("a@b.com")
+ if err != nil {
+ t.Fatalf("store: %v", err)
+ }
+ if updateErr := store.Update(func(s *gmailWatchState) error {
+ s.Account = "a@b.com"
+ s.HistoryID = "100"
+ return nil
+ }); updateErr != nil {
+ t.Fatalf("seed: %v", updateErr)
+ }
+
+ var historyCalls int
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch {
+ case strings.Contains(r.URL.Path, "/gmail/v1/users/me/history"):
+ historyCalls++
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(map[string]any{
+ "historyId": "200",
+ "history": []map[string]any{
+ {"messagesAdded": []map[string]any{
+ {"message": map[string]any{"id": "m1"}},
+ }},
+ },
+ })
+ return
+ case strings.Contains(r.URL.Path, "/gmail/v1/users/me/messages/m1"):
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(map[string]any{
+ "id": "m1",
+ "threadId": "t1",
+ "snippet": "hi",
+ "payload": map[string]any{"headers": []map[string]any{{"name": "Subject", "value": "S"}}},
+ })
+ return
+ default:
+ http.NotFound(w, r)
+ }
+ }))
+ defer srv.Close()
+
+ gsvc, err := gmail.NewService(context.Background(),
+ option.WithoutAuthentication(),
+ option.WithHTTPClient(srv.Client()),
+ option.WithEndpoint(srv.URL+"/"),
+ )
+ if err != nil {
+ t.Fatalf("NewService: %v", err)
+ }
+
+ var slept time.Duration
+ var sleepCalls int
+ server := &gmailWatchServer{
+ cfg: gmailWatchServeConfig{
+ Account: "a@b.com",
+ HistoryMax: 10,
+ FetchDelay: 5 * time.Second,
+ },
+ store: store,
+ newService: func(context.Context, string) (*gmail.Service, error) { return gsvc, nil },
+ sleep: func(_ context.Context, d time.Duration) error {
+ sleepCalls++
+ slept = d
+ return nil
+ },
+ logf: func(string, ...any) {},
+ warnf: func(string, ...any) {},
+ }
+
+ got, err := server.handlePush(context.Background(), gmailPushPayload{EmailAddress: "a@b.com", HistoryID: "200"})
+ if err != nil {
+ t.Fatalf("handlePush: %v", err)
+ }
+ if got == nil || len(got.Messages) != 1 {
+ t.Fatalf("unexpected payload: %#v", got)
+ }
+ if sleepCalls != 1 {
+ t.Fatalf("expected one sleep call, got %d", sleepCalls)
+ }
+ if slept != 5*time.Second {
+ t.Fatalf("expected 5s sleep, got %v", slept)
+ }
+ if historyCalls != 1 {
+ t.Fatalf("expected one history call, got %d", historyCalls)
+ }
+}
+
+func TestGmailWatchServer_HandlePush_FetchDelayCanceledContext(t *testing.T) {
+ var serviceCalls int
+ server := &gmailWatchServer{
+ cfg: gmailWatchServeConfig{Account: "a@b.com", FetchDelay: time.Second},
+ store: &gmailWatchStore{state: gmailWatchState{HistoryID: "100"}},
+ newService: func(context.Context, string) (*gmail.Service, error) {
+ serviceCalls++
+ return nil, errors.New("unexpected newService call")
+ },
+ logf: func(string, ...any) {},
+ warnf: func(string, ...any) {},
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ if _, err := server.handlePush(ctx, gmailPushPayload{HistoryID: "200"}); !errors.Is(err, context.Canceled) {
+ t.Fatalf("expected canceled context, got %v", err)
+ }
+ if serviceCalls != 0 {
+ t.Fatalf("expected no service calls, got %d", serviceCalls)
+ }
+}
+
func TestGmailWatchServer_OIDCAudience(t *testing.T) {
s := &gmailWatchServer{
cfg: gmailWatchServeConfig{OIDCAudience: ""},
diff --git a/internal/cmd/gmail_watch_types.go b/internal/cmd/gmail_watch_types.go
index f93471d8..d6c1f7b7 100644
--- a/internal/cmd/gmail_watch_types.go
+++ b/internal/cmd/gmail_watch_types.go
@@ -12,6 +12,7 @@ const (
defaultHookMaxBytes = 20000
defaultHistoryMaxResults = 100
defaultHistoryResyncMax = 10
+ defaultHistoryFetchDelay = 3 * time.Second
defaultPushBodyLimitBytes = 1024 * 1024
defaultHookRequestTimeoutSec = 10
)
@@ -55,6 +56,7 @@ type gmailWatchServeConfig struct {
ExcludeLabels []string
HistoryMax int64
ResyncMax int64
+ FetchDelay time.Duration
HistoryTypes []string
HookTimeout time.Duration
DateLocation *time.Location