diff --git a/src/org/jgroups/protocols/NoBundler.java b/src/org/jgroups/protocols/NoBundler.java index b0701dad81..8f8bd6de79 100644 --- a/src/org/jgroups/protocols/NoBundler.java +++ b/src/org/jgroups/protocols/NoBundler.java @@ -1,16 +1,6 @@ package org.jgroups.protocols; -import org.jgroups.Address; import org.jgroups.Message; -import org.jgroups.PhysicalAddress; -import org.jgroups.logging.Log; -import org.jgroups.stack.MessageProcessingPolicy; -import org.jgroups.util.ByteArrayDataOutputStream; -import org.jgroups.util.Util; - -import java.util.Objects; - -import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK; /** * Bundler which doesn't bundle :-) Can be used to measure the diff between bundling and non-bundling (e.g. at runtime) @@ -18,61 +8,24 @@ * @author Bela Ban * @since 4.0 */ -public class NoBundler implements Bundler { - protected TP transport; - protected Log log; - protected MessageProcessingPolicy msg_processing_policy; - protected MsgStats msg_stats; - - public int size() {return 0;} - public int getMaxSize() {return 64000;} - - public void init(TP transport) { - this.transport=Objects.requireNonNull(transport); - msg_processing_policy=transport.msgProcessingPolicy(); - msg_stats=transport.getMessageStats(); - log=transport.getLog(); - } +public class NoBundler extends BaseBundler { + public int size() {return 0;} public int getQueueSize() { return -1; } - public void start() {} - public void stop() {} - + @Override public void send(Message msg) throws Exception { - Address dst=msg.dest(); - if(dst == null) { // multicast - sendSingleMessage(msg, null); - if(!msg.isFlagSet(DONT_LOOPBACK)) { - msg_stats.received(msg); - msg_processing_policy.loopback(msg, msg.isFlagSet(Message.Flag.OOB)); - } + lock.lock(); + try { + output.position(0); + sendSingle(msg.dest(), msg); } - else { // unicast - boolean loopback=Objects.equals(transport.getAddress(), dst) - || dst instanceof PhysicalAddress && dst.equals(transport.localPhysicalAddress()); - if(loopback) { - if(!msg.isFlagSet(DONT_LOOPBACK)) { - msg_stats.received(msg); - msg_processing_policy.loopback(msg, msg.isFlagSet(Message.Flag.OOB)); - } - } - else { - sendSingleMessage(msg, null); - } + finally { + lock.unlock(); } } - protected void sendSingleMessage(final Message msg, ByteArrayDataOutputStream output) throws Exception { - if(output == null) - output=new ByteArrayDataOutputStream(msg.size() + 10); - Address dest=msg.getDest(); - output.position(0); - Util.writeMessage(msg, output, dest == null); - transport.doSend(output.buffer(), 0, output.position(), dest); - transport.getMessageStats().sent(msg); - } } \ No newline at end of file