Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ http {
data-dir = "/data/lplex"

journal {
rotate-duration = PT1H
retention {
max-age = P90D
max-size = 53687091200
Expand Down
2 changes: 2 additions & 0 deletions cmd/lplex-cloud/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var configToFlag = map[string]string{
"journal.retention.max-size": "journal-retention-max-size",
"journal.retention.soft-pct": "journal-retention-soft-pct",
"journal.retention.overflow-policy": "journal-retention-overflow-policy",
"journal.rotate-duration": "journal-rotate-duration",
"journal.rotate-size": "journal-rotate-size",
"journal.archive.command": "journal-archive-command",
"journal.archive.trigger": "journal-archive-trigger",
}
Expand Down
22 changes: 21 additions & 1 deletion cmd/lplex-cloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func main() {
retentionOverflowPolicy := flag.String("journal-retention-overflow-policy", "delete-unarchived", "Overflow policy: delete-unarchived or pause-recording")
archiveCommand := flag.String("journal-archive-command", "", "Path to archive script")
archiveTriggerStr := flag.String("journal-archive-trigger", "", "Archive trigger: on-rotate or before-expire")
journalRotateDur := flag.String("journal-rotate-duration", "PT1H", "Rotate live journal files after duration (ISO 8601, e.g. PT1H)")
journalRotateSize := flag.Int64("journal-rotate-size", 0, "Rotate live journal files after this many bytes (0 = disabled)")
configFile := flag.String("config", "", "Path to HOCON config file")
showVersion := flag.Bool("version", false, "Print version and exit")
flag.Parse()
Expand Down Expand Up @@ -87,6 +89,21 @@ func main() {
os.Exit(1)
}

// Parse and apply journal rotation for live writers.
{
var rotateDur time.Duration
if *journalRotateDur != "" {
var err error
rotateDur, err = lplex.ParseISO8601Duration(*journalRotateDur)
if err != nil {
logger.Error("invalid journal-rotate-duration", "value", *journalRotateDur, "error", err)
os.Exit(1)
}
}
im.SetJournalRotation(rotateDur, *journalRotateSize)
logger.Info("journal rotation configured", "duration", rotateDur, "size", *journalRotateSize)
}

// Set up journal keeper (retention + archive) if configured.
keeperCfg, err := buildKeeperConfig(
*dataDir,
Expand Down Expand Up @@ -156,9 +173,12 @@ func main() {
case <-ctx.Done():
}

cancel()
// Order matters: stop servers first (no new connections), then stop
// brokers (fires OnRotate via journal finalize), then stop the keeper
// (drains remaining rotation notifications before exiting).
shutdown()
im.Shutdown()
cancel()
wg.Wait()
logger.Info("lplex-cloud stopped")
}
Expand Down
71 changes: 70 additions & 1 deletion journal_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (k *JournalKeeper) Run(ctx context.Context) {
// Startup scan: handle files rotated while we were down.
k.scanAll(ctx)

// Archive any .lpj files that were rotated but never archived (e.g. process
// crashed before on-rotate fired, or files accumulated before archiving was
// configured). Runs once on startup regardless of trigger mode.
k.archiveUnarchived(ctx)

scanTicker := time.NewTicker(keeperScanInterval)
defer scanTicker.Stop()

Expand All @@ -164,7 +169,17 @@ func (k *JournalKeeper) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
// Drain any rotation notifications that arrived between
// im.Shutdown() and cancel(). Without this, files finalized
// during shutdown would never get archived.
for {
select {
case rf := <-k.rotateCh:
k.handleRotation(context.Background(), rf)
default:
return
}
}

case rf := <-k.rotateCh:
k.handleRotation(ctx, rf)
Expand Down Expand Up @@ -212,6 +227,60 @@ func (k *JournalKeeper) handleRotation(ctx context.Context, rf RotatedFile) {
k.archiveFiles(ctx, []pendingFile{pf})
}

// archiveUnarchived is a one-shot startup sweep that archives any .lpj files
// missing their .archived sidecar marker. Only runs with on-rotate trigger:
// if the process crashed before on-rotate fired, these files would never be
// archived otherwise. With before-expire, files get archived naturally at
// retention time. This runs at startup before any brokers are started, so
// all files are from previous runs and are complete.
func (k *JournalKeeper) archiveUnarchived(ctx context.Context) {
if k.cfg.ArchiveCommand == "" || k.cfg.ArchiveTrigger != ArchiveOnRotate {
return
}

dirs := k.cfg.Dirs
if k.cfg.DirFunc != nil {
dirs = k.cfg.DirFunc()
}

var toArchive []pendingFile
for _, d := range dirs {
if ctx.Err() != nil {
return
}

entries, err := os.ReadDir(d.Dir)
if err != nil {
continue
}

for _, e := range entries {
name := e.Name()
if !strings.HasSuffix(name, ".lpj") {
continue
}
info, err := e.Info()
if err != nil {
continue
}
path := filepath.Join(d.Dir, name)
if !isArchived(path) && !k.isPending(path) {
toArchive = append(toArchive, pendingFile{
path: path,
instanceID: d.InstanceID,
size: info.Size(),
created: parseTimestampFromFilename(name),
})
}
}
}

if len(toArchive) > 0 {
k.logger.Info("startup archive sweep", "files", len(toArchive))
k.archiveFiles(ctx, toArchive)
}
}

// scanAll scans all configured directories and applies retention + archive rules.
func (k *JournalKeeper) scanAll(ctx context.Context) {
dirs := k.cfg.Dirs
Expand Down
132 changes: 132 additions & 0 deletions journal_keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2218,3 +2218,135 @@ func TestEndToEndSoftToHardToPause(t *testing.T) {
t.Errorf("expected [true, false], got %v", pauseHistory)
}
}

// -----------------------------------------------------------------------
// Startup archive sweep
// -----------------------------------------------------------------------

// TestStartupArchivesUnarchivedFiles verifies that Run() archives all
// non-archived .lpj files on startup, regardless of trigger mode.
func TestStartupArchivesUnarchivedFiles(t *testing.T) {
dir := t.TempDir()
scriptDir := t.TempDir()
now := time.Now().UTC()

// Three files, none archived, none expired.
p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100)
p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100)
p3 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100)

script := writeArchiveScript(t, scriptDir, okScript)

keeper := NewJournalKeeper(KeeperConfig{
Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}},
ArchiveCommand: script,
ArchiveTrigger: ArchiveOnRotate,
MaxAge: 30 * 24 * time.Hour, // files are nowhere near expiry
})

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel()
}()
keeper.Run(ctx)

