From c7a9115fff6300da73591d2b7b6c072b153a1b49 Mon Sep 17 00:00:00 2001 From: Toktarev Alexander Date: Tue, 30 Apr 2019 17:29:11 +0300 Subject: [PATCH] Additional logging --- db/column_family.cc | 6 ++++ db/compacted_db_impl.cc | 2 ++ db/compaction_job.cc | 3 ++ db/db_filesnapshot.cc | 8 +++++ db/db_impl.cc | 54 ++++++++++++++++++++++++++++++++++ db/db_impl_compaction_flush.cc | 24 +++++++++++++++ db/db_impl_debug.cc | 2 ++ db/db_impl_files.cc | 4 +++ db/db_impl_open.cc | 10 +++++++ db/db_impl_readonly.cc | 2 ++ db/db_impl_write.cc | 32 ++++++++++++++++++++ db/error_handler.cc | 2 ++ db/flush_job.cc | 2 ++ db/forward_iterator.cc | 2 ++ db/repair.cc | 5 ++++ db/write_thread.cc | 5 +++- db/write_thread.h | 5 +++- 17 files changed, 166 insertions(+), 2 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 8236efc6b..292824927 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -67,6 +67,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { delete cfd_; } db_->FindObsoleteFiles(&job_context, false, true); + ROCKS_LOG_INFO(cfd_->ioptions()->info_log, + "Unlock3"); mutex_->Unlock(); if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles(job_context); @@ -510,6 +512,8 @@ ColumnFamilyData::~ColumnFamilyData() { if (super_version_ != nullptr) { // Release SuperVersion reference kept in ThreadLocalPtr. // This must be done outside of mutex_ since unref handler can lock mutex. + ROCKS_LOG_INFO(ioptions_.info_log, + "Unlock2"); super_version_->db_mutex->Unlock(); local_sv_.reset(); super_version_->db_mutex->Lock(); @@ -1051,6 +1055,8 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( db_mutex->Lock(); } sv = super_version_->Ref(); + ROCKS_LOG_INFO(ioptions_.info_log, + "Unlock1"); db_mutex->Unlock(); delete sv_to_delete; diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index acdaad4ec..c3cba934f 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -94,6 +94,8 @@ Status CompactedDBImpl::Init(const Options& options) { DefaultColumnFamily())->cfd(); cfd_->InstallSuperVersion(&sv_context, &mutex_); } + ROCKS_LOG_INFO(options.info_log, + "Unlock4"); mutex_.Unlock(); sv_context.Clean(); if (!s.ok()) { diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 10aaef098..cfcd171ec 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -529,6 +529,9 @@ void CompactionJob::GenSubcompactionBoundaries() { // ApproximateSize could potentially create table reader iterator to seek // to the index block and may incur I/O cost in the process. Unlock db // mutex to reduce contention + ROCKS_LOG_INFO(db_options_.info_log, + "Unlock5"); + db_mutex_->Unlock(); uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1); db_mutex_->Lock(); diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index ace0befb6..2eeec8ee8 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -89,6 +89,8 @@ Status DBImpl::GetLiveFiles(std::vector& ret, if (immutable_db_options_.atomic_flush) { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock6"); mutex_.Unlock(); status = AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kGetLiveFiles); @@ -99,6 +101,8 @@ Status DBImpl::GetLiveFiles(std::vector& ret, continue; } cfd->Ref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock7"); mutex_.Unlock(); status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles); TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); @@ -113,6 +117,8 @@ Status DBImpl::GetLiveFiles(std::vector& ret, versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); if (!status.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock8"); mutex_.Unlock(); ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n", status.ToString().c_str()); @@ -145,6 +151,8 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // find length of manifest file while holding the mutex lock *manifest_file_size = versions_->manifest_file_size(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock9"); mutex_.Unlock(); return Status::OK(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 3f6e44676..fad795327 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -267,6 +267,8 @@ Status DBImpl::Resume() { return Status::Busy(); } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock32"); mutex_.Unlock(); Status s = error_handler_.RecoverFromBGError(true); mutex_.Lock(); @@ -311,6 +313,8 @@ Status DBImpl::ResumeImpl() { if (immutable_db_options_.atomic_flush) { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock33"); mutex_.Unlock(); s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery); mutex_.Lock(); @@ -320,6 +324,8 @@ Status DBImpl::ResumeImpl() { continue; } cfd->Ref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock34"); mutex_.Unlock(); s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery); mutex_.Lock(); @@ -341,6 +347,8 @@ Status DBImpl::ResumeImpl() { if (s.ok()) { s = error_handler_.ClearBGError(); } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock35"); mutex_.Unlock(); job_context.manifest_file_number = 1; @@ -392,6 +400,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { // before grabbing db mutex because the actual worker function // `DBImpl::DumpStats()` also holds db mutex if (thread_dump_stats_ != nullptr) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock14"); mutex_.Unlock(); thread_dump_stats_->cancel(); mutex_.Lock(); @@ -403,6 +413,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { if (immutable_db_options_.atomic_flush) { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock15"); mutex_.Unlock(); AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); mutex_.Lock(); @@ -410,6 +422,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { cfd->Ref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock16"); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); mutex_.Lock(); @@ -437,6 +451,8 @@ Status DBImpl::CloseHelper() { while (error_handler_.IsRecoveryInProgress()) { bg_cv_.Wait(); } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock19"); mutex_.Unlock(); // CancelAllBackgroundWork called with false means we just set the shutdown @@ -484,6 +500,8 @@ Status DBImpl::CloseHelper() { if (default_cf_handle_ != nullptr) { // we need to delete handle outside of lock because it does its own locking + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock20"); mutex_.Unlock(); delete default_cf_handle_; mutex_.Lock(); @@ -502,6 +520,8 @@ Status DBImpl::CloseHelper() { JobContext job_context(next_job_id_.fetch_add(1)); FindObsoleteFiles(&job_context, true); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock21"); mutex_.Unlock(); // manifest number starting from 2 job_context.manifest_file_number = 1; @@ -554,6 +574,8 @@ Status DBImpl::CloseHelper() { // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock22"); mutex_.Unlock(); if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); @@ -806,6 +828,8 @@ Status DBImpl::SetDBOptions( if (new_options.stats_dump_period_sec != mutable_db_options_.stats_dump_period_sec) { if (thread_dump_stats_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock36"); mutex_.Unlock(); thread_dump_stats_->cancel(); mutex_.Lock(); @@ -1044,6 +1068,8 @@ InternalIterator* DBImpl::NewInternalIterator( mutex_.Lock(); SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock31"); mutex_.Unlock(); ReadOptions roptions; return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg, @@ -1075,6 +1101,8 @@ void DBImpl::BackgroundCallPurge() { auto job_id = purge_file->job_id; purge_queue_.pop_front(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock10"); mutex_.Unlock(); DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); mutex_.Lock(); @@ -1082,6 +1110,8 @@ void DBImpl::BackgroundCallPurge() { assert(!logs_to_free_queue_.empty()); log::Writer* log_writer = *(logs_to_free_queue_.begin()); logs_to_free_queue_.pop_front(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock12"); mutex_.Unlock(); delete log_writer; mutex_.Lock(); @@ -1094,6 +1124,8 @@ void DBImpl::BackgroundCallPurge() { // signal the DB destructor that it's OK to proceed with destruction. In // that case, all DB variables will be dealloacated and referencing them // will cause trouble. + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock13"); mutex_.Unlock(); } @@ -1126,6 +1158,8 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) { if (state->background_purge) { state->db->ScheduleBgLogWriterClose(&job_context); } + ROCKS_LOG_INFO(state->db->immutable_db_options().info_log, + "Unlock17"); state->mu->Unlock(); delete state->super_version; @@ -1137,6 +1171,8 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) { state->db->PurgeObsoleteFiles(job_context, true /* schedule only */); state->mu->Lock(); state->db->SchedulePurge(); + ROCKS_LOG_INFO(state->db->immutable_db_options().info_log, + "Unlock18"); state->mu->Unlock(); } else { state->db->PurgeObsoleteFiles(job_context); @@ -1364,6 +1400,8 @@ std::vector DBImpl::MultiGet( mgd_iter.second->super_version = mgd_iter.second->cfd->GetSuperVersion()->Ref(); } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock29"); mutex_.Unlock(); // Contain a list of merge operations if merge occurs. @@ -1439,6 +1477,8 @@ std::vector DBImpl::MultiGet( superversions_to_delete.push_back(mgd->super_version); } } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock30"); mutex_.Unlock(); for (auto td : superversions_to_delete) { @@ -1959,6 +1999,8 @@ Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, mutex_.Lock(); auto version = cfd->current(); version->Ref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock23"); mutex_.Unlock(); auto s = version->GetPropertiesOfAllTables(props); @@ -1966,6 +2008,8 @@ Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, // Decrement the ref count mutex_.Lock(); version->Unref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock24"); mutex_.Unlock(); return s; @@ -1981,6 +2025,8 @@ Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family, mutex_.Lock(); auto version = cfd->current(); version->Ref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock25"); mutex_.Unlock(); auto s = version->GetPropertiesOfTablesInRange(range, n, props); @@ -1988,6 +2034,8 @@ Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family, // Decrement the ref count mutex_.Lock(); version->Unref(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock26"); mutex_.Unlock(); return s; @@ -2774,6 +2822,8 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock, // because the single write thread ensures all new writes get queued. DBOptions db_options = BuildDBOptions(immutable_db_options_, mutable_db_options_); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock37"); mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1"); @@ -3137,11 +3187,15 @@ Status DBImpl::IngestExternalFile( if (immutable_db_options_.atomic_flush) { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock27"); mutex_.Unlock(); status = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kExternalFileIngestion, true /* writes_stopped */); } else { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock28"); mutex_.Unlock(); status = FlushMemTable(cfd, flush_opts, FlushReason::kExternalFileIngestion, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index bb5280019..1d0e122ba 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -78,6 +78,8 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status s; if (!logs_to_sync.empty()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock49"); mutex_.Unlock(); for (log::Writer* log : logs_to_sync) { @@ -527,6 +529,8 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, (cfd->current()->storage_info()->NumLevelFiles(0) >= mutable_cf_options.level0_stop_writes_trigger); // release lock while notifying events + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock47"); mutex_.Unlock(); { FlushJobInfo info; @@ -578,6 +582,8 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, (cfd->current()->storage_info()->NumLevelFiles(0) >= mutable_cf_options.level0_stop_writes_trigger); // release lock while notifying events + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock48"); mutex_.Unlock(); { FlushJobInfo info; @@ -641,6 +647,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, autovector cfds; mutex_.Lock(); SelectColumnFamiliesForAtomicFlush(&cfds); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock44"); mutex_.Unlock(); s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction, false /* writes_stopped */); @@ -951,6 +959,8 @@ Status DBImpl::CompactFilesImpl( compaction_job.Prepare(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock43"); mutex_.Unlock(); TEST_SYNC_POINT("CompactFilesImpl:0"); TEST_SYNC_POINT("CompactFilesImpl:1"); @@ -1053,6 +1063,8 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Version* current = cfd->current(); current->Ref(); // release lock while notifying events + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock45"); mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex"); { @@ -1115,6 +1127,8 @@ void DBImpl::NotifyOnCompactionCompleted( Version* current = cfd->current(); current->Ref(); // release lock while notifying events + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock46"); mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); { @@ -2096,6 +2110,8 @@ void DBImpl::BackgroundCallFlush() { uint64_t error_cnt = default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock40"); mutex_.Unlock(); ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Waiting after background flush error: %s" @@ -2116,6 +2132,8 @@ void DBImpl::BackgroundCallFlush() { // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToClean() || job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock41"); mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound"); // Have to flush the info logs before bg_flush_scheduled_-- @@ -2179,6 +2197,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, uint64_t error_cnt = default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock38"); mutex_.Unlock(); log_buffer.FlushBufferToLog(); ROCKS_LOG_ERROR(immutable_db_options_.info_log, @@ -2201,6 +2221,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToClean() || job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock39"); mutex_.Unlock(); // Have to flush the info logs before bg_compaction_scheduled_-- // because if bg_flush_scheduled_ becomes 0 and the lock is @@ -2576,6 +2598,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock42"); mutex_.Unlock(); compaction_job.Run(); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index bbdd5df37..ca0c7c850 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -148,6 +148,8 @@ void DBImpl::TEST_LockMutex() { } void DBImpl::TEST_UnlockMutex() { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock50"); mutex_.Unlock(); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 523192f00..603094c01 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -208,6 +208,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } alive_log_files_.pop_front(); if (two_write_queues_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock52"); log_write_mutex_.Unlock(); } // Current log should always stay alive since it can't have @@ -556,6 +558,8 @@ void DBImpl::DeleteObsoleteFiles() { JobContext job_context(next_job_id_.fetch_add(1)); FindObsoleteFiles(&job_context, true); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock51"); mutex_.Unlock(); if (job_context.HaveSomethingToDelete()) { PurgeObsoleteFiles(job_context); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 5196be7ba..37be1de1e 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -964,6 +964,8 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& log_numbers) { } } if (two_write_queues_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock56"); log_write_mutex_.Unlock(); } return s; @@ -1001,6 +1003,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, { auto write_hint = cfd->CalculateSSTWriteHint(0); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock57"); mutex_.Unlock(); SequenceNumber earliest_write_conflict_snapshot; @@ -1192,6 +1196,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (db_options.create_missing_column_families) { // missing column family, create it ColumnFamilyHandle* handle; + ROCKS_LOG_INFO(db_options.info_log, + "Unlock53"); impl->mutex_.Unlock(); s = impl->CreateColumnFamily(cf.options, cf.name, &handle); impl->mutex_.Lock(); @@ -1220,6 +1226,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); if (impl->two_write_queues_) { + ROCKS_LOG_INFO(db_options.info_log, + "Unlock54"); impl->log_write_mutex_.Unlock(); } impl->DeleteObsoleteFiles(); @@ -1268,6 +1276,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->opened_successfully_ = true; impl->MaybeScheduleFlushOrCompaction(); } + ROCKS_LOG_INFO(db_options.info_log, + "Unlock55"); impl->mutex_.Unlock(); #ifndef ROCKSDB_LITE diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index bd7099f00..f6614b11a 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -182,6 +182,8 @@ Status DB::OpenForReadOnly( cfd->InstallSuperVersion(&sv_context, &impl->mutex_); } } + ROCKS_LOG_INFO(db_options.info_log, + "Unlock58"); impl->mutex_.Unlock(); sv_context.Clean(); if (s.ok()) { diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 6bfc98025..cd240b888 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -211,6 +211,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } log::Writer* log_writer = logs_.back().writer; + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock68"); + mutex_.Unlock(); // Add to log and apply to memtable. We can release the lock @@ -368,6 +371,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (need_log_sync) { mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock69"); + mutex_.Unlock(); // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily efficient. @@ -436,6 +442,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); log::Writer* log_writer = logs_.back().writer; + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock62"); mutex_.Unlock(); // This can set non-OK status if callback fail. @@ -495,6 +503,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (need_log_sync) { mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock63"); mutex_.Unlock(); } @@ -683,6 +693,8 @@ void DBImpl::WriteStatusCheck(const Status& status) { !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock71"); mutex_.Unlock(); } } @@ -697,6 +709,8 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) { mutex_.Lock(); assert(!error_handler_.IsBGWorkStopped()); error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock61"); mutex_.Unlock(); } } @@ -837,6 +851,8 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, } Status status = log_writer->AddRecord(log_entry); if (UNLIKELY(needs_locking)) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock72"); log_write_mutex_.Unlock(); } if (log_used != nullptr) { @@ -954,6 +970,10 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; } + + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock59"); + log_write_mutex_.Unlock(); if (status.ok()) { @@ -995,6 +1015,8 @@ Status DBImpl::WriteRecoverableState() { } versions_->SetLastSequence(last_seq); if (two_write_queues_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock70"); log_write_mutex_.Unlock(); } if (status.ok() && recoverable_state_pre_release_callback_) { @@ -1216,6 +1238,8 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, // fail any pending writers with no_slowdown write_thread_.BeginWriteStall(); TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock60"); mutex_.Unlock(); // We will delay the write until we have slept for delay ms or // we don't need a delay anymore @@ -1387,6 +1411,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (immutable_db_options_.enable_pipelined_write) { // Memtable writers may call DB::Get in case max_successive_merges > 0, // which may lock mutex. Unlocking mutex here to avoid deadlock. + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock64"); mutex_.Unlock(); write_thread_.WaitForMemTableWriters(); mutex_.Lock(); @@ -1400,6 +1426,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } bool creating_new_log = !log_empty_; if (two_write_queues_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock65"); log_write_mutex_.Unlock(); } uint64_t recycle_log_number = 0; @@ -1429,6 +1457,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { const auto preallocate_block_size = GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); auto write_hint = CalculateWALWriteHint(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock66"); mutex_.Unlock(); { std::string log_fname = @@ -1502,6 +1532,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock67"); log_write_mutex_.Unlock(); } diff --git a/db/error_handler.cc b/db/error_handler.cc index afec14edc..94e6897a0 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -131,6 +131,8 @@ void ErrorHandler::CancelErrorRecovery() { db_options_.sst_file_manager.get()); if (sfm) { // This may or may not cancel a pending recovery + ROCKS_LOG_INFO(db_options_.info_log, + "Unlock73"); db_mutex_->Unlock(); bool cancelled = sfm->CancelErrorRecovery(this); db_mutex_->Lock(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 8769c849e..80f3e3fdc 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -288,6 +288,8 @@ Status FlushJob::WriteLevel0Table() { Status s; { auto write_hint = cfd_->CalculateSSTWriteHint(0); + ROCKS_LOG_INFO(db_options_.info_log, + "Unlock74"); db_mutex_->Unlock(); if (log_buffer_) { log_buffer_->FlushBufferToLog(); diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index f44a09756..a5e53c0d7 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -236,6 +236,8 @@ void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv, if (background_purge_on_iterator_cleanup) { db->ScheduleBgLogWriterClose(&job_context); } + ROCKS_LOG_INFO(db->immutable_db_options_.info_log, + "Unlock75"); db->mutex_.Unlock(); delete sv; if (job_context.HaveSomethingToDelete()) { diff --git a/db/repair.cc b/db/repair.cc index 4e93a161c..2e7ced12d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -159,6 +159,9 @@ class Repairer { Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, nullptr /* db_directory */, false /* new_descriptor_log */, cf_opts); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock76"); mutex_.Unlock(); return status; } @@ -595,6 +598,8 @@ class Repairer { Status status = vset_.LogAndApply( cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, nullptr /* db_directory */, false /* new_descriptor_log */); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Unlock77"); mutex_.Unlock(); if (!status.ok()) { return status; diff --git a/db/write_thread.cc b/db/write_thread.cc index 835992c8f..c8a090a83 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -27,7 +27,8 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options) last_sequence_(0), write_stall_dummy_(), stall_mu_(), - stall_cv_(&stall_mu_) {} + stall_cv_(&stall_mu_), + m_db_options(db_options) {} uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { // We're going to block. Lazily create the mutex. We guarantee @@ -725,6 +726,8 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, static WriteThread::AdaptationContext eu_ctx("EnterUnbatched"); void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { assert(w != nullptr && w->batch == nullptr); + ROCKS_LOG_INFO(m_db_options.info_log, + "Unlock78"); mu->Unlock(); bool linked_as_leader = LinkOne(w, &newest_writer_); if (!linked_as_leader) { diff --git a/db/write_thread.h b/db/write_thread.h index a3802c996..e0ca6240c 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "db/dbformat.h" #include "db/pre_release_callback.h" @@ -383,7 +384,9 @@ class WriteThread { port::Mutex stall_mu_; port::CondVar stall_cv_; - // Waits for w->state & goal_mask using w->StateMutex(). Returns + ImmutableDBOptions m_db_options; + + // Waits for w->state & goal_mask using w->StateMutex(). Returns // the state that satisfies goal_mask. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);