Skip to content

Commit

Permalink
Additional logging
Browse files Browse the repository at this point in the history
  • Loading branch information
toktarev committed Apr 30, 2019
1 parent cd93aa4 commit c7a9115
Show file tree
Hide file tree
Showing 17 changed files with 166 additions and 2 deletions.
6 changes: 6 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
delete cfd_;
}
db_->FindObsoleteFiles(&job_context, false, true);
ROCKS_LOG_INFO(cfd_->ioptions()->info_log,
"Unlock3");
mutex_->Unlock();
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
Expand Down Expand Up @@ -510,6 +512,8 @@ ColumnFamilyData::~ColumnFamilyData() {
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
ROCKS_LOG_INFO(ioptions_.info_log,
"Unlock2");
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();
Expand Down Expand Up @@ -1051,6 +1055,8 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
db_mutex->Lock();
}
sv = super_version_->Ref();
ROCKS_LOG_INFO(ioptions_.info_log,
"Unlock1");
db_mutex->Unlock();

delete sv_to_delete;
Expand Down
2 changes: 2 additions & 0 deletions db/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ Status CompactedDBImpl::Init(const Options& options) {
DefaultColumnFamily())->cfd();
cfd_->InstallSuperVersion(&sv_context, &mutex_);
}
ROCKS_LOG_INFO(options.info_log,
"Unlock4");
mutex_.Unlock();
sv_context.Clean();
if (!s.ok()) {
Expand Down
3 changes: 3 additions & 0 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ void CompactionJob::GenSubcompactionBoundaries() {
// ApproximateSize could potentially create table reader iterator to seek
// to the index block and may incur I/O cost in the process. Unlock db
// mutex to reduce contention
ROCKS_LOG_INFO(db_options_.info_log,
"Unlock5");

db_mutex_->Unlock();
uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1);
db_mutex_->Lock();
Expand Down
8 changes: 8 additions & 0 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock6");
mutex_.Unlock();
status = AtomicFlushMemTables(cfds, FlushOptions(),
FlushReason::kGetLiveFiles);
Expand All @@ -99,6 +101,8 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
continue;
}
cfd->Ref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock7");
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
Expand All @@ -113,6 +117,8 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

