From d727c905948e4ee442a183884c9a08549df14a45 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 8 Aug 2024 09:54:17 +0200 Subject: [PATCH] - Added num_blockings to NAKACK4 --- src/org/jgroups/protocols/NAKACK4.java | 71 ++++++++++++++++++- .../jgroups/protocols/ReliableMulticast.java | 27 ++++++- src/org/jgroups/util/FixedBuffer.java | 13 +++- 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/src/org/jgroups/protocols/NAKACK4.java b/src/org/jgroups/protocols/NAKACK4.java index 436ec43cd4..812027472d 100644 --- a/src/org/jgroups/protocols/NAKACK4.java +++ b/src/org/jgroups/protocols/NAKACK4.java @@ -1,9 +1,20 @@ package org.jgroups.protocols; +import org.jgroups.Address; import org.jgroups.Message; +import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.ManagedOperation; +import org.jgroups.annotations.Property; +import org.jgroups.util.AckTable; import org.jgroups.util.Buffer; +import org.jgroups.util.Buffer.Options; import org.jgroups.util.FixedBuffer; +import java.util.List; +import java.util.concurrent.atomic.LongAdder; + +import static org.jgroups.conf.AttributeType.SCALAR; + /** * New multicast protocols based on fixed-size xmit windows and message ACKs * Details: https://issues.redhat.com/browse/JGRP-2780 @@ -11,13 +22,67 @@ * @since 5.4 */ public class NAKACK4 extends ReliableMulticast { - protected static final Buffer.Options SEND_OPTIONS=new Buffer.Options().block(true); + protected final AckTable ack_table=new AckTable(); + protected static final Options SEND_OPTIONS=new Options().block(true); + + @Property(description="Size of the send/receive buffers, in messages",writable=false) + protected int capacity=1024; + + @ManagedAttribute(description="Number of ACKs received",type=SCALAR) + protected final LongAdder acks_received=new LongAdder(); + + @ManagedAttribute(description="Number of times sender threads were blocked on a full send window",type=SCALAR) + public long getNumBlockings() { + FixedBuffer buf=(FixedBuffer)sendBuf(); + return buf != null? buf.numBlockings() : -1; + } @Override protected Buffer createXmitWindow(long initial_seqno) { - return new FixedBuffer<>(initial_seqno); + return new FixedBuffer<>(capacity, initial_seqno); + } + + @Override + protected Options sendOptions() {return SEND_OPTIONS;} + protected int capacity() {return capacity;} + + @ManagedAttribute(description="The minimum of all ACKs",type=SCALAR) + public long acksMin() {return ack_table.min();} + + @Override + public void resetStats() { + super.resetStats(); + acks_received.reset(); + Buffer buf=sendBuf(); + if(buf != null) + buf.resetStats(); + } + + @Override + public void destroy() { + super.destroy(); + ack_table.clear(); } + @ManagedOperation(description="Prints the ACKs received from members") + public String printAckTable() {return ack_table.toString();} + @Override - protected Buffer.Options sendOptions() {return SEND_OPTIONS;} + protected void adjustReceivers(List
members) { + super.adjustReceivers(members); + ack_table.adjust(members); + } + + @Override + protected void handleAck(Address sender, long ack) { + Buffer buf=sendBuf(); + if(buf == null) { + log.warn("%s: local send buffer is null", local_addr); + return; + } + acks_received.increment(); + long old_min=ack_table.min(), new_min=ack_table.ack(sender, ack); + if(new_min > old_min) + buf.purge(new_min); // unblocks senders waiting for space to become available + } } diff --git a/src/org/jgroups/protocols/ReliableMulticast.java b/src/org/jgroups/protocols/ReliableMulticast.java index a264f32eba..10bb86183e 100644 --- a/src/org/jgroups/protocols/ReliableMulticast.java +++ b/src/org/jgroups/protocols/ReliableMulticast.java @@ -313,7 +313,11 @@ public String printMessages() { for(Map.Entry> entry: xmit_table.entrySet()) { Address addr=entry.getKey(); Buffer win=entry.getValue(); - ret.append(addr).append(": ").append(win.toString()).append('\n'); + int cap=win.capacity(); + ret.append(addr).append(": ").append(win); + if(cap >0) + ret.append(String.format(" [capacity: %s]", cap)); + ret.append('\n'); } return ret.toString(); } @@ -333,7 +337,7 @@ public String printDigestHistory() { return sb.toString(); } - protected Buffer sendWin() { + protected Buffer sendBuf() { return local_xmit_table != null? local_xmit_table : (local_xmit_table=xmit_table.get(local_addr)); } @@ -559,6 +563,10 @@ public Object up(Message msg) { handleHighestSeqno(msg.getSrc(), hdr.seqno); return null; + case NakAckHeader.ACK: + handleAck(msg.src(), hdr.seqno); + return null; + default: log.error(Util.getMessage("HeaderTypeNotKnown"), local_addr, hdr.type); return null; @@ -572,6 +580,7 @@ public void up(MessageBatch mb) { up_prot.up(mb); return; } + long highest_ack=0; for(FastArray.FastIterator it=(FastArray.FastIterator)mb.iterator(); it.hasNext();) { final Message msg=it.next(); NakAckHeader hdr; @@ -607,11 +616,18 @@ public void up(MessageBatch mb) { it.remove(); handleHighestSeqno(mb.sender(), hdr.seqno); break; + case NakAckHeader.ACK: + it.remove(); + highest_ack=Math.max(highest_ack, hdr.seqno); + break; default: log.error(Util.getMessage("HeaderTypeNotKnown"), local_addr, hdr.type); } } + if(highest_ack > 0) + handleAck(mb.sender(), highest_ack); + if(!mb.isEmpty()) handleMessageBatch(mb); @@ -677,7 +693,7 @@ protected void send(Message msg) { return; } - Buffer win=sendWin(); + Buffer win=sendBuf(); if(win == null) // discard message if there is no entry for local_addr return; @@ -810,6 +826,8 @@ protected void removeAndDeliver(Buffer win, Address sender, boolean loo if(size > 0) { if(stats) avg_batch_size.add(size); + long hd=win.highestDelivered(); + down_prot.down(new EmptyMessage(sender).putHeader(id, NakAckHeader.createAckHeader(hd)).setFlag(OOB)); deliverBatch(batch); } } @@ -975,6 +993,9 @@ protected void handleHighestSeqno(Address sender, long seqno) { } } + protected void handleAck(Address sender, long ack) { + } + protected Message msgFromXmitRsp(Message msg, NakAckHeader hdr) { if(msg == null) return null; diff --git a/src/org/jgroups/util/FixedBuffer.java b/src/org/jgroups/util/FixedBuffer.java index 41ffa42422..f483c7c397 100644 --- a/src/org/jgroups/util/FixedBuffer.java +++ b/src/org/jgroups/util/FixedBuffer.java @@ -4,6 +4,7 @@ import java.io.Closeable; import java.util.*; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Condition; import java.util.function.BiConsumer; import java.util.function.Function; @@ -29,6 +30,8 @@ public class FixedBuffer extends Buffer implements Closeable { /** Used to unblock blocked senders on close() */ protected boolean open=true; + protected final LongAdder num_blockings=new LongAdder(); + public FixedBuffer() { this(0); @@ -54,7 +57,8 @@ public FixedBuffer(int capacity, long offset) { this.low=this.hd=this.high=this.offset=offset; } - @Override public int capacity() {return buf.length;} + @Override public int capacity() {return buf.length;} + public long numBlockings() {return num_blockings.sum();} /** * Adds a new element to the buffer @@ -275,6 +279,12 @@ public void forEach(long from, long to, Visitor visitor, boolean nullify, boo buffer_full.signalAll(); } + @Override + public void resetStats() { + super.resetStats(); + num_blockings.reset(); + } + @Override public void close() { lock.lock(); @@ -323,6 +333,7 @@ protected int index(long seqno) { protected boolean block(long seqno) { while(open && seqno - low > capacity()) { try { + num_blockings.increment(); buffer_full.await(); } catch(InterruptedException e) {