Skip to content

Commit

Permalink
- NoBundler now extends BaseBundler
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Sep 24, 2024
1 parent 787f7a5 commit 110b40c
Showing 1 changed file with 9 additions and 56 deletions.
65 changes: 9 additions & 56 deletions src/org/jgroups/protocols/NoBundler.java
Original file line number Diff line number Diff line change
@@ -1,78 +1,31 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.PhysicalAddress;
import org.jgroups.logging.Log;
import org.jgroups.stack.MessageProcessingPolicy;
import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.Util;

import java.util.Objects;

import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;

/**
* Bundler which doesn't bundle :-) Can be used to measure the diff between bundling and non-bundling (e.g. at runtime)
* This bundler doesn't use a pool of buffers, but creates a new buffer every time a message is sent.
* @author Bela Ban
* @since 4.0
*/
public class NoBundler implements Bundler {
protected TP transport;
protected Log log;
protected MessageProcessingPolicy msg_processing_policy;
protected MsgStats msg_stats;

public int size() {return 0;}
public int getMaxSize() {return 64000;}

public void init(TP transport) {
this.transport=Objects.requireNonNull(transport);
msg_processing_policy=transport.msgProcessingPolicy();
msg_stats=transport.getMessageStats();
log=transport.getLog();
}
public class NoBundler extends BaseBundler {
public int size() {return 0;}

public int getQueueSize() {
return -1;
}

public void start() {}
public void stop() {}

@Override
public void send(Message msg) throws Exception {
Address dst=msg.dest();
if(dst == null) { // multicast
sendSingleMessage(msg, null);
if(!msg.isFlagSet(DONT_LOOPBACK)) {
msg_stats.received(msg);
msg_processing_policy.loopback(msg, msg.isFlagSet(Message.Flag.OOB));
}
lock.lock();
try {
output.position(0);
sendSingle(msg.dest(), msg);
}
else { // unicast
boolean loopback=Objects.equals(transport.getAddress(), dst)
|| dst instanceof PhysicalAddress && dst.equals(transport.localPhysicalAddress());
if(loopback) {
if(!msg.isFlagSet(DONT_LOOPBACK)) {
msg_stats.received(msg);
msg_processing_policy.loopback(msg, msg.isFlagSet(Message.Flag.OOB));
}
}
else {
sendSingleMessage(msg, null);
}
finally {
lock.unlock();
}
}

protected void sendSingleMessage(final Message msg, ByteArrayDataOutputStream output) throws Exception {
if(output == null)
output=new ByteArrayDataOutputStream(msg.size() + 10);
Address dest=msg.getDest();
output.position(0);
Util.writeMessage(msg, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
transport.getMessageStats().sent(msg);
}

}

0 comments on commit 110b40c

Please sign in to comment.