Skip to content

Commit

Permalink
- First shot at implementing loopback handling in the bundler (https:…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Sep 6, 2024
1 parent 43db691 commit 84ab0f2
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 32 deletions.
66 changes: 54 additions & 12 deletions src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.logging.Log;
import org.jgroups.stack.MessageProcessingPolicy;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
import static org.jgroups.protocols.TP.MSG_OVERHEAD;
import static org.jgroups.util.MessageBatch.Mode.OOB;
import static org.jgroups.util.MessageBatch.Mode.REG;

/**
* Implements storing of messages in a hashmap and sending of single messages and message batches. Most bundler
Expand All @@ -31,6 +33,7 @@ public abstract class BaseBundler implements Bundler {
/** Keys are destinations, values are lists of Messages */
protected final Map<Address,List<Message>> msgs=new HashMap<>(24);
protected TP transport;
protected MessageProcessingPolicy msg_processing_policy;
protected final ReentrantLock lock=new ReentrantLock();
protected @GuardedBy("lock") long count; // current number of bytes accumulated
protected ByteArrayDataOutputStream output;
Expand All @@ -48,18 +51,23 @@ public abstract class BaseBundler implements Bundler {
type=AttributeType.SCALAR)
protected int capacity=16384;

@Property(description="Whether loopback messages (dest == src or dest == null) are processed")
protected boolean process_loopbacks=true;

@ManagedAttribute(description="Time (us) to send the bundled messages")
protected final AverageMinMax avg_send_time=new AverageMinMax().unit(TimeUnit.NANOSECONDS);



public int getCapacity() {return capacity;}
public Bundler setCapacity(int c) {this.capacity=c; return this;}
public int getMaxSize() {return max_size;}
public Bundler setMaxSize(int s) {max_size=s; return this;}
public int getCapacity() {return capacity;}
public Bundler setCapacity(int c) {this.capacity=c; return this;}
public int getMaxSize() {return max_size;}
public Bundler setMaxSize(int s) {max_size=s; return this;}
public boolean processLoopbacks() {return process_loopbacks;}
public Bundler processLoopbacks(boolean b) {process_loopbacks=b; return this;}

public void init(TP transport) {
this.transport=transport;
msg_processing_policy=transport.msgProcessingPolicy();
log=transport.getLog();
output=new ByteArrayDataOutputStream(max_size + MSG_OVERHEAD);
}
Expand Down Expand Up @@ -104,12 +112,22 @@ public int getQueueSize() {
List<Message> list=entry.getValue();
if(list.isEmpty())
continue;
Address dst=entry.getKey();
boolean loopback=(dst == null) || Objects.equals(transport.getAddress(), dst);
output.position(0);
if(list.size() == 1)
sendSingleMessage(list.get(0));

// System.out.printf("-- sending %d msgs to %s\n", list.size(), dst);

if(list.size() == 1) {
Message msg=list.get(0);
sendSingleMessage(msg);
if(process_loopbacks && loopback && !msg.isFlagSet(DONT_LOOPBACK) && transport.loopbackSeparateThread())
transport.loopback(msg, msg.isFlagSet(Message.Flag.OOB));
}
else {
Address dst=entry.getKey();
sendMessageList(dst, list.get(0).getSrc(), list);
if(process_loopbacks && loopback && transport.loopbackSeparateThread())
loopback(dst, transport.getAddress(), list);
}
list.clear();
}
Expand All @@ -120,6 +138,30 @@ public int getQueueSize() {
}
}

protected void loopback(Address dest, Address sender, List<Message> list) {



// TODO: reuse message batches, similar to ReliableMulticast.removeAndDeliver()

// TODO: implement loopback in other Bundlers (not extending this one), too


MessageBatch oob=new MessageBatch(dest, sender, transport.getClusterNameAscii(), dest == null, OOB, list.size());
MessageBatch reg=new MessageBatch(dest, sender, transport.getClusterNameAscii(), dest == null, REG, list.size());
for(Message msg: list) {
if(msg.isFlagSet(DONT_LOOPBACK))
continue;
if(msg.isFlagSet(Message.Flag.OOB))
oob.add(msg);
else
reg.add(msg);
}
if(!reg.isEmpty())
msg_processing_policy.loopback(reg, false);
if(!oob.isEmpty())
msg_processing_policy.loopback(oob, true);
}

