Skip to content

Commit

Permalink
- Added num_blockings to NAKACK4
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Aug 8, 2024
1 parent 43aecc7 commit d727c90
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 7 deletions.
71 changes: 68 additions & 3 deletions src/org/jgroups/protocols/NAKACK4.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,88 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.util.AckTable;
import org.jgroups.util.Buffer;
import org.jgroups.util.Buffer.Options;
import org.jgroups.util.FixedBuffer;

import java.util.List;
import java.util.concurrent.atomic.LongAdder;

import static org.jgroups.conf.AttributeType.SCALAR;

/**
* New multicast protocols based on fixed-size xmit windows and message ACKs<rb/>
* Details: https://issues.redhat.com/browse/JGRP-2780
* @author Bela Ban
* @since 5.4
*/
public class NAKACK4 extends ReliableMulticast {
protected static final Buffer.Options SEND_OPTIONS=new Buffer.Options().block(true);
protected final AckTable ack_table=new AckTable();
protected static final Options SEND_OPTIONS=new Options().block(true);

@Property(description="Size of the send/receive buffers, in messages",writable=false)
protected int capacity=1024;

@ManagedAttribute(description="Number of ACKs received",type=SCALAR)
protected final LongAdder acks_received=new LongAdder();

@ManagedAttribute(description="Number of times sender threads were blocked on a full send window",type=SCALAR)
public long getNumBlockings() {
FixedBuffer<Message> buf=(FixedBuffer<Message>)sendBuf();
return buf != null? buf.numBlockings() : -1;
}

@Override
protected Buffer<Message> createXmitWindow(long initial_seqno) {
return new FixedBuffer<>(initial_seqno);
return new FixedBuffer<>(capacity, initial_seqno);
}

@Override
protected Options sendOptions() {return SEND_OPTIONS;}
protected int capacity() {return capacity;}

@ManagedAttribute(description="The minimum of all ACKs",type=SCALAR)
public long acksMin() {return ack_table.min();}

@Override
public void resetStats() {
super.resetStats();
acks_received.reset();
Buffer<Message> buf=sendBuf();
if(buf != null)
buf.resetStats();
}

@Override
public void destroy() {
super.destroy();
ack_table.clear();
}

@ManagedOperation(description="Prints the ACKs received from members")
public String printAckTable() {return ack_table.toString();}

@Override
protected Buffer.Options sendOptions() {return SEND_OPTIONS;}
protected void adjustReceivers(List<Address> members) {
super.adjustReceivers(members);
ack_table.adjust(members);
}

@Override
protected void handleAck(Address sender, long ack) {
Buffer<Message> buf=sendBuf();
if(buf == null) {
log.warn("%s: local send buffer is null", local_addr);
return;
}
acks_received.increment();
long old_min=ack_table.min(), new_min=ack_table.ack(sender, ack);
if(new_min > old_min)
buf.purge(new_min); // unblocks senders waiting for space to become available
}
}
27 changes: 24 additions & 3 deletions src/org/jgroups/protocols/ReliableMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,11 @@ public String printMessages() {
for(Map.Entry<Address,Buffer<Message>> entry: xmit_table.entrySet()) {
Address addr=entry.getKey();
Buffer<Message> win=entry.getValue();
ret.append(addr).append(": ").append(win.toString()).append('\n');
int cap=win.capacity();
ret.append(addr).append(": ").append(win);
if(cap >0)
ret.append(String.format(" [capacity: %s]", cap));
ret.append('\n');
}
return ret.toString();
}
Expand All @@ -333,7 +337,7 @@ public String printDigestHistory() {
return sb.toString();
}

protected Buffer<Message> sendWin() {
protected Buffer<Message> sendBuf() {
return local_xmit_table != null? local_xmit_table : (local_xmit_table=xmit_table.get(local_addr));
}

Expand Down Expand Up @@ -559,6 +563,10 @@ public Object up(Message msg) {
handleHighestSeqno(msg.getSrc(), hdr.seqno);
return null;

case NakAckHeader.ACK:
handleAck(msg.src(), hdr.seqno);
return null;

default:
log.error(Util.getMessage("HeaderTypeNotKnown"), local_addr, hdr.type);
return null;
Expand All @@ -572,6 +580,7 @@ public void up(MessageBatch mb) {
up_prot.up(mb);
return;
}
long highest_ack=0;
for(FastArray<Message>.FastIterator it=(FastArray<Message>.FastIterator)mb.iterator(); it.hasNext();) {
final Message msg=it.next();
NakAckHeader hdr;
Expand Down Expand Up @@ -607,11 +616,18 @@ public void up(MessageBatch mb) {
it.remove();
handleHighestSeqno(mb.sender(), hdr.seqno);
break;
case NakAckHeader.ACK:
it.remove();
highest_ack=Math.max(highest_ack, hdr.seqno);
break;
default:
log.error(Util.getMessage("HeaderTypeNotKnown"), local_addr, hdr.type);
}
}

if(highest_ack > 0)
handleAck(mb.sender(), highest_ack);

if(!mb.isEmpty())
handleMessageBatch(mb);

Expand Down Expand Up @@ -677,7 +693,7 @@ protected void send(Message msg) {
return;
}

Buffer<Message> win=sendWin();
Buffer<Message> win=sendBuf();
if(win == null) // discard message if there is no entry for local_addr
return;

Expand Down Expand Up @@ -810,6 +826,8 @@ protected void removeAndDeliver(Buffer<Message> win, Address sender, boolean loo
if(size > 0) {
if(stats)
avg_batch_size.add(size);
long hd=win.highestDelivered();
down_prot.down(new EmptyMessage(sender).putHeader(id, NakAckHeader.createAckHeader(hd)).setFlag(OOB));
deliverBatch(batch);
}
}
Expand Down Expand Up @@ -975,6 +993,9 @@ protected void handleHighestSeqno(Address sender, long seqno) {
}
}

protected void handleAck(Address sender, long ack) {
}

protected Message msgFromXmitRsp(Message msg, NakAckHeader hdr) {
if(msg == null)
return null;
Expand Down
13 changes: 12 additions & 1 deletion src/org/jgroups/util/FixedBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.io.Closeable;
import java.util.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand All @@ -29,6 +30,8 @@ public class FixedBuffer<T> extends Buffer<T> implements Closeable {
/** Used to unblock blocked senders on close() */
protected boolean open=true;

protected final LongAdder num_blockings=new LongAdder();


public FixedBuffer() {
this(0);
Expand All @@ -54,7 +57,8 @@ public FixedBuffer(int capacity, long offset) {
this.low=this.hd=this.high=this.offset=offset;
}

@Override public int capacity() {return buf.length;}
@Override public int capacity() {return buf.length;}
public long numBlockings() {return num_blockings.sum();}

/**
* Adds a new element to the buffer
Expand Down Expand Up @@ -275,6 +279,12 @@ public void forEach(long from, long to, Visitor<T> visitor, boolean nullify, boo
buffer_full.signalAll();
}

@Override
public void resetStats() {
super.resetStats();
num_blockings.reset();
}

@Override
public void close() {
lock.lock();
Expand Down Expand Up @@ -323,6 +333,7 @@ protected int index(long seqno) {
protected boolean block(long seqno) {
while(open && seqno - low > capacity()) {
try {
num_blockings.increment();
buffer_full.await();
}
catch(InterruptedException e) {
Expand Down

0 comments on commit d727c90

Please sign in to comment.