From 8666b1a0b84d949e7d01c1aa748581347408fe01 Mon Sep 17 00:00:00 2001 From: Theo Zourzouvillys Date: Sun, 8 Mar 2026 15:33:53 -0700 Subject: [PATCH 1/3] Fix cloud journal archival to S3 Three bugs prevented journal archival from working on lplex-cloud: 1. Live JournalWriter had zero RotateDuration, so files never rotated and the on-rotate trigger never fired. Add -journal-rotate-duration flag (default PT1H) and thread it through InstanceManager to each instance's JournalConfig. 2. On startup, the keeper only archived files during hard-expire. If the process restarted with rotated but non-expired .lpj files missing .archived markers, they sat there until max-age. Add a one-shot archiveUnarchived() sweep at startup that archives any unarchived files (skipping the newest per dir as it may be the active writer). 3. (Already fixed externally) archive-to-s3.sh read .file instead of .path from the JSONL metadata. --- README.md | 1 + cmd/lplex-cloud/config.go | 1 + cmd/lplex-cloud/main.go | 12 ++ journal_keeper.go | 74 +++++++++ journal_keeper_test.go | 144 ++++++++++++++++++ lplex-cloud.conf.example | 7 +- replication_server.go | 78 ++++++---- replication_server_test.go | 97 ++++++++++++ website/docs/cloud/self-hosted.md | 16 +- website/docs/getting-started/configuration.md | 6 + website/docs/user-guide/retention.md | 4 + 11 files changed, 408 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 781f031..13037b6 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,7 @@ http { data-dir = "/data/lplex" journal { + rotate-duration = PT1H retention { max-age = P90D max-size = 53687091200 diff --git a/cmd/lplex-cloud/config.go b/cmd/lplex-cloud/config.go index cedd9fb..08f83be 100644 --- a/cmd/lplex-cloud/config.go +++ b/cmd/lplex-cloud/config.go @@ -30,6 +30,7 @@ var configToFlag = map[string]string{ "journal.retention.max-size": "journal-retention-max-size", "journal.retention.soft-pct": "journal-retention-soft-pct", "journal.retention.overflow-policy": "journal-retention-overflow-policy", + "journal.rotate-duration": "journal-rotate-duration", "journal.archive.command": "journal-archive-command", "journal.archive.trigger": "journal-archive-trigger", } diff --git a/cmd/lplex-cloud/main.go b/cmd/lplex-cloud/main.go index 1949b0b..5e4a395 100644 --- a/cmd/lplex-cloud/main.go +++ b/cmd/lplex-cloud/main.go @@ -53,6 +53,7 @@ func main() { retentionOverflowPolicy := flag.String("journal-retention-overflow-policy", "delete-unarchived", "Overflow policy: delete-unarchived or pause-recording") archiveCommand := flag.String("journal-archive-command", "", "Path to archive script") archiveTriggerStr := flag.String("journal-archive-trigger", "", "Archive trigger: on-rotate or before-expire") + journalRotateDur := flag.String("journal-rotate-duration", "PT1H", "Rotate live journal files after duration (ISO 8601, e.g. PT1H)") configFile := flag.String("config", "", "Path to HOCON config file") showVersion := flag.Bool("version", false, "Print version and exit") flag.Parse() @@ -87,6 +88,17 @@ func main() { os.Exit(1) } + // Parse and apply journal rotation duration for live writers. + if *journalRotateDur != "" { + rotateDur, err := lplex.ParseISO8601Duration(*journalRotateDur) + if err != nil { + logger.Error("invalid journal-rotate-duration", "value", *journalRotateDur, "error", err) + os.Exit(1) + } + im.SetJournalRotateDuration(rotateDur) + logger.Info("journal rotation configured", "duration", rotateDur) + } + // Set up journal keeper (retention + archive) if configured. keeperCfg, err := buildKeeperConfig( *dataDir, diff --git a/journal_keeper.go b/journal_keeper.go index 71fe9c6..1d86cdc 100644 --- a/journal_keeper.go +++ b/journal_keeper.go @@ -155,6 +155,11 @@ func (k *JournalKeeper) Run(ctx context.Context) { // Startup scan: handle files rotated while we were down. k.scanAll(ctx) + // Archive any .lpj files that were rotated but never archived (e.g. process + // crashed before on-rotate fired, or files accumulated before archiving was + // configured). Runs once on startup regardless of trigger mode. + k.archiveUnarchived(ctx) + scanTicker := time.NewTicker(keeperScanInterval) defer scanTicker.Stop() @@ -212,6 +217,75 @@ func (k *JournalKeeper) handleRotation(ctx context.Context, rf RotatedFile) { k.archiveFiles(ctx, []pendingFile{pf}) } +// archiveUnarchived is a one-shot startup sweep that archives any .lpj files +// missing their .archived sidecar marker. The most recent file in each directory +// is skipped because it's likely the active journal being written to. +func (k *JournalKeeper) archiveUnarchived(ctx context.Context) { + if k.cfg.ArchiveCommand == "" { + return + } + + dirs := k.cfg.Dirs + if k.cfg.DirFunc != nil { + dirs = k.cfg.DirFunc() + } + + var toArchive []pendingFile + for _, d := range dirs { + if ctx.Err() != nil { + return + } + + entries, err := os.ReadDir(d.Dir) + if err != nil { + continue + } + + // Collect .lpj files sorted by timestamp (oldest first). + var files []journalFileInfo + for _, e := range entries { + name := e.Name() + if !strings.HasSuffix(name, ".lpj") { + continue + } + info, err := e.Info() + if err != nil { + continue + } + path := filepath.Join(d.Dir, name) + files = append(files, journalFileInfo{ + path: path, + instanceID: d.InstanceID, + size: info.Size(), + created: parseTimestampFromFilename(name), + archived: isArchived(path), + }) + } + + slices.SortFunc(files, func(a, b journalFileInfo) int { + return a.created.Compare(b.created) + }) + + if len(files) == 0 { + continue + } + + // Skip the newest file (likely the active journal). + candidates := files[:len(files)-1] + + for _, f := range candidates { + if !f.archived && !k.isPending(f.path) { + toArchive = append(toArchive, makePending(f)) + } + } + } + + if len(toArchive) > 0 { + k.logger.Info("startup archive sweep", "files", len(toArchive)) + k.archiveFiles(ctx, toArchive) + } +} + // scanAll scans all configured directories and applies retention + archive rules. func (k *JournalKeeper) scanAll(ctx context.Context) { dirs := k.cfg.Dirs diff --git a/journal_keeper_test.go b/journal_keeper_test.go index 15440a6..0473342 100644 --- a/journal_keeper_test.go +++ b/journal_keeper_test.go @@ -2218,3 +2218,147 @@ func TestEndToEndSoftToHardToPause(t *testing.T) { t.Errorf("expected [true, false], got %v", pauseHistory) } } + +// ----------------------------------------------------------------------- +// Startup archive sweep +// ----------------------------------------------------------------------- + +// TestStartupArchivesUnarchivedFiles verifies that Run() archives all +// non-archived .lpj files on startup, regardless of trigger mode. +func TestStartupArchivesUnarchivedFiles(t *testing.T) { + dir := t.TempDir() + scriptDir := t.TempDir() + now := time.Now().UTC() + + // Three files, none archived, none expired. + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100) + p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) + p3 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + + script := writeArchiveScript(t, scriptDir, okScript) + + keeper := NewJournalKeeper(KeeperConfig{ + Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}}, + ArchiveCommand: script, + ArchiveTrigger: ArchiveOnRotate, + MaxAge: 30 * 24 * time.Hour, // files are nowhere near expiry + }) + + ctx, cancel := context.WithCancel(context.Background()) + // Let Run() execute the startup scan, then cancel. + go func() { + time.Sleep(500 * time.Millisecond) + cancel() + }() + keeper.Run(ctx) + + // All three should be archived. p3 is the most recent but since there + // are 3 files, it's not the "active" one (only the newest per dir is + // skipped when there's exactly 1 file, but with 3 the newest is still + // skipped as it could be the active writer). Actually, the newest file + // should be skipped. Let's verify: + if !isArchived(p1) { + t.Errorf("p1 should be archived: %s", p1) + } + if !isArchived(p2) { + t.Errorf("p2 should be archived: %s", p2) + } + // p3 is the newest file in the dir, so it should be skipped (active journal). + if isArchived(p3) { + t.Errorf("p3 (newest) should NOT be archived: %s", p3) + } +} + +// TestStartupSkipsAlreadyArchivedFiles verifies the startup sweep doesn't +// re-archive files that already have .archived markers. +func TestStartupSkipsAlreadyArchivedFiles(t *testing.T) { + dir := t.TempDir() + scriptDir := t.TempDir() + now := time.Now().UTC() + + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100) + p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) + // The newest file, will be skipped as active. + createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + + // p1 is already archived. + markArchived(p1) + + trackFile := filepath.Join(scriptDir, "calls.log") + trackScript := fmt.Sprintf(` +for arg in "$@"; do + echo "$arg" >> %s + echo "{\"path\":\"$arg\",\"status\":\"ok\"}" +done +`, trackFile) + script := writeArchiveScript(t, scriptDir, trackScript) + + keeper := NewJournalKeeper(KeeperConfig{ + Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}}, + ArchiveCommand: script, + ArchiveTrigger: ArchiveOnRotate, + MaxAge: 30 * 24 * time.Hour, + }) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(500 * time.Millisecond) + cancel() + }() + keeper.Run(ctx) + + // p1 was already archived, p2 should now be archived too. + if !isArchived(p2) { + t.Error("p2 should be archived") + } + + // Check the tracking file: only p2 should appear (p1 was already archived, + // p3 is skipped as active). + data, err := os.ReadFile(trackFile) + if err != nil { + t.Fatalf("tracking file missing: %v", err) + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) != 1 { + t.Fatalf("expected 1 archive call, got %d: %v", len(lines), lines) + } + if lines[0] != p2 { + t.Errorf("expected archive call for %s, got %s", p2, lines[0]) + } +} + +// TestStartupSkipsActiveFile verifies the startup sweep skips the most recent +// file per directory (the one likely being written to). +func TestStartupSkipsActiveFile(t *testing.T) { + dir := t.TempDir() + scriptDir := t.TempDir() + now := time.Now().UTC() + + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) + p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + + script := writeArchiveScript(t, scriptDir, okScript) + + keeper := NewJournalKeeper(KeeperConfig{ + Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}}, + ArchiveCommand: script, + ArchiveTrigger: ArchiveOnRotate, + MaxAge: 30 * 24 * time.Hour, + }) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(500 * time.Millisecond) + cancel() + }() + keeper.Run(ctx) + + // p1 (older) should be archived. + if !isArchived(p1) { + t.Error("p1 should be archived") + } + // p2 (newest) should NOT be archived (active journal). + if isArchived(p2) { + t.Error("p2 (newest/active) should NOT be archived") + } +} diff --git a/lplex-cloud.conf.example b/lplex-cloud.conf.example index 0230935..8c2b0ea 100644 --- a/lplex-cloud.conf.example +++ b/lplex-cloud.conf.example @@ -40,9 +40,14 @@ http { # Data directory for instance state and journals data-dir = "/data/lplex" -# Journal retention and archival (all optional, comment out to disable) +# Journal rotation and retention (all optional, comment out to disable) journal { + # Rotate live journal files after this duration (ISO 8601, default PT1H). + # Required for on-rotate archival to trigger. Backfill files rotate + # automatically when each backfill session ends. + # rotate-duration = PT1H + # Retention policy (applied per-instance journal directory) retention { # Delete files older than this (ISO 8601, e.g. P30D = 30 days) diff --git a/replication_server.go b/replication_server.go index 87fed27..63c15b5 100644 --- a/replication_server.go +++ b/replication_server.go @@ -37,14 +37,15 @@ type InstanceState struct { PersistedHoles []SeqRange `json:"holes,omitempty"` // Runtime (not persisted) - events *EventLog - broker *Broker - journalDir string - journalCh chan RxFrame // connects broker to its JournalWriter - journalWriter *JournalWriter // live journal writer (nil when broker not running) - cancelFunc context.CancelFunc // stops the broker's journal writer - onRotate func(RotatedFile) // optional callback for keeper - logger *slog.Logger + events *EventLog + broker *Broker + journalDir string + journalCh chan RxFrame // connects broker to its JournalWriter + journalWriter *JournalWriter // live journal writer (nil when broker not running) + cancelFunc context.CancelFunc // stops the broker's journal writer + onRotate func(RotatedFile) // optional callback for keeper + rotateDuration time.Duration // journal rotation interval for live writer + logger *slog.Logger } // instanceStatePersist is the JSON shape written to state.json. @@ -129,12 +130,13 @@ func (s *InstanceState) ensureBroker() { s.cancelFunc = cancel jw, err := NewJournalWriter(JournalConfig{ - Dir: journalDir, - Prefix: "nmea2k", - BlockSize: 262144, - Compression: journal.CompressionZstd, - OnRotate: s.onRotate, - Logger: s.logger.With("instance", s.ID, "component", "journal"), + Dir: journalDir, + Prefix: "nmea2k", + BlockSize: 262144, + Compression: journal.CompressionZstd, + RotateDuration: s.rotateDuration, + OnRotate: s.onRotate, + Logger: s.logger.With("instance", s.ID, "component", "journal"), }, b.Devices(), s.journalCh) if err != nil { s.logger.Error("failed to create journal writer", "instance", s.ID, "error", err) @@ -182,11 +184,12 @@ func (s *InstanceState) stopBroker() { // InstanceManager manages per-instance state on the cloud side. type InstanceManager struct { - mu sync.Mutex - instances map[string]*InstanceState - dataDir string - logger *slog.Logger - onRotate func(instanceID string, rf RotatedFile) // optional callback for keeper + mu sync.Mutex + instances map[string]*InstanceState + dataDir string + logger *slog.Logger + onRotate func(instanceID string, rf RotatedFile) // optional callback for keeper + rotateDuration time.Duration // journal rotation interval for live writers } // NewInstanceManager creates a new instance manager, loading any persisted state. @@ -224,11 +227,11 @@ func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, } state := &InstanceState{ - ID: id, - HoleTracker: ht, - events: NewEventLog(), - journalDir: dir, - logger: logger, + ID: id, + HoleTracker: ht, + events: NewEventLog(), + journalDir: dir, + logger: logger, } if persisted != nil { state.Cursor = persisted.Cursor @@ -259,6 +262,20 @@ func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile } } +// SetJournalRotateDuration sets the rotation interval for live journal writers. +// Must be called before any connections are accepted. Retroactively updates +// all existing instances loaded at startup. +func (im *InstanceManager) SetJournalRotateDuration(d time.Duration) { + im.mu.Lock() + defer im.mu.Unlock() + im.rotateDuration = d + for _, s := range im.instances { + s.mu.Lock() + s.rotateDuration = d + s.mu.Unlock() + } +} + // makeOnRotate returns an instance-scoped OnRotate callback, or nil if no // manager-level callback is set. Caller must hold im.mu. func (im *InstanceManager) makeOnRotate(id string) func(RotatedFile) { @@ -301,12 +318,13 @@ func (im *InstanceManager) GetOrCreate(id string) *InstanceState { _ = os.MkdirAll(filepath.Join(dir, "journal"), 0o755) s := &InstanceState{ - ID: id, - HoleTracker: NewHoleTracker(), - events: NewEventLog(), - journalDir: dir, - onRotate: im.makeOnRotate(id), - logger: im.logger, + ID: id, + HoleTracker: NewHoleTracker(), + events: NewEventLog(), + journalDir: dir, + onRotate: im.makeOnRotate(id), + rotateDuration: im.rotateDuration, + logger: im.logger, } im.instances[id] = s im.logger.Info("created instance", "id", id) diff --git a/replication_server_test.go b/replication_server_test.go index 6199859..cbdfb19 100644 --- a/replication_server_test.go +++ b/replication_server_test.go @@ -5,6 +5,7 @@ import ( "log/slog" "os" "path/filepath" + "sync/atomic" "testing" "time" ) @@ -270,6 +271,102 @@ func TestInstanceManagerStopBrokerCleanup(t *testing.T) { inst.mu.Unlock() } +func TestCloudJournalWriterRotation(t *testing.T) { + dir := t.TempDir() + im, err := NewInstanceManager(dir, slog.Default()) + if err != nil { + t.Fatal(err) + } + defer im.Shutdown() + + im.SetJournalRotateDuration(42 * time.Second) + + var rotated atomic.Int32 + im.SetOnRotate(func(instanceID string, rf RotatedFile) { + if instanceID != "rotate-test" { + t.Errorf("unexpected instanceID in OnRotate: %q", instanceID) + } + rotated.Add(1) + }) + + inst := im.GetOrCreate("rotate-test") + + // Verify the duration propagated to the instance. + inst.mu.Lock() + if inst.rotateDuration != 42*time.Second { + t.Fatalf("rotateDuration: got %v, want 42s", inst.rotateDuration) + } + + // Start broker, which creates the JournalWriter with our rotation duration. + inst.ensureBroker() + if inst.journalWriter == nil { + t.Fatal("journalWriter should exist after ensureBroker") + } + if inst.journalWriter.cfg.RotateDuration != 42*time.Second { + t.Fatalf("JournalWriter.RotateDuration: got %v, want 42s", inst.journalWriter.cfg.RotateDuration) + } + broker := inst.broker + inst.mu.Unlock() + + // Feed a frame so the journal file gets created. + broker.RxFrames() <- RxFrame{ + Timestamp: time.Now(), + Header: ParseCANID(0x09F80100), + Data: make([]byte, 8), + Seq: 1, + } + time.Sleep(50 * time.Millisecond) // let the writer drain + + // Stop the broker. finalize() flushes the journal and fires OnRotate, + // but it runs in the journal writer goroutine which stopBroker doesn't + // join, so give it a moment. + inst.mu.Lock() + inst.stopBroker() + inst.mu.Unlock() + time.Sleep(200 * time.Millisecond) + + if got := rotated.Load(); got != 1 { + t.Fatalf("expected 1 OnRotate callback (from finalize), got %d", got) + } + + // Verify a .lpj file was created. + journalDir := filepath.Join(dir, "instances", "rotate-test", "journal") + entries, err := os.ReadDir(journalDir) + if err != nil { + t.Fatalf("read journal dir: %v", err) + } + var lpjCount int + for _, e := range entries { + if filepath.Ext(e.Name()) == ".lpj" { + lpjCount++ + } + } + if lpjCount == 0 { + t.Fatal("expected at least 1 .lpj file") + } +} + +// Verify SetJournalRotateDuration retroactively updates existing instances. +func TestSetJournalRotateDurationRetroactive(t *testing.T) { + dir := t.TempDir() + im, err := NewInstanceManager(dir, slog.Default()) + if err != nil { + t.Fatal(err) + } + defer im.Shutdown() + + inst := im.GetOrCreate("boat-1") + + // Set duration after instance was created. + im.SetJournalRotateDuration(5 * time.Minute) + + inst.mu.Lock() + if inst.rotateDuration != 5*time.Minute { + t.Fatalf("retroactive update failed: got %v, want 5m", inst.rotateDuration) + } + inst.mu.Unlock() +} + func TestInstanceManagerShutdownPersistsAll(t *testing.T) { dir := t.TempDir() im, err := NewInstanceManager(dir, slog.Default()) diff --git a/website/docs/cloud/self-hosted.md b/website/docs/cloud/self-hosted.md index f93834e..21cfc98 100644 --- a/website/docs/cloud/self-hosted.md +++ b/website/docs/cloud/self-hosted.md @@ -147,11 +147,25 @@ RestartSec=5 WantedBy=multi-user.target ``` +## Journal rotation + +Live journal files on the cloud side must be rotated for archival to work. Without rotation, the `on-rotate` trigger never fires and files grow indefinitely. Configure with `-journal-rotate-duration` (default `PT1H`): + +```hocon +journal { + rotate-duration = PT1H +} +``` + +This is set by default, so no action is needed unless you want a different interval. Backfill files (from the backfill stream) rotate automatically when each backfill session closes. + ## Retention and archival lplex-cloud uses the same retention and archival system as lplex. A single JournalKeeper goroutine manages all instance directories. -See [Retention & Archival](/user-guide/retention) for configuration details. The same flags and HOCON paths apply. +On startup, the keeper also runs a one-time sweep to archive any `.lpj` files that are missing `.archived` markers (skipping the most recent file per directory, which may still be active). This handles the case where the process was restarted before `on-rotate` archival could complete. + +See [Retention & Archival](/user-guide/retention) for configuration details. The same retention and archive flags apply. ## Monitoring diff --git a/website/docs/getting-started/configuration.md b/website/docs/getting-started/configuration.md index 23fdf45..47406fe 100644 --- a/website/docs/getting-started/configuration.md +++ b/website/docs/getting-started/configuration.md @@ -196,6 +196,10 @@ data-dir = "/data/lplex" # Same retention/archive config as lplex journal { + # Rotate live journal files after this duration (ISO 8601). + # Required for on-rotate archival to work (files must rotate to trigger archival). + rotate-duration = PT1H + retention { max-age = P90D max-size = 107374182400 @@ -223,6 +227,8 @@ journal { | `-tls-client-ca` | `grpc.tls.client-ca` / `tls.client-ca` | (empty) | Client CA cert | | `-data-dir` | `data-dir` | `/data/lplex` | Data directory | +| `-journal-rotate-duration` | `journal.rotate-duration` | `PT1H` | Rotate live journal files after this duration (ISO 8601) | + Retention and archive flags are the same as lplex (see table above). ## Systemd diff --git a/website/docs/user-guide/retention.md b/website/docs/user-guide/retention.md index 808edc7..18907ea 100644 --- a/website/docs/user-guide/retention.md +++ b/website/docs/user-guide/retention.md @@ -110,6 +110,10 @@ nmea2k-20260306T101500Z.lpj.archived The keeper uses these markers to track archive state across restarts. +### Startup archive sweep + +On startup, the keeper scans all directories and archives any `.lpj` files that are missing their `.archived` marker. The most recent file in each directory is skipped (it may be the active journal still being written to). This catches files that were rotated but never archived, for example if the process crashed before the `on-rotate` callback fired, or if archiving was configured after files already existed on disk. + ### Retry behavior Failed archives retry with exponential backoff: 1 minute initial delay, doubling up to a 1 hour cap. From 9fa641a0b8943d890ba1ed4ee66d1cea8482cee8 Mon Sep 17 00:00:00 2001 From: Theo Zourzouvillys Date: Sun, 8 Mar 2026 15:40:04 -0700 Subject: [PATCH 2/3] Add -journal-rotate-size to lplex-cloud for parity with boat The previous commit only threaded RotateDuration through to the cloud's JournalWriter. Add RotateSize too so files rotate on whichever threshold hits first (time or size), matching the boat-side behavior. Rename SetJournalRotateDuration to SetJournalRotation(duration, size) since both knobs travel together. --- cmd/lplex-cloud/config.go | 1 + cmd/lplex-cloud/main.go | 21 ++++++++++++------- lplex-cloud.conf.example | 3 ++- replication_server.go | 14 +++++++++---- replication_server_test.go | 18 ++++++++++------ website/docs/cloud/self-hosted.md | 7 ++++--- website/docs/getting-started/configuration.md | 4 +++- 7 files changed, 45 insertions(+), 23 deletions(-) diff --git a/cmd/lplex-cloud/config.go b/cmd/lplex-cloud/config.go index 08f83be..37f0541 100644 --- a/cmd/lplex-cloud/config.go +++ b/cmd/lplex-cloud/config.go @@ -31,6 +31,7 @@ var configToFlag = map[string]string{ "journal.retention.soft-pct": "journal-retention-soft-pct", "journal.retention.overflow-policy": "journal-retention-overflow-policy", "journal.rotate-duration": "journal-rotate-duration", + "journal.rotate-size": "journal-rotate-size", "journal.archive.command": "journal-archive-command", "journal.archive.trigger": "journal-archive-trigger", } diff --git a/cmd/lplex-cloud/main.go b/cmd/lplex-cloud/main.go index 5e4a395..aee1b0a 100644 --- a/cmd/lplex-cloud/main.go +++ b/cmd/lplex-cloud/main.go @@ -54,6 +54,7 @@ func main() { archiveCommand := flag.String("journal-archive-command", "", "Path to archive script") archiveTriggerStr := flag.String("journal-archive-trigger", "", "Archive trigger: on-rotate or before-expire") journalRotateDur := flag.String("journal-rotate-duration", "PT1H", "Rotate live journal files after duration (ISO 8601, e.g. PT1H)") + journalRotateSize := flag.Int64("journal-rotate-size", 0, "Rotate live journal files after this many bytes (0 = disabled)") configFile := flag.String("config", "", "Path to HOCON config file") showVersion := flag.Bool("version", false, "Print version and exit") flag.Parse() @@ -88,15 +89,19 @@ func main() { os.Exit(1) } - // Parse and apply journal rotation duration for live writers. - if *journalRotateDur != "" { - rotateDur, err := lplex.ParseISO8601Duration(*journalRotateDur) - if err != nil { - logger.Error("invalid journal-rotate-duration", "value", *journalRotateDur, "error", err) - os.Exit(1) + // Parse and apply journal rotation for live writers. + { + var rotateDur time.Duration + if *journalRotateDur != "" { + var err error + rotateDur, err = lplex.ParseISO8601Duration(*journalRotateDur) + if err != nil { + logger.Error("invalid journal-rotate-duration", "value", *journalRotateDur, "error", err) + os.Exit(1) + } } - im.SetJournalRotateDuration(rotateDur) - logger.Info("journal rotation configured", "duration", rotateDur) + im.SetJournalRotation(rotateDur, *journalRotateSize) + logger.Info("journal rotation configured", "duration", rotateDur, "size", *journalRotateSize) } // Set up journal keeper (retention + archive) if configured. diff --git a/lplex-cloud.conf.example b/lplex-cloud.conf.example index 8c2b0ea..2f073f9 100644 --- a/lplex-cloud.conf.example +++ b/lplex-cloud.conf.example @@ -43,10 +43,11 @@ data-dir = "/data/lplex" # Journal rotation and retention (all optional, comment out to disable) journal { - # Rotate live journal files after this duration (ISO 8601, default PT1H). + # Rotate live journal files after this duration or size (whichever comes first). # Required for on-rotate archival to trigger. Backfill files rotate # automatically when each backfill session ends. # rotate-duration = PT1H + # rotate-size = 0 # bytes, 0 = disabled # Retention policy (applied per-instance journal directory) retention { diff --git a/replication_server.go b/replication_server.go index 63c15b5..95becbd 100644 --- a/replication_server.go +++ b/replication_server.go @@ -45,6 +45,7 @@ type InstanceState struct { cancelFunc context.CancelFunc // stops the broker's journal writer onRotate func(RotatedFile) // optional callback for keeper rotateDuration time.Duration // journal rotation interval for live writer + rotateSize int64 // journal rotation size cap for live writer logger *slog.Logger } @@ -135,6 +136,7 @@ func (s *InstanceState) ensureBroker() { BlockSize: 262144, Compression: journal.CompressionZstd, RotateDuration: s.rotateDuration, + RotateSize: s.rotateSize, OnRotate: s.onRotate, Logger: s.logger.With("instance", s.ID, "component", "journal"), }, b.Devices(), s.journalCh) @@ -190,6 +192,7 @@ type InstanceManager struct { logger *slog.Logger onRotate func(instanceID string, rf RotatedFile) // optional callback for keeper rotateDuration time.Duration // journal rotation interval for live writers + rotateSize int64 // journal rotation size cap for live writers } // NewInstanceManager creates a new instance manager, loading any persisted state. @@ -262,16 +265,18 @@ func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile } } -// SetJournalRotateDuration sets the rotation interval for live journal writers. +// SetJournalRotation configures rotation for live journal writers. // Must be called before any connections are accepted. Retroactively updates // all existing instances loaded at startup. -func (im *InstanceManager) SetJournalRotateDuration(d time.Duration) { +func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64) { im.mu.Lock() defer im.mu.Unlock() - im.rotateDuration = d + im.rotateDuration = duration + im.rotateSize = size for _, s := range im.instances { s.mu.Lock() - s.rotateDuration = d + s.rotateDuration = duration + s.rotateSize = size s.mu.Unlock() } } @@ -324,6 +329,7 @@ func (im *InstanceManager) GetOrCreate(id string) *InstanceState { journalDir: dir, onRotate: im.makeOnRotate(id), rotateDuration: im.rotateDuration, + rotateSize: im.rotateSize, logger: im.logger, } im.instances[id] = s diff --git a/replication_server_test.go b/replication_server_test.go index cbdfb19..3b34c57 100644 --- a/replication_server_test.go +++ b/replication_server_test.go @@ -279,7 +279,7 @@ func TestCloudJournalWriterRotation(t *testing.T) { } defer im.Shutdown() - im.SetJournalRotateDuration(42 * time.Second) + im.SetJournalRotation(42*time.Second, 1024*1024) var rotated atomic.Int32 im.SetOnRotate(func(instanceID string, rf RotatedFile) { @@ -305,6 +305,9 @@ func TestCloudJournalWriterRotation(t *testing.T) { if inst.journalWriter.cfg.RotateDuration != 42*time.Second { t.Fatalf("JournalWriter.RotateDuration: got %v, want 42s", inst.journalWriter.cfg.RotateDuration) } + if inst.journalWriter.cfg.RotateSize != 1024*1024 { + t.Fatalf("JournalWriter.RotateSize: got %d, want 1MB", inst.journalWriter.cfg.RotateSize) + } broker := inst.broker inst.mu.Unlock() @@ -346,8 +349,8 @@ func TestCloudJournalWriterRotation(t *testing.T) { } } -// Verify SetJournalRotateDuration retroactively updates existing instances. -func TestSetJournalRotateDurationRetroactive(t *testing.T) { +// Verify SetJournalRotation retroactively updates existing instances. +func TestSetJournalRotationRetroactive(t *testing.T) { dir := t.TempDir() im, err := NewInstanceManager(dir, slog.Default()) if err != nil { @@ -357,12 +360,15 @@ func TestSetJournalRotateDurationRetroactive(t *testing.T) { inst := im.GetOrCreate("boat-1") - // Set duration after instance was created. - im.SetJournalRotateDuration(5 * time.Minute) + // Set rotation after instance was created. + im.SetJournalRotation(5*time.Minute, 512*1024*1024) inst.mu.Lock() if inst.rotateDuration != 5*time.Minute { - t.Fatalf("retroactive update failed: got %v, want 5m", inst.rotateDuration) + t.Fatalf("retroactive duration: got %v, want 5m", inst.rotateDuration) + } + if inst.rotateSize != 512*1024*1024 { + t.Fatalf("retroactive size: got %d, want 512MB", inst.rotateSize) } inst.mu.Unlock() } diff --git a/website/docs/cloud/self-hosted.md b/website/docs/cloud/self-hosted.md index 21cfc98..79bfc1f 100644 --- a/website/docs/cloud/self-hosted.md +++ b/website/docs/cloud/self-hosted.md @@ -149,15 +149,16 @@ WantedBy=multi-user.target ## Journal rotation -Live journal files on the cloud side must be rotated for archival to work. Without rotation, the `on-rotate` trigger never fires and files grow indefinitely. Configure with `-journal-rotate-duration` (default `PT1H`): +Live journal files on the cloud side must be rotated for archival to work. Without rotation, the `on-rotate` trigger never fires and files grow indefinitely. Rotation is configured the same way as on the boat, with duration and/or size thresholds (whichever triggers first): ```hocon journal { - rotate-duration = PT1H + rotate-duration = PT1H # default, rotate after 1 hour + # rotate-size = 536870912 # optional, rotate after 512 MB } ``` -This is set by default, so no action is needed unless you want a different interval. Backfill files (from the backfill stream) rotate automatically when each backfill session closes. +Duration-based rotation is on by default (`PT1H`), so no action is needed unless you want a different interval or want to add a size cap. Backfill files (from the backfill stream) rotate automatically when each backfill session closes. ## Retention and archival diff --git a/website/docs/getting-started/configuration.md b/website/docs/getting-started/configuration.md index 47406fe..459f9f6 100644 --- a/website/docs/getting-started/configuration.md +++ b/website/docs/getting-started/configuration.md @@ -196,9 +196,10 @@ data-dir = "/data/lplex" # Same retention/archive config as lplex journal { - # Rotate live journal files after this duration (ISO 8601). + # Rotate live journal files after this duration or size (whichever comes first). # Required for on-rotate archival to work (files must rotate to trigger archival). rotate-duration = PT1H + # rotate-size = 0 # bytes, 0 = disabled retention { max-age = P90D @@ -228,6 +229,7 @@ journal { | `-data-dir` | `data-dir` | `/data/lplex` | Data directory | | `-journal-rotate-duration` | `journal.rotate-duration` | `PT1H` | Rotate live journal files after this duration (ISO 8601) | +| `-journal-rotate-size` | `journal.rotate-size` | `0` | Rotate live journal files after this many bytes (0 = disabled) | Retention and archive flags are the same as lplex (see table above). From 88070bfd914babb4cf08cf63d9749b95369da400 Mon Sep 17 00:00:00 2001 From: Theo Zourzouvillys Date: Sun, 8 Mar 2026 15:54:24 -0700 Subject: [PATCH 3/3] Fix shutdown ordering and startup sweep for cloud journal archival Three bugs prevented journal files from reaching S3: 1. Shutdown ordering: cancel() killed the keeper before im.Shutdown() stopped brokers. Journal finalize -> OnRotate fired into a dead keeper. Fix: shutdown servers, then brokers, then keeper. 2. stopBroker didn't wait for journal writer goroutine. finalize() and OnRotate ran asynchronously after stopBroker returned. Added journalDone channel so stopBroker blocks until finalize completes. 3. archiveUnarchived skipped the newest file per directory (assumed active). With only one file (common after short-lived deployments), nothing got archived. Since the sweep runs at startup before any brokers exist, all files are completed. Removed the skip entirely. Also added drain logic to keeper's Run() so it processes remaining rotation notifications after context cancellation. --- cmd/lplex-cloud/main.go | 5 ++- journal_keeper.go | 53 +++++++++++++-------------- journal_keeper_test.go | 54 +++++++++++----------------- replication_server.go | 20 ++++++++++- replication_server_test.go | 9 ++--- website/docs/cloud/self-hosted.md | 2 +- website/docs/user-guide/retention.md | 2 +- 7 files changed, 75 insertions(+), 70 deletions(-) diff --git a/cmd/lplex-cloud/main.go b/cmd/lplex-cloud/main.go index aee1b0a..d0059ae 100644 --- a/cmd/lplex-cloud/main.go +++ b/cmd/lplex-cloud/main.go @@ -173,9 +173,12 @@ func main() { case <-ctx.Done(): } - cancel() + // Order matters: stop servers first (no new connections), then stop + // brokers (fires OnRotate via journal finalize), then stop the keeper + // (drains remaining rotation notifications before exiting). shutdown() im.Shutdown() + cancel() wg.Wait() logger.Info("lplex-cloud stopped") } diff --git a/journal_keeper.go b/journal_keeper.go index 1d86cdc..53abaa6 100644 --- a/journal_keeper.go +++ b/journal_keeper.go @@ -169,7 +169,17 @@ func (k *JournalKeeper) Run(ctx context.Context) { for { select { case <-ctx.Done(): - return + // Drain any rotation notifications that arrived between + // im.Shutdown() and cancel(). Without this, files finalized + // during shutdown would never get archived. + for { + select { + case rf := <-k.rotateCh: + k.handleRotation(context.Background(), rf) + default: + return + } + } case rf := <-k.rotateCh: k.handleRotation(ctx, rf) @@ -218,10 +228,13 @@ func (k *JournalKeeper) handleRotation(ctx context.Context, rf RotatedFile) { } // archiveUnarchived is a one-shot startup sweep that archives any .lpj files -// missing their .archived sidecar marker. The most recent file in each directory -// is skipped because it's likely the active journal being written to. +// missing their .archived sidecar marker. Only runs with on-rotate trigger: +// if the process crashed before on-rotate fired, these files would never be +// archived otherwise. With before-expire, files get archived naturally at +// retention time. This runs at startup before any brokers are started, so +// all files are from previous runs and are complete. func (k *JournalKeeper) archiveUnarchived(ctx context.Context) { - if k.cfg.ArchiveCommand == "" { + if k.cfg.ArchiveCommand == "" || k.cfg.ArchiveTrigger != ArchiveOnRotate { return } @@ -241,8 +254,6 @@ func (k *JournalKeeper) archiveUnarchived(ctx context.Context) { continue } - // Collect .lpj files sorted by timestamp (oldest first). - var files []journalFileInfo for _, e := range entries { name := e.Name() if !strings.HasSuffix(name, ".lpj") { @@ -253,29 +264,13 @@ func (k *JournalKeeper) archiveUnarchived(ctx context.Context) { continue } path := filepath.Join(d.Dir, name) - files = append(files, journalFileInfo{ - path: path, - instanceID: d.InstanceID, - size: info.Size(), - created: parseTimestampFromFilename(name), - archived: isArchived(path), - }) - } - - slices.SortFunc(files, func(a, b journalFileInfo) int { - return a.created.Compare(b.created) - }) - - if len(files) == 0 { - continue - } - - // Skip the newest file (likely the active journal). - candidates := files[:len(files)-1] - - for _, f := range candidates { - if !f.archived && !k.isPending(f.path) { - toArchive = append(toArchive, makePending(f)) + if !isArchived(path) && !k.isPending(path) { + toArchive = append(toArchive, pendingFile{ + path: path, + instanceID: d.InstanceID, + size: info.Size(), + created: parseTimestampFromFilename(name), + }) } } } diff --git a/journal_keeper_test.go b/journal_keeper_test.go index 0473342..941b1ff 100644 --- a/journal_keeper_test.go +++ b/journal_keeper_test.go @@ -2245,27 +2245,22 @@ func TestStartupArchivesUnarchivedFiles(t *testing.T) { }) ctx, cancel := context.WithCancel(context.Background()) - // Let Run() execute the startup scan, then cancel. go func() { - time.Sleep(500 * time.Millisecond) + time.Sleep(2 * time.Second) cancel() }() keeper.Run(ctx) - // All three should be archived. p3 is the most recent but since there - // are 3 files, it's not the "active" one (only the newest per dir is - // skipped when there's exactly 1 file, but with 3 the newest is still - // skipped as it could be the active writer). Actually, the newest file - // should be skipped. Let's verify: + // All three should be archived. The startup sweep runs before any + // brokers are started, so all files are from previous runs. if !isArchived(p1) { t.Errorf("p1 should be archived: %s", p1) } if !isArchived(p2) { t.Errorf("p2 should be archived: %s", p2) } - // p3 is the newest file in the dir, so it should be skipped (active journal). - if isArchived(p3) { - t.Errorf("p3 (newest) should NOT be archived: %s", p3) + if !isArchived(p3) { + t.Errorf("p3 should be archived: %s", p3) } } @@ -2278,8 +2273,7 @@ func TestStartupSkipsAlreadyArchivedFiles(t *testing.T) { p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100) p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) - // The newest file, will be skipped as active. - createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + p3 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) // p1 is already archived. markArchived(p1) @@ -2302,40 +2296,39 @@ done ctx, cancel := context.WithCancel(context.Background()) go func() { - time.Sleep(500 * time.Millisecond) + time.Sleep(2 * time.Second) cancel() }() keeper.Run(ctx) - // p1 was already archived, p2 should now be archived too. + // p1 was already archived, p2 and p3 should now be archived too. if !isArchived(p2) { t.Error("p2 should be archived") } + if !isArchived(p3) { + t.Error("p3 should be archived") + } - // Check the tracking file: only p2 should appear (p1 was already archived, - // p3 is skipped as active). + // Check the tracking file: p2 and p3 should appear (p1 was already archived). data, err := os.ReadFile(trackFile) if err != nil { t.Fatalf("tracking file missing: %v", err) } lines := strings.Split(strings.TrimSpace(string(data)), "\n") - if len(lines) != 1 { - t.Fatalf("expected 1 archive call, got %d: %v", len(lines), lines) - } - if lines[0] != p2 { - t.Errorf("expected archive call for %s, got %s", p2, lines[0]) + if len(lines) != 2 { + t.Fatalf("expected 2 archive calls, got %d: %v", len(lines), lines) } } -// TestStartupSkipsActiveFile verifies the startup sweep skips the most recent -// file per directory (the one likely being written to). -func TestStartupSkipsActiveFile(t *testing.T) { +// TestStartupArchivesSingleFile verifies the startup sweep archives even when +// there's only one file in a directory. This is the critical case: after a +// short-lived deployment, there's one file from finalize and it must be archived. +func TestStartupArchivesSingleFile(t *testing.T) { dir := t.TempDir() scriptDir := t.TempDir() now := time.Now().UTC() - p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100) - p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) + p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100) script := writeArchiveScript(t, scriptDir, okScript) @@ -2348,17 +2341,12 @@ func TestStartupSkipsActiveFile(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { - time.Sleep(500 * time.Millisecond) + time.Sleep(2 * time.Second) cancel() }() keeper.Run(ctx) - // p1 (older) should be archived. if !isArchived(p1) { - t.Error("p1 should be archived") - } - // p2 (newest) should NOT be archived (active journal). - if isArchived(p2) { - t.Error("p2 (newest/active) should NOT be archived") + t.Error("single file should be archived on startup") } } diff --git a/replication_server.go b/replication_server.go index 95becbd..8c64a6c 100644 --- a/replication_server.go +++ b/replication_server.go @@ -42,6 +42,7 @@ type InstanceState struct { journalDir string journalCh chan RxFrame // connects broker to its JournalWriter journalWriter *JournalWriter // live journal writer (nil when broker not running) + journalDone chan struct{} // closed when journal writer goroutine exits cancelFunc context.CancelFunc // stops the broker's journal writer onRotate func(RotatedFile) // optional callback for keeper rotateDuration time.Duration // journal rotation interval for live writer @@ -147,9 +148,11 @@ func (s *InstanceState) ensureBroker() { } s.journalWriter = jw + s.journalDone = make(chan struct{}) go b.Run() go func() { + defer close(s.journalDone) if err := jw.Run(ctx); err != nil && ctx.Err() == nil { s.logger.Error("journal writer failed", "instance", s.ID, "error", err) } @@ -159,12 +162,15 @@ func (s *InstanceState) ensureBroker() { s.logger.Info("broker started", "instance", s.ID, "initial_head", initialHead) } -// stopBroker stops the instance's broker and journal writer. Caller must hold s.mu. +// stopBroker stops the instance's broker and journal writer. Blocks until +// the journal writer's finalize completes (including OnRotate callbacks). +// Caller must hold s.mu. func (s *InstanceState) stopBroker() { if s.broker == nil { return } b := s.broker + journalDone := s.journalDone s.broker = nil // Signal the broker to stop, then wait for Run() to exit so it's no @@ -178,8 +184,20 @@ func (s *InstanceState) stopBroker() { if s.cancelFunc != nil { s.cancelFunc() } + + // Wait for the journal writer goroutine to finish. This ensures + // finalize() has run and OnRotate has fired before we return. + // Release the lock while waiting to avoid deadlock if OnRotate + // needs to acquire other locks. + s.mu.Unlock() + if journalDone != nil { + <-journalDone + } + s.mu.Lock() + s.journalCh = nil s.journalWriter = nil + s.journalDone = nil s.cancelFunc = nil s.logger.Info("broker stopped", "instance", s.ID) } diff --git a/replication_server_test.go b/replication_server_test.go index 3b34c57..6aee56e 100644 --- a/replication_server_test.go +++ b/replication_server_test.go @@ -262,6 +262,9 @@ func TestInstanceManagerStopBrokerCleanup(t *testing.T) { if inst.journalCh != nil { t.Fatal("journalCh should be nil after stopBroker") } + if inst.journalDone != nil { + t.Fatal("journalDone should be nil after stopBroker") + } if inst.cancelFunc != nil { t.Fatal("cancelFunc should be nil after stopBroker") } @@ -320,13 +323,11 @@ func TestCloudJournalWriterRotation(t *testing.T) { } time.Sleep(50 * time.Millisecond) // let the writer drain - // Stop the broker. finalize() flushes the journal and fires OnRotate, - // but it runs in the journal writer goroutine which stopBroker doesn't - // join, so give it a moment. + // Stop the broker. stopBroker now waits for the journal writer goroutine + // to finish, so finalize() and OnRotate complete before this returns. inst.mu.Lock() inst.stopBroker() inst.mu.Unlock() - time.Sleep(200 * time.Millisecond) if got := rotated.Load(); got != 1 { t.Fatalf("expected 1 OnRotate callback (from finalize), got %d", got) diff --git a/website/docs/cloud/self-hosted.md b/website/docs/cloud/self-hosted.md index 79bfc1f..c58d8f7 100644 --- a/website/docs/cloud/self-hosted.md +++ b/website/docs/cloud/self-hosted.md @@ -164,7 +164,7 @@ Duration-based rotation is on by default (`PT1H`), so no action is needed unless lplex-cloud uses the same retention and archival system as lplex. A single JournalKeeper goroutine manages all instance directories. -On startup, the keeper also runs a one-time sweep to archive any `.lpj` files that are missing `.archived` markers (skipping the most recent file per directory, which may still be active). This handles the case where the process was restarted before `on-rotate` archival could complete. +On startup, the keeper runs a one-time sweep to archive any `.lpj` files that are missing `.archived` markers. This runs before any brokers start, so all files on disk are completed files from previous runs. This handles the case where the process was restarted before `on-rotate` archival could complete. See [Retention & Archival](/user-guide/retention) for configuration details. The same retention and archive flags apply. diff --git a/website/docs/user-guide/retention.md b/website/docs/user-guide/retention.md index 18907ea..78f2ee6 100644 --- a/website/docs/user-guide/retention.md +++ b/website/docs/user-guide/retention.md @@ -112,7 +112,7 @@ The keeper uses these markers to track archive state across restarts. ### Startup archive sweep -On startup, the keeper scans all directories and archives any `.lpj` files that are missing their `.archived` marker. The most recent file in each directory is skipped (it may be the active journal still being written to). This catches files that were rotated but never archived, for example if the process crashed before the `on-rotate` callback fired, or if archiving was configured after files already existed on disk. +When the archive trigger is `on-rotate`, the keeper runs a one-time sweep on startup to archive any `.lpj` files that are missing their `.archived` marker. This runs before any brokers start, so all files on disk are completed files from previous runs. This catches files that were rotated but never archived, for example if the process was restarted before the `on-rotate` callback fired. ### Retry behavior