diff --git a/src/org/jgroups/protocols/BaseBundler.java b/src/org/jgroups/protocols/BaseBundler.java index 345f899935..004e74cae7 100644 --- a/src/org/jgroups/protocols/BaseBundler.java +++ b/src/org/jgroups/protocols/BaseBundler.java @@ -8,18 +8,20 @@ import org.jgroups.annotations.Property; import org.jgroups.conf.AttributeType; import org.jgroups.logging.Log; +import org.jgroups.stack.MessageProcessingPolicy; import org.jgroups.util.AverageMinMax; import org.jgroups.util.ByteArrayDataOutputStream; +import org.jgroups.util.MessageBatch; import org.jgroups.util.Util; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK; import static org.jgroups.protocols.TP.MSG_OVERHEAD; +import static org.jgroups.util.MessageBatch.Mode.OOB; +import static org.jgroups.util.MessageBatch.Mode.REG; /** * Implements storing of messages in a hashmap and sending of single messages and message batches. Most bundler @@ -31,6 +33,7 @@ public abstract class BaseBundler implements Bundler { /** Keys are destinations, values are lists of Messages */ protected final Map> msgs=new HashMap<>(24); protected TP transport; + protected MessageProcessingPolicy msg_processing_policy; protected final ReentrantLock lock=new ReentrantLock(); protected @GuardedBy("lock") long count; // current number of bytes accumulated protected ByteArrayDataOutputStream output; @@ -48,18 +51,23 @@ public abstract class BaseBundler implements Bundler { type=AttributeType.SCALAR) protected int capacity=16384; + @Property(description="Whether loopback messages (dest == src or dest == null) are processed") + protected boolean process_loopbacks=true; + @ManagedAttribute(description="Time (us) to send the bundled messages") protected final AverageMinMax avg_send_time=new AverageMinMax().unit(TimeUnit.NANOSECONDS); - - public int getCapacity() {return capacity;} - public Bundler setCapacity(int c) {this.capacity=c; return this;} - public int getMaxSize() {return max_size;} - public Bundler setMaxSize(int s) {max_size=s; return this;} + public int getCapacity() {return capacity;} + public Bundler setCapacity(int c) {this.capacity=c; return this;} + public int getMaxSize() {return max_size;} + public Bundler setMaxSize(int s) {max_size=s; return this;} + public boolean processLoopbacks() {return process_loopbacks;} + public Bundler processLoopbacks(boolean b) {process_loopbacks=b; return this;} public void init(TP transport) { this.transport=transport; + msg_processing_policy=transport.msgProcessingPolicy(); log=transport.getLog(); output=new ByteArrayDataOutputStream(max_size + MSG_OVERHEAD); } @@ -104,12 +112,22 @@ public int getQueueSize() { List list=entry.getValue(); if(list.isEmpty()) continue; + Address dst=entry.getKey(); + boolean loopback=(dst == null) || Objects.equals(transport.getAddress(), dst); output.position(0); - if(list.size() == 1) - sendSingleMessage(list.get(0)); + + // System.out.printf("-- sending %d msgs to %s\n", list.size(), dst); + + if(list.size() == 1) { + Message msg=list.get(0); + sendSingleMessage(msg); + if(process_loopbacks && loopback && !msg.isFlagSet(DONT_LOOPBACK) && transport.loopbackSeparateThread()) + transport.loopback(msg, msg.isFlagSet(Message.Flag.OOB)); + } else { - Address dst=entry.getKey(); sendMessageList(dst, list.get(0).getSrc(), list); + if(process_loopbacks && loopback && transport.loopbackSeparateThread()) + loopback(dst, transport.getAddress(), list); } list.clear(); } @@ -120,6 +138,30 @@ public int getQueueSize() { } } + protected void loopback(Address dest, Address sender, List list) { + + + + // TODO: reuse message batches, similar to ReliableMulticast.removeAndDeliver() + + // TODO: implement loopback in other Bundlers (not extending this one), too + + + MessageBatch oob=new MessageBatch(dest, sender, transport.getClusterNameAscii(), dest == null, OOB, list.size()); + MessageBatch reg=new MessageBatch(dest, sender, transport.getClusterNameAscii(), dest == null, REG, list.size()); + for(Message msg: list) { + if(msg.isFlagSet(DONT_LOOPBACK)) + continue; + if(msg.isFlagSet(Message.Flag.OOB)) + oob.add(msg); + else + reg.add(msg); + } + if(!reg.isEmpty()) + msg_processing_policy.loopback(reg, false); + if(!oob.isEmpty()) + msg_processing_policy.loopback(oob, true); + } protected void sendSingleMessage(final Message msg) { Address dest=msg.getDest(); diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index f4c6d4c19c..b7cfc7689f 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -456,8 +456,9 @@ public T enableBlockingTimerTasks(boolean flag) { protected TP() { } - public MsgStats getMessageStats() {return msg_stats;} - public RTT getRTT() {return rtt;} + public MsgStats getMessageStats() {return msg_stats;} + public MessageProcessingPolicy msgProcessingPolicy() {return msg_processing_policy;} + public RTT getRTT() {return rtt;} @Override public void enableStats(boolean flag) { @@ -1075,15 +1076,21 @@ public Object down(Message msg) { // Don't send if dest is local address. Instead, send it up the stack. If multicast message, loop back directly // to us (but still multicast). Once we receive this, we discard our own multicast message - boolean multicast=dest == null, do_send=multicast || !dest.equals(sender), + boolean multicast=dest == null, loop_back=(multicast || dest.equals(sender)) && !msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK); - if(dest instanceof PhysicalAddress && dest.equals(local_physical_addr)) { + if(dest instanceof PhysicalAddress && dest.equals(local_physical_addr)) loop_back=true; - do_send=false; + + if(loop_back && !loopback_separate_thread) { + final Message copy=loopback_copy? msg.copy(true, true) : msg; + if(is_trace) + log.trace("%s: looping back message %s, headers are %s", local_addr, copy, copy.printHeaders()); + msg_stats.received(msg); + passMessageUp(copy, null, false, multicast, false); } - if(loopback_separate_thread) { + /* if(loopback_separate_thread) { if(loop_back) loopback(msg, multicast); if(do_send) @@ -1094,7 +1101,8 @@ public Object down(Message msg) { _send(msg, dest); if(loop_back) loopback(msg, multicast); - } + }*/ + _send(msg, dest); return null; } @@ -1164,7 +1172,7 @@ protected void loopback(Message msg, final boolean multicast) { return; } // changed to fix https://issues.redhat.com/browse/JGRP-506 - msg_processing_policy.loopback(msg, msg.isFlagSet(Message.Flag.OOB)); + msg_processing_policy.loopback(copy, msg.isFlagSet(Message.Flag.OOB)); } protected void _send(Message msg, Address dest) { diff --git a/src/org/jgroups/stack/MessageProcessingPolicy.java b/src/org/jgroups/stack/MessageProcessingPolicy.java index f2de20045d..65ce850d31 100644 --- a/src/org/jgroups/stack/MessageProcessingPolicy.java +++ b/src/org/jgroups/stack/MessageProcessingPolicy.java @@ -33,6 +33,8 @@ default void destroy() {} */ boolean loopback(Message msg, boolean oob); + boolean loopback(MessageBatch batch, boolean oob); + /** * Process a message received from the transport * @param msg the message diff --git a/src/org/jgroups/util/MaxOneThreadPerSender.java b/src/org/jgroups/util/MaxOneThreadPerSender.java index dd1c7707eb..a401587cbf 100644 --- a/src/org/jgroups/util/MaxOneThreadPerSender.java +++ b/src/org/jgroups/util/MaxOneThreadPerSender.java @@ -267,12 +267,11 @@ public String toString() { } - public class BatchHandlerLoop extends BatchHandler { + protected class BatchHandlerLoop extends BatchHandler { protected final Entry entry; - protected final boolean loopback; - public BatchHandlerLoop(MessageBatch batch, Entry entry, boolean loopback) { - super(batch); + protected BatchHandlerLoop(MessageBatch batch, Entry entry, boolean loopback) { + super(batch, loopback); this.entry=entry; this.loopback=loopback; } @@ -289,9 +288,5 @@ public void run() { while(entry.workAvailable(this.batch)); // transfers msgs from entry.batch --> this.batch // worker termination: workAvailable() already set running=false } - - @Override protected void passBatchUp() { - tp.passBatchUp(batch, !loopback, !loopback); - } } } diff --git a/src/org/jgroups/util/PassRegularMessagesUpDirectly.java b/src/org/jgroups/util/PassRegularMessagesUpDirectly.java index 9927c6f904..783f2e149b 100644 --- a/src/org/jgroups/util/PassRegularMessagesUpDirectly.java +++ b/src/org/jgroups/util/PassRegularMessagesUpDirectly.java @@ -31,7 +31,7 @@ public boolean process(Message msg, boolean oob) { public boolean process(MessageBatch batch, boolean oob) { if(oob) return super.process(batch, oob); - BatchHandler bh=new BatchHandler(batch); + BatchHandler bh=new BatchHandler(batch, false); bh.run(); return true; } diff --git a/src/org/jgroups/util/SubmitToThreadPool.java b/src/org/jgroups/util/SubmitToThreadPool.java index c6f3a7c10e..09aef08198 100644 --- a/src/org/jgroups/util/SubmitToThreadPool.java +++ b/src/org/jgroups/util/SubmitToThreadPool.java @@ -31,6 +31,15 @@ public boolean loopback(Message msg, boolean oob) { return tp.getThreadPool().execute(new SingleLoopbackHandler(msg)); } + public boolean loopback(MessageBatch batch, boolean oob) { + if(oob) { + boolean removed=removeAndDispatchNonBundledMessages(batch); + if(removed && batch.isEmpty()) + return true; + } + return tp.getThreadPool().execute(new BatchHandler(batch, true)); + } + public boolean process(Message msg, boolean oob) { return tp.getThreadPool().execute(new SingleMessageHandler(msg)); } @@ -41,7 +50,7 @@ public boolean process(MessageBatch batch, boolean oob) { if(removed && batch.isEmpty()) return true; } - return tp.getThreadPool().execute(new BatchHandler(batch)); + return tp.getThreadPool().execute(new BatchHandler(batch, false)); } @@ -120,9 +129,11 @@ protected SingleMessageHandlerWithClusterName(Message msg, byte[] cluster_name) public class BatchHandler implements Runnable { protected MessageBatch batch; + protected boolean loopback; - public BatchHandler(final MessageBatch batch) { + public BatchHandler(final MessageBatch batch, boolean loopback) { this.batch=batch; + this.loopback=loopback; } public MessageBatch getBatch() {return batch;} @@ -134,7 +145,7 @@ public void run() { } protected void passBatchUp() { - tp.passBatchUp(batch, true, true); + tp.passBatchUp(batch, !loopback, !loopback); } }