Skip to content

Commit

Permalink
chore: remove old io (#3953)
Browse files Browse the repository at this point in the history
* chore: Remove old IO

* fix: fix last error accounting
* chore: use unique_ptr<char> in MGetResponse storage

---------

Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg authored Nov 10, 2024
1 parent 2d49a28 commit eadce55
Show file tree
Hide file tree
Showing 22 changed files with 229 additions and 1,241 deletions.
13 changes: 5 additions & 8 deletions src/facade/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@

#include "absl/flags/internal/flag.h"
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"

ABSL_FLAG(bool, experimental_new_io, true,
"Use new replying code - should "
"reduce latencies for pipelining");

namespace facade {

ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
Expand All @@ -22,11 +19,11 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow

if (stream) {
switch (protocol_) {
case Protocol::NONE:
LOG(DFATAL) << "Invalid protocol";
break;
case Protocol::REDIS: {
RedisReplyBuilder* rb = absl::GetFlag(FLAGS_experimental_new_io)
? new RedisReplyBuilder2(stream)
: new RedisReplyBuilder(stream);
rbuilder_.reset(rb);
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
}
case Protocol::MEMCACHE:
Expand Down
15 changes: 9 additions & 6 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
}
arr[i++] = pub_msg.channel;
arr[i++] = pub_msg.message;
rbuilder->SendStringArr(absl::Span<string_view>{arr.data(), i},
RedisReplyBuilder::CollectionType::PUSH);
rbuilder->SendBulkStrArr(absl::Span<string_view>{arr.data(), i},
RedisReplyBuilder::CollectionType::PUSH);
}

void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) {
Expand Down Expand Up @@ -518,7 +518,7 @@ void Connection::DispatchOperations::operator()(const InvalidationMessage& msg)
rbuilder->SendNull();
} else {
std::string_view keys[] = {msg.key};
rbuilder->SendStringArr(keys);
rbuilder->SendBulkStrArr(keys);
}
}

Expand Down Expand Up @@ -550,6 +550,9 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
static_assert(kReqSz <= 256 && kReqSz >= 200);

switch (protocol) {
case Protocol::NONE:
LOG(DFATAL) << "Invalid protocol";
break;
case Protocol::REDIS:
redis_parser_.reset(new RedisParser(GetFlag(FLAGS_max_multi_bulk_len)));
break;
Expand Down Expand Up @@ -1358,7 +1361,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) {

void Connection::SquashPipeline() {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);
DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported.
DCHECK_EQ(reply_builder_->GetProtocol(), Protocol::REDIS); // Only Redis is supported.

vector<ArgSlice> squash_cmds;
squash_cmds.reserve(dispatch_q_.size());
Expand All @@ -1377,7 +1380,7 @@ void Connection::SquashPipeline() {
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get());

if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
reply_builder_->FlushBatch();
reply_builder_->Flush();
reply_builder_->SetBatchMode(false); // in case the next dispatch is sync
}

Expand Down Expand Up @@ -1498,7 +1501,7 @@ void Connection::ExecutionFiber() {
// last command to reply and flush. If it doesn't reply (i.e. is a control message like
// migrate), we have to flush manually.
if (dispatch_q_.empty() && !msg.IsReplying()) {
reply_builder_->FlushBatch();
reply_builder_->Flush();
}

if (ShouldEndDispatchFiber(msg)) {
Expand Down
4 changes: 4 additions & 0 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ ostream& operator<<(ostream& os, facade::CmdArgList ras) {
return os;
}

ostream& operator<<(ostream& os, facade::Protocol p) {
return os << int(p);
}

ostream& operator<<(ostream& os, const facade::RespExpr& e) {
using facade::RespExpr;
using facade::ToSV;
Expand Down
3 changes: 2 additions & 1 deletion src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ constexpr size_t kSanitizerOverhead = 0u;
#endif
#endif

enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };
enum class Protocol : uint8_t { NONE = 0, MEMCACHE = 1, REDIS = 2 };

using MutableSlice = std::string_view;
using CmdArgList = absl::Span<const std::string_view>;
Expand Down Expand Up @@ -189,5 +189,6 @@ void ResetStats();

namespace std {
ostream& operator<<(ostream& os, facade::CmdArgList args);
ostream& operator<<(ostream& os, facade::Protocol protocol);

} // namespace std
Loading

0 comments on commit eadce55

Please sign in to comment.