Skip to content

Commit

Permalink
chore: preparation for basic http api (#2764)
Browse files Browse the repository at this point in the history
* chore: preparation for basic http api

The goal is to provide very basic support for simple commands,
fancy stuff like pipelining, blocking commands won't work.

1. Added optional registration for /api handler.
2. Implemented parsing of post body.
3. Added basic formatting routine for the response. It does not cover all the commands but should suffice for
   basic usage.

The API is a POST method and the body of the request should contain command arguments formatted as json array.
For example, `'["set", "foo", "bar", "ex", "100"]'`.
The response is a json object with either `result` field holding the response of the command or
`error` field containing the error message sent by the server.
See `test_http` test in tests/dragonfly/connection_test.py for more details.


* chore: cover iouring with enable_direct_fd

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Mar 25, 2024
1 parent 30ec81c commit 966d7f5
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 34 deletions.
2 changes: 1 addition & 1 deletion helio
9 changes: 5 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
// One place to find a good implementation would be https://github.com/h2o/picohttpparser
bool MatchHttp11Line(string_view line) {
return absl::StartsWith(line, "GET ") && absl::EndsWith(line, "HTTP/1.1");
return (absl::StartsWith(line, "GET ") || absl::StartsWith(line, "POST ")) &&
absl::EndsWith(line, "HTTP/1.1");
}

void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats,
Expand Down Expand Up @@ -651,11 +652,13 @@ void Connection::HandleRequests() {
http_res = CheckForHttpProto(peer);

if (http_res) {
cc_.reset(service_->CreateContext(peer, this));
if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
is_http_ = true;
HttpConnection http_conn{http_listener_};
http_conn.SetSocket(peer);
http_conn.set_user_data(cc_.get());
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
io_buf_.ConsumeInput(io_buf_.InputLen());
if (!ec) {
Expand All @@ -666,17 +669,15 @@ void Connection::HandleRequests() {
// this connection.
http_conn.ReleaseSocket();
} else {
cc_.reset(service_->CreateContext(peer, this));
if (breaker_cb_) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}

ConnectionFlow(peer);

socket_->CancelOnErrorCb(); // noop if nothing is registered.

cc_.reset();
}
cc_.reset();
}

VLOG(1) << "Closed connection for peer " << remote_ep;
Expand Down
9 changes: 4 additions & 5 deletions src/facade/reply_capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {

void StartCollection(unsigned len, CollectionType type) override;

private:
public:
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
using Null = std::nullptr_t; // SendNull or SendNullArray
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

struct StrArrPayload {
bool simple;
Expand All @@ -66,7 +64,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
bool with_scores;
};

public:
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL)
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
}
Expand All @@ -89,7 +89,6 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
// If an error is stored inside payload, get a reference to it.
static std::optional<ErrorRef> GetError(const Payload& pl);

private:
struct CollectionPayload {
CollectionPayload(unsigned len, CollectionType type);

Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessor

add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc json_family.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
${SEARCH_FILES}
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
Expand Down
228 changes: 228 additions & 0 deletions src/server/http_api.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/http_api.h"

#include "base/logging.h"
#include "core/flatbuffers.h"
#include "facade/conn_context.h"
#include "facade/reply_builder.h"
#include "server/main_service.h"
#include "util/http/http_common.h"

namespace dfly {
using namespace util;
using namespace std;
namespace h2 = boost::beast::http;
using facade::CapturingReplyBuilder;

namespace {

bool IsVectorOfStrings(flexbuffers::Reference req) {
if (!req.IsVector()) {
return false;
}

auto vec = req.AsVector();
if (vec.size() == 0) {
return false;
}

for (size_t i = 0; i < vec.size(); ++i) {
if (!vec[i].IsString()) {
return false;
}
}
return true;
}

// Escape a string so that it is legal to print it in JSON text.
std::string JsonEscape(string_view input) {
auto hex_digit = [](unsigned c) -> char {
DCHECK_LT(c, 0xFu);
return c < 10 ? c + '0' : c - 10 + 'a';
};

string out;
out.reserve(input.size() + 2);
out.push_back('\"');

auto p = input.begin();
auto e = input.end();

while (p < e) {
uint8_t c = *p;
if (c == '\\' || c == '\"') {
out.push_back('\\');
out.push_back(*p++);
} else if (c <= 0x1f) {
switch (c) {
case '\b':
out.append("\\b");
p++;
break;
case '\f':
out.append("\\f");
p++;
break;
case '\n':
out.append("\\n");
p++;
break;
case '\r':
out.append("\\r");
p++;
break;
case '\t':
out.append("\\t");
p++;
break;
default:
// this condition captures non readable chars with value < 32,
// so size = 1 byte (e.g control chars).
out.append("\\u00");
out.push_back(hex_digit((c & 0xf0) >> 4));
out.push_back(hex_digit(c & 0xf));
p++;
}
} else {
out.push_back(*p++);
}
}

out.push_back('\"');
return out;
}

struct CaptureVisitor {
CaptureVisitor() {
str = R"({"result":)";
}

void operator()(monostate) {
}

void operator()(long v) {
absl::StrAppend(&str, v);
}

void operator()(double v) {
absl::StrAppend(&str, v);
}

void operator()(const CapturingReplyBuilder::SimpleString& ss) {
absl::StrAppend(&str, "\"", ss, "\"");
}

void operator()(const CapturingReplyBuilder::BulkString& bs) {
absl::StrAppend(&str, JsonEscape(bs));
}

void operator()(CapturingReplyBuilder::Null) {
absl::StrAppend(&str, "null");
}

void operator()(CapturingReplyBuilder::Error err) {
str = absl::StrCat(R"({"error": ")", err.first);
}

void operator()(facade::OpStatus status) {
absl::StrAppend(&str, "\"", facade::StatusToMsg(status), "\"");
}

void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(unique_ptr<CapturingReplyBuilder::CollectionPayload> cp) {
if (!cp) {
absl::StrAppend(&str, "null");
return;
}
if (cp->len == 0 && cp->type == facade::RedisReplyBuilder::ARRAY) {
absl::StrAppend(&str, "[]");
return;
}

absl::StrAppend(&str, "[");
for (auto& pl : cp->arr) {
visit(*this, std::move(pl));
}
}

void operator()(facade::SinkReplyBuilder::MGetResponse resp) {
absl::StrAppend(&str, "not_implemented");
}

void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
absl::StrAppend(&str, "[");
for (const auto& [key, score] : sarr.arr) {
absl::StrAppend(&str, "{", JsonEscape(key), ":", score, "},");
}
if (sarr.arr.size() > 0) {
str.pop_back();
}
absl::StrAppend(&str, "]");
}

string str;
};

} // namespace

void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
HttpContext* http_cntx) {
auto& body = req.body();

flexbuffers::Builder fbb;
flatbuffers::Parser parser;
flexbuffers::Reference doc;
bool success = parser.ParseFlexBuffer(body.c_str(), nullptr, &fbb);
if (success) {
fbb.Finish();
doc = flexbuffers::GetRoot(fbb.GetBuffer());
if (!IsVectorOfStrings(doc)) {
success = false;
}
}

if (!success) {
auto response = http::MakeStringResponse(h2::status::bad_request);
http::SetMime(http::kTextMime, &response);
response.body() = "Failed to parse json\r\n";
http_cntx->Invoke(std::move(response));
return;
}

vector<string> cmd_args;
flexbuffers::Vector vec = doc.AsVector();
for (size_t i = 0; i < vec.size(); ++i) {
cmd_args.push_back(vec[i].AsString().c_str());
}
vector<facade::MutableSlice> cmd_slices(cmd_args.size());
for (size_t i = 0; i < cmd_args.size(); ++i) {
cmd_slices[i] = absl::MakeSpan(cmd_args[i]);
}

facade::ConnectionContext* context = (facade::ConnectionContext*)http_cntx->user_data();
DCHECK(context);

facade::CapturingReplyBuilder reply_builder;
auto* prev = context->Inject(&reply_builder);
// TODO: to finish this.
service->DispatchCommand(absl::MakeSpan(cmd_slices), context);
facade::CapturingReplyBuilder::Payload payload = reply_builder.Take();

context->Inject(prev);
auto response = http::MakeStringResponse();
http::SetMime(http::kJsonMime, &response);

CaptureVisitor visitor;
std::visit(visitor, std::move(payload));
visitor.str.append("}\r\n");
response.body() = visitor.str;
http_cntx->Invoke(std::move(response));
}

} // namespace dfly
26 changes: 26 additions & 0 deletions src/server/http_api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include "util/http/http_handler.h"

