Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Added percentiles
- Moved RTT from protocol to component in TP

- Added Message.putHeaderIfAbsent

- Added flag,index to TpHeader
- On reception of a message/batch, if TpHeader has a flag > 0, the message is passed to the RTT component

- Message.writeToNoAddrs() has no excluded headers anymore
  • Loading branch information
belaban committed Jul 2, 2024
1 parent 523ce0f commit 974a536
Show file tree
Hide file tree
Showing 21 changed files with 303 additions and 69 deletions.
42 changes: 25 additions & 17 deletions src/org/jgroups/BaseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Message clearFlag(Flag... flags) {
short tmp=this.flags;
for(Flag flag : flags)
if(flag != null)
tmp&=~flag.value();
tmp=(short)(tmp & ~flag.value());
this.flags=tmp;
}
return this;
Expand All @@ -134,7 +134,7 @@ public Message clearFlag(TransientFlag... flags) {
short tmp=this.transient_flags;
for(TransientFlag flag : flags)
if(flag != null)
tmp&=~flag.value();
tmp=(short)(tmp & ~flag.value());
this.transient_flags=(byte)tmp;
}
return this;
Expand Down Expand Up @@ -191,18 +191,12 @@ public Message copy(boolean copy_payload, boolean copy_headers) {

/** Puts a header given an ID into the hashmap. Overwrites potential existing entry. */
public Message putHeader(short id, Header hdr) {
if(id < 0)
throw new IllegalArgumentException("An ID of " + id + " is invalid");
if(hdr != null)
hdr.setProtId(id);
synchronized(this) {
if(this.headers == null)
this.headers=createHeaders(Util.DEFAULT_HEADERS);
Header[] resized_array=Headers.putHeader(this.headers, id, hdr, true);
if(resized_array != null)
this.headers=resized_array;
}
return this;
return putHeader(id, hdr, true);
}

@Override
public Message putHeaderIfAbsent(short id, Header hdr) {
return putHeader(id, hdr, false);
}

public <T extends Header> T getHeader(short id) {
Expand Down Expand Up @@ -271,13 +265,13 @@ public void writeTo(DataOutput out) throws IOException {
Util.writeAddress(sender, out);

// write the headers
Headers.writeHeaders(this.headers, out, (short[])null);
Headers.writeHeaders(this.headers, out);

// finally write the payload
writePayload(out);
}

public void writeToNoAddrs(Address src, DataOutput out, short... excluded_headers) throws IOException {
public void writeToNoAddrs(Address src, DataOutput out) throws IOException {
byte leading=0;

boolean write_src_addr=src == null || sender != null && !sender.equals(src);
Expand All @@ -296,7 +290,7 @@ public void writeToNoAddrs(Address src, DataOutput out, short... excluded_header
Util.writeAddress(sender, out);

// write the headers
Headers.writeHeaders(this.headers, out, excluded_headers);
Headers.writeHeaders(this.headers, out);

// finally write the payload
writePayload(out);
Expand All @@ -323,6 +317,20 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
readPayload(in);
}

protected Message putHeader(short id, Header hdr, boolean replace_if_present) {
if(id < 0)
throw new IllegalArgumentException("An ID of " + id + " is invalid");
if(hdr != null)
hdr.setProtId(id);
synchronized(this) {
if(this.headers == null)
this.headers=createHeaders(Util.DEFAULT_HEADERS);
Header[] resized_array=Headers.putHeader(this.headers, id, hdr, replace_if_present);
if(resized_array != null)
this.headers=resized_array;
}
return this;
}

/** Copies the payload */
protected Message copyPayload(Message copy) {
Expand Down
7 changes: 5 additions & 2 deletions src/org/jgroups/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public interface Message extends SizeStreamable, Constructable<Message> {
/** Adds a header to the message */
Message putHeader(short id, Header hdr);

/** Adds a header to a message if not present */
Message putHeaderIfAbsent(short id, Header hdr);

/** Gets a header from the message */
<T extends Header> T getHeader(short id);

Expand Down Expand Up @@ -201,8 +204,8 @@ default Message setFlag(short flag, boolean transient_flags) {
*/
int size();

/** Writes the message to an output stream excluding the destination (and possibly source) address, plus a number of headers */
void writeToNoAddrs(Address src, DataOutput out, short... excluded_headers) throws IOException;
/** Writes the message to an output stream excluding the destination (and possibly source) address */
void writeToNoAddrs(Address src, DataOutput out) throws IOException;

void writePayload(DataOutput out) throws IOException;

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected void sendSingleMessage(final Message msg) {

protected void sendMessageList(final Address dest, final Address src, final List<Message> list) {
try {
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId());
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
transport.getMessageStats().incrNumBatchesSent();
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/BatchBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ protected void sendMessageList(final Address dest, final Address src, final Mess
try {
output.position(0);
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, 0,
length, output, dest == null, transport.getId());
length, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
}
catch(Throwable e) {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/PerDestinationBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected void sendMessageList(final Address dest, final Address src, final Fast
output.position(0);
try {
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list,
output, dest == null, transport.getId());
output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest);
transport.getMessageStats().incrNumBatchesSent();
num_batches_sent.increment();
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf,
num_msgs++;
buf[start_index]=null;
output.writeShort(msg.getType());
msg.writeToNoAddrs(msg.getSrc(), output, transport.getId());
msg.writeToNoAddrs(msg.getSrc(), output);
}
if(start_index == end_index)
break;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf,
num_msgs++;
buf[start_index]=null;
output.writeShort(msg.getType());
msg.writeToNoAddrs(msg.getSrc(), output, transport.getId());
msg.writeToNoAddrs(msg.getSrc(), output);
}
available_msgs--;
start_index=increment(start_index);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless2.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, fina
num_msgs++;
buf[i]=NULL_MSG;
output.writeShort(msg.getType());
msg.writeToNoAddrs(msg.getSrc(), output, transport.getId());
msg.writeToNoAddrs(msg.getSrc(), output);
}
}
return num_msgs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected void _sendBundledMessages() {
// since we assigned the matching destination we can do plain ==
if(msg != null && msg.getDest() == dest) {
output.writeShort(msg.getType());
msg.writeToNoAddrs(msg.getSrc(), output, transport.getId());
msg.writeToNoAddrs(msg.getSrc(), output);
msg_queue[i]=null;
}
}
Expand Down
23 changes: 21 additions & 2 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ public <T extends TP> T enableBlockingTimerTasks(boolean flag) {
@Component(name="diag")
protected DiagnosticsHandler diag_handler;

@Component
protected RTT rtt=new RTT();

/** The header including the cluster name, sent with each message */
protected TpHeader header;

Expand Down Expand Up @@ -452,6 +455,7 @@ protected TP() {
}

public MsgStats getMessageStats() {return msg_stats;}
public RTT getRTT() {return rtt;}

@Override
public void enableStats(boolean flag) {
Expand Down Expand Up @@ -777,6 +781,7 @@ public String toString() {
bundler=createBundler(bundler_type, getClass());
bundler.init(this);
}
rtt.init(this);
}


Expand Down Expand Up @@ -1035,7 +1040,7 @@ public Object down(Event evt) {
/** A message needs to be sent to a single member or all members */
public Object down(Message msg) {
if(header != null)
msg.putHeader(this.id, header); // added patch by Roland Kurmann (March 20, 2003)
msg.putHeaderIfAbsent(this.id, header); // added patch by Roland Kurmann (March 20, 2003)

setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !!

Expand Down Expand Up @@ -1192,6 +1197,11 @@ public void passMessageUp(Message msg, byte[] cluster_name, boolean perform_clus
}
return;
}
TpHeader hdr=msg.getHeader(id);
if(hdr != null && hdr.flag() > 0) {
rtt.handleMessage(msg, hdr);
return;
}
up_prot.up(msg);
}

Expand All @@ -1218,7 +1228,16 @@ public void passBatchUp(MessageBatch batch, boolean perform_cluster_name_matchin

if(batch.multicast() && discard_own_mcast && local_addr != null && local_addr.equals(batch.sender()))
return;
up_prot.up(batch);
for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
Message msg=it.next();
TpHeader hdr=msg.getHeader(id);
if(hdr != null && hdr.flag() > 0) {
it.remove();
rtt.handleMessage(msg, hdr);
}
}
if(!batch.isEmpty())
up_prot.up(batch);
}

protected boolean sameCluster(String req) {
Expand Down
36 changes: 34 additions & 2 deletions src/org/jgroups/protocols/TpHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
public class TpHeader extends Header {
protected byte[] cluster_name;

// the next 2 fields are used to measure round-trip times
public static final byte REQ=1, RSP=2;
protected byte flag; // default: 0, 1: RTT request, 2: RTT response (https://issues.redhat.com/browse/JGRP-2812)
protected int index; // used when flag > 0

public TpHeader() { // used for externalization
}

Expand All @@ -28,10 +33,31 @@ public TpHeader(String n) {
cluster_name[i]=(byte)n.charAt(i);
}

public TpHeader(String n, byte flag, int index) {
int len=n.length();
cluster_name=new byte[len];
for(int i=0; i < len; i++)
cluster_name[i]=(byte)n.charAt(i);
this.flag=flag;
this.index=index;
}

public TpHeader(AsciiString n) {
cluster_name=n != null? n.chars() : null;
}

public TpHeader(AsciiString n, byte flag, int index) {
cluster_name=n != null? n.chars() : null;
this.flag=flag;
this.index=index;
}

public byte[] getClusterName() {return cluster_name;}
public byte[] clusterName() {return cluster_name;}
public byte flag() {return flag;}
public int index() {return index;}


public TpHeader(byte[] n) {
cluster_name=n;
}
Expand All @@ -44,11 +70,11 @@ public String toString() {
return String.format("[cluster=%s]", cluster_name != null ? new String(cluster_name) : "null");
}

public byte[] getClusterName() {return cluster_name;}

@Override
public int serializedSize() {
return cluster_name != null? Global.SHORT_SIZE + cluster_name.length : Global.SHORT_SIZE;
int retval=cluster_name != null? Global.SHORT_SIZE + cluster_name.length : Global.SHORT_SIZE;
return retval+Byte.BYTES + (flag > 0? Integer.BYTES : 0);
}

@Override
Expand All @@ -57,6 +83,9 @@ public void writeTo(DataOutput out) throws IOException {
out.writeShort(length);
if(cluster_name != null)
out.write(cluster_name, 0, cluster_name.length);
out.writeByte(flag);
if(flag > 0)
out.writeInt(index);
}

@Override
Expand All @@ -66,5 +95,8 @@ public void readFrom(DataInput in) throws IOException {
cluster_name=new byte[len];
in.readFully(cluster_name, 0, cluster_name.length);
}
flag=in.readByte();
if(flag > 0)
index=in.readInt();
}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/TransferQueueBundler2.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private Buffer addMessage(Message msg, TP transport) throws IOException {
length_index=out.position() - Global.INT_SIZE;
}
out.writeShort(msg.getType());
msg.writeToNoAddrs(msg.src(), out, transport_id); // exclude the transport header
msg.writeToNoAddrs(msg.src(), out);
count++;
return this;
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/relay/RelayHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void writeTo(DataOutput out) throws IOException {
Bits.writeString(s, out);
}
assertNonNullSites();
Headers.writeHeaders(original_hdrs, out, (short[])null);
Headers.writeHeaders(original_hdrs, out);
out.writeShort(original_flags);
}

Expand Down
Loading

0 comments on commit 974a536

Please sign in to comment.