protected void sendSingleMessage(final Message msg) {
Address dest=msg.getDest();
Expand Down
24 changes: 16 additions & 8 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,9 @@ public <T extends TP> T enableBlockingTimerTasks(boolean flag) {
protected TP() {
}

public MsgStats getMessageStats() {return msg_stats;}
public RTT getRTT() {return rtt;}
public MsgStats getMessageStats() {return msg_stats;}
public MessageProcessingPolicy msgProcessingPolicy() {return msg_processing_policy;}
public RTT getRTT() {return rtt;}

@Override
public void enableStats(boolean flag) {
Expand Down Expand Up @@ -1075,15 +1076,21 @@ public Object down(Message msg) {

// Don't send if dest is local address. Instead, send it up the stack. If multicast message, loop back directly
// to us (but still multicast). Once we receive this, we discard our own multicast message
boolean multicast=dest == null, do_send=multicast || !dest.equals(sender),
boolean multicast=dest == null,
loop_back=(multicast || dest.equals(sender)) && !msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK);

if(dest instanceof PhysicalAddress && dest.equals(local_physical_addr)) {
if(dest instanceof PhysicalAddress && dest.equals(local_physical_addr))
loop_back=true;
do_send=false;

if(loop_back && !loopback_separate_thread) {
final Message copy=loopback_copy? msg.copy(true, true) : msg;
if(is_trace)
log.trace("%s: looping back message %s, headers are %s", local_addr, copy, copy.printHeaders());
msg_stats.received(msg);
passMessageUp(copy, null, false, multicast, false);
}

if(loopback_separate_thread) {
/* if(loopback_separate_thread) {
if(loop_back)
loopback(msg, multicast);
if(do_send)
Expand All @@ -1094,7 +1101,8 @@ public Object down(Message msg) {
_send(msg, dest);
if(loop_back)
loopback(msg, multicast);
}
}*/
_send(msg, dest);
return null;
}

Expand Down Expand Up @@ -1164,7 +1172,7 @@ protected void loopback(Message msg, final boolean multicast) {
return;
}
// changed to fix https://issues.redhat.com/browse/JGRP-506
msg_processing_policy.loopback(msg, msg.isFlagSet(Message.Flag.OOB));
msg_processing_policy.loopback(copy, msg.isFlagSet(Message.Flag.OOB));
}

protected void _send(Message msg, Address dest) {
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/stack/MessageProcessingPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ default void destroy() {}
*/
boolean loopback(Message msg, boolean oob);

boolean loopback(MessageBatch batch, boolean oob);

/**
* Process a message received from the transport
* @param msg the message
Expand Down
11 changes: 3 additions & 8 deletions src/org/jgroups/util/MaxOneThreadPerSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,11 @@ public String toString() {
}


public class BatchHandlerLoop extends BatchHandler {
protected class BatchHandlerLoop extends BatchHandler {
protected final Entry entry;
protected final boolean loopback;

public BatchHandlerLoop(MessageBatch batch, Entry entry, boolean loopback) {
super(batch);
protected BatchHandlerLoop(MessageBatch batch, Entry entry, boolean loopback) {
super(batch, loopback);
this.entry=entry;
this.loopback=loopback;
}
Expand All @@ -289,9 +288,5 @@ public void run() {
while(entry.workAvailable(this.batch)); // transfers msgs from entry.batch --> this.batch
// worker termination: workAvailable() already set running=false
}

@Override protected void passBatchUp() {
tp.passBatchUp(batch, !loopback, !loopback);
}
}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/util/PassRegularMessagesUpDirectly.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public boolean process(Message msg, boolean oob) {
public boolean process(MessageBatch batch, boolean oob) {
if(oob)
return super.process(batch, oob);
BatchHandler bh=new BatchHandler(batch);
BatchHandler bh=new BatchHandler(batch, false);
bh.run();
return true;
}
Expand Down
17 changes: 14 additions & 3 deletions src/org/jgroups/util/SubmitToThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public boolean loopback(Message msg, boolean oob) {
return tp.getThreadPool().execute(new SingleLoopbackHandler(msg));
}

public boolean loopback(MessageBatch batch, boolean oob) {
if(oob) {
boolean removed=removeAndDispatchNonBundledMessages(batch);
if(removed && batch.isEmpty())
return true;
}
return tp.getThreadPool().execute(new BatchHandler(batch, true));
}

public boolean process(Message msg, boolean oob) {
return tp.getThreadPool().execute(new SingleMessageHandler(msg));
}
Expand All @@ -41,7 +50,7 @@ public boolean process(MessageBatch batch, boolean oob) {
if(removed && batch.isEmpty())
return true;
}
return tp.getThreadPool().execute(new BatchHandler(batch));
return tp.getThreadPool().execute(new BatchHandler(batch, false));
}


Expand Down Expand Up @@ -120,9 +129,11 @@ protected SingleMessageHandlerWithClusterName(Message msg, byte[] cluster_name)

public class BatchHandler implements Runnable {
protected MessageBatch batch;
protected boolean loopback;

public BatchHandler(final MessageBatch batch) {
public BatchHandler(final MessageBatch batch, boolean loopback) {
this.batch=batch;
this.loopback=loopback;
}

public MessageBatch getBatch() {return batch;}
Expand All @@ -134,7 +145,7 @@ public void run() {
}

protected void passBatchUp() {
tp.passBatchUp(batch, true, true);
tp.passBatchUp(batch, !loopback, !loopback);
}
}

Expand Down

0 comments on commit 84ab0f2

Please sign in to comment.