diff --git a/src/org/jgroups/protocols/NAKACK4.java b/src/org/jgroups/protocols/NAKACK4.java
index 436ec43cd4..812027472d 100644
--- a/src/org/jgroups/protocols/NAKACK4.java
+++ b/src/org/jgroups/protocols/NAKACK4.java
@@ -1,9 +1,20 @@
package org.jgroups.protocols;
+import org.jgroups.Address;
import org.jgroups.Message;
+import org.jgroups.annotations.ManagedAttribute;
+import org.jgroups.annotations.ManagedOperation;
+import org.jgroups.annotations.Property;
+import org.jgroups.util.AckTable;
import org.jgroups.util.Buffer;
+import org.jgroups.util.Buffer.Options;
import org.jgroups.util.FixedBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.jgroups.conf.AttributeType.SCALAR;
+
/**
* New multicast protocols based on fixed-size xmit windows and message ACKs
* Details: https://issues.redhat.com/browse/JGRP-2780
@@ -11,13 +22,67 @@
* @since 5.4
*/
public class NAKACK4 extends ReliableMulticast {
- protected static final Buffer.Options SEND_OPTIONS=new Buffer.Options().block(true);
+ protected final AckTable ack_table=new AckTable();
+ protected static final Options SEND_OPTIONS=new Options().block(true);
+
+ @Property(description="Size of the send/receive buffers, in messages",writable=false)
+ protected int capacity=1024;
+
+ @ManagedAttribute(description="Number of ACKs received",type=SCALAR)
+ protected final LongAdder acks_received=new LongAdder();
+
+ @ManagedAttribute(description="Number of times sender threads were blocked on a full send window",type=SCALAR)
+ public long getNumBlockings() {
+ FixedBuffer buf=(FixedBuffer)sendBuf();
+ return buf != null? buf.numBlockings() : -1;
+ }
@Override
protected Buffer createXmitWindow(long initial_seqno) {
- return new FixedBuffer<>(initial_seqno);
+ return new FixedBuffer<>(capacity, initial_seqno);
+ }
+
+ @Override
+ protected Options sendOptions() {return SEND_OPTIONS;}
+ protected int capacity() {return capacity;}
+
+ @ManagedAttribute(description="The minimum of all ACKs",type=SCALAR)
+ public long acksMin() {return ack_table.min();}
+
+ @Override
+ public void resetStats() {
+ super.resetStats();
+ acks_received.reset();
+ Buffer buf=sendBuf();
+ if(buf != null)
+ buf.resetStats();
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ ack_table.clear();
}
+ @ManagedOperation(description="Prints the ACKs received from members")
+ public String printAckTable() {return ack_table.toString();}
+
@Override
- protected Buffer.Options sendOptions() {return SEND_OPTIONS;}
+ protected void adjustReceivers(List members) {
+ super.adjustReceivers(members);
+ ack_table.adjust(members);
+ }
+
+ @Override
+ protected void handleAck(Address sender, long ack) {
+ Buffer buf=sendBuf();
+ if(buf == null) {
+ log.warn("%s: local send buffer is null", local_addr);
+ return;
+ }
+ acks_received.increment();
+ long old_min=ack_table.min(), new_min=ack_table.ack(sender, ack);
+ if(new_min > old_min)
+ buf.purge(new_min); // unblocks senders waiting for space to become available
+ }
}
diff --git a/src/org/jgroups/protocols/ReliableMulticast.java b/src/org/jgroups/protocols/ReliableMulticast.java
index a264f32eba..10bb86183e 100644
--- a/src/org/jgroups/protocols/ReliableMulticast.java
+++ b/src/org/jgroups/protocols/ReliableMulticast.java
@@ -313,7 +313,11 @@ public String printMessages() {
for(Map.Entry> entry: xmit_table.entrySet()) {
Address addr=entry.getKey();
Buffer win=entry.getValue();
- ret.append(addr).append(": ").append(win.toString()).append('\n');
+ int cap=win.capacity();
+ ret.append(addr).append(": ").append(win);
+ if(cap >0)
+ ret.append(String.format(" [capacity: %s]", cap));
+ ret.append('\n');
}
return ret.toString();
}
@@ -333,7 +337,7 @@ public String printDigestHistory() {
return sb.toString();
}
- protected Buffer sendWin() {
+ protected Buffer sendBuf() {
return local_xmit_table != null? local_xmit_table : (local_xmit_table=xmit_table.get(local_addr));
}
@@ -559,6 +563,10 @@ public Object up(Message msg) {
handleHighestSeqno(msg.getSrc(), hdr.seqno);
return null;
+ case NakAckHeader.ACK:
+ handleAck(msg.src(), hdr.seqno);
+ return null;
+
default:
log.error(Util.getMessage("HeaderTypeNotKnown"), local_addr, hdr.type);
return null;
@@ -572,6 +580,7 @@ public void up(MessageBatch mb) {
up_prot.up(mb);
return;
}
+ long highest_ack=0;
for(FastArray.FastIterator it=(FastArray.FastIterator)mb.iterator(); it.hasNext();) {
final Message msg=it.next();
NakAckHeader hdr;
@@ -607,11 +616,18 @@ public void up(MessageBatch mb) {
it.remove();
handleHighestSeqno(mb.sender(), hdr.seqno);
break;
+ case NakAckHeader.ACK:
+ it.remove();
+ highest_ack=Math.max(highest_ack, hdr.seqno);
+ break;
default:
log.error(Util.getMessage("HeaderTypeNotKnown"), local_addr, hdr.type);
}
}
+ if(highest_ack > 0)
+ handleAck(mb.sender(), highest_ack);
+
if(!mb.isEmpty())
handleMessageBatch(mb);
@@ -677,7 +693,7 @@ protected void send(Message msg) {
return;
}
- Buffer win=sendWin();
+ Buffer win=sendBuf();
if(win == null) // discard message if there is no entry for local_addr
return;
@@ -810,6 +826,8 @@ protected void removeAndDeliver(Buffer win, Address sender, boolean loo
if(size > 0) {
if(stats)
avg_batch_size.add(size);
+ long hd=win.highestDelivered();
+ down_prot.down(new EmptyMessage(sender).putHeader(id, NakAckHeader.createAckHeader(hd)).setFlag(OOB));
deliverBatch(batch);
}
}
@@ -975,6 +993,9 @@ protected void handleHighestSeqno(Address sender, long seqno) {
}
}
+ protected void handleAck(Address sender, long ack) {
+ }
+
protected Message msgFromXmitRsp(Message msg, NakAckHeader hdr) {
if(msg == null)
return null;
diff --git a/src/org/jgroups/util/FixedBuffer.java b/src/org/jgroups/util/FixedBuffer.java
index 41ffa42422..f483c7c397 100644
--- a/src/org/jgroups/util/FixedBuffer.java
+++ b/src/org/jgroups/util/FixedBuffer.java
@@ -4,6 +4,7 @@
import java.io.Closeable;
import java.util.*;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -29,6 +30,8 @@ public class FixedBuffer extends Buffer implements Closeable {
/** Used to unblock blocked senders on close() */
protected boolean open=true;
+ protected final LongAdder num_blockings=new LongAdder();
+
public FixedBuffer() {
this(0);
@@ -54,7 +57,8 @@ public FixedBuffer(int capacity, long offset) {
this.low=this.hd=this.high=this.offset=offset;
}
- @Override public int capacity() {return buf.length;}
+ @Override public int capacity() {return buf.length;}
+ public long numBlockings() {return num_blockings.sum();}
/**
* Adds a new element to the buffer
@@ -275,6 +279,12 @@ public void forEach(long from, long to, Visitor visitor, boolean nullify, boo
buffer_full.signalAll();
}
+ @Override
+ public void resetStats() {
+ super.resetStats();
+ num_blockings.reset();
+ }
+
@Override
public void close() {
lock.lock();
@@ -323,6 +333,7 @@ protected int index(long seqno) {
protected boolean block(long seqno) {
while(open && seqno - low > capacity()) {
try {
+ num_blockings.increment();
buffer_full.await();
}
catch(InterruptedException e) {