Skip to content

Commit

Permalink
perf: Turn DoOneResult into one uint8_t. Pass grpc_context.multithrea…
Browse files Browse the repository at this point in the history
…ded as extra parameter within run()
  • Loading branch information
Tradias committed Sep 4, 2024
1 parent d51dacd commit 63dcd30
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 31 deletions.
5 changes: 4 additions & 1 deletion src/agrpc/detail/allocate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class AllocationGuard
using Pointer = typename Traits::pointer;

public:
AllocationGuard(Pointer&& ptr, const Allocator& allocator) noexcept : ptr_(ptr), allocator_(allocator) {}
AllocationGuard(Pointer&& ptr, const Allocator& allocator) noexcept
: ptr_(static_cast<Pointer&&>(ptr)), allocator_(allocator)
{
}

AllocationGuard(ValueType& value, const Allocator& allocator) noexcept
: ptr_(std::pointer_traits<Pointer>::pointer_to(value)), allocator_(allocator)
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/detail/grpc_context_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct GrpcContextCompletionQueueLoopFunction

[[nodiscard]] bool has_processed(detail::DoOneResult result) const noexcept
{
return result.handled_completion_queue_event();
return result.handled_event() && !result.check_remote_work();
}
};

Expand Down
29 changes: 18 additions & 11 deletions src/agrpc/detail/grpc_context_implementation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ struct GrpcContextThreadContext
{
explicit GrpcContextThreadContext(agrpc::GrpcContext& grpc_context);

GrpcContextThreadContext(agrpc::GrpcContext& grpc_context, bool multithreaded);

~GrpcContextThreadContext() noexcept;

GrpcContextThreadContext(const GrpcContextThreadContext& other) = delete;
GrpcContextThreadContext(GrpcContextThreadContext&& other) = delete;
GrpcContextThreadContext& operator=(const GrpcContextThreadContext& other) = delete;
GrpcContextThreadContext& operator=(GrpcContextThreadContext&& other) = delete;

const bool multithreaded_;
bool check_remote_work_;
agrpc::GrpcContext& grpc_context_;
detail::IntrusiveQueue<detail::QueueableOperationBase> local_work_queue_;
bool check_remote_work_;
GrpcContextThreadContext* old_context_;
detail::ListablePoolResource& resource_;

Expand All @@ -90,25 +93,29 @@ enum class InvokeHandler

struct CompletionQueueEventResult
{
bool handled_completion_queue_event_{};
bool check_remote_work_{};
static constexpr uint8_t CHECK_REMOTE_WORK = 1 << 0;
static constexpr uint8_t HANDLED_EVENT = 1 << 1;

explicit operator bool() const noexcept { return handled_completion_queue_event_; }
uint8_t flags_{};

[[nodiscard]] bool handled_completion_queue_event() const noexcept
{
return handled_completion_queue_event_ && !check_remote_work_;
}
[[nodiscard]] bool handled_event() const noexcept { return (flags_ & HANDLED_EVENT) != 0; }

[[nodiscard]] bool check_remote_work() const noexcept { return (flags_ & CHECK_REMOTE_WORK) != 0; }
};

struct DoOneResult : CompletionQueueEventResult
{
bool processed_local_work_{};
static constexpr uint8_t PROCESSED_LOCAL_WORK = HANDLED_EVENT << 1;

explicit operator bool() const noexcept
static DoOneResult create(CompletionQueueEventResult handled_event, bool processed_local_work) noexcept
{
return processed_local_work_ || CompletionQueueEventResult::operator bool();
return {static_cast<uint8_t>(handled_event.flags_ |

Check failure on line 112 in src/agrpc/detail/grpc_context_implementation.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 112 in src/agrpc/detail/grpc_context_implementation.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 112 in src/agrpc/detail/grpc_context_implementation.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 112 in src/agrpc/detail/grpc_context_implementation.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 112 in src/agrpc/detail/grpc_context_implementation.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]
(processed_local_work ? DoOneResult::PROCESSED_LOCAL_WORK : uint8_t{}))};
}

explicit operator bool() const noexcept { return processed_local_work() || handled_event(); }

[[nodiscard]] bool processed_local_work() const noexcept { return (flags_ & PROCESSED_LOCAL_WORK) != 0; }
};

struct GrpcContextImplementation
Expand Down
37 changes: 21 additions & 16 deletions src/agrpc/detail/grpc_context_implementation_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ namespace detail
inline thread_local detail::GrpcContextThreadContext* thread_local_grpc_context{};

inline GrpcContextThreadContext::GrpcContextThreadContext(agrpc::GrpcContext& grpc_context)
: grpc_context_(grpc_context),
local_work_queue_{grpc_context.multithreaded_ ? decltype(local_work_queue_){}
: std::move(grpc_context.local_work_queue_)},
check_remote_work_{grpc_context.multithreaded_ ? false : grpc_context.local_check_remote_work_},
: GrpcContextThreadContext(grpc_context, grpc_context.multithreaded_)
{
}

