Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22560 Processing messages to yourself in a thread pool #3976

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ default CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg)
* exist.
*
* @param recipient Recipient of the message.
* @param msg Message which should be delivered.
* @param channelType Channel which will be used to message transfer.
* @param msg Message which should be delivered.
* @return Future of the send operation.
*/
CompletableFuture<Void> send(ClusterNode recipient, ChannelType channelType, NetworkMessage msg);
Expand All @@ -97,7 +98,8 @@ default CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg)
* exist.
*
* @param recipientConsistentId Consistent ID of the recipient of the message.
* @param msg Message which should be delivered.
* @param channelType Channel which will be used to message transfer.
* @param msg Message which should be delivered.
* @return Future of the send operation.
*/
CompletableFuture<Void> send(String recipientConsistentId, ChannelType channelType, NetworkMessage msg);
Expand All @@ -119,8 +121,9 @@ default CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage ms
* Sends a response to a {@link #invoke} request.
* Guarantees are the same as for the {@link #send(ClusterNode, NetworkMessage)}.
*
* @param recipient Recipient of the message.
* @param msg Message which should be delivered.
* @param recipient Recipient of the message.
* @param channelType Channel which will be used to message transfer.
* @param msg Message which should be delivered.
* @param correlationId Correlation id when replying to the request.
* @return Future of the send operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ public CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, Ch
* Sends a message. If the target is the current node, then message will be delivered immediately.
*
* @param recipient Target cluster node.
* @param type Channel which will be used to message transfer.
* @param msg Message.
* @param correlationId Correlation id. Not null iff the message is a response to a {@link #invoke} request.
* @param correlationId Correlation id. Not {@code null} iff the message is a response to a {@link #invoke} request.
* @return Future of the send operation.
*/
private CompletableFuture<Void> send0(ClusterNode recipient, ChannelType type, NetworkMessage msg, @Nullable Long correlationId) {
Expand All @@ -237,9 +238,11 @@ private CompletableFuture<Void> send0(ClusterNode recipient, ChannelType type, N

if (recipientAddress == null) {
if (correlationId != null) {
onInvokeResponse(msg, correlationId);
// Choice of a thread pool was intentionally made to increase the system throughput.
Executor executor = chooseExecutorInInboundPoolToSendSelf(type, msg);
executor.execute(() -> onInvokeResponse(msg, correlationId));
} else {
sendToSelf(msg, null);
sendToSelf(msg, type, null);
}

return nullCompletedFuture();
Expand Down Expand Up @@ -284,7 +287,7 @@ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, Channel
InetSocketAddress recipientAddress = resolveRecipientAddress(recipient);

if (recipientAddress == null) {
sendToSelf(msg, correlationId);
sendToSelf(msg, type, correlationId);

return responseFuture;
}
Expand Down Expand Up @@ -346,17 +349,19 @@ private List<ClassDescriptorMessage> prepareMarshal(NetworkMessage msg) throws E
* Sends a message to the current node.
*
* @param message Message.
* @param correlationId Correlation id.
* @param channelType Channel which will be used to message transfer.
* @param correlationId Correlation ID. Not {@code null} iff the message is a response to a {@link #invoke} request.
*/
private void sendToSelf(NetworkMessage message, @Nullable Long correlationId) {
private void sendToSelf(NetworkMessage message, ChannelType channelType, @Nullable Long correlationId) {
List<HandlerContext> handlerContexts = getHandlerContexts(message.groupType());

// Specially made by a classic loop for optimization.
for (int i = 0; i < handlerContexts.size(); i++) {
HandlerContext handlerContext = handlerContexts.get(i);

// Invoking on the same thread, ignoring the executor chooser registered with the handler.
handlerContext.handler().onReceived(message, topologyService.localMember(), correlationId);
// Choice of a thread pool was intentionally made to increase the system throughput.
Executor executor = chooseExecutorInInboundPoolToSendSelf(channelType, message);
executor.execute(() -> handlerContext.handler().onReceived(message, topologyService.localMember(), correlationId));
}
}

Expand Down Expand Up @@ -705,4 +710,18 @@ protected void onStoppingInitiated() {
private static InetSocketAddress createResolved(NetworkAddress address) {
return new InetSocketAddress(address.host(), address.port());
}

private Executor chooseExecutorInInboundPoolToSendSelf(ChannelType channelType, NetworkMessage message) {
int stripeIndex = safeAbs(message.getClass().hashCode());

return inboundExecutors.executorFor(channelType.id(), stripeIndex);
}

private Executor chooseExecutorForSendSelf(NetworkMessage payload, ChannelType channelType, ExecutorChooser<NetworkMessage> chooser) {
if (wantsInboundPool(chooser)) {
return chooseExecutorInInboundPoolToSendSelf(channelType, payload);
} else {
return chooser.choose(payload);
}
}
}