From 974a5367141f8647752c6444c67b377cc5620e21 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Fri, 28 Jun 2024 11:46:28 +0200 Subject: [PATCH] - Impl of RTT (https://issues.redhat.com/browse/JGRP-2812) - Added percentiles - Moved RTT from protocol to component in TP - Added Message.putHeaderIfAbsent - Added flag,index to TpHeader - On reception of a message/batch, if TpHeader has a flag > 0, the message is passed to the RTT component - Message.writeToNoAddrs() has no excluded headers anymore --- src/org/jgroups/BaseMessage.java | 42 +++-- src/org/jgroups/Message.java | 7 +- src/org/jgroups/protocols/BaseBundler.java | 2 +- src/org/jgroups/protocols/BatchBundler.java | 2 +- .../protocols/PerDestinationBundler.java | 2 +- .../jgroups/protocols/RingBufferBundler.java | 2 +- .../protocols/RingBufferBundlerLockless.java | 2 +- .../protocols/RingBufferBundlerLockless2.java | 2 +- .../SimplifiedTransferQueueBundler.java | 2 +- src/org/jgroups/protocols/TP.java | 23 ++- src/org/jgroups/protocols/TpHeader.java | 36 +++- .../protocols/TransferQueueBundler2.java | 2 +- .../jgroups/protocols/relay/RelayHeader.java | 2 +- src/org/jgroups/util/AverageMinMax.java | 45 +++-- src/org/jgroups/util/Headers.java | 10 +- src/org/jgroups/util/RTT.java | 157 ++++++++++++++++++ src/org/jgroups/util/SubmitToThreadPool.java | 2 +- src/org/jgroups/util/Util.java | 18 +- .../org/jgroups/tests/HeadersResizeTest.java | 8 +- .../org/jgroups/tests/MessageBatchTest.java | 2 +- .../org/jgroups/tests/SizeTest.java | 4 +- 21 files changed, 303 insertions(+), 69 deletions(-) create mode 100644 src/org/jgroups/util/RTT.java diff --git a/src/org/jgroups/BaseMessage.java b/src/org/jgroups/BaseMessage.java index 61ca49edabc..0c6cf68655b 100644 --- a/src/org/jgroups/BaseMessage.java +++ b/src/org/jgroups/BaseMessage.java @@ -123,7 +123,7 @@ public Message clearFlag(Flag... flags) { short tmp=this.flags; for(Flag flag : flags) if(flag != null) - tmp&=~flag.value(); + tmp=(short)(tmp & ~flag.value()); this.flags=tmp; } return this; @@ -134,7 +134,7 @@ public Message clearFlag(TransientFlag... flags) { short tmp=this.transient_flags; for(TransientFlag flag : flags) if(flag != null) - tmp&=~flag.value(); + tmp=(short)(tmp & ~flag.value()); this.transient_flags=(byte)tmp; } return this; @@ -191,18 +191,12 @@ public Message copy(boolean copy_payload, boolean copy_headers) { /** Puts a header given an ID into the hashmap. Overwrites potential existing entry. */ public Message putHeader(short id, Header hdr) { - if(id < 0) - throw new IllegalArgumentException("An ID of " + id + " is invalid"); - if(hdr != null) - hdr.setProtId(id); - synchronized(this) { - if(this.headers == null) - this.headers=createHeaders(Util.DEFAULT_HEADERS); - Header[] resized_array=Headers.putHeader(this.headers, id, hdr, true); - if(resized_array != null) - this.headers=resized_array; - } - return this; + return putHeader(id, hdr, true); + } + + @Override + public Message putHeaderIfAbsent(short id, Header hdr) { + return putHeader(id, hdr, false); } public T getHeader(short id) { @@ -271,13 +265,13 @@ public void writeTo(DataOutput out) throws IOException { Util.writeAddress(sender, out); // write the headers - Headers.writeHeaders(this.headers, out, (short[])null); + Headers.writeHeaders(this.headers, out); // finally write the payload writePayload(out); } - public void writeToNoAddrs(Address src, DataOutput out, short... excluded_headers) throws IOException { + public void writeToNoAddrs(Address src, DataOutput out) throws IOException { byte leading=0; boolean write_src_addr=src == null || sender != null && !sender.equals(src); @@ -296,7 +290,7 @@ public void writeToNoAddrs(Address src, DataOutput out, short... excluded_header Util.writeAddress(sender, out); // write the headers - Headers.writeHeaders(this.headers, out, excluded_headers); + Headers.writeHeaders(this.headers, out); // finally write the payload writePayload(out); @@ -323,6 +317,20 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { readPayload(in); } + protected Message putHeader(short id, Header hdr, boolean replace_if_present) { + if(id < 0) + throw new IllegalArgumentException("An ID of " + id + " is invalid"); + if(hdr != null) + hdr.setProtId(id); + synchronized(this) { + if(this.headers == null) + this.headers=createHeaders(Util.DEFAULT_HEADERS); + Header[] resized_array=Headers.putHeader(this.headers, id, hdr, replace_if_present); + if(resized_array != null) + this.headers=resized_array; + } + return this; + } /** Copies the payload */ protected Message copyPayload(Message copy) { diff --git a/src/org/jgroups/Message.java b/src/org/jgroups/Message.java index 0d042c2762a..30270d0a465 100644 --- a/src/org/jgroups/Message.java +++ b/src/org/jgroups/Message.java @@ -52,6 +52,9 @@ public interface Message extends SizeStreamable, Constructable { /** Adds a header to the message */ Message putHeader(short id, Header hdr); + /** Adds a header to a message if not present */ + Message putHeaderIfAbsent(short id, Header hdr); + /** Gets a header from the message */ T getHeader(short id); @@ -201,8 +204,8 @@ default Message setFlag(short flag, boolean transient_flags) { */ int size(); - /** Writes the message to an output stream excluding the destination (and possibly source) address, plus a number of headers */ - void writeToNoAddrs(Address src, DataOutput out, short... excluded_headers) throws IOException; + /** Writes the message to an output stream excluding the destination (and possibly source) address */ + void writeToNoAddrs(Address src, DataOutput out) throws IOException; void writePayload(DataOutput out) throws IOException; diff --git a/src/org/jgroups/protocols/BaseBundler.java b/src/org/jgroups/protocols/BaseBundler.java index fed61a089e8..62fcfba317f 100644 --- a/src/org/jgroups/protocols/BaseBundler.java +++ b/src/org/jgroups/protocols/BaseBundler.java @@ -135,7 +135,7 @@ protected void sendSingleMessage(final Message msg) { protected void sendMessageList(final Address dest, final Address src, final List list) { try { - Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId()); + Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null); transport.doSend(output.buffer(), 0, output.position(), dest); transport.getMessageStats().incrNumBatchesSent(); } diff --git a/src/org/jgroups/protocols/BatchBundler.java b/src/org/jgroups/protocols/BatchBundler.java index 228c5241912..56c2d6c7d43 100644 --- a/src/org/jgroups/protocols/BatchBundler.java +++ b/src/org/jgroups/protocols/BatchBundler.java @@ -277,7 +277,7 @@ protected void sendMessageList(final Address dest, final Address src, final Mess try { output.position(0); Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, 0, - length, output, dest == null, transport.getId()); + length, output, dest == null); transport.doSend(output.buffer(), 0, output.position(), dest); } catch(Throwable e) { diff --git a/src/org/jgroups/protocols/PerDestinationBundler.java b/src/org/jgroups/protocols/PerDestinationBundler.java index 6439b67353b..23a71072df0 100644 --- a/src/org/jgroups/protocols/PerDestinationBundler.java +++ b/src/org/jgroups/protocols/PerDestinationBundler.java @@ -249,7 +249,7 @@ protected void sendMessageList(final Address dest, final Address src, final Fast output.position(0); try { Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, - output, dest == null, transport.getId()); + output, dest == null); transport.doSend(output.buffer(), 0, output.position(), dest); transport.getMessageStats().incrNumBatchesSent(); num_batches_sent.increment(); diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index 972155c2df7..2727e28e0d5 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -147,7 +147,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, num_msgs++; buf[start_index]=null; output.writeShort(msg.getType()); - msg.writeToNoAddrs(msg.getSrc(), output, transport.getId()); + msg.writeToNoAddrs(msg.getSrc(), output); } if(start_index == end_index) break; diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index df06a695fc4..d0c9a22138d 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -225,7 +225,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, num_msgs++; buf[start_index]=null; output.writeShort(msg.getType()); - msg.writeToNoAddrs(msg.getSrc(), output, transport.getId()); + msg.writeToNoAddrs(msg.getSrc(), output); } available_msgs--; start_index=increment(start_index); diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index 78f55ab1075..34d9512cb51 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -210,7 +210,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, fina num_msgs++; buf[i]=NULL_MSG; output.writeShort(msg.getType()); - msg.writeToNoAddrs(msg.getSrc(), output, transport.getId()); + msg.writeToNoAddrs(msg.getSrc(), output); } } return num_msgs; diff --git a/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java b/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java index c5f9b50b4ab..5ecafc9752e 100644 --- a/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java +++ b/src/org/jgroups/protocols/SimplifiedTransferQueueBundler.java @@ -79,7 +79,7 @@ protected void _sendBundledMessages() { // since we assigned the matching destination we can do plain == if(msg != null && msg.getDest() == dest) { output.writeShort(msg.getType()); - msg.writeToNoAddrs(msg.getSrc(), output, transport.getId()); + msg.writeToNoAddrs(msg.getSrc(), output); msg_queue[i]=null; } } diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 169667f574b..a7143f39af0 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -407,6 +407,9 @@ public T enableBlockingTimerTasks(boolean flag) { @Component(name="diag") protected DiagnosticsHandler diag_handler; + @Component + protected RTT rtt=new RTT(); + /** The header including the cluster name, sent with each message */ protected TpHeader header; @@ -452,6 +455,7 @@ protected TP() { } public MsgStats getMessageStats() {return msg_stats;} + public RTT getRTT() {return rtt;} @Override public void enableStats(boolean flag) { @@ -777,6 +781,7 @@ public String toString() { bundler=createBundler(bundler_type, getClass()); bundler.init(this); } + rtt.init(this); } @@ -1035,7 +1040,7 @@ public Object down(Event evt) { /** A message needs to be sent to a single member or all members */ public Object down(Message msg) { if(header != null) - msg.putHeader(this.id, header); // added patch by Roland Kurmann (March 20, 2003) + msg.putHeaderIfAbsent(this.id, header); // added patch by Roland Kurmann (March 20, 2003) setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !! @@ -1192,6 +1197,11 @@ public void passMessageUp(Message msg, byte[] cluster_name, boolean perform_clus } return; } + TpHeader hdr=msg.getHeader(id); + if(hdr != null && hdr.flag() > 0) { + rtt.handleMessage(msg, hdr); + return; + } up_prot.up(msg); } @@ -1218,7 +1228,16 @@ public void passBatchUp(MessageBatch batch, boolean perform_cluster_name_matchin if(batch.multicast() && discard_own_mcast && local_addr != null && local_addr.equals(batch.sender())) return; - up_prot.up(batch); + for(Iterator it=batch.iterator(); it.hasNext();) { + Message msg=it.next(); + TpHeader hdr=msg.getHeader(id); + if(hdr != null && hdr.flag() > 0) { + it.remove(); + rtt.handleMessage(msg, hdr); + } + } + if(!batch.isEmpty()) + up_prot.up(batch); } protected boolean sameCluster(String req) { diff --git a/src/org/jgroups/protocols/TpHeader.java b/src/org/jgroups/protocols/TpHeader.java index ab0fa01b14e..3880e434660 100644 --- a/src/org/jgroups/protocols/TpHeader.java +++ b/src/org/jgroups/protocols/TpHeader.java @@ -18,6 +18,11 @@ public class TpHeader extends Header { protected byte[] cluster_name; + // the next 2 fields are used to measure round-trip times + public static final byte REQ=1, RSP=2; + protected byte flag; // default: 0, 1: RTT request, 2: RTT response (https://issues.redhat.com/browse/JGRP-2812) + protected int index; // used when flag > 0 + public TpHeader() { // used for externalization } @@ -28,10 +33,31 @@ public TpHeader(String n) { cluster_name[i]=(byte)n.charAt(i); } + public TpHeader(String n, byte flag, int index) { + int len=n.length(); + cluster_name=new byte[len]; + for(int i=0; i < len; i++) + cluster_name[i]=(byte)n.charAt(i); + this.flag=flag; + this.index=index; + } + public TpHeader(AsciiString n) { cluster_name=n != null? n.chars() : null; } + public TpHeader(AsciiString n, byte flag, int index) { + cluster_name=n != null? n.chars() : null; + this.flag=flag; + this.index=index; + } + + public byte[] getClusterName() {return cluster_name;} + public byte[] clusterName() {return cluster_name;} + public byte flag() {return flag;} + public int index() {return index;} + + public TpHeader(byte[] n) { cluster_name=n; } @@ -44,11 +70,11 @@ public String toString() { return String.format("[cluster=%s]", cluster_name != null ? new String(cluster_name) : "null"); } - public byte[] getClusterName() {return cluster_name;} @Override public int serializedSize() { - return cluster_name != null? Global.SHORT_SIZE + cluster_name.length : Global.SHORT_SIZE; + int retval=cluster_name != null? Global.SHORT_SIZE + cluster_name.length : Global.SHORT_SIZE; + return retval+Byte.BYTES + (flag > 0? Integer.BYTES : 0); } @Override @@ -57,6 +83,9 @@ public void writeTo(DataOutput out) throws IOException { out.writeShort(length); if(cluster_name != null) out.write(cluster_name, 0, cluster_name.length); + out.writeByte(flag); + if(flag > 0) + out.writeInt(index); } @Override @@ -66,5 +95,8 @@ public void readFrom(DataInput in) throws IOException { cluster_name=new byte[len]; in.readFully(cluster_name, 0, cluster_name.length); } + flag=in.readByte(); + if(flag > 0) + index=in.readInt(); } } diff --git a/src/org/jgroups/protocols/TransferQueueBundler2.java b/src/org/jgroups/protocols/TransferQueueBundler2.java index 11a1927a159..1724367aa3c 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler2.java +++ b/src/org/jgroups/protocols/TransferQueueBundler2.java @@ -281,7 +281,7 @@ private Buffer addMessage(Message msg, TP transport) throws IOException { length_index=out.position() - Global.INT_SIZE; } out.writeShort(msg.getType()); - msg.writeToNoAddrs(msg.src(), out, transport_id); // exclude the transport header + msg.writeToNoAddrs(msg.src(), out); count++; return this; } diff --git a/src/org/jgroups/protocols/relay/RelayHeader.java b/src/org/jgroups/protocols/relay/RelayHeader.java index 72f0d3de5e4..ba32779e72b 100644 --- a/src/org/jgroups/protocols/relay/RelayHeader.java +++ b/src/org/jgroups/protocols/relay/RelayHeader.java @@ -152,7 +152,7 @@ public void writeTo(DataOutput out) throws IOException { Bits.writeString(s, out); } assertNonNullSites(); - Headers.writeHeaders(original_hdrs, out, (short[])null); + Headers.writeHeaders(original_hdrs, out); out.writeShort(original_flags); } diff --git a/src/org/jgroups/util/AverageMinMax.java b/src/org/jgroups/util/AverageMinMax.java index d775862f6c3..a11c6772d6d 100644 --- a/src/org/jgroups/util/AverageMinMax.java +++ b/src/org/jgroups/util/AverageMinMax.java @@ -16,20 +16,24 @@ * @since 4.0, 3.6.10 */ public class AverageMinMax extends Average { - protected long min=Long.MAX_VALUE, max=0; - protected List values; + protected long min=Long.MAX_VALUE, max=0; + protected List values; + protected volatile boolean sorted; - public long min() {return min;} - public long max() {return max;} - public boolean usePercentiles() {return values != null;} - public AverageMinMax usePercentiles(int capacity) {values=capacity > 0? new ArrayList<>(capacity) : null; return this;} + public long min() {return min;} + public long max() {return max;} + public boolean usePercentiles() {return values != null;} + public AverageMinMax usePercentiles(int cap) {values=cap > 0? new ArrayList<>(cap) : null; return this;} + public List values() {return values;} public T add(long num) { super.add(num); min=Math.min(min, num); max=Math.max(max, num); - if(values != null) + if(values != null) { values.add(num); + sorted=false; + } return (T)this; } @@ -41,8 +45,10 @@ public T merge(T other) { AverageMinMax o=(AverageMinMax)other; this.min=Math.min(min, o.min()); this.max=Math.max(max, o.max()); - if(this.values != null) + if(this.values != null) { this.values.addAll(o.values); + sorted=false; + } } return (T)this; } @@ -56,7 +62,7 @@ public void clear() { public String percentiles() { if(values == null) return "n/a"; - Collections.sort(values); + sort(); double stddev=stddev(); return String.format("stddev: %.2f, 50: %d, 90: %d, 99: %d, 99.9: %d, 99.99: %d, 99.999: %d, 100: %d\n", stddev, p(50), p(90), p(99), p(99.9), p(99.99), p(99.999), p(100)); @@ -87,22 +93,37 @@ public void readFrom(DataInput in) throws IOException { max=Bits.readLongCompressed(in); } + public long percentile(double percentile) { + return p(percentile); + } - protected long p(double percentile) { + public long p(double percentile) { if(values == null) return -1; + sort(); int size=values.size(); - int index=(int)(size * (percentile/100.0)); + if(size == 0) + return -1; + int index=size == 1? 1 : (int)(size * (percentile/100.0)); return values.get(index-1); } - protected double stddev() { + public double stddev() { if(values == null) return -1.0; + sort(); double av=average(); int size=values.size(); double variance=values.stream().map(v -> (v - av)*(v - av)).reduce(0.0, Double::sum) / size; return Math.sqrt(variance); } + public AverageMinMax sort() { + if(values != null && !sorted) { + Collections.sort(values); + sorted=true; + } + return this; + } + } diff --git a/src/org/jgroups/util/Headers.java b/src/org/jgroups/util/Headers.java index 60b6254ab88..948d3443d35 100644 --- a/src/org/jgroups/util/Headers.java +++ b/src/org/jgroups/util/Headers.java @@ -138,16 +138,14 @@ public static Header[] putHeader(final Header[] headers, short id, Header hdr, b throw new IllegalStateException("unable to add element " + id + ", index=" + i); // we should never come here } - public static void writeHeaders(Header[] hdrs, DataOutput out, short... excluded_headers) throws IOException { - int size=Headers.size(hdrs, excluded_headers); + public static void writeHeaders(Header[] hdrs, DataOutput out) throws IOException { + int size=Headers.size(hdrs); out.writeShort(size); if(size > 0) { for(Header hdr: hdrs) { if(hdr == null) break; short id=hdr.getProtId(); - if(Util.containsId(id, excluded_headers)) - continue; out.writeShort(id); writeHeader(hdr, out); } @@ -212,7 +210,7 @@ public static int size(Header[] hdrs) { return retval; } - public static int size(Header[] hdrs, short... excluded_ids) { + /*public static int size(Header[] hdrs, short... excluded_ids) { int retval=0; if(hdrs == null) return retval; @@ -223,7 +221,7 @@ public static int size(Header[] hdrs, short... excluded_ids) { retval++; } return retval; - } + }*/ private static void writeHeader(Header hdr, DataOutput out) throws IOException { short magic_number=hdr.getMagicId(); diff --git a/src/org/jgroups/util/RTT.java b/src/org/jgroups/util/RTT.java new file mode 100644 index 00000000000..da9f6c2c3ba --- /dev/null +++ b/src/org/jgroups/util/RTT.java @@ -0,0 +1,157 @@ +package org.jgroups.util; + +import org.jgroups.*; +import org.jgroups.annotations.MBean; +import org.jgroups.annotations.ManagedOperation; +import org.jgroups.annotations.Property; +import org.jgroups.protocols.TP; +import org.jgroups.protocols.TpHeader; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Measures round-trip times (RTT) between nodes + * @author Bela Ban + * @since 5.4, 5.3.8 + */ +@MBean(description="Component measuring round-trip times to all other nodes") +public class RTT { + protected TP transport; + protected short tp_id; + + @Property(description="The number of RPCs to send") + protected int num_reqs=10; + + @Property(description="Timeout (ms) for an RPC") + protected long timeout=1000; + + @Property(description="Size (in bytes) of an RPC") + protected int size; + + @Property(description="Send requests and responses as OOB messages") + protected boolean oob; + + protected final Map rtts=Util.createConcurrentMap(); + protected final Map times=Util.createConcurrentMap(); // list of start times (us) + + public int numReqs() {return num_reqs;} + public RTT numReqs(int n) {this.num_reqs=n; return this;} + public long timeout() {return timeout;} + public RTT timeout(long t) {this.timeout=t; return this;} + public int size() {return size;} + public RTT size(int size) {this.size=size; return this;} + public boolean oob() {return oob;} + public RTT oob(boolean b) {oob=b; return this;} + + public void init(TP tp) { + this.transport=Objects.requireNonNull(tp); + this.tp_id=transport.getId(); + } + + @ManagedOperation(description="Sends N RPCs to all other nodes and computes min/avg/max RTT") + public String rtt() { + return rtt(num_reqs, size, true, false); + } + + /** + * Sends N requests to all members and computes RTTs + * @param num_reqs The number of requests to be sent to all members + * @param details Whether to print details (e.g. min/max/percentiles) + */ + @ManagedOperation(description="Sends N RPCs to all other nodes and computes min/avg/max RTT") + public String rtt(int num_reqs, boolean details) { + return rtt(num_reqs, this.size, details, false); + } + + /** + * Sends N requests to all members and computes RTTs + * @param num_reqs The number of requests to be sent to all members + * @param size The number of bytes a request should have + * @param details Whether to print details (e.g. min/max/percentiles) + * @param exclude_self Whether to exclude the local node + */ + @ManagedOperation(description="Sends N RPCs to all other nodes and computes min/avg/max RTT") + public String rtt(int num_reqs, int size, boolean details, boolean exclude_self) { + Map m=_rtt(num_reqs, size, exclude_self); + return m.entrySet().stream() + .map(e -> String.format("%s: %s", e.getKey(), print(e.getValue(), details, TimeUnit.MICROSECONDS, num_reqs))) + .collect(Collectors.joining("\n")); + } + + public Map _rtt(int num_reqs, int size, boolean exclude_self) { + rtts.clear(); + times.clear(); + View view=transport.view(); + byte[] payload=new byte[size]; + final List
targets=new ArrayList<>(view.getMembers()); + if(exclude_self) + targets.remove(transport.addr()); + for(Address addr: targets) { + rtts.put(addr, new AverageMinMax().usePercentiles(128).unit(TimeUnit.MICROSECONDS)); + times.put(addr, new long[num_reqs]); + } + AsciiString cluster=transport.getClusterNameAscii(); + for(int i=0; i < num_reqs; i++) { + TpHeader hdr=new TpHeader(cluster, TpHeader.REQ, i); + for(Address addr: targets) { + Message msg=new BytesMessage(addr, payload).putHeader(tp_id, hdr); + if(oob) + msg.setFlag(Message.Flag.OOB); + long[] t=times.get(addr); + t[i]=Util.micros(); + transport.down(msg); + } + } + Util.waitUntilTrue(timeout, timeout/10, () -> rtts.values().stream().allMatch(a -> a.count() >= num_reqs)); + return new HashMap<>(rtts); + } + + /** Called when a message (request or response) is received */ + public void handleMessage(Message msg, TpHeader hdr) { + if(hdr != null) { + switch(hdr.flag()) { + case TpHeader.REQ: + handleRequest(msg.src(), hdr); + break; + case TpHeader.RSP: + handleResponse(msg.src(), hdr.index()); + } + } + } + + protected void handleRequest(Address sender, TpHeader hdr) { + Message rsp=new EmptyMessage(sender) + .putHeader(tp_id, new TpHeader(transport.getClusterNameAscii(), TpHeader.RSP, hdr.index())); + if(oob) + rsp.setFlag(Message.Flag.OOB); + transport.down(rsp); + } + + protected void handleResponse(Address sender, int index) { + long[] start_times=times.get(sender); + if(start_times != null) { + long time=Util.micros() - start_times[index]; + AverageMinMax avg=rtts.get(sender); + if(avg != null) + avg.add(time); + } + } + + protected static String print(AverageMinMax avg, boolean details, TimeUnit unit, int num_reqs) { + return details? avg.toString(unit) + String.format(" (%s)", percentiles(avg, num_reqs)) + : Util.printTime(avg.average(), unit); + } + + protected static String percentiles(AverageMinMax avg, int num_reqs) { + List values=avg.values(); + int received=values.size(), non_received=num_reqs - received; + double failure_rate=non_received == 0? 0.0 : (double)non_received / received; + String failures=non_received == 0? "" : String.format(" (failure rate: %.2f)", failure_rate); + return String.format("p90=%s p99=%s p99.9=%s%s", Util.printTime(avg.p(90), avg.unit()), + Util.printTime(avg.p(99), avg.unit()), + Util.printTime(avg.p(99.9), avg.unit()), failures); + } + +} diff --git a/src/org/jgroups/util/SubmitToThreadPool.java b/src/org/jgroups/util/SubmitToThreadPool.java index f2bd9161525..c6f3a7c10e4 100644 --- a/src/org/jgroups/util/SubmitToThreadPool.java +++ b/src/org/jgroups/util/SubmitToThreadPool.java @@ -101,7 +101,7 @@ public void run() { protected byte[] getClusterName() { TpHeader hdr=msg.getHeader(tp_id); - return hdr.getClusterName(); + return hdr.clusterName(); } } diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index e49088bd9f8..c0b48a2761b 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -570,7 +570,7 @@ public static boolean isFlagSet(byte bits,byte flag) { public static byte clearFlags(byte bits,byte flag) { - return bits&=~flag; + return bits=(byte)(bits & ~flag); } public static String flagsToString(short flags) { @@ -1454,39 +1454,39 @@ public static Message readMessage(DataInput in, MessageFactory mf) throws IOExce * */ public static void writeMessageList(Address dest, Address src, byte[] cluster_name, List msgs, - DataOutput dos, boolean multicast, short transport_id) throws IOException { + DataOutput dos, boolean multicast) throws IOException { writeMessageListHeader(dest, src, cluster_name, msgs != null ? msgs.size() : 0, dos, multicast); if(msgs != null) for(Message msg: msgs) { dos.writeShort(msg.getType()); - msg.writeToNoAddrs(src, dos, transport_id); // exclude the transport header + msg.writeToNoAddrs(src, dos); } } public static void writeMessageList(Address dest, Address src, byte[] cluster_name, FastArray msgs, - DataOutput dos, boolean multicast, short transport_id) throws IOException { + DataOutput dos, boolean multicast) throws IOException { writeMessageListHeader(dest, src, cluster_name, msgs != null ? msgs.size() : 0, dos, multicast); if(msgs != null) for(Message msg: msgs) { dos.writeShort(msg.getType()); - msg.writeToNoAddrs(src, dos, transport_id); // exclude the transport header + msg.writeToNoAddrs(src, dos); } } public static void writeMessageList(Address dest, Address src, byte[] cluster_name, - Message[] msgs, int offset, int length, DataOutput dos, boolean multicast, - short transport_id) throws IOException { + Message[] msgs, int offset, int length, DataOutput dos, boolean multicast) + throws IOException { writeMessageListHeader(dest, src, cluster_name, length, dos, multicast); if(msgs != null) for(int i=0; i < length; i++) { Message msg=msgs[offset+i]; dos.writeShort(msg.getType()); - msg.writeToNoAddrs(src, dos, transport_id); // exclude the transport header + msg.writeToNoAddrs(src, dos); } } @@ -2974,7 +2974,7 @@ public static boolean containsViewId(Collection views,ViewId vid) { public static boolean containsId(short id,short[] ids) { if(ids == null) return false; - for(short tmp : ids) + for(short tmp: ids) if(tmp == id) return true; return false; diff --git a/tests/junit-functional/org/jgroups/tests/HeadersResizeTest.java b/tests/junit-functional/org/jgroups/tests/HeadersResizeTest.java index 31a5e76d3cf..f2723884aab 100644 --- a/tests/junit-functional/org/jgroups/tests/HeadersResizeTest.java +++ b/tests/junit-functional/org/jgroups/tests/HeadersResizeTest.java @@ -52,13 +52,7 @@ public void testResizing() throws Exception { } bundler.release(); // sends all bundled messages as a batch - for(int i=0; i < 10; i++) { - if(receiver.num_msgs >= 5) - break; - Util.sleep(200); - } - System.out.printf("Number of transport headers: %d\n", receiver.num_transport_headers); - assert receiver.num_transport_headers == 0; + Util.waitUntil(5000, 100, () -> receiver.num_msgs >= 5 && receiver.num_transport_headers == 5); } diff --git a/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java b/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java index be7357beed8..040dc3b1f05 100644 --- a/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java +++ b/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java @@ -385,7 +385,7 @@ public void testSize() throws Exception { List msgs=createMessages(); ByteArrayOutputStream output=new ByteArrayOutputStream(); DataOutputStream out=new DataOutputStream(output); - Util.writeMessageList(b, a, "cluster".getBytes(), msgs, out, false, UDP_ID); + Util.writeMessageList(b, a, "cluster".getBytes(), msgs, out, false); out.flush(); byte[] buf=output.toByteArray(); diff --git a/tests/junit-functional/org/jgroups/tests/SizeTest.java b/tests/junit-functional/org/jgroups/tests/SizeTest.java index 52c827a1684..e54d96156ac 100644 --- a/tests/junit-functional/org/jgroups/tests/SizeTest.java +++ b/tests/junit-functional/org/jgroups/tests/SizeTest.java @@ -34,8 +34,10 @@ public class SizeTest { - public static void testTpHeader() throws Exception { + public void testTpHeader() throws Exception { _testSize(new TpHeader("DemoChannel")); + _testSize(new TpHeader("DemoChannel", (byte)1, 4)); + _testSize(new TpHeader("DemoChannel", (byte)2, 4)); } public void testByteArray() throws Exception {