Skip to content

Commit

Permalink
chore: change Namespaces to be a global pointer (#4032)
Browse files Browse the repository at this point in the history
* chore: change Namespaces to be a global pointer

Before the namespaces object was defined globally.
However it has non-trivial d'tor that is being called after main exits.
It's quite dangerous to have global non-POD objects being defined globally.
For example, if we used LOG(INFO) inside the Clear function , that would crash dragonfly on exit.

Ths PR changes it to be a global pointer.

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Nov 10, 2024
1 parent 9366c67 commit be96e6c
Show file tree
Hide file tree
Showing 22 changed files with 76 additions and 71 deletions.
7 changes: 3 additions & 4 deletions src/server/blocking_controller_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void BlockingControllerTest::SetUp() {
arg_vec_.emplace_back(s);
}

trans_->InitByArgs(&namespaces.GetDefaultNamespace(), 0, {arg_vec_.data(), arg_vec_.size()});
trans_->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {arg_vec_.data(), arg_vec_.size()});
CHECK_EQ(0u, Shard("x", shard_set->size()));
CHECK_EQ(2u, Shard("z", shard_set->size()));

Expand All @@ -71,7 +71,6 @@ void BlockingControllerTest::SetUp() {

void BlockingControllerTest::TearDown() {
shard_set->PreShutdown();
namespaces.Clear();
shard_set->Shutdown();
delete shard_set;

Expand All @@ -81,7 +80,7 @@ void BlockingControllerTest::TearDown() {

TEST_F(BlockingControllerTest, Basic) {
trans_->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) {
BlockingController bc(shard, &namespaces.GetDefaultNamespace());
BlockingController bc(shard, &namespaces->GetDefaultNamespace());
auto keys = t->GetShardArgs(shard->shard_id());
bc.AddWatched(
keys, [](auto...) { return true; }, t);
Expand All @@ -107,7 +106,7 @@ TEST_F(BlockingControllerTest, Timeout) {
unsigned num_watched = shard_set->Await(

0, [&] {
return namespaces.GetDefaultNamespace()
return namespaces->GetDefaultNamespace()
.GetBlockingController(EngineShard::tlocal()->shard_id())
->NumWatched(0);
});
Expand Down
4 changes: 2 additions & 2 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ void DeleteSlots(const SlotRanges& slots_ranges) {
if (shard == nullptr)
return;

namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).FlushSlots(slots_ranges);
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).FlushSlots(slots_ranges);
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
Expand Down Expand Up @@ -633,7 +633,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* bu

util::fb2::LockGuard lk(mu);
for (auto& [slot, data] : slots_stats) {
data += namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot);
data += namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot);
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ uint64_t GetKeyCount(const SlotRanges& slots) {
uint64_t shard_keys = 0;
for (const SlotRange& range : slots) {
for (SlotId slot = range.start; slot <= range.end; slot++) {
shard_keys += namespaces.GetDefaultNamespace()
shard_keys += namespaces->GetDefaultNamespace()
.GetDbSlice(shard->shard_id())
.GetSlotStats(slot)
.key_count;
Expand Down
8 changes: 4 additions & 4 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
server_family_(sf),
cf_(cf),
tx_(new Transaction{sf->service().FindCmd("DFLYCLUSTER")}) {
tx_->InitByArgs(&namespaces.GetDefaultNamespace(), 0, {});
tx_->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {});
}

OutgoingMigration::~OutgoingMigration() {
Expand Down Expand Up @@ -218,7 +218,7 @@ void OutgoingMigration::SyncFb() {
}

OnAllShards([this](auto& migration) {
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetCurrentDbSlice();
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice();
server_family_->journal()->StartInThread();
migration = std::make_unique<SliceSlotMigration>(
&db_slice, server(), migration_info_.slot_ranges, server_family_->journal());
Expand Down Expand Up @@ -291,8 +291,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetNonPriviligedListeners(), &namespaces.GetDefaultNamespace(), nullptr,
ClientPause::WRITE, is_pause_in_progress);
Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(),
nullptr, ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
Expand Down
1 change: 1 addition & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ atomic_uint64_t rss_mem_peak(0);
unsigned kernel_version = 0;
size_t max_memory_limit = 0;
size_t serialization_max_chunk_size = 0;
Namespaces* namespaces = nullptr;

const char* GlobalStateName(GlobalState s) {
switch (s) {
Expand Down
3 changes: 3 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Transaction;
class EngineShard;
class ConnectionState;
class Interpreter;
class Namespaces;

struct LockTagOptions {
bool enabled = false;
Expand Down Expand Up @@ -132,6 +133,8 @@ extern std::atomic_uint64_t rss_mem_peak;

extern size_t max_memory_limit;

extern Namespaces* namespaces;

// version 5.11 maps to 511 etc.
// set upon server start.
extern unsigned kernel_version;
Expand Down
2 changes: 1 addition & 1 deletion src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ void SaveStagesController::CloseCb(unsigned index) {
}

if (auto* es = EngineShard::tlocal(); use_dfs_format_ && es)
namespaces.GetDefaultNamespace().GetDbSlice(es->shard_id()).ResetUpdateEvents();
namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id()).ResetUpdateEvents();
}

void SaveStagesController::RunStage(void (SaveStagesController::*cb)(unsigned)) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
EngineShard* shard) {
// We don't want any writes to the journal after we send the `PING`,
// and expirations could ruin that.
namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false);
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false);
shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {}, true);

const FlowInfo* flow = &replica->flows[shard->shard_id()];
Expand Down Expand Up @@ -455,7 +455,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
absl::Cleanup cleanup([] {
VLOG(2) << "Enabling expiration";
shard_set->RunBriefInParallel([](EngineShard* shard) {
namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
});
});

Expand Down
4 changes: 2 additions & 2 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ TEST_F(DflyEngineTest, MemcacheFlags) {
ASSERT_EQ(Run("resp", {"flushdb"}), "OK");
pp_->AwaitFiberOnAll([](auto*) {
if (auto* shard = EngineShard::tlocal(); shard) {
EXPECT_EQ(namespaces.GetDefaultNamespace()
EXPECT_EQ(namespaces->GetDefaultNamespace()
.GetDbSlice(shard->shard_id())
.GetDBTable(0)
->mcflag.size(),
Expand Down Expand Up @@ -600,7 +600,7 @@ TEST_F(DflyEngineTest, Bug468) {

TEST_F(DflyEngineTest, Bug496) {
shard_set->RunBlockingInParallel([](EngineShard* shard) {
auto& db = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id());
auto& db = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id());

int cb_hits = 0;
uint32_t cb_id =
Expand Down
20 changes: 10 additions & 10 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ bool EngineShard::DoDefrag() {
const float threshold = GetFlag(FLAGS_mem_defrag_page_utilization_threshold);

// TODO: enable tiered storage on non-default db slice
DbSlice& slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_->shard_id());
DbSlice& slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_->shard_id());

// If we moved to an invalid db, skip as long as it's not the last one
while (!slice.IsDbValid(defrag_state_.dbid) && defrag_state_.dbid + 1 < slice.db_array_size())
Expand Down Expand Up @@ -339,7 +339,7 @@ bool EngineShard::DoDefrag() {
}
});
traverses_count++;
} while (traverses_count < kMaxTraverses && cur && namespaces.IsInitialized());
} while (traverses_count < kMaxTraverses && cur && namespaces);

defrag_state_.UpdateScanState(cur.value());

Expand Down Expand Up @@ -370,7 +370,7 @@ bool EngineShard::DoDefrag() {
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
if (!namespaces.IsInitialized()) {
if (!namespaces) {
return util::ProactorBase::kOnIdleMaxLevel;
}

Expand All @@ -392,7 +392,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap),
shard_id_(pb->GetPoolIndex()) {
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
}
Expand Down Expand Up @@ -452,6 +451,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index));
RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_);
});
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
}

void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb,
Expand Down Expand Up @@ -492,7 +492,7 @@ void EngineShard::InitTieredStorage(ProactorBase* pb, size_t max_file_size) {
<< "Only ioring based backing storage is supported. Exiting...";

// TODO: enable tiered storage on non-default namespace
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
auto* shard = EngineShard::tlocal();
shard->tiered_storage_ = make_unique<TieredStorage>(max_file_size, &db_slice);
error_code ec = shard->tiered_storage_->Open(backing_prefix);
Expand Down Expand Up @@ -657,7 +657,7 @@ void EngineShard::RemoveContTx(Transaction* tx) {

void EngineShard::Heartbeat() {
DVLOG(2) << " Hearbeat";
DCHECK(namespaces.IsInitialized());
DCHECK(namespaces);

CacheStats();

Expand All @@ -666,7 +666,7 @@ void EngineShard::Heartbeat() {
}

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());

// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
Expand All @@ -692,7 +692,7 @@ void EngineShard::Heartbeat() {

void EngineShard::RetireExpiredAndEvict() {
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
// Some of the functions below might acquire the same lock again so we need to unlock it
// asap. We won't yield before we relock the mutex again, so the code below is atomic
// in respect to preemptions of big values. An example of that is the call to
Expand Down Expand Up @@ -758,7 +758,7 @@ void EngineShard::CacheStats() {
cache_stats_time_ = now;
// Used memory for this shard.
size_t used_mem = UsedMemory();
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());

// delta can wrap if used_memory is smaller than last_cached_used_memory_ and it's fine.
size_t delta = used_mem - last_cached_used_memory_;
Expand Down Expand Up @@ -808,7 +808,7 @@ EngineShard::TxQueueInfo EngineShard::AnalyzeTxQueue() const {
info.tx_total = queue->size();
unsigned max_db_id = 0;

auto& db_slice = namespaces.GetDefaultNamespace().GetCurrentDbSlice();
auto& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice();

{
auto value = queue->At(cur);
Expand Down
14 changes: 12 additions & 2 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ EngineShardSet* shard_set = nullptr;

void EngineShardSet::Init(uint32_t sz, std::function<void()> shard_handler) {
CHECK_EQ(0u, size());
CHECK(namespaces == nullptr);

shards_.reset(new EngineShard*[sz]);

size_ = sz;
size_t max_shard_file_size = GetTieredFileLimit(sz);
pp_->AwaitFiberOnAll([this](uint32_t index, ProactorBase* pb) {
Expand All @@ -112,7 +115,8 @@ void EngineShardSet::Init(uint32_t sz, std::function<void()> shard_handler) {
}
});

namespaces.Init();
// The order is important here. We must initialize namespaces after shards_.
namespaces = new Namespaces();

pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < size_) {
Expand All @@ -139,7 +143,13 @@ void EngineShardSet::PreShutdown() {
}

void EngineShardSet::Shutdown() {
// Calling Namespaces::Clear before destroying engine shards, because it accesses them
// internally.
namespaces->Clear();
RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); });

delete namespaces;
namespaces = nullptr;
}

void EngineShardSet::InitThreadLocal(ProactorBase* pb) {
Expand All @@ -150,7 +160,7 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb) {

void EngineShardSet::TEST_EnableCacheMode() {
RunBlockingInParallel([](EngineShard* shard) {
namespaces.GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode();
namespaces->GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode();
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ JournalExecutor::JournalExecutor(Service* service)
conn_context_.is_replicating = true;
conn_context_.journal_emulated = true;
conn_context_.skip_acl_validation = true;
conn_context_.ns = &namespaces.GetDefaultNamespace();
conn_context_.ns = &namespaces->GetDefaultNamespace();
}

JournalExecutor::~JournalExecutor() {
Expand Down
4 changes: 2 additions & 2 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ListFamilyTest : public BaseFamilyTest {
static unsigned NumWatched() {
atomic_uint32_t sum{0};

auto ns = &namespaces.GetDefaultNamespace();
auto ns = &namespaces->GetDefaultNamespace();
shard_set->RunBriefInParallel([&](EngineShard* es) {
auto* bc = ns->GetBlockingController(es->shard_id());
if (bc)
Expand All @@ -45,7 +45,7 @@ class ListFamilyTest : public BaseFamilyTest {

static bool HasAwakened() {
atomic_uint32_t sum{0};
auto ns = &namespaces.GetDefaultNamespace();
auto ns = &namespaces->GetDefaultNamespace();
shard_set->RunBriefInParallel([&](EngineShard* es) {
auto* bc = ns->GetBlockingController(es->shard_id());
if (bc)
Expand Down
9 changes: 4 additions & 5 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ void Topkeys(const http::QueryArgs& args, HttpContext* send) {

shard_set->RunBriefInParallel([&](EngineShard* shard) {
for (const auto& db :
namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).databases()) {
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).databases()) {
if (db->top_keys.IsEnabled()) {
is_enabled = true;
for (const auto& [key, count] : db->top_keys.GetTopKeys()) {
Expand Down Expand Up @@ -823,7 +823,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
auto* shard = EngineShard::tlocal();
if (shard) {
auto shard_id = shard->shard_id();
auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id);
auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id);
db_slice.SetNotifyKeyspaceEvents(*res);
}
});
Expand Down Expand Up @@ -897,7 +897,6 @@ void Service::Shutdown() {
ChannelStore::Destroy();

shard_set->PreShutdown();
namespaces.Clear();
shard_set->Shutdown();
Transaction::Shutdown();

Expand Down Expand Up @@ -1586,7 +1585,7 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,
facade::Connection* owner) {
auto cred = user_registry_.GetCredentials("default");
ConnectionContext* res = new ConnectionContext{peer, owner, std::move(cred)};
res->ns = &namespaces.GetOrInsert("");
res->ns = &namespaces->GetOrInsert("");

if (peer->IsUDS()) {
res->req_auth = false;
Expand Down Expand Up @@ -2449,7 +2448,7 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;

Metrics m = server_family_.GetMetrics(&namespaces.GetDefaultNamespace());
Metrics m = server_family_.GetMetrics(&namespaces->GetDefaultNamespace());
DbStats db_stats;
for (const auto& s : m.db_stats) {
db_stats += s;
Expand Down
Loading

0 comments on commit be96e6c

Please sign in to comment.