diff --git a/include/amqpcpp/libboostasio.h b/include/amqpcpp/libboostasio.h index 0df9011f..b88aedc1 100644 --- a/include/amqpcpp/libboostasio.h +++ b/include/amqpcpp/libboostasio.h @@ -1,11 +1,11 @@ /** * LibBoostAsio.h * - * Implementation for the AMQP::TcpHandler for boost::asio. You can use this class - * instead of a AMQP::TcpHandler class, just pass the boost asio service to the + * Implementation for the AMQP::TcpHandler for boost::asio. You can use this class + * instead of a AMQP::TcpHandler class, just pass the boost asio service to the * constructor and you're all set. See tests/libboostasio.cpp for example. * - * Watch out: this class was not implemented or reviewed by the original author of + * Watch out: this class was not implemented or reviewed by the original author of * AMQP-CPP. However, we do get a lot of questions and issues from users of this class, * so we cannot guarantee its quality. If you run into such issues too, it might be * better to implement your own handler that interact with boost. @@ -62,12 +62,6 @@ class LibBoostAsioHandler : public virtual TcpHandler { private: - /** - * The boost asio io_context which is responsible for detecting events. - * @var class boost::asio::io_context& - */ - boost::asio::io_context & _iocontext; - using strand_weak_ptr = std::weak_ptr; /** @@ -84,10 +78,16 @@ class LibBoostAsioHandler : public virtual TcpHandler boost::asio::posix::stream_descriptor _socket; /** - * The boost asynchronous deadline timer. + * The boost asynchronous deadline timer for heartbeats. + * @var class boost::asio::deadline_timer + */ + boost::asio::deadline_timer _heartbeat_timer; + + /** + * The boost asynchronous deadline timer for read event timeouts. * @var class boost::asio::deadline_timer */ - boost::asio::deadline_timer _timer; + boost::asio::deadline_timer _expire_timer; /** * A boolean that indicates if the watcher is monitoring for read events. @@ -113,28 +113,38 @@ class LibBoostAsioHandler : public virtual TcpHandler */ bool _write_pending{false}; - using handler_cb = boost::function; - using io_handler = boost::function; - using timer_handler = boost::function; + /** + * The timeout in seconds between each heartbeat to be sent. + * @var _timeout 0 if no timeout is agreed. + */ + uint16_t _heartbeat_interval{0}; + + /** + * The timeout in seconds negotiated with the server. + * @var _timeout 0 if no timeout is agreed. + */ + uint16_t _expire_timeout{0}; + + using handler = boost::function; + using io_handler = boost::function; /** * Builds a io handler callback that executes the io callback in a strand. * @param io_handler The handler callback to dispatch - * @return handler_cb A function wrapping the execution of the handler function in a io_context::strand. + * @return io_handler A function wrapping the execution of the handler function in a io_context::strand. */ - handler_cb get_dispatch_wrapper(io_handler fn) + io_handler get_dispatch_wrapper(const handler& fn) { const strand_weak_ptr wpstrand = _wpstrand; - - return [fn, wpstrand](const boost::system::error_code &ec, const std::size_t bytes_transferred) + return [fn, wpstrand] (const boost::system::error_code &ec, const std::size_t transfered_bytes) { const strand_shared_ptr strand = wpstrand.lock(); if (!strand) { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0}); + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); return; } - boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec, bytes_transferred)); + boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec)); }; } @@ -144,12 +154,11 @@ class LibBoostAsioHandler : public virtual TcpHandler * @param fd The file descripter being watched. * @return handler callback */ - handler_cb get_read_handler(TcpConnection *const connection, const int fd) + io_handler get_read_handler(TcpConnection *const connection, const int fd) { auto fn = boost::bind(&Watcher::read_handler, this, boost::placeholders::_1, - boost::placeholders::_2, PTR_FROM_THIS(Watcher), connection, fd); @@ -162,12 +171,11 @@ class LibBoostAsioHandler : public virtual TcpHandler * @param fd The file descripter being watched. * @return handler callback */ - handler_cb get_write_handler(TcpConnection *const connection, const int fd) + io_handler get_write_handler(TcpConnection *const connection, const int fd) { auto fn = boost::bind(&Watcher::write_handler, this, boost::placeholders::_1, - boost::placeholders::_2, PTR_FROM_THIS(Watcher), connection, fd); @@ -175,45 +183,44 @@ class LibBoostAsioHandler : public virtual TcpHandler } /** - * Binds and returns a lamba function handler for the io operation. + * Binds and returns a lamba function handler for the heartbeat operation. * @param connection The connection being watched. - * @param timeout The file descripter being watched. * @return handler callback */ - timer_handler get_timer_handler(TcpConnection *const connection, const uint16_t timeout) + handler get_heartbeat_handler(TcpConnection *const connection) { - const auto fn = boost::bind(&Watcher::timeout_handler, - this, - boost::placeholders::_1, - PTR_FROM_THIS(Watcher), - connection, - timeout); - - const strand_weak_ptr wpstrand = _wpstrand; + const auto fn = boost::bind(&Watcher::heartbeat_handler, + this, + boost::placeholders::_1, + PTR_FROM_THIS(Watcher), + connection); + return boost::bind(get_dispatch_wrapper(fn), boost::placeholders::_1, 0); + } - return [fn, wpstrand](const boost::system::error_code &ec) - { - const strand_shared_ptr strand = wpstrand.lock(); - if (!strand) - { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); - return; - } - boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec)); - }; + /** + * Binds and returns a lamba function handler for the expire operation. + * @param connection The connection being watched. + * @return handler callback + */ + handler get_expire_handler(TcpConnection *const connection) + { + const auto fn = boost::bind(&Watcher::expire_handler, + this, + boost::placeholders::_1, + PTR_FROM_THIS(Watcher), + connection); + return boost::bind(get_dispatch_wrapper(fn), boost::placeholders::_1, 0); } /** * Handler method that is called by boost's io_context when the socket pumps a read event. * @param ec The status of the callback. - * @param bytes_transferred The number of bytes transferred. * @param awpWatcher A weak pointer to this object. * @param connection The connection being watched. * @param fd The file descriptor being watched. * @note The handler will get called if a read is cancelled. */ void read_handler(const boost::system::error_code &ec, - const std::size_t bytes_transferred, const std::weak_ptr awpWatcher, TcpConnection *const connection, const int fd) @@ -227,6 +234,14 @@ class LibBoostAsioHandler : public virtual TcpHandler if ((!ec || ec == boost::asio::error::would_block) && _read) { + // if the server is readable, we have some extra time before it expires, the expire time + // is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent + if (_expire_timeout > 0) + { + _expire_timer.expires_from_now(boost::posix_time::seconds(_expire_timeout)); + _expire_timer.async_wait(get_expire_handler(connection)); + } + connection->process(fd, AMQP::readable); _read_pending = true; @@ -240,14 +255,12 @@ class LibBoostAsioHandler : public virtual TcpHandler /** * Handler method that is called by boost's io_context when the socket pumps a write event. * @param ec The status of the callback. - * @param bytes_transferred The number of bytes transferred. * @param awpWatcher A weak pointer to this object. * @param connection The connection being watched. * @param fd The file descriptor being watched. * @note The handler will get called if a write is cancelled. */ void write_handler(const boost::system::error_code ec, - const std::size_t bytes_transferred, const std::weak_ptr awpWatcher, TcpConnection *const connection, const int fd) @@ -274,14 +287,13 @@ class LibBoostAsioHandler : public virtual TcpHandler /** * Callback method that is called by libev when the timer expires * @param ec error code returned from loop - * @param loop The loop in which the event was triggered + * @param awpWatcher A weak pointer to this object. * @param connection * @param timeout */ - void timeout_handler(const boost::system::error_code &ec, - std::weak_ptr awpThis, - TcpConnection *const connection, - const uint16_t timeout) + void heartbeat_handler(const boost::system::error_code &ec, + std::weak_ptr awpThis, + TcpConnection *const connection) { // Resolve any potential problems with dangling pointers // (remember we are using async). @@ -296,11 +308,33 @@ class LibBoostAsioHandler : public virtual TcpHandler connection->heartbeat(); } - // Reschedule the timer for the future: - _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); + // Reschedule the timer and register handler + _heartbeat_timer.expires_from_now(boost::posix_time::seconds(_heartbeat_interval)); + _heartbeat_timer.async_wait(get_heartbeat_handler(connection)); + } + } - // Posts the timer event - _timer.async_wait(get_timer_handler(connection, timeout)); + /** + * Callback method that is called by libev when the timer expires + * @param ec error code returned from loop + * @param awpWatcher A weak pointer to this object. + * @param connection + */ + void expire_handler(const boost::system::error_code &ec, + std::weak_ptr awpThis, + TcpConnection *const connection) + { + // Resolve any potential problems with dangling pointers + // (remember we are using async). + const std::shared_ptr apTimer = awpThis.lock(); + if (!apTimer) { return; } + if (!ec) + { + if (connection) + { + // this is a connection timeout, close the connection from our side too + connection->close(true); + } } } @@ -315,10 +349,10 @@ class LibBoostAsioHandler : public virtual TcpHandler Watcher(boost::asio::io_context &io_context, const strand_weak_ptr wpstrand, const int fd) : - _iocontext(io_context), _wpstrand(wpstrand), _socket(io_context), - _timer(io_context) + _heartbeat_timer(io_context), + _expire_timer(io_context) { _socket.assign(fd); @@ -341,7 +375,7 @@ class LibBoostAsioHandler : public virtual TcpHandler _read = false; _write = false; _socket.release(); - stop_timer(); + stop_timers(); } /** @@ -378,29 +412,41 @@ class LibBoostAsioHandler : public virtual TcpHandler } /** - * Change the expire time + * Change the expire and heartbeat timers * @param connection * @param timeout */ - void set_timer(TcpConnection *connection, uint16_t timeout) + void set_timers(TcpConnection *connection, uint16_t heartbeat_timeout) { // stop timer in case it was already set - stop_timer(); + stop_timers(); + + // timeout not applicable + if (heartbeat_timeout == 0) { return; } - // Reschedule the timer for the future: - _timer.expires_from_now(boost::posix_time::seconds(timeout)); + // save new interval + _heartbeat_interval = heartbeat_timeout * 0.5; - // Posts the timer event - _timer.async_wait(get_timer_handler(connection, timeout)); + // Reschedule the timer and register handler + _heartbeat_timer.expires_from_now(boost::posix_time::seconds(_heartbeat_interval)); + _heartbeat_timer.async_wait(get_heartbeat_handler(connection)); + + // save new timeout - increase the agreed timeout to be a bit permissive + _expire_timeout = heartbeat_timeout * 1.5; + + // Reschedule the timer and register handler + _expire_timer.expires_from_now(boost::posix_time::seconds(_expire_timeout)); + _expire_timer.async_wait(get_expire_handler(connection)); } /** * Stop the timer */ - void stop_timer() + void stop_timers() { // do nothing if it was never set - _timer.cancel(); + _heartbeat_timer.cancel(); + _expire_timer.cancel(); } }; @@ -446,7 +492,6 @@ class LibBoostAsioHandler : public virtual TcpHandler // construct a new pair (watcher/timer), and put it in the map const std::shared_ptr apWatcher = std::make_shared(_iocontext, _strand, fd); - _watchers[fd] = apWatcher; // explicitly set the events to monitor @@ -468,13 +513,13 @@ class LibBoostAsioHandler : public virtual TcpHandler /** * Method that is called when the heartbeat frequency is negotiated between the server and the client. * @param connection The connection that suggested a heartbeat interval - * @param interval The suggested interval from the server - * @return uint16_t The interval to use + * @param timeout The suggested timeout from the server + * @return uint16_t The timeout to use */ - virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override + virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t timeout) override { // skip if no heartbeats are needed - if (interval == 0) return 0; + if (timeout == 0) return 0; const auto fd = connection->fileno(); @@ -482,10 +527,10 @@ class LibBoostAsioHandler : public virtual TcpHandler if (iter == _watchers.end()) return 0; // set the timer - iter->second->set_timer(connection, interval); + iter->second->set_timers(connection, timeout); - // we agree with the interval - return interval; + // we agree with the timeout + return timeout; } public: @@ -504,7 +549,6 @@ class LibBoostAsioHandler : public virtual TcpHandler explicit LibBoostAsioHandler(boost::asio::io_context &io_context) : _iocontext(io_context), _strand(std::make_shared(_iocontext)) - //_timer(std::make_shared(_iocontext,_strand)) { }