// All three should be archived. The startup sweep runs before any
// brokers are started, so all files are from previous runs.
if !isArchived(p1) {
t.Errorf("p1 should be archived: %s", p1)
}
if !isArchived(p2) {
t.Errorf("p2 should be archived: %s", p2)
}
if !isArchived(p3) {
t.Errorf("p3 should be archived: %s", p3)
}
}

// TestStartupSkipsAlreadyArchivedFiles verifies the startup sweep doesn't
// re-archive files that already have .archived markers.
func TestStartupSkipsAlreadyArchivedFiles(t *testing.T) {
dir := t.TempDir()
scriptDir := t.TempDir()
now := time.Now().UTC()

p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-3*time.Hour)), 100)
p2 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-2*time.Hour)), 100)
p3 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100)

// p1 is already archived.
markArchived(p1)

trackFile := filepath.Join(scriptDir, "calls.log")
trackScript := fmt.Sprintf(`
for arg in "$@"; do
echo "$arg" >> %s
echo "{\"path\":\"$arg\",\"status\":\"ok\"}"
done
`, trackFile)
script := writeArchiveScript(t, scriptDir, trackScript)

keeper := NewJournalKeeper(KeeperConfig{
Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}},
ArchiveCommand: script,
ArchiveTrigger: ArchiveOnRotate,
MaxAge: 30 * 24 * time.Hour,
})

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel()
}()
keeper.Run(ctx)

// p1 was already archived, p2 and p3 should now be archived too.
if !isArchived(p2) {
t.Error("p2 should be archived")
}
if !isArchived(p3) {
t.Error("p3 should be archived")
}

// Check the tracking file: p2 and p3 should appear (p1 was already archived).
data, err := os.ReadFile(trackFile)
if err != nil {
t.Fatalf("tracking file missing: %v", err)
}
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
if len(lines) != 2 {
t.Fatalf("expected 2 archive calls, got %d: %v", len(lines), lines)
}
}

// TestStartupArchivesSingleFile verifies the startup sweep archives even when
// there's only one file in a directory. This is the critical case: after a
// short-lived deployment, there's one file from finalize and it must be archived.
func TestStartupArchivesSingleFile(t *testing.T) {
dir := t.TempDir()
scriptDir := t.TempDir()
now := time.Now().UTC()

p1 := createTestJournal(t, dir, lpjName("nmea2k", now.Add(-1*time.Hour)), 100)

script := writeArchiveScript(t, scriptDir, okScript)

keeper := NewJournalKeeper(KeeperConfig{
Dirs: []KeeperDir{{Dir: dir, InstanceID: "test"}},
ArchiveCommand: script,
ArchiveTrigger: ArchiveOnRotate,
MaxAge: 30 * 24 * time.Hour,
})

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel()
}()
keeper.Run(ctx)

if !isArchived(p1) {
t.Error("single file should be archived on startup")
}
}
8 changes: 7 additions & 1 deletion lplex-cloud.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ http {
# Data directory for instance state and journals
data-dir = "/data/lplex"

# Journal retention and archival (all optional, comment out to disable)
# Journal rotation and retention (all optional, comment out to disable)
journal {

# Rotate live journal files after this duration or size (whichever comes first).
# Required for on-rotate archival to trigger. Backfill files rotate
# automatically when each backfill session ends.
# rotate-duration = PT1H
# rotate-size = 0 # bytes, 0 = disabled

# Retention policy (applied per-instance journal directory)
retention {
# Delete files older than this (ISO 8601, e.g. P30D = 30 days)
Expand Down
Loading
Loading