Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] feat: adding copy #4038

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,49 @@ OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
return OpStatus::OK;
}

// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpMove -> OpCopy

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't spend time on docs as this was just a draft

// as a global transaction.
// TODO: Allow running OpMove without a global transaction.
OpStatus OpCopy(const OpArgs& op_args, string_view key, DbIndex target_db) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy is actually a non-trivial operation that requires very good understanding of Dragonfly architecture.

This code won't work when keys reside on different thread shards. If you wish to learn more about dragonfly architecture and its transactional framework, please read the docs under docs/ folder.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go through the docs and share an one-pager for this feature. Do you recommend any other resources/docs i can look at in general to understand this better?

Do you have any timeline in mind for this feature? I'm new to this domain, so will take a few weeks to deliver this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have any suggested order for reading files in the docs folder?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df-share-nothing.md and transaction.md are good candidates

auto& db_slice = op_args.GetDbSlice();

// Fetch value at key in current db.
auto from_res = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(from_res.it))
return OpStatus::KEY_NOTFOUND;

// Fetch value at key in target db.
DbContext target_cntx = op_args.db_cntx;
target_cntx.db_index = target_db;
auto to_res = db_slice.FindReadOnly(target_cntx, key);
if (IsValid(to_res.it))
return OpStatus::KEY_EXISTS;

// Ensure target database exists.
db_slice.ActivateDb(target_db);

bool sticky = from_res.it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it);
from_res.post_updater.Run();
PrimeValue from_obj = from_res.it->second.AsRef();

// Restore expire flag after std::move.
from_res.it->second.SetExpire(IsValid(from_res.exp_it));

// CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
auto& add_res = *op_result;
add_res.it->first.SetSticky(sticky);

auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (add_res.it->second.ObjType() == OBJ_LIST && bc) {
bc->AwakeWatched(target_db, key);
}

return OpStatus::OK;
}

OpResult<void> OpRen(const OpArgs& op_args, string_view from_key, string_view to_key,
bool destination_should_not_exist) {
auto* es = op_args.shard;
Expand Down Expand Up @@ -1555,6 +1598,45 @@ void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
builder->SendLong(res == OpStatus::OK);
}

void GenericFamily::Copy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view target_db_sv = ArgS(args, 1);
int32_t target_db;

if (!absl::SimpleAtoi(target_db_sv, &target_db)) {
return builder->SendError(kInvalidIntErr);
}

if (target_db < 0 || uint32_t(target_db) >= absl::GetFlag(FLAGS_dbnum)) {
return builder->SendError(kDbIndOutOfRangeErr);
}

if (target_db == tx->GetDbIndex()) {
return builder->SendError("source and destination objects are the same");
}

OpStatus res = OpStatus::SKIPPED;
ShardId target_shard = Shard(key, shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
// MOVE runs as a global transaction and is therefore scheduled on every shard.
if (target_shard == shard->shard_id()) {
auto op_args = t->GetOpArgs(shard);
res = OpCopy(op_args, key, target_db);
// MOVE runs as global command but we want to write the
// command to only one journal.
if (op_args.shard->journal()) {
RecordJournal(op_args, "MOVE"sv, ArgSlice{key, target_db_sv});
}
}
return OpStatus::OK;
};

tx->ScheduleSingleHop(std::move(cb));
// Exactly one shard will call OpMove.
DCHECK(res != OpStatus::SKIPPED);
builder->SendLong(res == OpStatus::OK);
}

void GenericFamily::Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto reply = RenameGeneric(args, false, tx);
builder->SendError(reply);
Expand Down Expand Up @@ -1796,6 +1878,7 @@ constexpr uint32_t kUnlink = KEYSPACE | WRITE | FAST;
constexpr uint32_t kStick = KEYSPACE | WRITE | FAST;
constexpr uint32_t kSort = WRITE | SET | SORTEDSET | LIST | SLOW | DANGEROUS;
constexpr uint32_t kMove = KEYSPACE | WRITE | FAST;
constexpr uint32_t kCopy = KEYSPACE | WRITE | FAST;
constexpr uint32_t kRestore = KEYSPACE | WRITE | SLOW | DANGEROUS;
constexpr uint32_t kExpireTime = KEYSPACE | READ | FAST;
constexpr uint32_t kPExpireTime = KEYSPACE | READ | FAST;
Expand Down Expand Up @@ -1842,6 +1925,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC(
Move)
<< CI{"COPY", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kCopy}.HFUNC(
Copy)
<< CI{"RESTORE", CO::WRITE, -4, 1, 1, acl::kRestore}.HFUNC(Restore)
<< CI{"RANDOMKEY", CO::READONLY, 1, 0, 0, 0}.HFUNC(RandomKey)
<< CI{"EXPIRETIME", CO::READONLY | CO::FAST, 2, 1, 1, acl::kExpireTime}.HFUNC(ExpireTime)
Expand Down
1 change: 1 addition & 0 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class GenericFamily {
static void Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Copy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);

static void Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
Expand Down