diff --git a/src/org/jgroups/util/AckTable.java b/src/org/jgroups/util/AckTable.java index 51959823b3..8a5eeec880 100644 --- a/src/org/jgroups/util/AckTable.java +++ b/src/org/jgroups/util/AckTable.java @@ -1,10 +1,14 @@ package org.jgroups.util; import org.jgroups.Address; +import org.jgroups.annotations.GuardedBy; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** @@ -13,39 +17,83 @@ * @since 5.4 */ public class AckTable { - protected final Map acks=Util.createConcurrentMap(); - protected long min; // the current minimum, recomputed on ack() and view change + protected final Map acks=new HashMap<>(); + protected final Lock lock=new ReentrantLock(); + protected volatile long min; // the current minimum, recomputed on ack() and view change - public long min() {return min;} + public long min() { + lock.lock(); + try { + return min; + } + finally { + lock.unlock(); + } + } /** Adds an ACK from a sender to the map. Returns the new minimum */ public long ack(Address sender, long seqno) { - Long existing=acks.get(sender); - if(existing != null && existing >= seqno) - return min; - acks.put(sender, seqno); - return min=computeMin(); + // System.out.printf("-- [%d] ACK(%s,%d)\n", Thread.currentThread().getId(), sender, seqno); + lock.lock(); + try { + Long existing=acks.get(sender); + if(existing != null && existing >= seqno) + return min; + acks.put(sender, seqno); + return min=computeMin(); + } + finally { + lock.unlock(); + } } /** Removes left members from and adds new members to the map */ public AckTable adjust(List
mbrs) { if(mbrs == null) return this; - acks.keySet().retainAll(mbrs); - for(Address mbr: mbrs) - acks.putIfAbsent(mbr, 0L); - min=computeMin(); - return this; + lock.lock(); + try { + acks.keySet().retainAll(mbrs); + for(Address mbr: mbrs) + acks.putIfAbsent(mbr, 0L); + min=computeMin(); + return this; + } + finally { + lock.unlock(); + } } - public int size() {return acks.size();} + public AckTable clear() { + lock.lock(); + try { + acks.clear(); + min=computeMin(); + return this; + } + finally { + lock.unlock(); + } + } + + public int size() { + lock.lock(); + try { + return acks.size(); + } + finally { + lock.unlock(); + } + } + // no need for the lock - approximation, may be incorrect, that's ok @Override public String toString() { - String tmp= acks.entrySet().stream().map(e -> String.format("%s: %d", e.getKey(), e.getValue())) + String tmp=acks.entrySet().stream().map(e -> String.format("%s: %d", e.getKey(), e.getValue())) .collect(Collectors.joining("\n")); return tmp.isEmpty()? "min: " + min : tmp + "\nmin: " + min; } + @GuardedBy("lock") protected long computeMin() { Optional m=acks.values().stream().min(Long::compareTo); return m.orElse(min); diff --git a/tests/junit-functional/org/jgroups/tests/AckTableTest.java b/tests/junit-functional/org/jgroups/tests/AckTableTest.java index 1b6d56668f..5064d2608b 100644 --- a/tests/junit-functional/org/jgroups/tests/AckTableTest.java +++ b/tests/junit-functional/org/jgroups/tests/AckTableTest.java @@ -64,5 +64,9 @@ public void testAck() { table.adjust(List.of(a,b,c,d)); assert table.size() == 4; assert table.min() == 0; + + table.clear(); + assert table.size() == 0; + assert table.min() == 0; } }