diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index f76a25092974..f20ecde1cdd8 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -16,7 +16,7 @@ class MyTestCompactionService : public CompactionService { public: MyTestCompactionService( std::string db_path, Options& options, - std::shared_ptr& statistics, + std::shared_ptr statistics, std::vector> listeners, std::vector> table_properties_collector_factories) @@ -2858,6 +2858,86 @@ TEST_F(ResumableCompactionKeyTypeTest, CancelAndResumeWithTimedPut) { VerifyResumeBytes(); } + +TEST_F(CompactionServiceTest, AtomicFlushRemoteCompactionMissingFile) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.atomic_flush = true; + options.create_missing_column_families = true; + options.env = env_; + options.max_manifest_file_size = 1; + options.num_levels = 2; + options.max_background_jobs = 4; + + auto my_cs = std::make_shared( + dbname_, options, nullptr, remote_listeners, + remote_table_properties_collector_factories); + options.compaction_service = my_cs; + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back("cf_1", options); + + ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles_, &db_)); + + // Atomic flush writes version edits for CF0 and CF1 as one atomic group + ASSERT_OK(Put(0, Key(1), "value1")); + ASSERT_OK(Put(1, Key(1), "value1")); + ASSERT_OK(db_->Flush(FlushOptions(), handles_)); + ASSERT_EQ("1", FilesPerLevel(0)); + ASSERT_EQ("1", FilesPerLevel(1)); + + // Add more L0 files so CompactRange has something to compact + ASSERT_OK(Put(0, Key(1), "value2")); + ASSERT_OK(Flush(0)); + ASSERT_EQ("2", FilesPerLevel(0)); + ASSERT_OK(Put(1, Key(1), "value2")); + ASSERT_OK(Flush(1)); + ASSERT_EQ("2", FilesPerLevel(1)); + + auto pressure_token = + dbfull()->TEST_write_controler().GetCompactionPressureToken(); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + + // Remote compaction on CF1 opens a secondary DB that reads the manifest. + // Sync point pauses secondary after it opens the manifest file but before + // reading records. The callback compacts CF0 locally, which deletes CF0's + // atomic-flushed SST and rotates the manifest. When the secondary resumes + // reading the manifest it already opened, CF0's file is gone, so the + // atomic group from the flush is incomplete in the secondary. + SyncPoint::GetInstance()->SetCallBack( + "ReactiveVersionSet::Recover:AfterMaybeSwitchManifest", + [&](void* /*arg*/) { + my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal); + ASSERT_OK(db_->CompactRange(cro, handles_[0], nullptr, nullptr)); + my_cs->ResetOverride(); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Before fix: fails with "Cannot find matched SST files" + // After fix: compaction succeeds + ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr)); + + // Verify both compactions actually ran (2 L0 files → 1 L1 file each) + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ("0,1", FilesPerLevel(1)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + for (auto* h : handles_) { + if (h != db_->DefaultColumnFamily()) { + ASSERT_OK(db_->DestroyColumnFamilyHandle(h)); + } + } + handles_.clear(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 42d83b84d627..d4697e77ef76 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -1047,6 +1047,35 @@ Status ManifestTailer::Initialize() { return s; } +Status ManifestTailer::OnAtomicGroupReplayEnd() { + Status s = VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd(); + if (!s.ok()) { + return s; + } + // For secondary/follower DB, files referenced in the manifest can become + // permanently unavailable due to manifest rotation combined with concurrent + // primary compaction. If the atomic group is incomplete (some CFs have + // missing files), salvage any valid versions from the staging map into + // versions_. Otherwise, the stale atomic_update_versions_ map traps all + // subsequent version creation and prevents version installation for all CFs. + if (atomic_update_versions_missing_ > 0) { + for (auto& [cfid, version] : atomic_update_versions_) { + if (version != nullptr) { + auto existing = versions_.find(cfid); + if (existing != versions_.end()) { + delete existing->second; + existing->second = version; + } else { + versions_.emplace(cfid, version); + } + } + } + atomic_update_versions_.clear(); + atomic_update_versions_missing_ = 0; + } + return Status::OK(); +} + Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) { Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd); diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 1d4b22e3c13e..babdb462b584 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -390,6 +390,8 @@ class ManifestTailer : public VersionEditHandlerPointInTime { void CheckIterationResult(const log::Reader& reader, Status* s) override; + Status OnAtomicGroupReplayEnd() override; + enum Mode : uint8_t { kRecovery = 0, kCatchUp = 1, diff --git a/db/version_set.cc b/db/version_set.cc index 6c9cbc82a17c..b7dab06e6374 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -7903,6 +7903,8 @@ Status ReactiveVersionSet::Recover( log::Reader* reader = manifest_reader->get(); assert(reader); + TEST_SYNC_POINT("ReactiveVersionSet::Recover:AfterMaybeSwitchManifest"); + manifest_tailer_.reset(new ManifestTailer( column_families, const_cast(this), io_tracer_, read_options_, EpochNumberRequirement::kMightMissing)); diff --git a/db/version_set.h b/db/version_set.h index 47a677cf59e6..7000fa2ef3ce 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1087,6 +1087,7 @@ class Version { friend class VersionSet; friend class VersionEditHandler; friend class VersionEditHandlerPointInTime; + friend class ManifestTailer; const InternalKeyComparator* internal_comparator() const { return storage_info_.internal_comparator_; diff --git a/unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md b/unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md new file mode 100644 index 000000000000..730e97f1d63e --- /dev/null +++ b/unreleased_history/bug_fixes/fix_atomic_flush_secondary_db.md @@ -0,0 +1 @@ +Fix a bug where secondary/follower DB with `atomic_flush=true` could stop installing new versions, causing remote compaction to fail with "Cannot find matched SST files" errors and secondary DB to serve permanently stale data in release builds and crashing in debug builds.