Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setConnectionCallback #2204

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion examples/async_stream/main.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
#include <drogon/drogon.h>
#include <chrono>
#include <functional>
#include <mutex>
#include <unordered_map>
#include <trantor/utils/Logger.h>
#include <trantor/net/callbacks.h>
#include <trantor/net/TcpConnection.h>

using namespace drogon;
using namespace std::chrono_literals;

std::mutex mutex;
std::unordered_map<trantor::TcpConnectionPtr, std::function<void()>>
connMapping;

int main()
{
app().registerHandler(
"/stream",
[](const HttpRequestPtr &,
[](const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
const auto &weakConnPtr = req->getConnectionPtr();
if (auto connPtr = weakConnPtr.lock())
{
std::lock_guard lk(mutex);
connMapping.emplace(std::move(connPtr), [] {
LOG_INFO << "call stop or other options!!!!";
});
}
auto resp = drogon::HttpResponse::newAsyncStreamResponse(
[](drogon::ResponseStreamPtr stream) {
std::thread([stream =
Expand Down Expand Up @@ -79,5 +97,17 @@ int main()

LOG_INFO << "Server running on 127.0.0.1:8848";
app().enableRequestStream(); // This is for request stream.
app().setConnectionCallback([](const trantor::TcpConnectionPtr &conn) {
if (conn->disconnected())
{
std::lock_guard lk(mutex);
if (auto it = connMapping.find(conn); it != connMapping.end())
{
LOG_INFO << "disconnect";
connMapping[conn]();
connMapping.erase(conn);
}
}
});
app().addListener("127.0.0.1", 8848).run();
}
9 changes: 9 additions & 0 deletions lib/inc/drogon/HttpAppFramework.h
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,15 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
virtual HttpAppFramework &setAfterAcceptSockOptCallback(
std::function<void(int)> cb) = 0;

/**
* @brief Set the client disconnect or connect callback.
*
* @param cb This callback will be called, when the client disconnect or
* connect
*/
virtual HttpAppFramework &setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb) = 0;

virtual HttpAppFramework &enableRequestStream(bool enable = true) = 0;
virtual bool isRequestStreamEnabled() const = 0;

Expand Down
4 changes: 4 additions & 0 deletions lib/inc/drogon/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <unordered_map>
#include <optional>
#include <string_view>
#include <trantor/net/TcpConnection.h>

namespace drogon
{
Expand Down Expand Up @@ -506,6 +507,9 @@ class DROGON_EXPORT HttpRequest

virtual bool connected() const noexcept = 0;

virtual const std::weak_ptr<trantor::TcpConnection> &getConnectionPtr()
const noexcept = 0;

virtual ~HttpRequest()
{
}
Expand Down
7 changes: 7 additions & 0 deletions lib/src/HttpAppFrameworkImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1367,3 +1367,10 @@ HttpAppFramework &HttpAppFrameworkImpl::setAfterAcceptSockOptCallback(
listenerManagerPtr_->setAfterAcceptSockOptCallback(std::move(cb));
return *this;
}

HttpAppFramework &HttpAppFrameworkImpl::setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb)
{
listenerManagerPtr_->setConnectionCallback(std::move(cb));
return *this;
}
2 changes: 2 additions & 0 deletions lib/src/HttpAppFrameworkImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class HttpAppFrameworkImpl final : public HttpAppFramework
std::function<void(int)> cb) override;
HttpAppFramework &setAfterAcceptSockOptCallback(
std::function<void(int)> cb) override;
HttpAppFramework &setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb) override;

HttpAppFramework &enableRequestStream(bool enable) override;
bool isRequestStreamEnabled() const override;
Expand Down
7 changes: 7 additions & 0 deletions lib/src/HttpRequestImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "HttpUtils.h"
#include "CacheFile.h"
#include "impl_forwards.h"
#include <drogon/utils/Utilities.h>
#include <drogon/HttpRequest.h>
#include <drogon/RequestStream.h>
Expand Down Expand Up @@ -572,6 +573,12 @@ class HttpRequestImpl : public HttpRequest
return false;
}

const std::weak_ptr<trantor::TcpConnection> &getConnectionPtr()
const noexcept override
{
return connPtr_;
}

bool isOnSecureConnection() const noexcept override
{
return isOnSecureConnection_;
Expand Down
8 changes: 7 additions & 1 deletion lib/src/HttpServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "HttpControllersRouter.h"
#include "StaticFileRouter.h"
#include "WebSocketConnectionImpl.h"
#include "impl_forwards.h"

#if COZ_PROFILING
#include <coz.h>
Expand Down Expand Up @@ -75,7 +76,12 @@ HttpServer::HttpServer(EventLoop *loop,
: server_(loop, listenAddr, std::move(name), true, app().reusePort())
#endif
{
server_.setConnectionCallback(onConnection);
server_.setConnectionCallback(
[this](const trantor::TcpConnectionPtr &conn) {
onConnection(conn);
if (connectionCallback_)
connectionCallback_(conn);
});
server_.setRecvMessageCallback(onMessage);
server_.kickoffIdleConnections(
HttpAppFrameworkImpl::instance().getIdleConnectionTimeout());
Expand Down
7 changes: 7 additions & 0 deletions lib/src/HttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class HttpServer : trantor::NonCopyable
afterAcceptSetSockOptCallback_ = std::move(cb);
}

void setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb)
{
connectionCallback_ = std::move(cb);
}

private:
friend class HttpInternalForwardHelper;

Expand Down Expand Up @@ -144,6 +150,7 @@ class HttpServer : trantor::NonCopyable

std::function<void(int)> beforeListenSetSockOptCallback_;
std::function<void(int)> afterAcceptSetSockOptCallback_;
std::function<void(const trantor::TcpConnectionPtr &)> connectionCallback_;
};

class HttpInternalForwardHelper
Expand Down
4 changes: 4 additions & 0 deletions lib/src/ListenerManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ void ListenerManager::createListeners(
serverPtr->setAfterAcceptSockOptCallback(
afterAcceptSetSockOptCallback_);
}
if (connectionCallback_)
{
serverPtr->setConnectionCallback(connectionCallback_);
}

if (listener.useSSL_ && utils::supportsTls())
{
Expand Down
7 changes: 7 additions & 0 deletions lib/src/ListenerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ListenerManager : public trantor::NonCopyable
afterAcceptSetSockOptCallback_ = std::move(cb);
}

void setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb)
{
connectionCallback_ = std::move(cb);
}

void reloadSSLFiles();

private:
Expand Down Expand Up @@ -101,6 +107,7 @@ class ListenerManager : public trantor::NonCopyable
std::unique_ptr<trantor::EventLoopThread> listeningThread_;
std::function<void(int)> beforeListenSetSockOptCallback_;
std::function<void(int)> afterAcceptSetSockOptCallback_;
std::function<void(const trantor::TcpConnectionPtr &)> connectionCallback_;
};

} // namespace drogon
Loading