if (!status.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock8");
mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
status.ToString().c_str());
Expand Down Expand Up @@ -145,6 +151,8 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// find length of manifest file while holding the mutex lock
*manifest_file_size = versions_->manifest_file_size();

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock9");
mutex_.Unlock();
return Status::OK();
}
Expand Down
54 changes: 54 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ Status DBImpl::Resume() {
return Status::Busy();
}

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock32");
mutex_.Unlock();
Status s = error_handler_.RecoverFromBGError(true);
mutex_.Lock();
Expand Down Expand Up @@ -311,6 +313,8 @@ Status DBImpl::ResumeImpl() {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock33");
mutex_.Unlock();
s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
mutex_.Lock();
Expand All @@ -320,6 +324,8 @@ Status DBImpl::ResumeImpl() {
continue;
}
cfd->Ref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock34");
mutex_.Unlock();
s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
mutex_.Lock();
Expand All @@ -341,6 +347,8 @@ Status DBImpl::ResumeImpl() {
if (s.ok()) {
s = error_handler_.ClearBGError();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock35");
mutex_.Unlock();

job_context.manifest_file_number = 1;
Expand Down Expand Up @@ -392,6 +400,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
// before grabbing db mutex because the actual worker function
// `DBImpl::DumpStats()` also holds db mutex
if (thread_dump_stats_ != nullptr) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock14");
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
Expand All @@ -403,13 +413,17 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock15");
mutex_.Unlock();
AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock16");
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
mutex_.Lock();
Expand Down Expand Up @@ -437,6 +451,8 @@ Status DBImpl::CloseHelper() {
while (error_handler_.IsRecoveryInProgress()) {
bg_cv_.Wait();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock19");
mutex_.Unlock();

// CancelAllBackgroundWork called with false means we just set the shutdown
Expand Down Expand Up @@ -484,6 +500,8 @@ Status DBImpl::CloseHelper() {

if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock20");
mutex_.Unlock();
delete default_cf_handle_;
mutex_.Lock();
Expand All @@ -502,6 +520,8 @@ Status DBImpl::CloseHelper() {
JobContext job_context(next_job_id_.fetch_add(1));
FindObsoleteFiles(&job_context, true);

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock21");
mutex_.Unlock();
// manifest number starting from 2
job_context.manifest_file_number = 1;
Expand Down Expand Up @@ -554,6 +574,8 @@ Status DBImpl::CloseHelper() {
// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
versions_.reset();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock22");
mutex_.Unlock();
if (db_lock_ != nullptr) {
env_->UnlockFile(db_lock_);
Expand Down Expand Up @@ -806,6 +828,8 @@ Status DBImpl::SetDBOptions(
if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec) {
if (thread_dump_stats_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock36");
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
Expand Down Expand Up @@ -1044,6 +1068,8 @@ InternalIterator* DBImpl::NewInternalIterator(

mutex_.Lock();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock31");
mutex_.Unlock();
ReadOptions roptions;
return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
Expand Down Expand Up @@ -1075,13 +1101,17 @@ void DBImpl::BackgroundCallPurge() {
auto job_id = purge_file->job_id;
purge_queue_.pop_front();

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock10");
mutex_.Unlock();
DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
mutex_.Lock();
} else {
assert(!logs_to_free_queue_.empty());
log::Writer* log_writer = *(logs_to_free_queue_.begin());
logs_to_free_queue_.pop_front();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock12");
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
Expand All @@ -1094,6 +1124,8 @@ void DBImpl::BackgroundCallPurge() {
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock13");
mutex_.Unlock();
}

Expand Down Expand Up @@ -1126,6 +1158,8 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
if (state->background_purge) {
state->db->ScheduleBgLogWriterClose(&job_context);
}
ROCKS_LOG_INFO(state->db->immutable_db_options().info_log,
"Unlock17");
state->mu->Unlock();

delete state->super_version;
Expand All @@ -1137,6 +1171,8 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
state->mu->Lock();
state->db->SchedulePurge();
ROCKS_LOG_INFO(state->db->immutable_db_options().info_log,
"Unlock18");
state->mu->Unlock();
} else {
state->db->PurgeObsoleteFiles(job_context);
Expand Down Expand Up @@ -1364,6 +1400,8 @@ std::vector<Status> DBImpl::MultiGet(
mgd_iter.second->super_version =
mgd_iter.second->cfd->GetSuperVersion()->Ref();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock29");
mutex_.Unlock();

// Contain a list of merge operations if merge occurs.
Expand Down Expand Up @@ -1439,6 +1477,8 @@ std::vector<Status> DBImpl::MultiGet(
superversions_to_delete.push_back(mgd->super_version);
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock30");
mutex_.Unlock();

for (auto td : superversions_to_delete) {
Expand Down Expand Up @@ -1959,13 +1999,17 @@ Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
mutex_.Lock();
auto version = cfd->current();
version->Ref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock23");
mutex_.Unlock();

auto s = version->GetPropertiesOfAllTables(props);

// Decrement the ref count
mutex_.Lock();
version->Unref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock24");
mutex_.Unlock();

return s;
Expand All @@ -1981,13 +2025,17 @@ Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
mutex_.Lock();
auto version = cfd->current();
version->Ref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock25");
mutex_.Unlock();

auto s = version->GetPropertiesOfTablesInRange(range, n, props);

// Decrement the ref count
mutex_.Lock();
version->Unref();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock26");
mutex_.Unlock();

return s;
Expand Down Expand Up @@ -2774,6 +2822,8 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
// because the single write thread ensures all new writes get queued.
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock37");
mutex_.Unlock();

TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
Expand Down Expand Up @@ -3137,11 +3187,15 @@ Status DBImpl::IngestExternalFile(
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock27");
mutex_.Unlock();
status = AtomicFlushMemTables(cfds, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Unlock28");
mutex_.Unlock();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
Expand Down
Loading

0 comments on commit c7a9115

Please sign in to comment.