inline GrpcContextThreadContext::GrpcContextThreadContext(agrpc::GrpcContext& grpc_context, bool multithreaded)
: multithreaded_{multithreaded},
check_remote_work_{multithreaded_ ? false : grpc_context.local_check_remote_work_},
grpc_context_(grpc_context),
local_work_queue_{multithreaded_ ? decltype(local_work_queue_){} : std::move(grpc_context.local_work_queue_)},
old_context_{std::exchange(detail::thread_local_grpc_context, this)},
resource_{(old_context_ && &old_context_->grpc_context_ == &grpc_context)
? old_context_->resource_
Expand All @@ -46,7 +51,7 @@ inline GrpcContextThreadContext::GrpcContextThreadContext(agrpc::GrpcContext& gr

inline GrpcContextThreadContext::~GrpcContextThreadContext() noexcept
{
if (grpc_context_.multithreaded_)
if (multithreaded_)
{
const bool moved_work = GrpcContextImplementation::move_local_queue_to_remote_work(*this);
if (moved_work || check_remote_work_ ||
Expand Down Expand Up @@ -201,16 +206,16 @@ inline CompletionQueueEventResult GrpcContextImplementation::handle_next_complet
if (GrpcContextImplementation::CHECK_REMOTE_WORK_TAG == event.tag_)
{
context.check_remote_work_ = true;
return CompletionQueueEventResult{true, true};
return {CompletionQueueEventResult::CHECK_REMOTE_WORK | CompletionQueueEventResult::HANDLED_EVENT};
}
const auto result =
detail::InvokeHandler::NO_ == invoke
? (event.ok_ ? detail::OperationResult::SHUTDOWN_OK : detail::OperationResult::SHUTDOWN_NOT_OK)
: (event.ok_ ? detail::OperationResult::OK_ : detail::OperationResult::NOT_OK);
detail::process_grpc_tag(event.tag_, result, grpc_context);
return CompletionQueueEventResult{true};
return {CompletionQueueEventResult::HANDLED_EVENT};
}
return CompletionQueueEventResult{false};
return {};
}

template <class Queue>
Expand All @@ -235,7 +240,7 @@ inline DoOneResult GrpcContextImplementation::do_one(detail::GrpcContextThreadCo
bool check_remote_work = context.check_remote_work_;
if (check_remote_work)
{
if (grpc_context.multithreaded_)
if (context.multithreaded_)
{
auto local_queue{std::move(context.local_work_queue_)};
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
Expand All @@ -261,19 +266,19 @@ inline DoOneResult GrpcContextImplementation::do_one(detail::GrpcContextThreadCo
const bool is_more_completed_work_pending = check_remote_work || !context.local_work_queue_.empty();
if (!is_more_completed_work_pending && grpc_context.is_stopped())
{
return {{}, processed_local_work};
return {DoOneResult::PROCESSED_LOCAL_WORK};

Check failure on line 269 in src/agrpc/detail/grpc_context_implementation_definition.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 269 in src/agrpc/detail/grpc_context_implementation_definition.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 269 in src/agrpc/detail/grpc_context_implementation_definition.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 269 in src/agrpc/detail/grpc_context_implementation_definition.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]

Check failure on line 269 in src/agrpc/detail/grpc_context_implementation_definition.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Clang

suggest braces around initialization of subobject [-Werror,-Wmissing-braces]
}
const auto handled_event = GrpcContextImplementation::handle_next_completion_queue_event(
context, is_more_completed_work_pending ? GrpcContextImplementation::TIME_ZERO : deadline, invoke);
return {handled_event, processed_local_work};
return DoOneResult::create(handled_event, processed_local_work);
}

inline DoOneResult GrpcContextImplementation::do_one_if_not_stopped(detail::GrpcContextThreadContext& context,
::gpr_timespec deadline)
{
if (context.grpc_context_.is_stopped())
{
return {{}, false};
return {};
}
return {GrpcContextImplementation::do_one(context, deadline, detail::InvokeHandler::YES_)};
}
Expand All @@ -290,7 +295,7 @@ inline DoOneResult GrpcContextImplementation::do_one_completion_queue_if_not_sto
{
if (context.grpc_context_.is_stopped())
{
return {{}, false};
return {};
}
return {
GrpcContextImplementation::handle_next_completion_queue_event(context, deadline, detail::InvokeHandler::YES_)};
Expand Down Expand Up @@ -325,13 +330,13 @@ inline bool GrpcContextImplementation::process_work(agrpc::GrpcContext& grpc_con

inline void GrpcContextImplementation::drain_completion_queue(agrpc::GrpcContext& grpc_context) noexcept
{
grpc_context.multithreaded_ = false;
detail::GrpcContextThreadContext thread_context{grpc_context};
detail::GrpcContextThreadContext thread_context{grpc_context, false};
(void)grpc_context.remote_work_queue_.try_mark_active();
GrpcContextImplementation::move_remote_work_to_local_queue(thread_context);
GrpcContextImplementation::process_local_queue(thread_context, detail::InvokeHandler::NO_);
while (GrpcContextImplementation::handle_next_completion_queue_event(
thread_context, detail::GrpcContextImplementation::INFINITE_FUTURE, detail::InvokeHandler::NO_))
thread_context, detail::GrpcContextImplementation::INFINITE_FUTURE, detail::InvokeHandler::NO_)
.handled_event())
{
//
}
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/detail/stop_callback_lifetime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct CancellationSlotToken
{
if (slot.is_connected())
{
slot.template emplace<StopFunction>(static_cast<Args&&>(args)...);
static_cast<CancellationSlot&&>(slot).template emplace<StopFunction>(static_cast<Args&&>(args)...);
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/grpc_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class GrpcContext
std::atomic_bool stopped_{false};
std::atomic_bool shutdown_{false};
bool local_check_remote_work_{false};
bool multithreaded_{false};
const bool multithreaded_{false};
LocalWorkQueue local_work_queue_{};
std::unique_ptr<grpc::CompletionQueue> completion_queue_;
RemoteWorkQueue remote_work_queue_{false};
Expand Down

0 comments on commit 63dcd30

Please sign in to comment.