namespace dfly {
class Service;
using HttpRequest = util::HttpListenerBase::RequestType;

/**
* @brief The main handler function for dispatching commands via HTTP.
*
* @param args - query arguments. currently not used.
* @param req - full http request including the body that should consist of a json array
* representing a Dragonfly command. aka `["set", "foo", "bar"]`
* @param service - a pointer to dfly::Service* object.
* @param http_cntxt - a pointer to the http context object which provide dragonfly context
* information via user_data() and allows to reply with HTTP responses.
*/
void HttpAPI(const util::http::QueryArgs& args, HttpRequest&& req, Service* service,
util::HttpContext* http_cntxt);

} // namespace dfly
13 changes: 12 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand Down Expand Up @@ -40,6 +40,7 @@ extern "C" {
#include "server/generic_family.h"
#include "server/hll_family.h"
#include "server/hset_family.h"
#include "server/http_api.h"
#include "server/json_family.h"
#include "server/list_family.h"
#include "server/multi_command_squasher.h"
Expand Down Expand Up @@ -83,6 +84,9 @@ ABSL_FLAG(bool, admin_nopass, false,
"If set, would enable open admin access to console on the assigned port, without "
"authorization needed.");

ABSL_FLAG(bool, expose_http_api, false,
"If set, will expose a POST /api handler for sending redis commands as json array.");

ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
Expand Down Expand Up @@ -2441,6 +2445,13 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privil
base->RegisterCb("/clusterz", [this](const http::QueryArgs& args, HttpContext* send) {
return ClusterHtmlPage(args, send, &cluster_family_);
});

if (absl::GetFlag(FLAGS_expose_http_api)) {
base->RegisterCb("/api",
[this](const http::QueryArgs& args, HttpRequest&& req, HttpContext* send) {
HttpAPI(args, std::move(req), this, send);
});
}
}

void Service::OnClose(facade::ConnectionContext* cntx) {
Expand Down
Loading

0 comments on commit 966d7f5

Please sign in to comment.