Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,15 @@ gog gmail delegates remove --email delegate@example.com
gog gmail watch start --topic projects/<p>/topics/<t> --label INBOX
gog gmail watch serve --bind 127.0.0.1 --token <shared> --hook-url http://127.0.0.1:18789/hooks/agent
gog gmail watch serve --bind 0.0.0.0 --verify-oidc --oidc-email <svc@...> --hook-url <url>
gog gmail watch serve --bind 127.0.0.1 --token <shared> --fetch-delay 5 --hook-url http://127.0.0.1:18789/hooks/agent
gog gmail watch serve --bind 127.0.0.1 --token <shared> --exclude-labels SPAM,TRASH --hook-url http://127.0.0.1:18789/hooks/agent
gog gmail history --since <historyId>
```

Gmail watch (Pub/Sub push):
- Create Pub/Sub topic + push subscription (OIDC preferred; shared token ok for dev).
- Full flow + payload details: `docs/watch.md`.
- `watch serve --fetch-delay` defaults to `3s` and helps avoid Gmail History indexing races after push delivery.
- `watch serve --exclude-labels` defaults to `SPAM,TRASH`; IDs are case-sensitive.

### Email Tracking
Expand Down
2 changes: 2 additions & 0 deletions docs/watch.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ gog gmail watch serve \
[--verify-oidc] [--oidc-email <svc@...>] [--oidc-audience <aud>] \
[--token <shared>] \
[--hook-url <url>] [--hook-token <token>] \
[--fetch-delay <sec|duration>] \
[--include-body] [--max-bytes <n>] [--exclude-labels <id,id,...>] \
[--history-types <type>...] [--save-hook]

Expand All @@ -61,6 +62,7 @@ Notes:
- `watch serve` uses stored hook if `--hook-url` not provided.
- `watch serve --exclude-labels` defaults to `SPAM,TRASH`; set to an empty string to disable.
- Exclude label IDs are matched exactly (case-sensitive opaque IDs).
- `watch serve --fetch-delay` delays Gmail history fetch after each push (default `3s`) to avoid indexing races; accepts seconds (`5`) or Go durations (`5s`).
- `watch serve --history-types` accepts `messageAdded`, `messageDeleted`, `labelAdded`, `labelRemoved` (repeatable or comma-separated). Default: `messageAdded` (for backward compatibility).
- `watch serve --history-types` must include at least one non-empty type.

Expand Down
9 changes: 9 additions & 0 deletions internal/cmd/gmail_watch_cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ type GmailWatchServeCmd struct {
Bind string `name:"bind" help:"Bind address" default:"127.0.0.1"`
Port int `name:"port" help:"Listen port" default:"8788"`
Path string `name:"path" help:"Push handler path" default:"/gmail-pubsub"`
FetchDelay string `name:"fetch-delay" help:"Delay before fetching Gmail history (seconds or duration)" default:"3s"`
Timezone string `name:"timezone" short:"z" help:"Output timezone (IANA name, e.g. America/New_York, UTC). Default: local"`
Local bool `name:"local" help:"Use local timezone (default behavior, useful to override --timezone)"`
VerifyOIDC bool `name:"verify-oidc" help:"Verify Pub/Sub OIDC tokens"`
Expand Down Expand Up @@ -262,6 +263,13 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags
if err != nil {
return err
}
fetchDelay, err := parseDurationSeconds(c.FetchDelay)
if err != nil {
return err
}
if fetchDelay < 0 {
return usage("--fetch-delay must be >= 0")
}

store, err := loadGmailWatchStore(account)
if err != nil {
Expand Down Expand Up @@ -326,6 +334,7 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags
HookTimeout: defaultHookRequestTimeoutSec * time.Second,
HistoryMax: defaultHistoryMaxResults,
ResyncMax: defaultHistoryResyncMax,
FetchDelay: fetchDelay,
HistoryTypes: historyTypes,
AllowNoHook: hook == nil,
IncludeBody: includeBody,
Expand Down
89 changes: 89 additions & 0 deletions internal/cmd/gmail_watch_serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,100 @@ func TestGmailWatchServeCmd_DefaultMaxBytes(t *testing.T) {
if got.cfg.MaxBodyBytes != defaultHookMaxBytes {
t.Fatalf("expected default max bytes, got %d", got.cfg.MaxBodyBytes)
}
if got.cfg.FetchDelay != defaultHistoryFetchDelay {
t.Fatalf("expected default fetch delay %v, got %v", defaultHistoryFetchDelay, got.cfg.FetchDelay)
}
if len(got.cfg.ExcludeLabels) != 2 || got.cfg.ExcludeLabels[0] != "SPAM" || got.cfg.ExcludeLabels[1] != "TRASH" {
t.Fatalf("unexpected exclude labels: %#v", got.cfg.ExcludeLabels)
}
}

func TestGmailWatchServeCmd_FetchDelaySeconds(t *testing.T) {
origListen := listenAndServe
t.Cleanup(func() { listenAndServe = origListen })

home := t.TempDir()
t.Setenv("HOME", home)

store, err := newGmailWatchStore("a@b.com")
if err != nil {
t.Fatalf("store: %v", err)
}
updateErr := store.Update(func(s *gmailWatchState) error {
s.Account = "a@b.com"
return nil
})
if updateErr != nil {
t.Fatalf("seed: %v", updateErr)
}

flags := &RootFlags{Account: "a@b.com"}
var got *gmailWatchServer
listenAndServe = func(srv *http.Server) error {
if gs, ok := srv.Handler.(*gmailWatchServer); ok {
got = gs
}
return nil
}

u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"})
if err != nil {
t.Fatalf("ui.New: %v", err)
}
if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "5"}, ui.WithUI(context.Background(), u), flags); execErr != nil {
t.Fatalf("execute: %v", execErr)
}
if got == nil {
t.Fatalf("expected server")
}
if got.cfg.FetchDelay != 5*time.Second {
t.Fatalf("expected fetch delay 5s, got %v", got.cfg.FetchDelay)
}
}

func TestGmailWatchServeCmd_FetchDelayDuration(t *testing.T) {
origListen := listenAndServe
t.Cleanup(func() { listenAndServe = origListen })

home := t.TempDir()
t.Setenv("HOME", home)

store, err := newGmailWatchStore("a@b.com")
if err != nil {
t.Fatalf("store: %v", err)
}
updateErr := store.Update(func(s *gmailWatchState) error {
s.Account = "a@b.com"
return nil
})
if updateErr != nil {
t.Fatalf("seed: %v", updateErr)
}

flags := &RootFlags{Account: "a@b.com"}
var got *gmailWatchServer
listenAndServe = func(srv *http.Server) error {
if gs, ok := srv.Handler.(*gmailWatchServer); ok {
got = gs
}
return nil
}

u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"})
if err != nil {
t.Fatalf("ui.New: %v", err)
}
if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "750ms"}, ui.WithUI(context.Background(), u), flags); execErr != nil {
t.Fatalf("execute: %v", execErr)
}
if got == nil {
t.Fatalf("expected server")
}
if got.cfg.FetchDelay != 750*time.Millisecond {
t.Fatalf("expected fetch delay 750ms, got %v", got.cfg.FetchDelay)
}
}

func TestGmailWatchServeCmd_ExcludeLabels_Disable(t *testing.T) {
origListen := listenAndServe
t.Cleanup(func() { listenAndServe = origListen })
Expand Down
12 changes: 12 additions & 0 deletions internal/cmd/gmail_watch_serve_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,16 @@ func TestGmailWatchServeCmd_ValidationErrors(t *testing.T) {
t.Fatalf("expected error")
}
})

t.Run("fetch delay must be non-negative", func(t *testing.T) {
if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "-1", "--port", "9999"}, context.Background(), flags); err == nil {
t.Fatalf("expected error")
}
})

t.Run("fetch delay must parse as duration", func(t *testing.T) {
if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "not-a-duration", "--port", "9999"}, context.Background(), flags); err == nil {
t.Fatalf("expected error")
}
})
}
26 changes: 24 additions & 2 deletions internal/cmd/gmail_watch_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type gmailWatchServer struct {
store *gmailWatchStore
validator *idtoken.Validator
newService func(context.Context, string) (*gmail.Service, error)
sleep func(context.Context, time.Duration) error
hookClient *http.Client
excludeLabelIDs map[string]struct{}
logf func(string, ...any)
Expand Down Expand Up @@ -172,6 +173,11 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl
return nil, errNoNewMessages
}

err = s.sleepForFetch(ctx)
if err != nil {
return nil, err
}

svc, err := s.newService(ctx, s.cfg.Account)
if err != nil {
return nil, err
Expand All @@ -181,8 +187,7 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl
if len(s.cfg.HistoryTypes) > 0 {
historyCall.HistoryTypes(s.cfg.HistoryTypes...)
}

historyResp, err := historyCall.Do()
historyResp, err := historyCall.Context(ctx).Do()
if err != nil {
if isStaleHistoryError(err) {
return s.resyncHistory(ctx, svc, payload.HistoryID, payload.MessageID)
Expand Down Expand Up @@ -267,6 +272,23 @@ func (s *gmailWatchServer) resyncHistory(ctx context.Context, svc *gmail.Service
}, nil
}

func (s *gmailWatchServer) sleepForFetch(ctx context.Context) error {
if s.cfg.FetchDelay <= 0 {
return nil
}
if s.sleep != nil {
return s.sleep(ctx, s.cfg.FetchDelay)
}
timer := time.NewTimer(s.cfg.FetchDelay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}

func (s *gmailWatchServer) fetchMessages(ctx context.Context, svc *gmail.Service, ids []string) ([]gmailHookMessage, int, error) {
messages := make([]gmailHookMessage, 0, len(ids))
excluded := 0
Expand Down
116 changes: 116 additions & 0 deletions internal/cmd/gmail_watch_server_more_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,122 @@ func TestGmailWatchHelpers(t *testing.T) {
}
}

func TestGmailWatchServer_HandlePush_AppliesFetchDelay(t *testing.T) {
home := t.TempDir()
t.Setenv("HOME", home)

store, err := newGmailWatchStore("a@b.com")
if err != nil {
t.Fatalf("store: %v", err)
}
if updateErr := store.Update(func(s *gmailWatchState) error {
s.Account = "a@b.com"
s.HistoryID = "100"
return nil
}); updateErr != nil {
t.Fatalf("seed: %v", updateErr)
}

var historyCalls int
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.Contains(r.URL.Path, "/gmail/v1/users/me/history"):
historyCalls++
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"historyId": "200",
"history": []map[string]any{
{"messagesAdded": []map[string]any{
{"message": map[string]any{"id": "m1"}},
}},
},
})
return
case strings.Contains(r.URL.Path, "/gmail/v1/users/me/messages/m1"):
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"id": "m1",
"threadId": "t1",
"snippet": "hi",
"payload": map[string]any{"headers": []map[string]any{{"name": "Subject", "value": "S"}}},
})
return
default:
http.NotFound(w, r)
}
}))
defer srv.Close()

gsvc, err := gmail.NewService(context.Background(),
option.WithoutAuthentication(),
option.WithHTTPClient(srv.Client()),
option.WithEndpoint(srv.URL+"/"),
)
if err != nil {
t.Fatalf("NewService: %v", err)
}

var slept time.Duration
var sleepCalls int
server := &gmailWatchServer{
cfg: gmailWatchServeConfig{
Account: "a@b.com",
HistoryMax: 10,
FetchDelay: 5 * time.Second,
},
store: store,
newService: func(context.Context, string) (*gmail.Service, error) { return gsvc, nil },
sleep: func(_ context.Context, d time.Duration) error {
sleepCalls++
slept = d
return nil
},
logf: func(string, ...any) {},
warnf: func(string, ...any) {},
}

got, err := server.handlePush(context.Background(), gmailPushPayload{EmailAddress: "a@b.com", HistoryID: "200"})
if err != nil {
t.Fatalf("handlePush: %v", err)
}
if got == nil || len(got.Messages) != 1 {
t.Fatalf("unexpected payload: %#v", got)
}
if sleepCalls != 1 {
t.Fatalf("expected one sleep call, got %d", sleepCalls)
}
if slept != 5*time.Second {
t.Fatalf("expected 5s sleep, got %v", slept)
}
if historyCalls != 1 {
t.Fatalf("expected one history call, got %d", historyCalls)
}
}

func TestGmailWatchServer_HandlePush_FetchDelayCanceledContext(t *testing.T) {
var serviceCalls int
server := &gmailWatchServer{
cfg: gmailWatchServeConfig{Account: "a@b.com", FetchDelay: time.Second},
store: &gmailWatchStore{state: gmailWatchState{HistoryID: "100"}},
newService: func(context.Context, string) (*gmail.Service, error) {
serviceCalls++
return nil, errors.New("unexpected newService call")
},
logf: func(string, ...any) {},
warnf: func(string, ...any) {},
}

ctx, cancel := context.WithCancel(context.Background())
cancel()

if _, err := server.handlePush(ctx, gmailPushPayload{HistoryID: "200"}); !errors.Is(err, context.Canceled) {
t.Fatalf("expected canceled context, got %v", err)
}
if serviceCalls != 0 {
t.Fatalf("expected no service calls, got %d", serviceCalls)
}
}

func TestGmailWatchServer_OIDCAudience(t *testing.T) {
s := &gmailWatchServer{
cfg: gmailWatchServeConfig{OIDCAudience: ""},
Expand Down
2 changes: 2 additions & 0 deletions internal/cmd/gmail_watch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
defaultHookMaxBytes = 20000
defaultHistoryMaxResults = 100
defaultHistoryResyncMax = 10
defaultHistoryFetchDelay = 3 * time.Second
defaultPushBodyLimitBytes = 1024 * 1024
defaultHookRequestTimeoutSec = 10
)
Expand Down Expand Up @@ -55,6 +56,7 @@ type gmailWatchServeConfig struct {
ExcludeLabels []string
HistoryMax int64
ResyncMax int64
FetchDelay time.Duration
HistoryTypes []string
HookTimeout time.Duration
DateLocation *time.Location
Expand Down
Loading