From 5fbf0505525e1e9624a903aae19a984e95edfec4 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Fri, 20 Feb 2026 12:41:41 -0800 Subject: [PATCH] Fix stale atomic_update_versions_ trapping version installation in secondary DB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: When `atomic_flush=true`, flush writes version edits for all CFs as an atomic group in the manifest. `VersionEditHandlerPointInTime` replays these groups using a staging map (`atomic_update_versions_`): versions are held there until ALL CFs in the group have valid versions, then moved to `versions_` for installation. If any CF has missing files, the map is never cleared. For secondary/follower DB (`ManifestTailer`), files can become permanently unavailable when manifest rotation races with primary compaction. Example: secondary opens MANIFEST-4, then the primary compacts CF0 (deleting CF0's atomic-flushed SST) and rotates to MANIFEST-5. When the secondary reads the manifest it already opened, CF0's file is gone — the atomic group is permanently incomplete: `atomic_update_versions_` stays {CF0: nullptr, CF1: valid_version}. Once stuck, all subsequent version creation for CF0 and CF1 is routed into this stale map, so nothing reaches `versions_` and no versions are ever installed. Symptoms: remote compaction "Cannot find matched SST files", secondary DB serving stale data. TryCatchUpWithPrimary cannot be used as a fix because the primary keeps writing new atomic groups, each of which can also become incomplete due to the same manifest rotation race. The secondary would be endlessly chasing a moving target — each catch-up attempt encounters new incomplete atomic groups that re-pollute the staging map. We can relax the all-or-nothing guarantee for ManifestTailer because it exists for primary best-effort recovery where missing files mean data loss on a frozen filesystem. For secondary DB, files are missing because the primary compacted them away — not data loss. Cross-CF read consistency in secondary is provided at a different layer (NewIterators/MultiGet use shared sequence numbers), so installing valid CF versions individually is safe. Fix: override OnAtomicGroupReplayEnd() in ManifestTailer to salvage valid versions from incomplete atomic groups and clear the staging map. The all-or-nothing invariant for primary best-effort recovery is preserved. Test Plan: New test `AtomicFlushRemoteCompactionMissingFile` reproduces the race where secondary DB gets an incomplete atomic group due to concurrent primary compaction + manifest rotation. - Before fix: "Cannot find matched SST files for the following file numbers: 9 14" - After fix: compaction succeeds, both CFs compacted to L1 --- db/compaction/compaction_service_test.cc | 82 ++++++++++++++++++- db/version_edit_handler.cc | 29 +++++++ db/version_edit_handler.h | 2 + db/version_set.cc | 2 + db/version_set.h | 1 + .../fix_atomic_flush_secondary_db.md | 1 + 6 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index f76a25092974..f20ecde1cdd8 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -16,7 +16,7 @@ class MyTestCompactionService : public CompactionService { public: MyTestCompactionService( std::string db_path, Options& options, - std::shared_ptr& statistics, + std::shared_ptr statistics, std::vector> listeners, std::vector> table_properties_collector_factories) @@ -2858,6 +2858,86 @@ TEST_F(ResumableCompactionKeyTypeTest, CancelAndResumeWithTimedPut) { VerifyResumeBytes(); } + +TEST_F(CompactionServiceTest, AtomicFlushRemoteCompactionMissingFile) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.atomic_flush = true; + options.create_missing_column_families = true; + options.env = env_; + options.max_manifest_file_size = 1; + options.num_levels = 2; + options.max_background_jobs = 4; + + auto my_cs = std::make_shared( + dbname_, options, nullptr, remote_listeners, + remote_table_properties_collector_factories); + options.compaction_service = my_cs; + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back("cf_1", options); + + ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles_, &db_)); + + // Atomic flush writes version edits for CF0 and CF1 as one atomic group + ASSERT_OK(Put(0, Key(1), "value1")); + ASSERT_OK(Put(1, Key(1), "value1")); + ASSERT_OK(db_->Flush(FlushOptions(), handles_)); + ASSERT_EQ("1", FilesPerLevel(0)); + ASSERT_EQ("1", FilesPerLevel(1)); + + // Add more L0 files so CompactRange has something to compact + ASSERT_OK(Put(0, Key(1), "value2")); + ASSERT_OK(Flush(0)); + ASSERT_EQ("2", FilesPerLevel(0)); + ASSERT_OK(Put(1, Key(1), "value2")); + ASSERT_OK(Flush(1)); + ASSERT_EQ("2", FilesPerLevel(1)); + + auto pressure_token = + dbfull()->TEST_write_controler().GetCompactionPressureToken(); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + + // Remote compaction on CF1 opens a secondary DB that reads the manifest. + // Sync point pauses secondary after it opens the manifest file but before + // reading records. The callback compacts CF0 locally, which deletes CF0's + // atomic-flushed SST and rotates the manifest. When the secondary resumes + // reading the manifest it already opened, CF0's file is gone, so the + // atomic group from the flush is incomplete in the secondary. + SyncPoint::GetInstance()->SetCallBack( + "ReactiveVersionSet::Recover:AfterMaybeSwitchManifest", + [&](void* /*arg*/) { + my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal); + ASSERT_OK(db_->CompactRange(cro, handles_[0], nullptr, nullptr)); + my_cs->ResetOverride(); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Before fix: fails with "Cannot find matched SST files" + // After fix: compaction succeeds + ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr)); + + // Verify both compactions actually ran (2 L0 files → 1 L1 file each) + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ("0,1", FilesPerLevel(1)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + for (auto* h : handles_) { + if (h != db_->DefaultColumnFamily()) { + ASSERT_OK(db_->DestroyColumnFamilyHandle(h)); + } + } + handles_.clear(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 42d83b84d627..d4697e77ef76 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -1047,6 +1047,35 @@ Status ManifestTailer::Initialize() { return s; } +Status ManifestTailer::OnAtomicGroupReplayEnd() { + Status s = VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd(); + if (!s.ok()) { + return s; + } + // For secondary/follower DB, files referenced in the manifest can become + // permanently unavailable due to manifest rotation combined with concurrent + // primary compaction. If the atomic group is incomplete (some CFs have + // missing files), salvage any valid versions from the staging map into + // versions_. Otherwise, the stale atomic_update_versions_ map traps all + // subsequent version creation and prevents version installation for all CFs. + if (atomic_update_versions_missing_ > 0) { + for (auto& [cfid, version] : atomic_update_versions_) { + if (version != nullptr) { + auto existing = versions_.find(cfid); + if (existing != versions_.end()) { + delete existing->second; + existing->second = version; + } else { + versions_.emplace(cfid, version); + } + } + } + atomic_update_versions_.clear(); + atomic_update_versions_missing_ = 0; + } + return Status::OK(); +} + Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) { Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd); diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 1d4b22e3c13e..babdb462b584 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -390,6 +390,8 @@ class ManifestTailer : public VersionEditHandlerPointInTime { void CheckIterationResult(const log::Reader& reader, Status* s) override; + Status OnAtomicGroupReplayEnd() override; + enum Mode : uint8_t { kRecovery = 0, kCatchUp = 1, diff --git a/db/version_set.cc b/db/version_set.cc index 6c9cbc82a17c..b7dab06e6374 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -7903,6 +7903,8 @@ Status ReactiveVersionSet::Recover( log::Reader* reader = manifest_reader->get(); assert(reader); + TEST_SYNC_POINT("ReactiveVersionSet::Recover:AfterMaybeSwitchManifest"); + manifest_tailer_.reset(new ManifestTailer( column_families, const_cast(this), io_tracer_, read_options_, EpochNumberRequirement::kMightMissing)); diff --git a/db/version_set.h b/db/version_set.h index 47a677cf59e6..7000fa2ef3ce 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1087,6 +1087,7 @@ class Version { friend class VersionSet; friend class VersionEditHandler; friend class VersionEditHandlerPointInTime; + friend class ManifestTailer; const InternalKeyComparator* internal_comparator() const { return storage_info_.internal_comparator_; diff --git a/unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md b/unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md new file mode 100644 index 000000000000..730e97f1d63e --- /dev/null +++ b/unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md @@ -0,0 +1 @@ +Fix a bug where secondary/follower DB with `atomic_flush=true` could stop installing new versions, causing remote compaction to fail with "Cannot find matched SST files" errors and secondary DB to serve permanently stale data in release builds and crashing in debug builds.