Skip to content

Commit

Permalink
Created new MessageProcessingPolicy 'UnbatchOOBBatches' (https://issu…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 14, 2024
1 parent 579fa1e commit 6729964
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 8 deletions.
11 changes: 8 additions & 3 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,14 @@ public void setMessageProcessingPolicy(String policy) {
if(policy == null)
return;

msg_processing_policy=policy.startsWith("submit")? new SubmitToThreadPool() :
policy.startsWith("max")? new MaxOneThreadPerSender() :
policy.startsWith("direct")? new PassRegularMessagesUpDirectly() : null;
if(policy.startsWith("submit"))
msg_processing_policy=new SubmitToThreadPool();
else if(policy.startsWith("max"))
msg_processing_policy=new MaxOneThreadPerSender();
else if(policy.startsWith("direct"))
msg_processing_policy=new PassRegularMessagesUpDirectly();
else if(policy.startsWith("unbatch"))
msg_processing_policy=new UnbatchOOBBatches();
try {
if(msg_processing_policy == null) {
Class<MessageProcessingPolicy> clazz=(Class<MessageProcessingPolicy>)Util.loadClass(policy, getClass());
Expand Down
24 changes: 24 additions & 0 deletions src/org/jgroups/util/UnbatchOOBBatches.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.jgroups.util;

import org.jgroups.Message;

/**
* Same as {@link MaxOneThreadPerSender}, but for OOB message batches, every message of the batch is passed to the
* thread pool separately (https://issues.redhat.com/browse/JGRP-2800).
* @author Bela Ban
* @since 5.4, 5.3.7
*/
public class UnbatchOOBBatches extends MaxOneThreadPerSender {

@Override
public boolean process(MessageBatch batch, boolean oob) {
if(!oob)
return super.process(batch, oob);
AsciiString tmp=batch.clusterName();
byte[] cname=tmp != null? tmp.chars() : null;
for(Message msg: batch)
tp.getThreadPool().execute(new SingleMessageHandlerWithClusterName(msg, cname));
batch.clear();
return true;
}
}
51 changes: 46 additions & 5 deletions tests/junit-functional/org/jgroups/protocols/UNBATCH_Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -37,9 +38,20 @@ public class UNBATCH_Test {

/** Tests that all unicasts sent by A to B are received as single messages by B */
public void testUnicastSingleMessages() throws Exception {
Address target=b.getAddress();
for(int i=1; i <= 100; i++)
a.send(target, i);
sendMessages(b.getAddress(), false);
Util.waitUntil(5000, 100, () -> rb.numMsgs() == 100, () -> print(b));
System.out.printf("msgs:\n%s\n", print(b));
assert rb.numSingleMsgs() == 100;
assert rb.numBatches() == 0;
}

/**
* Tests that no unicast OOB message batches are received with message processing policy being
* {@link org.jgroups.util.UnbatchOOBBatches}
*/
public void testUnicastBatchesWithUnbatchPolicy() throws Exception {
setUnbatchPolicy(a,b);
sendMessages(b.getAddress(), true);
Util.waitUntil(5000, 100, () -> rb.numMsgs() == 100, () -> print(b));
System.out.printf("msgs:\n%s\n", print(b));
assert rb.numSingleMsgs() == 100;
Expand All @@ -48,14 +60,35 @@ public void testUnicastSingleMessages() throws Exception {

/** Tests that all multicasts sent by A to B are received as single messages by A and B */
public void testMulticastSingleMessages() throws Exception {
for(int i=1; i <= 100; i++)
a.send(null, i);
sendMessages(null, false);
Util.waitUntil(5000, 100, () -> ra.numMsgs() == 100 && rb.numMsgs() == 100, () -> print(a,b));
System.out.printf("msgs:\n%s\n", print(a,b));
assert ra.numSingleMsgs() == 100 && rb.numSingleMsgs() == 100;
assert ra.numBatches() == 0 && rb.numBatches() == 0;
}


public void testMulticastBatchesWithUnbatchPolicy() throws Exception {
setUnbatchPolicy(a,b);
sendMessages(null, true);
Util.waitUntil(500000, 100, () -> ra.numMsgs() == 100 && rb.numMsgs() == 100, () -> print(a,b));
System.out.printf("msgs:\n%s\n", print(a,b));
assert rb.numBatches() == 0 && rb.numSingleMsgs() == 100;
// we *cannot* assert that A doesn't get looped-back batches:
// * NAKACK2.down(Message msg) adds msg to the table, then loops back if dest==sender
// * NAKACK2.handleMessage() delivers an OOB message, then calls removeAndDeliver(): when a message was added
// to the table, but not yet marked as OOB_DELIVERED, an OOB batch may be created
}

protected void sendMessages(Address target, boolean oob) throws Exception {
for(int i=1; i <= 100; i++) {
Message msg=new ObjectMessage(target, i);
if(oob)
msg.setFlag(Message.Flag.OOB);
a.send(msg);
}
}

protected static String print(JChannel... channels) {
return Stream.of(channels).map(ch -> String.format("%s: %s", ch.getAddress(), ch.getReceiver()))
.collect(Collectors.joining("\n"));
Expand All @@ -74,6 +107,14 @@ protected static JChannel create(String name) throws Exception {
return new JChannel(prots).name(name);
}

protected static void setUnbatchPolicy(JChannel ... channels) {
for(JChannel ch: channels) {
ProtocolStack stack=ch.stack();
stack.removeProtocol(UNBATCH.class);
stack.getTransport().setMessageProcessingPolicy("unbatch");
}
}

protected static class MyReceiver implements Receiver {
protected final LongAdder num_batches=new LongAdder();
protected final LongAdder num_single_msgs=new LongAdder();
Expand Down

0 comments on commit 6729964

Please sign in to comment.