Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(
std::string db_path, Options& options,
std::shared_ptr<Statistics>& statistics,
std::shared_ptr<Statistics> statistics,
std::vector<std::shared_ptr<EventListener>> listeners,
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
table_properties_collector_factories)
Expand Down Expand Up @@ -2858,6 +2858,86 @@ TEST_F(ResumableCompactionKeyTypeTest, CancelAndResumeWithTimedPut) {

VerifyResumeBytes();
}

TEST_F(CompactionServiceTest, AtomicFlushRemoteCompactionMissingFile) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.atomic_flush = true;
options.create_missing_column_families = true;
options.env = env_;
options.max_manifest_file_size = 1;
options.num_levels = 2;
options.max_background_jobs = 4;

auto my_cs = std::make_shared<MyTestCompactionService>(
dbname_, options, nullptr, remote_listeners,
remote_table_properties_collector_factories);
options.compaction_service = my_cs;

Close();
ASSERT_OK(DestroyDB(dbname_, options));

std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back("cf_1", options);

ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles_, &db_));

// Atomic flush writes version edits for CF0 and CF1 as one atomic group
ASSERT_OK(Put(0, Key(1), "value1"));
ASSERT_OK(Put(1, Key(1), "value1"));
ASSERT_OK(db_->Flush(FlushOptions(), handles_));
ASSERT_EQ("1", FilesPerLevel(0));
ASSERT_EQ("1", FilesPerLevel(1));

// Add more L0 files so CompactRange has something to compact
ASSERT_OK(Put(0, Key(1), "value2"));
ASSERT_OK(Flush(0));
ASSERT_EQ("2", FilesPerLevel(0));
ASSERT_OK(Put(1, Key(1), "value2"));
ASSERT_OK(Flush(1));
ASSERT_EQ("2", FilesPerLevel(1));

auto pressure_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();

CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;

// Remote compaction on CF1 opens a secondary DB that reads the manifest.
// Sync point pauses secondary after it opens the manifest file but before
// reading records. The callback compacts CF0 locally, which deletes CF0's
// atomic-flushed SST and rotates the manifest. When the secondary resumes
// reading the manifest it already opened, CF0's file is gone, so the
// atomic group from the flush is incomplete in the secondary.
SyncPoint::GetInstance()->SetCallBack(
"ReactiveVersionSet::Recover:AfterMaybeSwitchManifest",
[&](void* /*arg*/) {
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
ASSERT_OK(db_->CompactRange(cro, handles_[0], nullptr, nullptr));
my_cs->ResetOverride();
});
SyncPoint::GetInstance()->EnableProcessing();

// Before fix: fails with "Cannot find matched SST files"
// After fix: compaction succeeds
ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));

// Verify both compactions actually ran (2 L0 files → 1 L1 file each)
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_EQ("0,1", FilesPerLevel(1));

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

for (auto* h : handles_) {
if (h != db_->DefaultColumnFamily()) {
ASSERT_OK(db_->DestroyColumnFamilyHandle(h));
}
}
handles_.clear();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
29 changes: 29 additions & 0 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,35 @@ Status ManifestTailer::Initialize() {
return s;
}

Status ManifestTailer::OnAtomicGroupReplayEnd() {
Status s = VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd();
if (!s.ok()) {
return s;
}
// For secondary/follower DB, files referenced in the manifest can become
// permanently unavailable due to manifest rotation combined with concurrent
// primary compaction. If the atomic group is incomplete (some CFs have
// missing files), salvage any valid versions from the staging map into
// versions_. Otherwise, the stale atomic_update_versions_ map traps all
// subsequent version creation and prevents version installation for all CFs.
if (atomic_update_versions_missing_ > 0) {
for (auto& [cfid, version] : atomic_update_versions_) {
if (version != nullptr) {
auto existing = versions_.find(cfid);
if (existing != versions_.end()) {
delete existing->second;
existing->second = version;
} else {
versions_.emplace(cfid, version);
}
}
}
atomic_update_versions_.clear();
atomic_update_versions_missing_ = 0;
}
return Status::OK();
}

Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** cfd) {
Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd);
Expand Down
2 changes: 2 additions & 0 deletions db/version_edit_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ class ManifestTailer : public VersionEditHandlerPointInTime {

void CheckIterationResult(const log::Reader& reader, Status* s) override;

Status OnAtomicGroupReplayEnd() override;

enum Mode : uint8_t {
kRecovery = 0,
kCatchUp = 1,
Expand Down
2 changes: 2 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7903,6 +7903,8 @@ Status ReactiveVersionSet::Recover(
log::Reader* reader = manifest_reader->get();
assert(reader);

TEST_SYNC_POINT("ReactiveVersionSet::Recover:AfterMaybeSwitchManifest");

manifest_tailer_.reset(new ManifestTailer(
column_families, const_cast<ReactiveVersionSet*>(this), io_tracer_,
read_options_, EpochNumberRequirement::kMightMissing));
Expand Down
1 change: 1 addition & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ class Version {
friend class VersionSet;
friend class VersionEditHandler;
friend class VersionEditHandlerPointInTime;
friend class ManifestTailer;

const InternalKeyComparator* internal_comparator() const {
return storage_info_.internal_comparator_;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where secondary/follower DB with `atomic_flush=true` could stop installing new versions, causing remote compaction to fail with "Cannot find matched SST files" errors and secondary DB to serve permanently stale data in release builds and crashing in debug builds.
Loading