From 991bc8b64712d47a93508b421dccc4a4153ebb86 Mon Sep 17 00:00:00 2001 From: Sanath Date: Sat, 2 Nov 2024 11:05:52 +0000 Subject: [PATCH] feat: adding copy Signed-off-by: Sanath --- src/server/generic_family.cc | 85 ++++++++++++++++++++++++++++++++++++ src/server/generic_family.h | 1 + 2 files changed, 86 insertions(+) diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 390d70793a7d..a9b72631bca4 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -786,6 +786,49 @@ OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) { return OpStatus::OK; } +// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs +// as a global transaction. +// TODO: Allow running OpMove without a global transaction. +OpStatus OpCopy(const OpArgs& op_args, string_view key, DbIndex target_db) { + auto& db_slice = op_args.GetDbSlice(); + + // Fetch value at key in current db. + auto from_res = db_slice.FindMutable(op_args.db_cntx, key); + if (!IsValid(from_res.it)) + return OpStatus::KEY_NOTFOUND; + + // Fetch value at key in target db. + DbContext target_cntx = op_args.db_cntx; + target_cntx.db_index = target_db; + auto to_res = db_slice.FindReadOnly(target_cntx, key); + if (IsValid(to_res.it)) + return OpStatus::KEY_EXISTS; + + // Ensure target database exists. + db_slice.ActivateDb(target_db); + + bool sticky = from_res.it->first.IsSticky(); + uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it); + from_res.post_updater.Run(); + PrimeValue from_obj = from_res.it->second.AsRef(); + + // Restore expire flag after std::move. + from_res.it->second.SetExpire(IsValid(from_res.exp_it)); + + // CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); + auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts); + RETURN_ON_BAD_STATUS(op_result); + auto& add_res = *op_result; + add_res.it->first.SetSticky(sticky); + + auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); + if (add_res.it->second.ObjType() == OBJ_LIST && bc) { + bc->AwakeWatched(target_db, key); + } + + return OpStatus::OK; +} + OpResult OpRen(const OpArgs& op_args, string_view from_key, string_view to_key, bool destination_should_not_exist) { auto* es = op_args.shard; @@ -1555,6 +1598,45 @@ void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui builder->SendLong(res == OpStatus::OK); } +void GenericFamily::Copy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + string_view key = ArgS(args, 0); + string_view target_db_sv = ArgS(args, 1); + int32_t target_db; + + if (!absl::SimpleAtoi(target_db_sv, &target_db)) { + return builder->SendError(kInvalidIntErr); + } + + if (target_db < 0 || uint32_t(target_db) >= absl::GetFlag(FLAGS_dbnum)) { + return builder->SendError(kDbIndOutOfRangeErr); + } + + if (target_db == tx->GetDbIndex()) { + return builder->SendError("source and destination objects are the same"); + } + + OpStatus res = OpStatus::SKIPPED; + ShardId target_shard = Shard(key, shard_set->size()); + auto cb = [&](Transaction* t, EngineShard* shard) { + // MOVE runs as a global transaction and is therefore scheduled on every shard. + if (target_shard == shard->shard_id()) { + auto op_args = t->GetOpArgs(shard); + res = OpCopy(op_args, key, target_db); + // MOVE runs as global command but we want to write the + // command to only one journal. + if (op_args.shard->journal()) { + RecordJournal(op_args, "MOVE"sv, ArgSlice{key, target_db_sv}); + } + } + return OpStatus::OK; + }; + + tx->ScheduleSingleHop(std::move(cb)); + // Exactly one shard will call OpMove. + DCHECK(res != OpStatus::SKIPPED); + builder->SendLong(res == OpStatus::OK); +} + void GenericFamily::Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { auto reply = RenameGeneric(args, false, tx); builder->SendError(reply); @@ -1796,6 +1878,7 @@ constexpr uint32_t kUnlink = KEYSPACE | WRITE | FAST; constexpr uint32_t kStick = KEYSPACE | WRITE | FAST; constexpr uint32_t kSort = WRITE | SET | SORTEDSET | LIST | SLOW | DANGEROUS; constexpr uint32_t kMove = KEYSPACE | WRITE | FAST; +constexpr uint32_t kCopy = KEYSPACE | WRITE | FAST; constexpr uint32_t kRestore = KEYSPACE | WRITE | SLOW | DANGEROUS; constexpr uint32_t kExpireTime = KEYSPACE | READ | FAST; constexpr uint32_t kPExpireTime = KEYSPACE | READ | FAST; @@ -1842,6 +1925,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort) << CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC( Move) + << CI{"COPY", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kCopy}.HFUNC( + Copy) << CI{"RESTORE", CO::WRITE, -4, 1, 1, acl::kRestore}.HFUNC(Restore) << CI{"RANDOMKEY", CO::READONLY, 1, 0, 0, 0}.HFUNC(RandomKey) << CI{"EXPIRETIME", CO::READONLY | CO::FAST, 2, 1, 1, acl::kExpireTime}.HFUNC(ExpireTime) diff --git a/src/server/generic_family.h b/src/server/generic_family.h index c74e485bddf6..14b6e2f2d7ef 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -48,6 +48,7 @@ class GenericFamily { static void Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Copy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);