From c9589a88564fae7da1ef05e10c7342abdbda3364 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 19 Oct 2020 15:36:10 +0800 Subject: [PATCH 01/29] remove mutex in all write method Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 54 +++++++++------- db/db_impl/db_impl.h | 17 +++-- db/db_impl/db_impl_compaction_flush.cc | 6 +- db/db_impl/db_impl_files.cc | 13 +--- db/db_impl/db_impl_write.cc | 87 +++++++++++++------------- db/error_handler.cc | 11 ++++ db/error_handler.h | 20 +++--- db/perf_context_test.cc | 2 +- 8 files changed, 114 insertions(+), 96 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 50e8d711a859..dce27851ed8b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -173,7 +173,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, log_dir_synced_(false), log_empty_(true), persist_stats_cf_handle_(nullptr), - log_sync_cv_(&mutex_), + log_sync_cv_(&log_write_mutex_), total_log_size_(0), is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), @@ -267,6 +267,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber() // is called by client and this seqnum is advanced. preserve_deletes_seqnum_.store(0); + max_total_wal_size_.store(mutable_db_options_.max_total_wal_size, + std::memory_order_relaxed); } Status DBImpl::Resume() { @@ -573,25 +575,28 @@ Status DBImpl::CloseHelper() { mutex_.Lock(); } - for (auto l : logs_to_free_) { - delete l; - } - for (auto& log : logs_) { - uint64_t log_number = log.writer->get_log_number(); - Status s = log.ClearWriter(); - if (!s.ok()) { - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Unable to Sync WAL file %s with error -- %s", - LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), - s.ToString().c_str()); - // Retain the first error - if (ret.ok()) { - ret = s; + { + InstrumentedMutexLock lock(&log_write_mutex_); + for (auto l : logs_to_free_) { + delete l; + } + for (auto& log : logs_) { + uint64_t log_number = log.writer->get_log_number(); + Status s = log.ClearWriter(); + if (!s.ok()) { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Unable to Sync WAL file %s with error -- %s", + LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), + s.ToString().c_str()); + // Retain the first error + if (ret.ok()) { + ret = s; + } } } + logs_.clear(); } - logs_.clear(); // Table cache may have table handles holding blocks from the block cache. // We need to release them before the block cache is destroyed. The block @@ -1105,6 +1110,11 @@ Status DBImpl::SetDBOptions( new_options.stats_persist_period_sec); mutex_.Lock(); } + if (new_options.max_total_wal_size != + mutable_db_options_.max_total_wal_size) { + max_total_wal_size_.store(new_options.max_total_wal_size, + std::memory_order_release); + } write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); table_cache_.get()->SetCapacity(new_options.max_open_files == -1 @@ -1224,7 +1234,7 @@ Status DBImpl::SyncWAL() { uint64_t current_log_number; { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); assert(!logs_.empty()); // This SyncWAL() call only cares about logs up to this number. @@ -1281,7 +1291,7 @@ Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); MarkLogsSynced(current_log_number, need_log_dir_sync, status); } TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); @@ -1310,7 +1320,7 @@ Status DBImpl::UnlockWAL() { void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status) { - mutex_.AssertHeld(); + log_write_mutex_.AssertHeld(); if (synced_dir && logfile_number_ == up_to && status.ok()) { log_dir_synced_ = true; } @@ -1319,8 +1329,6 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, assert(log.getting_synced); if (status.ok() && logs_.size() > 1) { logs_to_free_.push_back(log.ReleaseWriter()); - // To modify logs_ both mutex_ and log_write_mutex_ must be held - InstrumentedMutexLock l(&log_write_mutex_); it = logs_.erase(it); } else { log.getting_synced = false; @@ -2582,7 +2590,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, s = cfd->AddDirectories(&dummy_created_dirs); } if (s.ok()) { - single_column_family_mode_ = false; auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); @@ -2599,6 +2606,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); + single_column_family_mode_.store(false, std::memory_order_release); } else { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Creating column family [%s] FAILED -- %s", diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b843093db829..ef768df73802 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1050,10 +1050,10 @@ class DBImpl : public DB { // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families - uint64_t max_total_in_memory_state_; + std::atomic max_total_in_memory_state_; // If true, we have only one (default) column family. We use this to optimize // some code-paths - bool single_column_family_mode_; + std::atomic single_column_family_mode_; // The options to access storage files const FileOptions file_options_; @@ -1257,7 +1257,13 @@ class DBImpl : public DB { } } }; - + struct LogContext { + explicit LogContext(bool need_sync = false) + : need_log_sync(need_sync), need_log_dir_sync(need_sync) {} + bool need_log_sync; + bool need_log_dir_sync; + log::Writer* writer; + }; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } @@ -1551,8 +1557,8 @@ class DBImpl : public DB { Status HandleWriteBufferFull(WriteContext* write_context); // REQUIRES: mutex locked - Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, - WriteContext* write_context); + Status PreprocessWrite(const WriteOptions& write_options, + LogContext* log_context, WriteContext* write_context); WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, WriteBatch* tmp_batch, size_t* write_with_wal, @@ -2168,6 +2174,7 @@ class DBImpl : public DB { InstrumentedCondVar atomic_flush_install_cv_; bool wal_in_db_path_; + std::atomic max_total_wal_size_; }; extern Options SanitizeOptions(const std::string& db, const Options& src); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 0ea4d7d0dd7d..dd6216d1f7c5 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -80,7 +80,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); - mutex_.AssertHeld(); + InstrumentedMutexLock l(&log_write_mutex_); autovector logs_to_sync; uint64_t current_log_number = logfile_number_; while (logs_.front().number < current_log_number && @@ -97,7 +97,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { IOStatus io_s; if (!logs_to_sync.empty()) { - mutex_.Unlock(); + log_write_mutex_.Unlock(); for (log::Writer* log : logs_to_sync) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -119,7 +119,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } - mutex_.Lock(); + log_write_mutex_.Lock(); // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 2d30f5857b95..25ee7b00e9d6 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -238,6 +238,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ is empty when called during recovery, in which case there can't yet // be any tracked obsolete logs + InstrumentedMutexLock l(&log_write_mutex_); if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size(); @@ -259,13 +260,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } job_context->size_log_to_delete += earliest.size; total_log_size_ -= earliest.size; - if (two_write_queues_) { - log_write_mutex_.Lock(); - } alive_log_files_.pop_front(); - if (two_write_queues_) { - log_write_mutex_.Unlock(); - } + // Current log should always stay alive since it can't have // number < MinLogNumber(). assert(alive_log_files_.size()); @@ -278,10 +274,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, continue; } logs_to_free_.push_back(log.ReleaseWriter()); - { - InstrumentedMutexLock wl(&log_write_mutex_); - logs_.pop_front(); - } + logs_.pop_front(); } // Current log cannot be obsolete. assert(!logs_.empty()); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 5d4f88f0b884..80523509f652 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -216,14 +216,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. WriteContext write_context; + LogContext log_context(write_options.sync); WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = kMaxSequenceNumber; - mutex_.Lock(); - - bool need_log_sync = write_options.sync; - bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + // The writer will only be used when two_write_queues_ is false. if (!two_write_queues_ || !disable_memtable) { // With concurrent writes we do preprocess only in the write thread that // also does write to memtable to avoid sync issue on shared data structure @@ -232,7 +230,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); - status = PreprocessWrite(write_options, &need_log_sync, &write_context); + status = PreprocessWrite(write_options, &log_context, &write_context); if (!two_write_queues_) { // Assign it after ::PreprocessWrite since the sequence might advance // inside it by WriteRecoverableState @@ -241,9 +239,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); } - log::Writer* log_writer = logs_.back().writer; - mutex_.Unlock(); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -327,8 +323,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!two_write_queues_) { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); - io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, last_sequence + 1); + io_s = WriteToWAL(write_group, log_context.writer, log_used, + log_context.need_log_sync, + log_context.need_log_dir_sync, last_sequence + 1); } } else { if (status.ok() && !write_options.disableWAL) { @@ -424,10 +421,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - if (need_log_sync) { - mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, status); - mutex_.Unlock(); + if (log_context.need_log_sync) { + log_write_mutex_.Lock(); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, status); + log_write_mutex_.Unlock(); // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily efficient. if (two_write_queues_) { @@ -479,15 +476,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (w.callback && !w.callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } - mutex_.Lock(); - bool need_log_sync = !write_options.disableWAL && write_options.sync; - bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + LogContext log_context(!write_options.disableWAL && write_options.sync); // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); - w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); + w.status = PreprocessWrite(write_options, &log_context, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); - log::Writer* log_writer = logs_.back().writer; - mutex_.Unlock(); // This can set non-OK status if callback fail. last_batch_group_size_ = @@ -536,8 +529,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, current_sequence); + io_s = WriteToWAL(wal_write_group, log_context.writer, log_used, + log_context.need_log_sync, + log_context.need_log_dir_sync, current_sequence); w.status = io_s; } @@ -549,10 +543,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } } - if (need_log_sync) { - mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); - mutex_.Unlock(); + if (log_context.need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, w.status); } write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); @@ -689,9 +682,8 @@ Status DBImpl::WriteImplWALOnly( // TODO(myabandeh): Make preliminary checks thread-safe so we could do them // without paying the cost of obtaining the mutex. if (status.ok()) { - InstrumentedMutexLock l(&mutex_); - bool need_log_sync = false; - status = PreprocessWrite(write_options, &need_log_sync, &write_context); + LogContext log_context; + status = PreprocessWrite(write_options, &log_context, &write_context); WriteStatusCheckOnLocked(status); } if (!status.ok()) { @@ -836,9 +828,8 @@ Status DBImpl::WriteImplWALOnly( void DBImpl::WriteStatusCheckOnLocked(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. - // Caller must hold mutex_. + InstrumentedMutexLock l(&mutex_); assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); - mutex_.AssertHeld(); if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { // Maybe change the return status to void? @@ -892,22 +883,22 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) { } Status DBImpl::PreprocessWrite(const WriteOptions& write_options, - bool* need_log_sync, + LogContext* log_context, WriteContext* write_context) { - mutex_.AssertHeld(); - assert(write_context != nullptr && need_log_sync != nullptr); + assert(write_context != nullptr && log_context != nullptr); Status status; if (error_handler_.IsDBStopped()) { + InstrumentedMutexLock l(&mutex_); status = error_handler_.GetBGError(); } PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); - assert(!single_column_family_mode_ || - versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); - if (UNLIKELY(status.ok() && !single_column_family_mode_ && + if (UNLIKELY(status.ok() && + !single_column_family_mode_.load(std::memory_order_acquire) && total_log_size_ > GetMaxTotalWalSize())) { + InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = SwitchWAL(write_context); } @@ -918,15 +909,18 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. + InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = HandleWriteBufferFull(write_context); } if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { + InstrumentedMutexLock l(&mutex_); status = TrimMemtableHistory(write_context); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + InstrumentedMutexLock l(&mutex_); WaitForPendingWrites(); status = ScheduleFlushes(write_context); } @@ -942,11 +936,13 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. + InstrumentedMutexLock l(&mutex_); status = DelayWrite(last_batch_group_size_, write_options); PERF_TIMER_START(write_pre_and_post_process_time); } - if (status.ok() && *need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + if (status.ok() && log_context->need_log_sync) { // Wait until the parallel syncs are finished. Any sync process has to sync // the front log too so it is enough to check the status of front() // We do a while loop since log_sync_cv_ is signalled when any sync is @@ -967,8 +963,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, log.getting_synced = true; } } else { - *need_log_sync = false; + log_context->need_log_sync = false; } + log_context->writer = logs_.back().writer; + log_context->need_log_dir_sync = + log_context->need_log_dir_sync && !log_dir_synced_; return status; } @@ -1416,10 +1415,11 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } uint64_t DBImpl::GetMaxTotalWalSize() const { - mutex_.AssertHeld(); - return mutable_db_options_.max_total_wal_size == 0 - ? 4 * max_total_in_memory_state_ - : mutable_db_options_.max_total_wal_size; + auto max_total_wal_size = max_total_wal_size_.load(std::memory_order_acquire); + if (max_total_wal_size > 0) { + return max_total_wal_size; + } + return 4 * max_total_in_memory_state_; } // REQUIRES: mutex_ is held @@ -1739,7 +1739,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_recycle_files_.pop_front(); } if (s.ok() && creating_new_log) { - log_write_mutex_.Lock(); + InstrumentedMutexLock l(&log_write_mutex_); assert(new_log != nullptr); if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one @@ -1763,7 +1763,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); } - log_write_mutex_.Unlock(); } if (!s.ok()) { diff --git a/db/error_handler.cc b/db/error_handler.cc index 7aa4aa82689c..c5120534a156 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -321,6 +321,9 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas RecoverFromNoSpace(); } } + if (bg_error_.severity() >= Status::Severity::kHardError) { + stop_state_.store(true, std::memory_order_release); + } return bg_error_; } @@ -350,6 +353,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, bool auto_recovery = false; Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError); bg_error_ = bg_err; + if (bg_error_.severity() >= Status::Severity::kHardError) { + stop_state_.store(true, std::memory_order_release); + } + if (recovery_in_prog_ && recovery_error_.ok()) { recovery_error_ = bg_err; } @@ -401,6 +408,9 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, if (bg_err.severity() > bg_error_.severity()) { bg_error_ = bg_err; } + if (bg_error_.severity() >= Status::Severity::kHardError) { + stop_state_.store(true, std::memory_order_release); + } recover_context_ = context; return StartRecoverFromRetryableBGIOError(bg_io_err); } @@ -618,6 +628,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { // the bg_error and notify user. TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverSuccess"); Status old_bg_error = bg_error_; + stop_state_.store(false, std::memory_order_release); bg_error_ = Status::OK(); EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, old_bg_error, db_mutex_); diff --git a/db/error_handler.h b/db/error_handler.h index 084434101aaa..0b061007288b 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -40,7 +40,8 @@ class ErrorHandler { db_mutex_(db_mutex), auto_recovery_(false), recovery_in_prog_(false), - soft_error_no_bg_work_(false) {} + soft_error_no_bg_work_(false), + stop_state_(false) {} ~ErrorHandler() { bg_error_.PermitUncheckedError(); recovery_error_.PermitUncheckedError(); @@ -63,16 +64,14 @@ class ErrorHandler { Status ClearBGError(); - bool IsDBStopped() { - return !bg_error_.ok() && - bg_error_.severity() >= Status::Severity::kHardError; - } + bool IsDBStopped() { return stop_state_.load(std::memory_order_acquire); } - bool IsBGWorkStopped() { - return !bg_error_.ok() && - (bg_error_.severity() >= Status::Severity::kHardError || - !auto_recovery_ || soft_error_no_bg_work_); - } + // this method must be protect in mutex. + bool IsBGWorkStopped() { + return !bg_error_.ok() && + (bg_error_.severity() >= Status::Severity::kHardError || + !auto_recovery_ || soft_error_no_bg_work_); + } bool IsSoftErrorNoBGWork() { return soft_error_no_bg_work_; } @@ -109,6 +108,7 @@ class ErrorHandler { // Used to store the context for recover, such as flush reason. DBRecoverContext recover_context_; + std::atomic stop_state_; Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); void RecoverFromNoSpace(); diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 5a714b9b85a4..5eb600c3bf99 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -385,7 +385,7 @@ void ProfileQueries(bool enabled_time = false) { EXPECT_GT(hist_write_scheduling_time.Average(), 0); #ifndef NDEBUG - ASSERT_GT(total_db_mutex_nanos, 2000U); + ASSERT_LT(total_db_mutex_nanos, 100U); #endif } From d399620e2fa53da22fc4601f58008ca3ee2b7655 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Feb 2022 15:45:07 +0800 Subject: [PATCH 02/29] fix conflict Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 32 +++++--------------------------- db/db_impl/db_impl_open.cc | 15 ++++++++------- db/db_impl/db_impl_write.cc | 5 ++--- db/error_handler.cc | 13 ++++--------- 4 files changed, 19 insertions(+), 46 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bc1d0a16b630..32660057e195 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -271,15 +271,11 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, mutable_db_options_.Dump(immutable_db_options_.info_log.get()); DumpSupportInfo(immutable_db_options_.info_log.get()); - // always open the DB with 0 here, which means if preserve_deletes_==true - // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber() - // is called by client and this seqnum is advanced. - preserve_deletes_seqnum_.store(0); max_total_wal_size_.store(mutable_db_options_.max_total_wal_size, std::memory_order_relaxed); - /*if (write_buffer_manager_) { + if (write_buffer_manager_) { wbm_stall_.reset(new WBMStallInterface()); - }*/ + } } Status DBImpl::Resume() { @@ -633,25 +629,7 @@ Status DBImpl::CloseHelper() { job_context.Clean(); mutex_.Lock(); } - { - InstrumentedMutexLock lock(&log_write_mutex_); - for (auto l : logs_to_free_) { - delete l; - } - for (auto& log : logs_) { - uint64_t log_number = log.writer->get_log_number(); - Status s = log.ClearWriter(); - if (!s.ok()) { - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Unable to Sync WAL file %s with error -- %s", - LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), - s.ToString().c_str()); - // Retain the first error - if (ret.ok()) { - ret = s; - } - /* + log_write_mutex_.Lock(); for (auto l : logs_to_free_) { delete l; } @@ -667,11 +645,11 @@ Status DBImpl::CloseHelper() { // Retain the first error if (ret.ok()) { ret = s; - */ } } - logs_.clear(); } + logs_.clear(); + log_write_mutex_.Unlock(); // Table cache may have table handles holding blocks from the block cache. // We need to release them before the block cache is destroyed. The block diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 4d14e0300685..3a1c1ac07bf6 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1363,7 +1363,12 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { break; } total_log_size_ += log.size; - alive_log_files_.push_back(log); + if (two_write_queues_) { + alive_log_files_.push_back(log); + } else { + InstrumentedMutexLock l(&log_write_mutex_); + alive_log_files_.push_back(log); + } } if (two_write_queues_) { log_write_mutex_.Unlock(); @@ -1697,14 +1702,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); } sv_context.Clean(); - if (impl->two_write_queues_) { - impl->log_write_mutex_.Lock(); - } + impl->log_write_mutex_.Lock(); impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); - if (impl->two_write_queues_) { - impl->log_write_mutex_.Unlock(); - } + impl->log_write_mutex_.Unlock(); } if (s.ok()) { // In WritePrepared there could be gap in sequence numbers. This breaks diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index b54c3ee70eaa..e96c7c6bd1a7 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -275,8 +275,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool in_parallel_group = false; uint64_t last_sequence = kMaxSequenceNumber; - // The writer will only be used when two_write_queues_ is false. - if (!two_write_queues_ || !disable_memtable) { + assert(!two_write_queues_ || !disable_memtable); + { // With concurrent writes we do preprocess only in the write thread that // also does write to memtable to avoid sync issue on shared data structure // with the other thread @@ -294,7 +294,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); } - // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes diff --git a/db/error_handler.cc b/db/error_handler.cc index da932e199a9a..c3b898fb39c8 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -412,15 +412,7 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err, // it can directly overwrite any existing bg_error_. bool auto_recovery = false; Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError); - bg_error_ = bg_err; - if (bg_error_.severity() >= Status::Severity::kHardError) { - stop_state_.store(true, std::memory_order_release); - } - - if (recovery_in_prog_ && recovery_error_.ok()) { - recovery_error_ = bg_err; - } - //CheckAndSetRecoveryAndBGError(bg_err); + CheckAndSetRecoveryAndBGError(bg_err); if (bg_error_stats_ != nullptr) { RecordTick(bg_error_stats_.get(), ERROR_HANDLER_BG_ERROR_COUNT); RecordTick(bg_error_stats_.get(), ERROR_HANDLER_BG_IO_ERROR_COUNT); @@ -806,6 +798,9 @@ void ErrorHandler::CheckAndSetRecoveryAndBGError(const Status& bg_err) { if (bg_err.severity() > bg_error_.severity()) { bg_error_ = bg_err; } + if (bg_error_.severity() >= Status::Severity::kHardError) { + stop_state_.store(true, std::memory_order_release); + } return; } From ab6edb49730107e2f1d70b887d586f010de2348f Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Feb 2022 15:51:58 +0800 Subject: [PATCH 03/29] fix format Signed-off-by: Little-Wallace --- db/db_impl/db_impl_write.cc | 3 ++- db/error_handler.cc | 11 +---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index e96c7c6bd1a7..61c9d471edb8 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -634,7 +634,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (log_context.need_log_sync) { InstrumentedMutexLock l(&log_write_mutex_); if (w.status.ok()) { - w.status = MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync); + w.status = + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync); } else { MarkLogsNotSynced(logfile_number_); } diff --git a/db/error_handler.cc b/db/error_handler.cc index c3b898fb39c8..49aef9774046 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -478,16 +478,7 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err, return StartRecoverFromRetryableBGIOError(bg_io_err); } else { Status bg_err(new_bg_io_err, Status::Severity::kHardError); - if (recovery_in_prog_ && recovery_error_.ok()) { - recovery_error_ = bg_err; - } - if (bg_err.severity() > bg_error_.severity()) { - bg_error_ = bg_err; - } - if (bg_error_.severity() >= Status::Severity::kHardError) { - stop_state_.store(true, std::memory_order_release); - } - //CheckAndSetRecoveryAndBGError(bg_err); + CheckAndSetRecoveryAndBGError(bg_err); recover_context_ = context; return StartRecoverFromRetryableBGIOError(bg_io_err); } From 5fe832a2f1874ef5e9a8e2fc73fbc243642a1b22 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Feb 2022 23:40:08 +0800 Subject: [PATCH 04/29] do not hold mutex for SyncClosedLogs Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 41 +++++++++++++++----------- db/db_impl/db_impl.h | 5 ++-- db/db_impl/db_impl_compaction_flush.cc | 25 ++++++++++++---- db/db_impl/db_impl_write.cc | 20 ++++++++++--- 4 files changed, 63 insertions(+), 28 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 32660057e195..3abd21b356b1 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1398,19 +1398,37 @@ Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); + VersionEdit synced_wals; { InstrumentedMutexLock l(&log_write_mutex_); if (status.ok()) { - status = MarkLogsSynced(current_log_number, need_log_dir_sync); + MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals); } else { MarkLogsNotSynced(current_log_number); } } + if (status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + status = ApplyWALToManifest(&synced_wals); + } + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); return status; } +Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { + // not empty, write to MANIFEST. + mutex_.AssertHeld(); + Status status = + versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_); + if (!status.ok() && versions_->io_status().IsIOError()) { + status = error_handler_.SetBGError(versions_->io_status(), + BackgroundErrorReason::kManifestWrite); + } + return status; +} + Status DBImpl::LockWAL() { log_write_mutex_.Lock(); auto cur_log_writer = logs_.back().writer; @@ -1430,20 +1448,20 @@ Status DBImpl::UnlockWAL() { return Status::OK(); } -Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { +void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, + VersionEdit* synced_wals) { log_write_mutex_.AssertHeld(); if (synced_dir && logfile_number_ == up_to) { log_dir_synced_ = true; } - VersionEdit synced_wals; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; assert(wal.getting_synced); if (logs_.size() > 1) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.writer->file()->GetFileSize() > 0) { - synced_wals.AddWal(wal.number, - WalMetadata(wal.writer->file()->GetFileSize())); + synced_wals->AddWal(wal.number, + WalMetadata(wal.writer->file()->GetFileSize())); } logs_to_free_.push_back(wal.ReleaseWriter()); it = logs_.erase(it); @@ -1454,22 +1472,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { } assert(logs_.empty() || logs_[0].number > up_to || (logs_.size() == 1 && !logs_[0].getting_synced)); - - Status s; - if (synced_wals.IsWalAddition()) { - // not empty, write to MANIFEST. - s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_); - if (!s.ok() && versions_->io_status().IsIOError()) { - s = error_handler_.SetBGError(versions_->io_status(), - BackgroundErrorReason::kManifestWrite); - } - } log_sync_cv_.SignalAll(); - return s; } void DBImpl::MarkLogsNotSynced(uint64_t up_to) { - mutex_.AssertHeld(); + log_write_mutex_.AssertHeld(); for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; ++it) { auto& wal = *it; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index aaeb47dec89b..472812a900ac 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1606,7 +1606,7 @@ class DBImpl : public DB { void ReleaseFileNumberFromPendingOutputs( std::unique_ptr::iterator>& v); - IOStatus SyncClosedLogs(JobContext* job_context); + IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals); // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Then @@ -1877,7 +1877,8 @@ class DBImpl : public DB { std::unique_ptr* token, LogBuffer* log_buffer); // helper function to call after some of the logs_ were synced - Status MarkLogsSynced(uint64_t up_to, bool synced_dir); + void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit); + Status ApplyWALToManifest(VersionEdit* edit); // WALs with log number up to up_to are not synced successfully. void MarkLogsNotSynced(uint64_t up_to); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index fcd237f6840d..0c1bd183f9a1 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -82,7 +82,8 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, return false; } -IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { +IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, + VersionEdit* synced_wals) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); InstrumentedMutexLock l(&log_write_mutex_); autovector logs_to_sync; @@ -134,7 +135,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". if (io_s.ok()) { - io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true)); + MarkLogsSynced(current_log_number - 1, true, synced_wals); } else { MarkLogsNotSynced(current_log_number - 1); } @@ -201,8 +202,15 @@ Status DBImpl::FlushMemTableToOutputFile( bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); if (needs_to_sync_closed_wals) { - // SyncClosedLogs() may unlock and re-lock the db_mutex. - log_io_s = SyncClosedLogs(job_context); + // SyncClosedLogs() may unlock and re-lock the db_log_write_mutex. + VersionEdit synced_wals; + mutex_.Unlock(); + log_io_s = SyncClosedLogs(job_context, &synced_wals); + mutex_.Lock(); + if (log_io_s.ok() && synced_wals.IsWalAddition()) { + log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); + } + if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && !log_io_s.IsColumnFamilyDropped()) { error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); @@ -459,7 +467,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (logfile_number_ > 0) { // TODO (yanqin) investigate whether we should sync the closed logs for // single column family case. - log_io_s = SyncClosedLogs(job_context); + VersionEdit synced_wals; + mutex_.Unlock(); + log_io_s = SyncClosedLogs(job_context, &synced_wals); + mutex_.Lock(); + if (log_io_s.ok() && synced_wals.IsWalAddition()) { + log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); + } + if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && !log_io_s.IsColumnFamilyDropped()) { if (total_log_size_ > 0) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 61c9d471edb8..38ab5512fac3 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -491,13 +491,20 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (log_context.need_log_sync) { + VersionEdit synced_wals; log_write_mutex_.Lock(); if (status.ok()) { - status = MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); } else { MarkLogsNotSynced(logfile_number_); } log_write_mutex_.Unlock(); + if (status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + status = ApplyWALToManifest(&synced_wals); + } + // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily efficient. if (two_write_queues_) { @@ -631,16 +638,20 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } } + VersionEdit synced_wals; if (log_context.need_log_sync) { InstrumentedMutexLock l(&log_write_mutex_); if (w.status.ok()) { - w.status = - MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); } else { MarkLogsNotSynced(logfile_number_); } } - + if (w.status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + w.status = ApplyWALToManifest(&synced_wals); + } write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); } @@ -1064,6 +1075,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, if (write_options.no_slowdown) { status = Status::Incomplete("Write stall"); } else { + InstrumentedMutexLock l(&mutex_); WriteBufferManagerStallWrites(); } } From 9f3dbe5005f7d0504e91ebe6e6a921e1f3333bf3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 2 Mar 2022 13:13:41 +0800 Subject: [PATCH 05/29] release mutex before wait sync Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_files.cc | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 472812a900ac..d52a3f22360f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2230,7 +2230,7 @@ class DBImpl : public DB { // Number of times FindObsoleteFiles has found deletable files and the // corresponding call to PurgeObsoleteFiles has not yet finished. - int pending_purge_obsolete_files_; + std::atomic pending_purge_obsolete_files_; // last time when DeleteObsoleteFiles with full scan was executed. Originally // initialized with startup time. diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 653b22117f53..257f9543d536 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -250,7 +250,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ is empty when called during recovery, in which case there can't yet // be any tracked obsolete logs - InstrumentedMutexLock l(&log_write_mutex_); + log_write_mutex_.Lock(); if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size(); @@ -278,6 +278,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // number < MinLogNumber(). assert(alive_log_files_.size()); } + log_write_mutex_.Unlock(); + mutex_.Unlock(); + log_write_mutex_.Lock(); while (!logs_.empty() && logs_.front().number < min_log_number) { auto& log = logs_.front(); if (log.getting_synced) { @@ -295,12 +298,14 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // We're just cleaning up for DB::Write(). assert(job_context->logs_to_free.empty()); job_context->logs_to_free = logs_to_free_; - job_context->log_recycle_files.assign(log_recycle_files_.begin(), - log_recycle_files_.end()); if (job_context->HaveSomethingToDelete()) { ++pending_purge_obsolete_files_; } logs_to_free_.clear(); + log_write_mutex_.Unlock(); + mutex_.Lock(); + job_context->log_recycle_files.assign(log_recycle_files_.begin(), + log_recycle_files_.end()); } namespace { From e07daeba6857db999e548701a53a043f06db9348 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 10:14:12 -0700 Subject: [PATCH 06/29] Add log_file_number_size to LogContext --- db/db_impl/db_impl.h | 17 ++++++++++------- db/db_impl/db_impl_write.cc | 7 +++++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index a41cb14bcaae..857bb3a22ec7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1518,13 +1518,7 @@ class DBImpl : public DB { } } }; - struct LogContext { - explicit LogContext(bool need_sync = false) - : need_log_sync(need_sync), need_log_dir_sync(need_sync) {} - bool need_log_sync; - bool need_log_dir_sync; - log::Writer* writer; - }; + struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} LogFileNumberSize() {} @@ -1559,6 +1553,15 @@ class DBImpl : public DB { bool getting_synced = false; }; + struct LogContext { + explicit LogContext(bool need_sync = false) + : need_log_sync(need_sync), need_log_dir_sync(need_sync) {} + bool need_log_sync = false; + bool need_log_dir_sync = false; + log::Writer* writer = nullptr; + LogFileNumberSize* log_file_number_size = nullptr; + }; + // PurgeFileInfo is a structure to hold information of files to be deleted in // purge_files_ struct PurgeFileInfo { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 7aec2738e038..514edc0082b1 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -411,6 +411,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!two_write_queues_) { if (status.ok() && !write_options.disableWAL) { + assert(log_context.log_file_number_size); + LogFileNumberSize& log_file_number_size = + *(log_context.log_file_number_size); PERF_TIMER_GUARD(write_wal_time); io_s = WriteToWAL(write_group, log_context.writer, log_used, log_context.need_log_sync, @@ -647,6 +650,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } + assert(log_context.log_file_number_size); + LogFileNumberSize& log_file_number_size = + *(log_context.log_file_number_size); io_s = WriteToWAL(wal_write_group, log_context.writer, log_used, log_context.need_log_sync, log_context.need_log_dir_sync, current_sequence, log_file_number_size); @@ -1135,6 +1141,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, log_context->writer = logs_.back().writer; log_context->need_log_dir_sync = log_context->need_log_dir_sync && !log_dir_synced_; + log_context->log_file_number_size = std::addressof(alive_log_files_.back()); return status; } From db71ee4ebc38a65b28e70da75f2f92ec0fa8bff4 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 13:10:02 -0700 Subject: [PATCH 07/29] Fix double mutex acquisition for secondary instance in FindObsoleteFiles() --- db/db_impl/db_impl_files.cc | 11 +++++++++++ db/db_impl/db_impl_write.cc | 14 ++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 80f4ce6b900e..598c4dbb0961 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -255,6 +255,17 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ is empty when called during recovery, in which case there can't yet // be any tracked obsolete logs log_write_mutex_.Lock(); + + if (alive_log_files_.empty() || logs_.empty()) { + mutex_.AssertHeld(); + // We may reach here if the db is DBImplSecondary + if (job_context->HaveSomethingToDelete()) { + ++pending_purge_obsolete_files_; + } + log_write_mutex_.Unlock(); + return; + } + if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size(); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 514edc0082b1..2bfda5dc72af 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -415,9 +415,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, LogFileNumberSize& log_file_number_size = *(log_context.log_file_number_size); PERF_TIMER_GUARD(write_wal_time); - io_s = WriteToWAL(write_group, log_context.writer, log_used, - log_context.need_log_sync, - log_context.need_log_dir_sync, last_sequence + 1, log_file_number_size); + io_s = + WriteToWAL(write_group, log_context.writer, log_used, + log_context.need_log_sync, log_context.need_log_dir_sync, + last_sequence + 1, log_file_number_size); } } else { if (status.ok() && !write_options.disableWAL) { @@ -653,9 +654,10 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, assert(log_context.log_file_number_size); LogFileNumberSize& log_file_number_size = *(log_context.log_file_number_size); - io_s = WriteToWAL(wal_write_group, log_context.writer, log_used, - log_context.need_log_sync, - log_context.need_log_dir_sync, current_sequence, log_file_number_size); + io_s = + WriteToWAL(wal_write_group, log_context.writer, log_used, + log_context.need_log_sync, log_context.need_log_dir_sync, + current_sequence, log_file_number_size); w.status = io_s; } From 91090c7435225255b5b41399ba826f16bd24a672 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 14:51:46 -0700 Subject: [PATCH 08/29] Fix data race in DBTest.TestLogCleanup, reported by TSAN --- db/db_impl/db_impl_debug.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 5647aa26f272..ed4149b85b21 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -223,7 +223,7 @@ void DBImpl::TEST_EndWrite(void* w) { } size_t DBImpl::TEST_LogsToFreeSize() { - InstrumentedMutexLock l(&mutex_); + InstrumentedMutexLock l(&log_write_mutex_); return logs_to_free_.size(); } From 3370fe460e7ac0f3b74a4f226f39383b6119c072 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 18:26:03 -0700 Subject: [PATCH 09/29] Fix compilation after merge One test becomes flaky: ~/gtest-parallel/gtest-parallel -r 100 ./db_write_test --gtest_filter=DBWriteTestInstance/DBWriteTest.ConcurrentlyDisabledWAL/2 --- db/db_impl/db_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 832a9efa8ed1..92e7da33a547 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1474,7 +1474,7 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, assert(wal.getting_synced); if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.writer->file()->GetFileSize() > 0) { - synced_wals.AddWal(wal.number, + synced_wals->AddWal(wal.number, WalMetadata(wal.writer->file()->GetFileSize())); } From d8cd64c9cf1d7a7e98c496452e96b55fd5484884 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 18:30:01 -0700 Subject: [PATCH 10/29] fix format --- db/db_impl/db_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 92e7da33a547..53a2dc7c5bcd 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1475,7 +1475,7 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.writer->file()->GetFileSize() > 0) { synced_wals->AddWal(wal.number, - WalMetadata(wal.writer->file()->GetFileSize())); + WalMetadata(wal.writer->file()->GetFileSize())); } if (logs_.size() > 1) { From d26cb103aca4a3475761bcb7db2596df8a61d1a8 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 22:56:18 -0700 Subject: [PATCH 11/29] Wal syncing VersionEdits out-of-order Summary: With changes in SyncWAL(), VersionEdits to sync WALs are computed in log_write_mutex which is later released. Next, calling thread acquires mutex_ and call LogAndApply to actually write the VersionEdits to MANIFEST. Consequently, it is possible that two threads syncing the same WAL with different sizes write to the MANIFEST out-of-order. It's possible that thread 1 syncing the first 10 bytes of 1.log arrive later than thread 2 syncing the first 20 bytes of 1.log. Therefore, update WalSet::AddWal() so that it will not report corruption in this case. Test Plan: make check ./version_set_test --gtest_filter=VersionSetTest.AddWalWithSmallerSize ./db_write_test --gtest_filter=DBWriteTest.ConcurrentlyDisabledWAL --- db/version_set_test.cc | 14 ++++++++------ db/wal_edit.cc | 37 ++++++++++++++++++++++--------------- db/wal_edit.h | 10 ++++++++++ 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 09170dcf00b8..28163c94aa19 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1970,6 +1970,7 @@ TEST_F(VersionSetTest, WalCreateAfterClose) { TEST_F(VersionSetTest, AddWalWithSmallerSize) { NewDB(); + assert(versions_); constexpr WalNumber kLogNumber = 10; constexpr uint64_t kSizeInBytes = 111; @@ -1982,6 +1983,9 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) { ASSERT_OK(LogAndApplyToDefaultCF(edit)); } + // Copy for future comparison. + const std::map wals1 = + versions_->GetWalSet().GetWals(); { // Add the same WAL with smaller synced size. @@ -1990,13 +1994,11 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) { edit.AddWal(kLogNumber, wal); Status s = LogAndApplyToDefaultCF(edit); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE( - s.ToString().find( - "WAL 10 must not have smaller synced size than previous one") != - std::string::npos) - << s.ToString(); + ASSERT_OK(s); } + const std::map wals2 = + versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals1, wals2); } TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 786d68b5c830..2525be610b47 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -112,26 +112,33 @@ Status WalSet::AddWal(const WalAddition& wal) { auto it = wals_.lower_bound(wal.GetLogNumber()); bool existing = it != wals_.end() && it->first == wal.GetLogNumber(); - if (existing && !wal.GetMetadata().HasSyncedSize()) { - std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() << " is created more than once"; - return Status::Corruption("WalSet::AddWal", ss.str()); + + if (!existing) { + wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()}); + return Status::OK(); } - // If the WAL has synced size, it must >= the previous size. - if (wal.GetMetadata().HasSyncedSize() && existing && - it->second.HasSyncedSize() && - wal.GetMetadata().GetSyncedSizeInBytes() < - it->second.GetSyncedSizeInBytes()) { + + assert(existing); + if (!wal.GetMetadata().HasSyncedSize()) { std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() - << " must not have smaller synced size than previous one"; + ss << "WAL " << wal.GetLogNumber() << " is created more than once"; return Status::Corruption("WalSet::AddWal", ss.str()); } - if (existing) { - it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes()); - } else { - wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()}); + + assert(wal.GetMetadata().HasSyncedSize()); + if (it->second.HasSyncedSize() && wal.GetMetadata().GetSyncedSizeInBytes() <= + it->second.GetSyncedSizeInBytes()) { + // This is possible because version edits with different synced WAL sizes + // for the same WAL can be committed out-of-order. For example, thread + // 1 synces the first 10 bytes of 1.log, while thread 2 synces the first 20 + // bytes of 1.log. It's possible that thread 1 calls LogAndApply() after + // thread 2. + // In this case, just return ok. + return Status::OK(); } + + // Update synced size for the given WAL. + it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes()); return Status::OK(); } diff --git a/db/wal_edit.h b/db/wal_edit.h index 23dc589051b9..bb5c5e292ee1 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -42,6 +42,8 @@ class WalMetadata { uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; } private: + friend bool operator==(const WalMetadata& lhs, const WalMetadata& rhs); + friend bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs); // The size of WAL is unknown, used when the WAL is not synced yet or is // empty. constexpr static uint64_t kUnknownWalSize = @@ -51,6 +53,14 @@ class WalMetadata { uint64_t synced_size_bytes_ = kUnknownWalSize; }; +inline bool operator==(const WalMetadata& lhs, const WalMetadata& rhs) { + return lhs.synced_size_bytes_ == rhs.synced_size_bytes_; +} + +inline bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs) { + return !(lhs == rhs); +} + // These tags are persisted to MANIFEST, so it's part of the user API. enum class WalAdditionTag : uint32_t { // Indicates that there are no more tags. From 9ca6f527726281503ffdc83c9ff8e8bc5e0de74e Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 23:29:29 -0700 Subject: [PATCH 12/29] Fix atest --- db/wal_edit_test.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc index 5895e180d1c3..0c18fb125cc7 100644 --- a/db/wal_edit_test.cc +++ b/db/wal_edit_test.cc @@ -54,12 +54,11 @@ TEST(WalSet, SmallerSyncedSize) { constexpr uint64_t kBytes = 100; WalSet wals; ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); + const auto wals1 = wals.GetWals(); Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(0))); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE( - s.ToString().find( - "WAL 100 must not have smaller synced size than previous one") != - std::string::npos); + const auto wals2 = wals.GetWals(); + ASSERT_OK(s); + ASSERT_EQ(wals1, wals2); } TEST(WalSet, CreateTwice) { From 84ebc8e572d369ab87bf11d529bb92100f106cc3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 5 Jun 2022 10:15:31 +0800 Subject: [PATCH 13/29] remove useless test Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 37 +++++++++++++++++++------------------ db/wal_edit_test.cc | 13 ------------- 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 5485a5a28369..d18a2b8d53dd 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -642,27 +642,28 @@ Status DBImpl::CloseHelper() { job_context.Clean(); mutex_.Lock(); } - log_write_mutex_.Lock(); - for (auto l : logs_to_free_) { - delete l; - } - for (auto& log : logs_) { - uint64_t log_number = log.writer->get_log_number(); - Status s = log.ClearWriter(); - if (!s.ok()) { - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Unable to Sync WAL file %s with error -- %s", - LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(), - s.ToString().c_str()); - // Retain the first error - if (ret.ok()) { - ret = s; + { + InstrumentedMutexLock lock(&log_write_mutex_); + for (auto l : logs_to_free_) { + delete l; + } + for (auto& log : logs_) { + uint64_t log_number = log.writer->get_log_number(); + Status s = log.ClearWriter(); + if (!s.ok()) { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Unable to Sync WAL file %s with error -- %s", + LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(), + s.ToString().c_str()); + // Retain the first error + if (ret.ok()) { + ret = s; + } } } + logs_.clear(); } - logs_.clear(); - log_write_mutex_.Unlock(); // Table cache may have table handles holding blocks from the block cache. // We need to release them before the block cache is destroyed. The block diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc index 5895e180d1c3..7c0ca0681d9a 100644 --- a/db/wal_edit_test.cc +++ b/db/wal_edit_test.cc @@ -49,19 +49,6 @@ TEST(WalSet, Overwrite) { ASSERT_EQ(wals.GetWals().at(kNumber).GetSyncedSizeInBytes(), kBytes); } -TEST(WalSet, SmallerSyncedSize) { - constexpr WalNumber kNumber = 100; - constexpr uint64_t kBytes = 100; - WalSet wals; - ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); - Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(0))); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE( - s.ToString().find( - "WAL 100 must not have smaller synced size than previous one") != - std::string::npos); -} - TEST(WalSet, CreateTwice) { constexpr WalNumber kNumber = 100; WalSet wals; From a109c81b5310e0fde938fc4e61f414e9b0b960d6 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 5 Jun 2022 20:49:04 +0800 Subject: [PATCH 14/29] add test print info Signed-off-by: Little-Wallace --- utilities/backup/backup_engine.cc | 2 ++ utilities/backup/backup_engine_test.cc | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index a14dbc8800b8..874fe001d708 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -1508,6 +1508,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( } } ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); + printf("add files for backup done [%d], wait finish.\n", io_s.ok()); IOStatus item_io_status; for (auto& item : backup_items_to_finish) { item.result.wait(); @@ -1533,6 +1534,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( } } + printf("finished all item with status: %d.\n", io_s.code()); // we copied all the files, enable file deletions if (disabled.ok()) { // If we successfully disabled file deletions db->EnableFileDeletions(false).PermitUncheckedError(); diff --git a/utilities/backup/backup_engine_test.cc b/utilities/backup/backup_engine_test.cc index 6e47af89f4b2..92be4db4e1f8 100644 --- a/utilities/backup/backup_engine_test.cc +++ b/utilities/backup/backup_engine_test.cc @@ -1115,7 +1115,8 @@ TEST_P(BackupEngineTestWithParam, OfflineIntegrationTest) { destroy_data = false; // kAutoFlushOnly to preserve legacy test behavior (consider updating) FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly); - ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0)) + << "iter: " << iter << ", idx: " << i; CloseDBAndBackupEngine(); DestroyDB(dbname_, options_); From 4a115ef59d7a959b384bc441b05c38154d8f4f91 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Sun, 5 Jun 2022 14:17:55 -0700 Subject: [PATCH 15/29] Deflake DBCompactionTestWithParam.FixFileIngestionCompactionDeadlock --- db/db_compaction_test.cc | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d307cadbbc24..939f677f5da6 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5517,18 +5517,10 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { for (int j = 0; j != kNumKeysPerFile; ++j) { ASSERT_OK(Put(Key(j), rnd.RandomString(990))); } - if (0 == i) { - // When we reach here, the memtables have kNumKeysPerFile keys. Note that - // flush is not yet triggered. We need to write an extra key so that the - // write path will call PreprocessWrite and flush the previous key-value - // pairs to e flushed. After that, there will be the newest key in the - // memtable, and a bunch of L0 files. Since there is already one key in - // the memtable, then for i = 1, 2, ..., we do not have to write this - // extra key to trigger flush. - ASSERT_OK(Put("", "")); + if (i > 0) { + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i); } - ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); - ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1); } // When we reach this point, there will be level0_stop_writes_trigger L0 // files and one extra key (99) in memory, which overlaps with the external From 9c16276cfa0d7339bd4b8e36673c9c6dd481bf4c Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 6 Jun 2022 10:15:29 +0800 Subject: [PATCH 16/29] add more print test Signed-off-by: Little-Wallace --- utilities/backup/backup_engine.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index 874fe001d708..911b5f3a939c 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -1300,6 +1300,9 @@ IOStatus BackupEngineImpl::Initialize() { result.db_session_id = work_item.db_session_id; result.expected_src_temperature = work_item.src_temperature; result.current_src_temperature = temp; + if (!result.io_status.ok()) { + printf("failed to copy or create file\n"); + } if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { // unknown checksum function name implies no db table file checksum in // db manifest; work_item.src_checksum_hex not empty means @@ -1445,6 +1448,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( return IOStatus::OK(); } Log(options_.info_log, "add file for backup %s", fname.c_str()); + printf("copy file for backup %s.\n", fname.c_str()); uint64_t size_bytes = 0; IOStatus io_st; if (type == kTableFile || type == kBlobFile) { @@ -1493,6 +1497,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( [&](const std::string& fname, const std::string& contents, FileType type) { Log(options_.info_log, "add file for backup %s", fname.c_str()); + printf("add file for backup %s.\n", fname.c_str()); return AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, false /* shared */, "" /* src_dir */, fname, @@ -1532,9 +1537,11 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( if (!item_io_status.ok()) { io_s = item_io_status; } + if (!io_s.ok()) { + printf("finished item failed: %s.\n", io_s.ToString().c_str()); + } } - printf("finished all item with status: %d.\n", io_s.code()); // we copied all the files, enable file deletions if (disabled.ok()) { // If we successfully disabled file deletions db->EnableFileDeletions(false).PermitUncheckedError(); @@ -2300,6 +2307,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem( } else if (exist.IsNotFound()) { file_exists = false; } else { + printf("final_dest_path exist failed"); return exist; } } @@ -2355,6 +2363,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem( src_path, db_fs_, src_env_options, size_limit, &checksum_hex, src_temperature); if (!io_s.ok()) { + printf("ReadFileAndComputeChecksum failed"); return io_s; } } From 929955e7aba98de8b385f990130dfb7b06b28c57 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 6 Jun 2022 11:21:10 +0800 Subject: [PATCH 17/29] remove test print Signed-off-by: Little-Wallace --- utilities/backup/backup_engine.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index 911b5f3a939c..a14dbc8800b8 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -1300,9 +1300,6 @@ IOStatus BackupEngineImpl::Initialize() { result.db_session_id = work_item.db_session_id; result.expected_src_temperature = work_item.src_temperature; result.current_src_temperature = temp; - if (!result.io_status.ok()) { - printf("failed to copy or create file\n"); - } if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { // unknown checksum function name implies no db table file checksum in // db manifest; work_item.src_checksum_hex not empty means @@ -1448,7 +1445,6 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( return IOStatus::OK(); } Log(options_.info_log, "add file for backup %s", fname.c_str()); - printf("copy file for backup %s.\n", fname.c_str()); uint64_t size_bytes = 0; IOStatus io_st; if (type == kTableFile || type == kBlobFile) { @@ -1497,7 +1493,6 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( [&](const std::string& fname, const std::string& contents, FileType type) { Log(options_.info_log, "add file for backup %s", fname.c_str()); - printf("add file for backup %s.\n", fname.c_str()); return AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, false /* shared */, "" /* src_dir */, fname, @@ -1513,7 +1508,6 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( } } ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); - printf("add files for backup done [%d], wait finish.\n", io_s.ok()); IOStatus item_io_status; for (auto& item : backup_items_to_finish) { item.result.wait(); @@ -1537,9 +1531,6 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( if (!item_io_status.ok()) { io_s = item_io_status; } - if (!io_s.ok()) { - printf("finished item failed: %s.\n", io_s.ToString().c_str()); - } } // we copied all the files, enable file deletions @@ -2307,7 +2298,6 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem( } else if (exist.IsNotFound()) { file_exists = false; } else { - printf("final_dest_path exist failed"); return exist; } } @@ -2363,7 +2353,6 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem( src_path, db_fs_, src_env_options, size_limit, &checksum_hex, src_temperature); if (!io_s.ok()) { - printf("ReadFileAndComputeChecksum failed"); return io_s; } } From d9e17239fdc5ce0eac219b1c4b11a3f6b5e71c6d Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 8 Jun 2022 17:44:18 +0800 Subject: [PATCH 18/29] fix delete log files Signed-off-by: Little-Wallace --- db/db_impl/db_impl_files.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 598c4dbb0961..777e347ad80c 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -256,11 +256,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // be any tracked obsolete logs log_write_mutex_.Lock(); + ++pending_purge_obsolete_files_; if (alive_log_files_.empty() || logs_.empty()) { mutex_.AssertHeld(); // We may reach here if the db is DBImplSecondary - if (job_context->HaveSomethingToDelete()) { - ++pending_purge_obsolete_files_; + if (!job_context->HaveSomethingToDelete()) { + --pending_purge_obsolete_files_; } log_write_mutex_.Unlock(); return; @@ -313,8 +314,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // We're just cleaning up for DB::Write(). assert(job_context->logs_to_free.empty()); job_context->logs_to_free = logs_to_free_; - if (job_context->HaveSomethingToDelete()) { - ++pending_purge_obsolete_files_; + if (!job_context->HaveSomethingToDelete()) { + --pending_purge_obsolete_files_; } logs_to_free_.clear(); log_write_mutex_.Unlock(); From 2e6f238bd4ce7e098952f59ae6ba7187dc26c446 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 10 Jun 2022 13:19:13 +0800 Subject: [PATCH 19/29] address comment Signed-off-by: Little-Wallace --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 1 - db/db_impl/db_impl.h | 3 --- db/db_impl/db_impl_write.cc | 2 +- 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 8caf7ce034c3..37041bdd80e6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -30,6 +30,7 @@ ### Behavior changes * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984) +* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table. ## 7.3.0 (05/20/2022) ### Bug Fixes diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 3694bb47b44a..a02762852324 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2826,7 +2826,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); - single_column_family_mode_.store(false, std::memory_order_release); } else { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Creating column family [%s] FAILED -- %s", diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f00b3165907d..075e95c9b45b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1272,9 +1272,6 @@ class DBImpl : public DB { // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families std::atomic max_total_in_memory_state_; - // If true, we have only one (default) column family. We use this to optimize - // some code-paths - std::atomic single_column_family_mode_; // The options to access storage files const FileOptions file_options_; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 380c1050ba20..940ac1d5f323 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1639,7 +1639,7 @@ uint64_t DBImpl::GetMaxTotalWalSize() const { if (max_total_wal_size > 0) { return max_total_wal_size; } - return 4 * max_total_in_memory_state_; + return 4 * max_total_in_memory_state_.load(std::memory_order_acquire); } // REQUIRES: mutex_ is held From 1f7f6c8e30fe0414953b554e530557e9d25d18b1 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 23 Jun 2022 15:34:31 -0700 Subject: [PATCH 20/29] Protect pending_purge_obsolete_files_ with mutex_ --- db/db_impl/db_impl_files.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1abab0fbfc72..0358df45ff68 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -314,14 +314,15 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // We're just cleaning up for DB::Write(). assert(job_context->logs_to_free.empty()); job_context->logs_to_free = logs_to_free_; - if (!job_context->HaveSomethingToDelete()) { - --pending_purge_obsolete_files_; - } + logs_to_free_.clear(); log_write_mutex_.Unlock(); mutex_.Lock(); job_context->log_recycle_files.assign(log_recycle_files_.begin(), log_recycle_files_.end()); + if (!job_context->HaveSomethingToDelete()) { + --pending_purge_obsolete_files_; + } } namespace { From 4f1e10443c72b49c2c6c2ea06aef23e80678f683 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 23 Jun 2022 20:25:58 -0700 Subject: [PATCH 21/29] Replace a compare func with lambda --- db/db_impl/db_impl_files.cc | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index ba78b76392d9..da87aa5cd010 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -335,19 +335,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, log_recycle_files_.end()); } -namespace { -bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, - const JobContext::CandidateFileInfo& second) { - if (first.file_name > second.file_name) { - return true; - } else if (first.file_name < second.file_name) { - return false; - } else { - return (first.file_path > second.file_path); - } -} -} // namespace - // Delete obsolete files and log status and information of file deletion void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, const std::string& path_to_sync, @@ -452,7 +439,16 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // dedup state.candidate_files so we don't try to delete the same // file twice std::sort(candidate_files.begin(), candidate_files.end(), - CompareCandidateFile); + [](const JobContext::CandidateFileInfo& lhs, + const JobContext::CandidateFileInfo& rhs) { + if (lhs.file_name > rhs.file_name) { + return true; + } else if (lhs.file_name < rhs.file_name) { + return false; + } else { + return (lhs.file_path > rhs.file_path); + } + }); candidate_files.erase( std::unique(candidate_files.begin(), candidate_files.end()), candidate_files.end()); From 56d8e153a99993abb8ab30009f58296311edffe1 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 1 Jul 2022 14:46:05 -0700 Subject: [PATCH 22/29] DBImpl::pending_obsolete_files_ does not have to be atomic --- db/db_impl/db_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c7ab4094714c..371ee0110ed9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2478,7 +2478,7 @@ class DBImpl : public DB { // Number of times FindObsoleteFiles has found deletable files and the // corresponding call to PurgeObsoleteFiles has not yet finished. - std::atomic pending_purge_obsolete_files_; + int pending_purge_obsolete_files_; // last time when DeleteObsoleteFiles with full scan was executed. Originally // initialized with startup time. From 644e0da575eec829978e0e44e4258db9b828117b Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 1 Jul 2022 17:17:12 -0700 Subject: [PATCH 23/29] Rename a variable --- db/error_handler.cc | 6 +++--- db/error_handler.h | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/db/error_handler.cc b/db/error_handler.cc index fa2cfe2ea9b9..1df01267faad 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -359,7 +359,7 @@ const Status& ErrorHandler::HandleKnownErrors(const Status& bg_err, } } if (bg_error_.severity() >= Status::Severity::kHardError) { - stop_state_.store(true, std::memory_order_release); + is_db_stopped_.store(true, std::memory_order_release); } return bg_error_; } @@ -739,7 +739,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { // the bg_error and notify user. TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverSuccess"); Status old_bg_error = bg_error_; - stop_state_.store(false, std::memory_order_release); + is_db_stopped_.store(false, std::memory_order_release); bg_error_ = Status::OK(); bg_error_.PermitUncheckedError(); EventHelpers::NotifyOnErrorRecoveryEnd( @@ -797,7 +797,7 @@ void ErrorHandler::CheckAndSetRecoveryAndBGError(const Status& bg_err) { bg_error_ = bg_err; } if (bg_error_.severity() >= Status::Severity::kHardError) { - stop_state_.store(true, std::memory_order_release); + is_db_stopped_.store(true, std::memory_order_release); } return; } diff --git a/db/error_handler.h b/db/error_handler.h index 488cb1b5161b..e7c47b76370d 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -38,7 +38,7 @@ class ErrorHandler { auto_recovery_(false), recovery_in_prog_(false), soft_error_no_bg_work_(false), - stop_state_(false), + is_db_stopped_(false), bg_error_stats_(db_options.statistics) { // Clear the checked flag for uninitialized errors bg_error_.PermitUncheckedError(); @@ -60,10 +60,11 @@ class ErrorHandler { Status ClearBGError(); - bool IsDBStopped() { return stop_state_.load(std::memory_order_acquire); } + bool IsDBStopped() { return is_db_stopped_.load(std::memory_order_acquire); } - // this method must be protect in mutex. bool IsBGWorkStopped() { + assert(db_mutex_); + db_mutex_->AssertHeld(); return !bg_error_.ok() && (bg_error_.severity() >= Status::Severity::kHardError || !auto_recovery_ || soft_error_no_bg_work_); @@ -104,7 +105,7 @@ class ErrorHandler { // Used to store the context for recover, such as flush reason. DBRecoverContext recover_context_; - std::atomic stop_state_; + std::atomic is_db_stopped_; // The pointer of DB statistics. std::shared_ptr bg_error_stats_; From baeb309a342e94e40ca5c3abff0d9229963c856e Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 5 Jul 2022 15:14:13 -0700 Subject: [PATCH 24/29] Some minor changes that does not change actual behavior --- db/db_impl/db_impl_open.cc | 3 +-- db/db_impl/db_impl_write.cc | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 51b263d427c8..d2de0e99bbd4 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1861,10 +1861,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { - impl->log_write_mutex_.Lock(); + InstrumentedMutexLock wl(&impl->log_write_mutex_); impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); - impl->log_write_mutex_.Unlock(); } if (s.ok()) { // In WritePrepared there could be gap in sequence numbers. This breaks diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a4cdf7eefa12..0f539e2fa3a6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1722,7 +1722,8 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { } uint64_t DBImpl::GetMaxTotalWalSize() const { - auto max_total_wal_size = max_total_wal_size_.load(std::memory_order_acquire); + uint64_t max_total_wal_size = + max_total_wal_size_.load(std::memory_order_acquire); if (max_total_wal_size > 0) { return max_total_wal_size; } From 86788254a6a0e0126cd79b5cc84df4d9a6152a71 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 5 Jul 2022 19:10:28 -0700 Subject: [PATCH 25/29] alive_log_files_ actually relies on db mutex --- db/db_impl/db_impl_open.cc | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d2de0e99bbd4..b70ba32cf10c 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1446,9 +1446,6 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { Status s; mutex_.AssertHeld(); assert(immutable_db_options_.avoid_flush_during_recovery); - if (two_write_queues_) { - log_write_mutex_.Lock(); - } // Mark these as alive so they'll be considered for deletion later by // FindObsoleteFiles() total_log_size_ = 0; @@ -1471,15 +1468,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { break; } total_log_size_ += log.size; - if (two_write_queues_) { - alive_log_files_.push_back(log); - } else { - InstrumentedMutexLock l(&log_write_mutex_); - alive_log_files_.push_back(log); - } - } - if (two_write_queues_) { - log_write_mutex_.Unlock(); + alive_log_files_.push_back(log); } return s; } @@ -1861,11 +1850,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { - InstrumentedMutexLock wl(&impl->log_write_mutex_); impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); - } - if (s.ok()) { // In WritePrepared there could be gap in sequence numbers. This breaks // the trick we use in kPointInTimeRecovery which assumes the first seq in // the log right after the corrupted log is one larger than the last seq From d8c8a9520827a955210e9cafaaa240d4ee9b41b9 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 6 Jul 2022 09:59:40 -0700 Subject: [PATCH 26/29] Fix a typo caused by search/replace --- db/db_impl/db_impl_compaction_flush.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 96bd1a5269b4..23492581c96e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -221,7 +221,8 @@ Status DBImpl::FlushMemTableToOutputFile( bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); if (needs_to_sync_closed_wals) { - // SyncClosedLogs() may unlock and re-lock the db_log_write_mutex. + // SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple + // times. VersionEdit synced_wals; mutex_.Unlock(); log_io_s = SyncClosedLogs(job_context, &synced_wals); From 033500ccf4667f23d3eebe7ab88bb58a343c43f3 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 6 Jul 2022 11:02:57 -0700 Subject: [PATCH 27/29] Update comment for alive_log_files_ --- db/db_impl/db_impl.h | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 371ee0110ed9..eb31116be2fb 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2327,10 +2327,36 @@ class DBImpl : public DB { bool persistent_stats_cfd_exists_ = true; - // Without two_write_queues, read and writes to alive_log_files_ are - // protected by mutex_. With two_write_queues_, writes - // are protected by locking both mutex_ and log_write_mutex_, and reads must - // be under either mutex_ or log_write_mutex_. + // alive_log_files_ is protected by mutex_ and log_write_mutex_ with details + // as follows: + // 1. read by FindObsoleteFiles() which can be called in either application + // thread or RocksDB bg threads, both mutex_ and log_write_mutex_ are + // held. + // 2. pop_front() by FindObsoleteFiles(), both mutex_ and log_write_mutex_ + // are held. + // 3. push_back() by DBImpl::Open() and DBImpl::RestoreAliveLogFiles() + // (actually called by Open()), only mutex_ is held because at this point, + // the DB::Open() call has not returned success to application, and the + // only other thread(s) that can conflict are bg threads calling + // FindObsoleteFiles() which ensure that both mutex_ and log_write_mutex_ + // are held when accessing alive_log_files_. + // 4. read by DBImpl::Open() is protected by mutex_. + // 5. push_back() by SwitchMemtable(). Both mutex_ and log_write_mutex_ are + // held. This is done by the write group leader. Note that in the case of + // two-write-queues, another WAL-only write thread can be writing to the + // WAL concurrently. See 9. + // 6. read by SwitchWAL() with both mutex_ and log_write_mutex_ held. This is + // done by write group leader. + // 7. read by ConcurrentWriteToWAL() by the write group leader in the case of + // two-write-queues. Only log_write_mutex_ is held to protect concurrent + // pop_front() by FindObsoleteFiles(). + // 8. read by PreprocessWrite() by the write group leader. log_write_mutex_ + // is held to protect the data structure from concurrent pop_front() by + // FindObsoleteFiles(). + // 9. read by ConcurrentWriteToWAL() by a WAL-only write thread in the case + // of two-write-queues. Only log_write_mutex_ is held. This suffices to + // protect the data structure from concurrent push_back() by current + // write group leader as well as pop_front() by FindObsoleteFiles(). std::deque alive_log_files_; // Log files that aren't fully synced, and the current log file. From 9bd453363e54828771773bf659a7ba7b60973209 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 6 Jul 2022 13:35:07 -0700 Subject: [PATCH 28/29] Update comment for DBImpl::logs_ --- db/db_impl/db_impl.h | 53 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index eb31116be2fb..b5fb6f064764 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2361,18 +2361,59 @@ class DBImpl : public DB { // Log files that aren't fully synced, and the current log file. // Synchronization: - // - push_back() is done from write_thread_ with locked mutex_ and - // log_write_mutex_ - // - pop_front() is done from any thread with locked mutex_ and - // log_write_mutex_ - // - reads are done with either locked mutex_ or log_write_mutex_ + // 1. read by FindObsoleteFiles() which can be called either in application + // thread or RocksDB bg threads. log_write_mutex_ is always held, while + // some reads are performed without mutex_. + // 2. pop_front() by FindObsoleteFiles() with only log_write_mutex_ held. + // 3. read by DBImpl::Open() with both mutex_ and log_write_mutex_. + // 4. emplace_back() by DBImpl::Open() with both mutex_ and log_write_mutex. + // Note that at this point, DB::Open() has not returned success to + // application, thus the only other thread(s) that can conflict are bg + // threads calling FindObsoleteFiles(). See 1. + // 5. iteration and clear() from CloseHelper() always hold log_write_mutex + // and mutex_. + // 6. back() called by APIs FlushWAL() and LockWAL() are protected by only + // log_write_mutex_. These two can be called by application threads after + // DB::Open() returns success to applications. + // 7. read by SyncWAL(), another API, protected by only log_write_mutex_. + // 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by + // log_write_mutex_. + // 9. erase() by MarkLogsSynced() protected by log_write_mutex_. + // 10. read by SyncClosedLogs() protected by only log_write_mutex_. This can + // happen in bg flush threads after DB::Open() returns success to + // applications. + // 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite() + // holds only the log_write_mutex_. This is done by the write group + // leader. A bg thread calling FindObsoleteFiles() or MarkLogsSynced() + // can happen concurrently. This is fine because log_write_mutex_ is used + // by all parties. See 2, 5, 9. + // 12. reads, empty(), back() called by SwitchMemtable() hold both mutex_ and + // log_write_mutex_. This happens in the write group leader. + // 13. emplace_back() by SwitchMemtable() hold both mutex_ and + // log_write_mutex_. This happens in the write group leader. Can conflict + // with bg threads calling FindObsoleteFiles(), MarkLogsSynced(), + // SyncClosedLogs(), etc. as well as application threads calling + // FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties + // require at least log_write_mutex_. + // 14. iteration called in WriteToWAL(write_group) protected by + // log_write_mutex_. This is done by write group leader when + // two-write-queues is disabled and write needs to sync logs. + // 15. back() called in ConcurrentWriteToWAL() protected by log_write_mutex_. + // This can be done by the write group leader if two-write-queues is + // enabled. It can also be done by another WAL-only write thread. + // + // Other observations: // - back() and items with getting_synced=true are not popped, // - The same thread that sets getting_synced=true will reset it. // - it follows that the object referred by back() can be safely read from - // the write_thread_ without using mutex + // the write_thread_ without using mutex. Note that calling back() without + // mutex may be unsafe because different implementations of deque::back() may + // access other member variables of deque, causing undefined behaviors. + // Generally, do not access stl containers without proper synchronization. // - it follows that the items with getting_synced=true can be safely read // from the same thread that has set getting_synced=true std::deque logs_; + // Signaled when getting_synced becomes false for some of the logs_. InstrumentedCondVar log_sync_cv_; // This is the app-level state that is written to the WAL but will be used From eb5b4d44ab58cdc7a09b3a40c385fb620d5f2885 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 7 Jul 2022 13:00:20 -0700 Subject: [PATCH 29/29] Update comment and improve readability --- db/db_impl/db_impl.cc | 1 + db/db_impl/db_impl.h | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 07fb16b265ac..ca740d7e7780 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1053,6 +1053,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family, } void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { + mutex_.AssertHeld(); if (!job_context->logs_to_free.empty()) { for (auto l : job_context->logs_to_free) { AddToLogsToFreeQueue(l); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b5fb6f064764..ec1aa9fe249d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -997,6 +997,7 @@ class DBImpl : public DB { } void AddToLogsToFreeQueue(log::Writer* log_writer) { + mutex_.AssertHeld(); logs_to_free_queue_.push_back(log_writer); } @@ -2312,8 +2313,9 @@ class DBImpl : public DB { // logfile_number_ is currently updated only in write_thread_, it can be read // from the same write_thread_ without any locks. uint64_t logfile_number_; - std::deque - log_recycle_files_; // a list of log files that we can recycle + // Log files that we can recycle. Must be protected by db mutex_. + std::deque log_recycle_files_; + // Protected by log_write_mutex_. bool log_dir_synced_; // Without two_write_queues, read and writes to log_empty_ are protected by // mutex_. Since it is currently updated/read only in write_thread_, it can be @@ -2428,7 +2430,7 @@ class DBImpl : public DB { std::atomic total_log_size_; // If this is non-empty, we need to delete these log files in background - // threads. Protected by db mutex. + // threads. Protected by log_write_mutex_. autovector logs_to_free_; bool is_snapshot_supported_; @@ -2508,10 +2510,13 @@ class DBImpl : public DB { // JobContext. Current implementation tracks table and blob files only. std::unordered_set files_grabbed_for_purge_; - // A queue to store log writers to close + // A queue to store log writers to close. Protected by db mutex_. std::deque logs_to_free_queue_; + std::deque superversions_to_free_queue_; + int unscheduled_flushes_; + int unscheduled_compactions_; // count how many background compactions are running or have been scheduled in