diff --git a/README.md b/README.md index 781f031..13037b6 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,7 @@ http { data-dir = "/data/lplex" journal { + rotate-duration = PT1H retention { max-age = P90D max-size = 53687091200 diff --git a/cmd/lplex-cloud/config.go b/cmd/lplex-cloud/config.go index cedd9fb..37f0541 100644 --- a/cmd/lplex-cloud/config.go +++ b/cmd/lplex-cloud/config.go @@ -30,6 +30,8 @@ var configToFlag = map[string]string{ "journal.retention.max-size": "journal-retention-max-size", "journal.retention.soft-pct": "journal-retention-soft-pct", "journal.retention.overflow-policy": "journal-retention-overflow-policy", + "journal.rotate-duration": "journal-rotate-duration", + "journal.rotate-size": "journal-rotate-size", "journal.archive.command": "journal-archive-command", "journal.archive.trigger": "journal-archive-trigger", } diff --git a/cmd/lplex-cloud/main.go b/cmd/lplex-cloud/main.go index 1949b0b..d0059ae 100644 --- a/cmd/lplex-cloud/main.go +++ b/cmd/lplex-cloud/main.go @@ -53,6 +53,8 @@ func main() { retentionOverflowPolicy := flag.String("journal-retention-overflow-policy", "delete-unarchived", "Overflow policy: delete-unarchived or pause-recording") archiveCommand := flag.String("journal-archive-command", "", "Path to archive script") archiveTriggerStr := flag.String("journal-archive-trigger", "", "Archive trigger: on-rotate or before-expire") + journalRotateDur := flag.String("journal-rotate-duration", "PT1H", "Rotate live journal files after duration (ISO 8601, e.g. PT1H)") + journalRotateSize := flag.Int64("journal-rotate-size", 0, "Rotate live journal files after this many bytes (0 = disabled)") configFile := flag.String("config", "", "Path to HOCON config file") showVersion := flag.Bool("version", false, "Print version and exit") flag.Parse() @@ -87,6 +89,21 @@ func main() { os.Exit(1) } + // Parse and apply journal rotation for live writers. + { + var rotateDur time.Duration + if *journalRotateDur != "" { + var err error + rotateDur, err = lplex.ParseISO8601Duration(*journalRotateDur) + if err != nil { + logger.Error("invalid journal-rotate-duration", "value", *journalRotateDur, "error", err) + os.Exit(1) + } + } + im.SetJournalRotation(rotateDur, *journalRotateSize) + logger.Info("journal rotation configured", "duration", rotateDur, "size", *journalRotateSize) + } + // Set up journal keeper (retention + archive) if configured. keeperCfg, err := buildKeeperConfig( *dataDir, @@ -156,9 +173,12 @@ func main() { case <-ctx.Done(): } - cancel() + // Order matters: stop servers first (no new connections), then stop + // brokers (fires OnRotate via journal finalize), then stop the keeper + // (drains remaining rotation notifications before exiting). shutdown() im.Shutdown() + cancel() wg.Wait() logger.Info("lplex-cloud stopped") } diff --git a/journal_keeper.go b/journal_keeper.go index 71fe9c6..53abaa6 100644 --- a/journal_keeper.go +++ b/journal_keeper.go @@ -155,6 +155,11 @@ func (k *JournalKeeper) Run(ctx context.Context) { // Startup scan: handle files rotated while we were down. k.scanAll(ctx) + // Archive any .lpj files that were rotated but never archived (e.g. process + // crashed before on-rotate fired, or files accumulated before archiving was + // configured). Runs once on startup regardless of trigger mode. + k.archiveUnarchived(ctx) + scanTicker := time.NewTicker(keeperScanInterval) defer scanTicker.Stop() @@ -164,7 +169,17 @@ func (k *JournalKeeper) Run(ctx context.Context) { for { select { case <-ctx.Done(): - return + // Drain any rotation notifications that arrived between + // im.Shutdown() and cancel(). Without this, files finalized + // during shutdown would never get archived. + for { + select { + case rf := <-k.rotateCh: + k.handleRotation(context.Background(), rf) + default: + return + } + } case rf := <-k.rotateCh: k.handleRotation(ctx, rf) @@ -212,6 +227,60 @@ func (k *JournalKeeper) handleRotation(ctx context.Context, rf RotatedFile) { k.archiveFiles(ctx, []pendingFile{pf}) } +// archiveUnarchived is a one-shot startup sweep that archives any .lpj files +// missing their .archived sidecar marker. Only runs with on-rotate trigger: +// if the process crashed before on-rotate fired, these files would never be +// archived otherwise. With before-expire, files get archived naturally at +// retention time. This runs at startup before any brokers are started, so +// all files are from previous runs and are complete. +func (k *JournalKeeper) archiveUnarchived(ctx context.Context) { + if k.cfg.ArchiveCommand == "" || k.cfg.ArchiveTrigger != ArchiveOnRotate { + return + } + + dirs := k.cfg.Dirs + if k.cfg.DirFunc != nil { + dirs = k.cfg.DirFunc() + } + + var toArchive []pendingFile + for _, d := range dirs { + if ctx.Err() != nil { + return + } + + entries, err := os.ReadDir(d.Dir) + if err != nil { + continue + } + + for _, e := range entries { + name := e.Name() + if !strings.HasSuffix(name, ".lpj") { + continue + } + info, err := e.Info() + if err != nil { + continue + } + path := filepath.Join(d.Dir, name) + if !isArchived(path) && !k.isPending(path) { + toArchive = append(toArchive, pendingFile{ + path: path, + instanceID: d.InstanceID, + size: info.Size(), + created: parseTimestampFromFilename(name), + }) + } + } + } + + if len(toArchive) > 0 { + k.logger.Info("startup archive sweep", "files", len(toArchive)) + k.archiveFiles(ctx, toArchive) + } +} + // scanAll scans all configured directories and applies retention + archive rules. func (k *JournalKeeper) scanAll(ctx context.Context) { dirs := k.cfg.Dirs diff --git a/journal_keeper_test.go b/journal_keeper_test.go index 15440a6..941b1ff 100644 --- a/journal_keeper_test.go +++ b/journal_keeper_test.go @@ -2218,3 +2218,135 @@ func TestEndToEndSoftToHardToPause(t *testing.T) { t.Errorf("expected [true, false], got %v", pauseHistory) } } + +// ----------------------------------------------------------------------- +// Startup archive sweep +// ----------------------------------------------------------------------- + +// TestStartupArchivesUnarchivedFiles verifies that Run() archives all +// non-archived .lpj files on startup, regardless of trigger mode. +func TestStartupArchivesUnarchivedFiles(t *testing.T) { + dir := t.TempDir() + scriptDir := t.TempDir() + now := time.Now().UTC() + + // Three files, none archived, none expired. + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100) + p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) + p3 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + + script := writeArchiveScript(t, scriptDir, okScript) + + keeper := NewJournalKeeper(KeeperConfig{ + Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}}, + ArchiveCommand: script, + ArchiveTrigger: ArchiveOnRotate, + MaxAge: 30 * 24 * time.Hour, // files are nowhere near expiry + }) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + keeper.Run(ctx) + + // All three should be archived. The startup sweep runs before any + // brokers are started, so all files are from previous runs. + if !isArchived(p1) { + t.Errorf("p1 should be archived: %s", p1) + } + if !isArchived(p2) { + t.Errorf("p2 should be archived: %s", p2) + } + if !isArchived(p3) { + t.Errorf("p3 should be archived: %s", p3) + } +} + +// TestStartupSkipsAlreadyArchivedFiles verifies the startup sweep doesn't +// re-archive files that already have .archived markers. +func TestStartupSkipsAlreadyArchivedFiles(t *testing.T) { + dir := t.TempDir() + scriptDir := t.TempDir() + now := time.Now().UTC() + + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100) + p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) + p3 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + + // p1 is already archived. + markArchived(p1) + + trackFile := filepath.Join(scriptDir, "calls.log") + trackScript := fmt.Sprintf(` +for arg in "$@"; do + echo "$arg" >> %s + echo "{\"path\":\"$arg\",\"status\":\"ok\"}" +done +`, trackFile) + script := writeArchiveScript(t, scriptDir, trackScript) + + keeper := NewJournalKeeper(KeeperConfig{ + Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}}, + ArchiveCommand: script, + ArchiveTrigger: ArchiveOnRotate, + MaxAge: 30 * 24 * time.Hour, + }) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + keeper.Run(ctx) + + // p1 was already archived, p2 and p3 should now be archived too. + if !isArchived(p2) { + t.Error("p2 should be archived") + } + if !isArchived(p3) { + t.Error("p3 should be archived") + } + + // Check the tracking file: p2 and p3 should appear (p1 was already archived). + data, err := os.ReadFile(trackFile) + if err != nil { + t.Fatalf("tracking file missing: %v", err) + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) != 2 { + t.Fatalf("expected 2 archive calls, got %d: %v", len(lines), lines) + } +} + +// TestStartupArchivesSingleFile verifies the startup sweep archives even when +// there's only one file in a directory. This is the critical case: after a +// short-lived deployment, there's one file from finalize and it must be archived. +func TestStartupArchivesSingleFile(t *testing.T) { + dir := t.TempDir() + scriptDir := t.TempDir() + now := time.Now().UTC() + + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + + script := writeArchiveScript(t, scriptDir, okScript) + + keeper := NewJournalKeeper(KeeperConfig{ + Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}}, + ArchiveCommand: script, + ArchiveTrigger: ArchiveOnRotate, + MaxAge: 30 * 24 * time.Hour, + }) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + keeper.Run(ctx) + + if !isArchived(p1) { + t.Error("single file should be archived on startup") + } +} diff --git a/lplex-cloud.conf.example b/lplex-cloud.conf.example index 0230935..2f073f9 100644 --- a/lplex-cloud.conf.example +++ b/lplex-cloud.conf.example @@ -40,9 +40,15 @@ http { # Data directory for instance state and journals data-dir = "/data/lplex" -# Journal retention and archival (all optional, comment out to disable) +# Journal rotation and retention (all optional, comment out to disable) journal { + # Rotate live journal files after this duration or size (whichever comes first). + # Required for on-rotate archival to trigger. Backfill files rotate + # automatically when each backfill session ends. + # rotate-duration = PT1H + # rotate-size = 0 # bytes, 0 = disabled + # Retention policy (applied per-instance journal directory) retention { # Delete files older than this (ISO 8601, e.g. P30D = 30 days) diff --git a/replication_server.go b/replication_server.go index 87fed27..8c64a6c 100644 --- a/replication_server.go +++ b/replication_server.go @@ -37,14 +37,17 @@ type InstanceState struct { PersistedHoles []SeqRange `json:"holes,omitempty"` // Runtime (not persisted) - events *EventLog - broker *Broker - journalDir string - journalCh chan RxFrame // connects broker to its JournalWriter - journalWriter *JournalWriter // live journal writer (nil when broker not running) - cancelFunc context.CancelFunc // stops the broker's journal writer - onRotate func(RotatedFile) // optional callback for keeper - logger *slog.Logger + events *EventLog + broker *Broker + journalDir string + journalCh chan RxFrame // connects broker to its JournalWriter + journalWriter *JournalWriter // live journal writer (nil when broker not running) + journalDone chan struct{} // closed when journal writer goroutine exits + cancelFunc context.CancelFunc // stops the broker's journal writer + onRotate func(RotatedFile) // optional callback for keeper + rotateDuration time.Duration // journal rotation interval for live writer + rotateSize int64 // journal rotation size cap for live writer + logger *slog.Logger } // instanceStatePersist is the JSON shape written to state.json. @@ -129,12 +132,14 @@ func (s *InstanceState) ensureBroker() { s.cancelFunc = cancel jw, err := NewJournalWriter(JournalConfig{ - Dir: journalDir, - Prefix: "nmea2k", - BlockSize: 262144, - Compression: journal.CompressionZstd, - OnRotate: s.onRotate, - Logger: s.logger.With("instance", s.ID, "component", "journal"), + Dir: journalDir, + Prefix: "nmea2k", + BlockSize: 262144, + Compression: journal.CompressionZstd, + RotateDuration: s.rotateDuration, + RotateSize: s.rotateSize, + OnRotate: s.onRotate, + Logger: s.logger.With("instance", s.ID, "component", "journal"), }, b.Devices(), s.journalCh) if err != nil { s.logger.Error("failed to create journal writer", "instance", s.ID, "error", err) @@ -143,9 +148,11 @@ func (s *InstanceState) ensureBroker() { } s.journalWriter = jw + s.journalDone = make(chan struct{}) go b.Run() go func() { + defer close(s.journalDone) if err := jw.Run(ctx); err != nil && ctx.Err() == nil { s.logger.Error("journal writer failed", "instance", s.ID, "error", err) } @@ -155,12 +162,15 @@ func (s *InstanceState) ensureBroker() { s.logger.Info("broker started", "instance", s.ID, "initial_head", initialHead) } -// stopBroker stops the instance's broker and journal writer. Caller must hold s.mu. +// stopBroker stops the instance's broker and journal writer. Blocks until +// the journal writer's finalize completes (including OnRotate callbacks). +// Caller must hold s.mu. func (s *InstanceState) stopBroker() { if s.broker == nil { return } b := s.broker + journalDone := s.journalDone s.broker = nil // Signal the broker to stop, then wait for Run() to exit so it's no @@ -174,19 +184,33 @@ func (s *InstanceState) stopBroker() { if s.cancelFunc != nil { s.cancelFunc() } + + // Wait for the journal writer goroutine to finish. This ensures + // finalize() has run and OnRotate has fired before we return. + // Release the lock while waiting to avoid deadlock if OnRotate + // needs to acquire other locks. + s.mu.Unlock() + if journalDone != nil { + <-journalDone + } + s.mu.Lock() + s.journalCh = nil s.journalWriter = nil + s.journalDone = nil s.cancelFunc = nil s.logger.Info("broker stopped", "instance", s.ID) } // InstanceManager manages per-instance state on the cloud side. type InstanceManager struct { - mu sync.Mutex - instances map[string]*InstanceState - dataDir string - logger *slog.Logger - onRotate func(instanceID string, rf RotatedFile) // optional callback for keeper + mu sync.Mutex + instances map[string]*InstanceState + dataDir string + logger *slog.Logger + onRotate func(instanceID string, rf RotatedFile) // optional callback for keeper + rotateDuration time.Duration // journal rotation interval for live writers + rotateSize int64 // journal rotation size cap for live writers } // NewInstanceManager creates a new instance manager, loading any persisted state. @@ -224,11 +248,11 @@ func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, } state := &InstanceState{ - ID: id, - HoleTracker: ht, - events: NewEventLog(), - journalDir: dir, - logger: logger, + ID: id, + HoleTracker: ht, + events: NewEventLog(), + journalDir: dir, + logger: logger, } if persisted != nil { state.Cursor = persisted.Cursor @@ -259,6 +283,22 @@ func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile } } +// SetJournalRotation configures rotation for live journal writers. +// Must be called before any connections are accepted. Retroactively updates +// all existing instances loaded at startup. +func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64) { + im.mu.Lock() + defer im.mu.Unlock() + im.rotateDuration = duration + im.rotateSize = size + for _, s := range im.instances { + s.mu.Lock() + s.rotateDuration = duration + s.rotateSize = size + s.mu.Unlock() + } +} + // makeOnRotate returns an instance-scoped OnRotate callback, or nil if no // manager-level callback is set. Caller must hold im.mu. func (im *InstanceManager) makeOnRotate(id string) func(RotatedFile) { @@ -301,12 +341,14 @@ func (im *InstanceManager) GetOrCreate(id string) *InstanceState { _ = os.MkdirAll(filepath.Join(dir, "journal"), 0o755) s := &InstanceState{ - ID: id, - HoleTracker: NewHoleTracker(), - events: NewEventLog(), - journalDir: dir, - onRotate: im.makeOnRotate(id), - logger: im.logger, + ID: id, + HoleTracker: NewHoleTracker(), + events: NewEventLog(), + journalDir: dir, + onRotate: im.makeOnRotate(id), + rotateDuration: im.rotateDuration, + rotateSize: im.rotateSize, + logger: im.logger, } im.instances[id] = s im.logger.Info("created instance", "id", id) diff --git a/replication_server_test.go b/replication_server_test.go index 6199859..6aee56e 100644 --- a/replication_server_test.go +++ b/replication_server_test.go @@ -5,6 +5,7 @@ import ( "log/slog" "os" "path/filepath" + "sync/atomic" "testing" "time" ) @@ -261,6 +262,9 @@ func TestInstanceManagerStopBrokerCleanup(t *testing.T) { if inst.journalCh != nil { t.Fatal("journalCh should be nil after stopBroker") } + if inst.journalDone != nil { + t.Fatal("journalDone should be nil after stopBroker") + } if inst.cancelFunc != nil { t.Fatal("cancelFunc should be nil after stopBroker") } @@ -270,6 +274,106 @@ func TestInstanceManagerStopBrokerCleanup(t *testing.T) { inst.mu.Unlock() } +func TestCloudJournalWriterRotation(t *testing.T) { + dir := t.TempDir() + im, err := NewInstanceManager(dir, slog.Default()) + if err != nil { + t.Fatal(err) + } + defer im.Shutdown() + + im.SetJournalRotation(42*time.Second, 1024*1024) + + var rotated atomic.Int32 + im.SetOnRotate(func(instanceID string, rf RotatedFile) { + if instanceID != "rotate-test" { + t.Errorf("unexpected instanceID in OnRotate: %q", instanceID) + } + rotated.Add(1) + }) + + inst := im.GetOrCreate("rotate-test") + + // Verify the duration propagated to the instance. + inst.mu.Lock() + if inst.rotateDuration != 42*time.Second { + t.Fatalf("rotateDuration: got %v, want 42s", inst.rotateDuration) + } + + // Start broker, which creates the JournalWriter with our rotation duration. + inst.ensureBroker() + if inst.journalWriter == nil { + t.Fatal("journalWriter should exist after ensureBroker") + } + if inst.journalWriter.cfg.RotateDuration != 42*time.Second { + t.Fatalf("JournalWriter.RotateDuration: got %v, want 42s", inst.journalWriter.cfg.RotateDuration) + } + if inst.journalWriter.cfg.RotateSize != 1024*1024 { + t.Fatalf("JournalWriter.RotateSize: got %d, want 1MB", inst.journalWriter.cfg.RotateSize) + } + broker := inst.broker + inst.mu.Unlock() + + // Feed a frame so the journal file gets created. + broker.RxFrames() <- RxFrame{ + Timestamp: time.Now(), + Header: ParseCANID(0x09F80100), + Data: make([]byte, 8), + Seq: 1, + } + time.Sleep(50 * time.Millisecond) // let the writer drain + + // Stop the broker. stopBroker now waits for the journal writer goroutine + // to finish, so finalize() and OnRotate complete before this returns. + inst.mu.Lock() + inst.stopBroker() + inst.mu.Unlock() + + if got := rotated.Load(); got != 1 { + t.Fatalf("expected 1 OnRotate callback (from finalize), got %d", got) + } + + // Verify a .lpj file was created. + journalDir := filepath.Join(dir, "instances", "rotate-test", "journal") + entries, err := os.ReadDir(journalDir) + if err != nil { + t.Fatalf("read journal dir: %v", err) + } + var lpjCount int + for _, e := range entries { + if filepath.Ext(e.Name()) == ".lpj" { + lpjCount++ + } + } + if lpjCount == 0 { + t.Fatal("expected at least 1 .lpj file") + } +} + +// Verify SetJournalRotation retroactively updates existing instances. +func TestSetJournalRotationRetroactive(t *testing.T) { + dir := t.TempDir() + im, err := NewInstanceManager(dir, slog.Default()) + if err != nil { + t.Fatal(err) + } + defer im.Shutdown() + + inst := im.GetOrCreate("boat-1") + + // Set rotation after instance was created. + im.SetJournalRotation(5*time.Minute, 512*1024*1024) + + inst.mu.Lock() + if inst.rotateDuration != 5*time.Minute { + t.Fatalf("retroactive duration: got %v, want 5m", inst.rotateDuration) + } + if inst.rotateSize != 512*1024*1024 { + t.Fatalf("retroactive size: got %d, want 512MB", inst.rotateSize) + } + inst.mu.Unlock() +} + func TestInstanceManagerShutdownPersistsAll(t *testing.T) { dir := t.TempDir() im, err := NewInstanceManager(dir, slog.Default()) diff --git a/website/docs/cloud/self-hosted.md b/website/docs/cloud/self-hosted.md index f93834e..c58d8f7 100644 --- a/website/docs/cloud/self-hosted.md +++ b/website/docs/cloud/self-hosted.md @@ -147,11 +147,26 @@ RestartSec=5 WantedBy=multi-user.target ``` +## Journal rotation + +Live journal files on the cloud side must be rotated for archival to work. Without rotation, the `on-rotate` trigger never fires and files grow indefinitely. Rotation is configured the same way as on the boat, with duration and/or size thresholds (whichever triggers first): + +```hocon +journal { + rotate-duration = PT1H # default, rotate after 1 hour + # rotate-size = 536870912 # optional, rotate after 512 MB +} +``` + +Duration-based rotation is on by default (`PT1H`), so no action is needed unless you want a different interval or want to add a size cap. Backfill files (from the backfill stream) rotate automatically when each backfill session closes. + ## Retention and archival lplex-cloud uses the same retention and archival system as lplex. A single JournalKeeper goroutine manages all instance directories. -See [Retention & Archival](/user-guide/retention) for configuration details. The same flags and HOCON paths apply. +On startup, the keeper runs a one-time sweep to archive any `.lpj` files that are missing `.archived` markers. This runs before any brokers start, so all files on disk are completed files from previous runs. This handles the case where the process was restarted before `on-rotate` archival could complete. + +See [Retention & Archival](/user-guide/retention) for configuration details. The same retention and archive flags apply. ## Monitoring diff --git a/website/docs/getting-started/configuration.md b/website/docs/getting-started/configuration.md index 23fdf45..459f9f6 100644 --- a/website/docs/getting-started/configuration.md +++ b/website/docs/getting-started/configuration.md @@ -196,6 +196,11 @@ data-dir = "/data/lplex" # Same retention/archive config as lplex journal { + # Rotate live journal files after this duration or size (whichever comes first). + # Required for on-rotate archival to work (files must rotate to trigger archival). + rotate-duration = PT1H + # rotate-size = 0 # bytes, 0 = disabled + retention { max-age = P90D max-size = 107374182400 @@ -223,6 +228,9 @@ journal { | `-tls-client-ca` | `grpc.tls.client-ca` / `tls.client-ca` | (empty) | Client CA cert | | `-data-dir` | `data-dir` | `/data/lplex` | Data directory | +| `-journal-rotate-duration` | `journal.rotate-duration` | `PT1H` | Rotate live journal files after this duration (ISO 8601) | +| `-journal-rotate-size` | `journal.rotate-size` | `0` | Rotate live journal files after this many bytes (0 = disabled) | + Retention and archive flags are the same as lplex (see table above). ## Systemd diff --git a/website/docs/user-guide/retention.md b/website/docs/user-guide/retention.md index 808edc7..78f2ee6 100644 --- a/website/docs/user-guide/retention.md +++ b/website/docs/user-guide/retention.md @@ -110,6 +110,10 @@ nmea2k-20260306T101500Z.lpj.archived The keeper uses these markers to track archive state across restarts. +### Startup archive sweep + +When the archive trigger is `on-rotate`, the keeper runs a one-time sweep on startup to archive any `.lpj` files that are missing their `.archived` marker. This runs before any brokers start, so all files on disk are completed files from previous runs. This catches files that were rotated but never archived, for example if the process was restarted before the `on-rotate` callback fired. + ### Retry behavior Failed archives retry with exponential backoff: 1 minute initial delay, doubling up to a 1 hour cap.