Skip to content

Commit

Permalink
perf: Avoid going through the entire event loop twice when dequeuing …
Browse files Browse the repository at this point in the history
…remote work in less contented scenarios
  • Loading branch information
Tradias committed Sep 13, 2024
1 parent 7df30c3 commit 0910b89
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 31 deletions.
27 changes: 7 additions & 20 deletions src/agrpc/detail/atomic_intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,16 @@ class AtomicIntrusiveQueue
return old_value == inactive;
}

[[nodiscard]] bool try_mark_inactive() noexcept
{
void* const inactive = producer_inactive_value();
if (void* old_value = head_.load(std::memory_order_relaxed); old_value == nullptr)
{
return head_.compare_exchange_strong(old_value, inactive, std::memory_order_release,
std::memory_order_relaxed);
}
return false;
}

// Atomically either mark the producer as inactive if the queue was empty
// or dequeue pending items from the queue.
//
// Not valid to call if the producer is already marked as inactive.
[[nodiscard]] detail::IntrusiveQueue<Item> try_mark_inactive_or_dequeue_all() noexcept
[[nodiscard]] bool dequeue_all_and_try_mark_inactive(detail::IntrusiveQueue<Item>& output) noexcept
{
if (try_mark_inactive())
{
return {};
}
void* const old_value = head_.exchange(nullptr, std::memory_order_acquire);
return detail::IntrusiveQueue<Item>::make_reversed(static_cast<Item*>(old_value));
void* const inactive = producer_inactive_value();
void* expect_empty = nullptr;
const bool marked_inactive =
head_.compare_exchange_strong(expect_empty, inactive, std::memory_order_release, std::memory_order_relaxed);
output.append(detail::IntrusiveQueue<Item>::make_reversed(static_cast<Item*>(old_value)));
return marked_inactive;
}

private:
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/detail/grpc_context_implementation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ struct GrpcContextImplementation
static void push_resource(agrpc::GrpcContext& grpc_context, detail::ListablePoolResource& resource);

template <class Function>
static decltype(auto) visit_is_multithreaded(agrpc::GrpcContext& grpc_context, Function function);
static decltype(auto) visit_is_multithreaded(const agrpc::GrpcContext& grpc_context, Function function);
};

void process_grpc_tag(void* tag, detail::OperationResult result, agrpc::GrpcContext& grpc_context);
Expand Down
10 changes: 2 additions & 8 deletions src/agrpc/detail/grpc_context_implementation_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,7 @@ inline bool GrpcContextImplementation::move_remote_work_to_local_queue(
detail::GrpcContextThreadContext& context) noexcept
{
agrpc::GrpcContext& grpc_context = context.grpc_context_;
auto remote_work_queue = grpc_context.remote_work_queue_.try_mark_inactive_or_dequeue_all();
if (remote_work_queue.empty())
{
return false;
}
context.local_work_queue_.append(std::move(remote_work_queue));
return true;
return !grpc_context.remote_work_queue_.dequeue_all_and_try_mark_inactive(context.local_work_queue_);
}

inline bool GrpcContextImplementation::distribute_all_local_work_to_other_threads_but_one(
Expand Down Expand Up @@ -349,7 +343,7 @@ inline void GrpcContextImplementation::push_resource(agrpc::GrpcContext& grpc_co
}

template <class Function>
inline decltype(auto) GrpcContextImplementation::visit_is_multithreaded(agrpc::GrpcContext& grpc_context,
inline decltype(auto) GrpcContextImplementation::visit_is_multithreaded(const agrpc::GrpcContext& grpc_context,
Function function)
{
if (grpc_context.multithreaded_)
Expand Down
2 changes: 0 additions & 2 deletions src/agrpc/detail/intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ class IntrusiveQueue

[[nodiscard]] bool empty() const noexcept { return head_ == nullptr; }

[[nodiscard]] bool has_exactly_one_element() const noexcept { return !empty() && head_ == tail_; }

[[nodiscard]] Item* pop_front() noexcept
{
Item* item = std::exchange(head_, head_->next_);
Expand Down

0 comments on commit 0910b89

Please sign in to comment.