diff --git a/src/org/jgroups/util/Buffer.java b/src/org/jgroups/util/Buffer.java index 01f21fa909..a85a83b024 100644 --- a/src/org/jgroups/util/Buffer.java +++ b/src/org/jgroups/util/Buffer.java @@ -92,7 +92,9 @@ public boolean add(long seqno, T element) { public abstract T _get(long seqno); - public abstract T remove(); + public T remove() { + return remove(true); + } public abstract T remove(boolean nullify); @@ -107,15 +109,20 @@ public abstract R removeMany(boolean nullify, int max_results, Predicate Supplier result_creator, BiConsumer accumulator); /** - * Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix - * and nulling all elements < index(seqno) of the first row that cannot be removed - * @param seqno + * Removes all elements <= seqno from the buffer. Does this by nulling all elements < index(seqno) */ // used: in stable() [NAKACK3 only] public int purge(long seqno) { return purge(seqno, false); } + /** + * Purges (nulls) all elements <= seqno. + * @param seqno All elements <= seqno will be purged. + * @param force If false, seqno is max(seqno,hd), else max(seqno,high). In the latter case (seqno > hd), we might + * purge elements that have not yet been received + * @return 0. The number of purged elements + */ public abstract int purge(long seqno, boolean force); public void forEach(Visitor visitor, boolean nullify) { @@ -131,8 +138,11 @@ public void forEach(Visitor visitor, boolean nullify) { public abstract Stream stream(long from, long to); + /** Iterates from hd to high and adds up non-null values. Caller must hold the lock. */ @GuardedBy("lock") - public abstract int computeSize(); + public int computeSize() { + return (int)stream().filter(Objects::nonNull).count(); + } /** Returns the number of null elements in the range [hd+1 .. hr-1] excluding hd and hr */ public int numMissing() { @@ -276,6 +286,41 @@ public interface Visitor { boolean visit(long seqno, T element); } + protected class Remover implements Visitor { + protected final int max_results; + protected int num_results; + protected final Predicate filter; + protected R result; + protected Supplier result_creator; + protected BiConsumer result_accumulator; + + public Remover(int max_results, Predicate filter, Supplier creator, BiConsumer accumulator) { + this.max_results=max_results; + this.filter=filter; + this.result_creator=creator; + this.result_accumulator=accumulator; + } + + public R getResult() {return result;} + + @GuardedBy("lock") + public boolean visit(long seqno, T element) { + if(element == null) + return false; + if(filter == null || filter.test(element)) { + if(result == null) + result=result_creator.get(); + result_accumulator.accept(result, element); + num_results++; + } + size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !) + if(seqno - hd > 0) + hd=seqno; + return max_results == 0 || num_results < max_results; + } + } + + protected class NumDeliverable implements Visitor { protected int num_deliverable=0; diff --git a/src/org/jgroups/util/DynamicBuffer.java b/src/org/jgroups/util/DynamicBuffer.java index 6cc24bf0a5..68031a3985 100644 --- a/src/org/jgroups/util/DynamicBuffer.java +++ b/src/org/jgroups/util/DynamicBuffer.java @@ -132,24 +132,24 @@ public boolean add(long seqno, T element, Predicate remove_filter, Options ig T[] row=getRow(row_index); int index=computeIndex(seqno); T existing_element=row[index]; - if(existing_element == null) { - row[index]=element; - size++; - if(seqno - high > 0) - high=seqno; - if(remove_filter != null && seqno-hd > 0) { - forEach((seq, msg) -> { - if(msg == null || !remove_filter.test(msg)) - return false; - if(seq - hd > 0) - hd=seq; - size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !) - return true; - }, false); - } - return true; + if(existing_element != null) + return false; + row[index]=element; + size++; + if(seqno - high > 0) + high=seqno; + if(remove_filter != null && seqno-hd > 0) { + Visitor v=(seq,msg) -> { + if(msg == null || !remove_filter.test(msg)) + return false; + if(seq - hd > 0) + hd=seq; + size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !) + return true; + }; + forEach(v, false); } - return false; + return true; } finally { lock.unlock(); @@ -247,12 +247,6 @@ public T _get(long seqno) { } } - - @Override - public T remove() { - return remove(true); - } - /** Removes the next non-null element and nulls the index if nullify=true */ @Override public T remove(boolean nullify) { @@ -306,7 +300,7 @@ public R removeMany(boolean nullify, int max_results, Predicate filter, Supplier result_creator, BiConsumer accumulator) { lock.lock(); try { - Remover remover=new Remover<>(max_results, filter, result_creator, accumulator); + Remover remover=new Remover(max_results, filter, result_creator, accumulator); forEach(remover, nullify); return remover.getResult(); } @@ -317,11 +311,11 @@ public R removeMany(boolean nullify, int max_results, Predicate filter, /** - * Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix + * Removes all elements less than or equal to seqno from the buffer. Does this by nulling entire rows in the matrix * and nulling all elements < index(seqno) of the first row that cannot be removed. * @param seqno All elements <= seqno will be nulled * @param force If true, we only ensure that seqno <= hr, but don't care about hd, and set hd=low=seqno. - * @return 0. This value is never used by {@link DynamicBuffer}, so it is not computed. Don't use it! + * @return 0. The number of purged elements */ @Override public int purge(long seqno, boolean force) { @@ -542,13 +536,6 @@ protected void _compact() { } } - /** Iterates from low to hr and add up non-null values. Caller must hold the lock. */ - @Override - @GuardedBy("lock") - public int computeSize() { - return (int)stream().filter(Objects::nonNull).count(); - } - /** * Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist * @param index @@ -632,7 +619,7 @@ public T next() { } - protected class Remover implements Visitor { + /*protected class Remover implements Visitor { protected final int max_results; protected int num_results; protected final Predicate filter; @@ -651,21 +638,20 @@ public Remover(int max_results, Predicate filter, Supplier creator, BiCons @GuardedBy("lock") public boolean visit(long seqno, T element) { - if(element != null) { - if(filter == null || filter.test(element)) { - if(result == null) - result=result_creator.get(); - result_accumulator.accept(result, element); - num_results++; - } - size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !) - if(seqno - hd > 0) - hd=seqno; - return max_results == 0 || num_results < max_results; + if(element == null) + return false; + if(filter == null || filter.test(element)) { + if(result == null) + result=result_creator.get(); + result_accumulator.accept(result, element); + num_results++; } - return false; + size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !) + if(seqno - hd > 0) + hd=seqno; + return max_results == 0 || num_results < max_results; } } - +*/ } diff --git a/src/org/jgroups/util/FixedBuffer.java b/src/org/jgroups/util/FixedBuffer.java index 5375b43af6..40714ab1a9 100644 --- a/src/org/jgroups/util/FixedBuffer.java +++ b/src/org/jgroups/util/FixedBuffer.java @@ -56,7 +56,7 @@ public FixedBuffer(int capacity, long offset) { @Override public int capacity() {return buf.length;} - @Override + /*@Override public int computeSize() { if(high - low <= 0) return 0; @@ -70,7 +70,7 @@ public int computeSize() { retval++; } return retval; - } + }*/ /** @@ -104,15 +104,16 @@ public boolean add(long seqno, T element, Predicate remove_filter, Options op if(seqno - high > 0) high=seqno; - if(remove_filter != null && seqno - low > 0) { + if(remove_filter != null && seqno - hd > 0) { Visitor v=(seq,msg) -> { if(msg == null || !remove_filter.test(msg)) return false; - if(seq - low > 0) - low=seq; + if(seq - hd > 0) + hd=seq; + size=Math.max(size-1, 0); return true; }; - forEach(highestDelivered()+1, high(), v, true, true); + forEach(highestDelivered()+1, high(), v, false, true); } return true; } @@ -153,7 +154,8 @@ public boolean add(MessageBatch batch, Function seqno_getter, boolean re * Removes the next non-null element and advances hd * @return T if there was a non-null element at hd+1, otherwise null */ - public T remove() { + @Override + public T remove(boolean nullify) { lock.lock(); try { long tmp=hd + 1; @@ -163,10 +165,12 @@ public T remove() { T element=buf[index]; if(element != null) { hd=tmp; - buf[index]=null; size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !) - if(hd - low > 0) - low=hd; + if(nullify) { + buf[index]=null; + if(hd - low > 0) + low=hd; + } buffer_full.signalAll(); } return element; @@ -176,22 +180,17 @@ public T remove() { } } - @Override - public T remove(boolean nullify) { - return remove(); - } - @Override public List removeMany(boolean nullify, int max_results, Predicate filter) { - return removeMany(true, max_results, filter, LinkedList::new, LinkedList::add); + return removeMany(nullify, max_results, filter, LinkedList::new, LinkedList::add); } @Override public R removeMany(boolean nullify, int max_results, Predicate filter, Supplier result_creator, BiConsumer accumulator) { lock.lock(); try { - Remover remover=new Remover<>(max_results, filter, result_creator, accumulator); - forEach(remover, true); + Remover remover=new Remover(max_results, filter, result_creator, accumulator); + forEach(remover, nullify); return remover.getResult(); } finally { @@ -225,10 +224,9 @@ public T _get(long seqno) { } } - /** Nulls elements between low and seqno and forwards low. Returns the number of nulled elements */ @Override public int purge(long seqno, boolean force) { - int count=0; + int purged=0; lock.lock(); try { if(seqno - low <= 0) // ignore if seqno <= low @@ -249,21 +247,17 @@ public int purge(long seqno, boolean force) { int index=index(from); if(buf[index] != null) { buf[index]=null; - size=Math.max(size-1, 0); - count++; + // size=Math.max(size-1, 0); + purged++; } low++; from++; - hd=Math.max(hd, low); + hd=Math.max(hd,low); } - /*if(seqno - low > 0) - low=seqno; - if(force) { - if(seqno - hd > 0) - hd=seqno; - }*/ + if(force) + size=computeSize(); if(low - tmp > 0) buffer_full.signalAll(); - return count; + return purged; } finally { lock.unlock(); @@ -279,7 +273,7 @@ public void forEach(long from, long to, Visitor visitor, boolean nullify, boo if(from - to > 0) // same as if(from > to), but prevents long overflow return; int distance=(int)(to - from +1); - long start=hd; + long start=low; for(int i=0; i < distance; i++) { int index=index(from); T element=buf[index]; @@ -288,9 +282,6 @@ public void forEach(long from, long to, Visitor visitor, boolean nullify, boo break; if(nullify && element != null) { buf[index]=null; - size=Math.max(size-1, 0); - if(from - hd > 0) - hd=from; if(from - low > 0) low=from; } @@ -298,7 +289,7 @@ public void forEach(long from, long to, Visitor visitor, boolean nullify, boo break; from++; } - if(nullify && hd - start > 0) + if(low - start > 0) buffer_full.signalAll(); } @@ -360,44 +351,9 @@ protected boolean block(long seqno) { - protected class Remover implements Visitor { - protected final int max_results; - protected int num_results; - protected final Predicate filter; - protected R result; - protected Supplier result_creator; - protected BiConsumer result_accumulator; - - public Remover(int max_results, Predicate filter, Supplier creator, BiConsumer accumulator) { - this.max_results=max_results; - this.filter=filter; - this.result_creator=creator; - this.result_accumulator=accumulator; - } - - public R getResult() { - return result; - } - - @GuardedBy("lock") - public boolean visit(long seqno, T element) { - if(element == null) - return false; - if(filter == null || filter.test(element)) { - if(result == null) - result=result_creator.get(); - result_accumulator.accept(result, element); - num_results++; - } - if(seqno - hd > 0) - hd=seqno; - return max_results == 0 || num_results < max_results; - } - } - protected class FixedBufferIterator implements Iterator { protected final T[] buffer; - protected long current=low+1; + protected long current=hd+1; public FixedBufferIterator(T[] buffer) { this.buffer=buffer; @@ -412,8 +368,8 @@ public T next() { if(!hasNext()) throw new NoSuchElementException(); // if(current <= low) - if(low - current >= 0) - current=low+1; + if(hd - current >= 0) + current=hd+1; return buffer[index(current++)]; } diff --git a/tests/junit-functional/org/jgroups/tests/BufferTest.java b/tests/junit-functional/org/jgroups/tests/BufferTest.java index 856206a684..0dd3f2f923 100644 --- a/tests/junit-functional/org/jgroups/tests/BufferTest.java +++ b/tests/junit-functional/org/jgroups/tests/BufferTest.java @@ -476,13 +476,8 @@ public void testAddAndRemove(Buffer buf) { buf.add(5, msg(5, true), dont_loopback_filter, Options.DEFAULT()); buf.add(6, msg(6, true), dont_loopback_filter, Options.DEFAULT()); assert buf.highestDelivered() == 6; - if(buf instanceof FixedBuffer) - for(int i=1; i <= 6; i++) - assert buf._get(i) == null; - else { - assert IntStream.rangeClosed(1,2).boxed().allMatch(n -> buf._get(n) == null); - assert IntStream.rangeClosed(3,6).boxed().allMatch(n -> buf._get(n) != null); - } + assert IntStream.rangeClosed(1,2).boxed().allMatch(n -> buf._get(n) == null); + assert IntStream.rangeClosed(3,6).boxed().allMatch(n -> buf._get(n) != null); } public void testAddAndRemove2(Buffer buf) { @@ -530,19 +525,11 @@ public void testIndex(Buffer type) { buf.add(6,6); buf.add(7,7); buf.remove(false); buf.remove(false); long low=buf.low(); - if(buf instanceof DynamicBuffer) - assert low == 5; - else - assert low == 7; - if(buf instanceof FixedBuffer) - for(int p: List.of(4, 5, 6, 7)) - assert buf.purge(p) == 0; - else { - assert buf.purge(4) == 0; - assert buf.purge(5) == 0; - assert buf.purge(6) == 1; - assert buf.purge(7) == 1; - } + assert low == 5; + assert buf.purge(4) == 0; + assert buf.purge(5) == 0; + assert buf.purge(6) == 1; + assert buf.purge(7) == 1; System.out.println("buf = " + buf); for(long i=low; i <= 7; i++) assert buf._get(i) == null : "message with seqno=" + i + " is not null"; @@ -562,7 +549,7 @@ public void testIndexWithRemoveMany(Buffer type) { } public void testComputeSize(Buffer buf) { - IntStream.rangeClosed(1,10).boxed().forEach(n -> buf.add(n,n)); + IntStream.rangeClosed(1,10).forEach(n -> buf.add(n,n)); System.out.println("buf = " + buf); assert buf.computeSize() == 10; buf.removeMany(false, 3); @@ -575,7 +562,7 @@ public void testComputeSize(Buffer buf) { assert buf.computeSize() == 3; } - public static void testComputeSize2(Buffer buf) { + public void testComputeSize2(Buffer buf) { buf.add(1, 1); System.out.println("buf = " + buf); assert buf.computeSize() == buf.size(); @@ -628,7 +615,7 @@ public void testRemove2(Buffer buf) { public void testRemove3(Buffer buf) { IntStream.rangeClosed(1,5).forEach(n -> buf.add(n,n)); System.out.println("buf = " + buf); - assertIndices(buf, 0, 5); + assertIndices(buf, 0, 0, 5); for(int i=1; i <= 5; i++) { Integer el=buf.remove(); @@ -756,10 +743,7 @@ public void testRemoveManyWithWrapping2(Buffer buf) { assert buf.size() == 18 && buf.numMissing() == 2; List list=buf.removeMany(false,0); assert list.size() == 12; - if(buf instanceof DynamicBuffer) - assertIndices(buf, 0, 12, 20); - else - assertIndices(buf, 12, 20); + assertIndices(buf, 0, 12, 20); assert buf.size() == 6 && buf.numMissing() == 2; buf.purge(12); assertIndices(buf, 12, 12, 20); @@ -1285,10 +1269,7 @@ public void testResizeWithPurgeAndGetOfNonExistingElement(Buffer buf) { assert num != null && num == i; } System.out.println("buf after removal of seqno 15: " + buf); - if(buf instanceof DynamicBuffer) - assertIndices(buf, 0, 15, 50); - else - assertIndices(buf, 15, 50); + assertIndices(buf, 0, 15, 50); assert buf.size() == 35 && buf.numMissing() == 0; buf.purge(15); @@ -1521,8 +1502,7 @@ public void testPurge6(Buffer buf) { public void testPurgeForce(Buffer buf) { - for(int i=1; i <= 30; i++) - buf.add(i, i); + IntStream.rangeClosed(1,30).forEach(n -> buf.add(n,n)); System.out.println("buf = " + buf); buf.purge(15, true); System.out.println("buf = " + buf); @@ -1552,6 +1532,30 @@ public void testPurgeForce(Buffer buf) { assertIndices(buf, 40, 40, 40); } + public void testPurgeForceWithGaps(Buffer buf) { + IntStream.rangeClosed(1,30).filter(n -> n % 2 == 0).forEach(n -> buf.add(n,n)); + System.out.println("buf = " + buf); + int purged=buf.purge(15, true); + assert purged == 7; + System.out.println("buf = " + buf); + assertIndices(buf, 15, 15, 30); + for(int i=1; i <= 15; i++) + assert buf._get(i) == null; + for(int i=16; i<= 30; i++) { + if(i % 2 == 0) + assert buf._get(i) != null; + else + assert buf._get(i) == null; + } + assert buf.get(5) == null && buf.get(26) != null; + + purged=buf.purge(30, true); + assert purged == 8; + System.out.println("buf = " + buf); + assertIndices(buf, 30, 30, 30); + assert buf.isEmpty(); + } + // Tests purge(40) followed by purge(20) - the second purge() should be ignored // https://issues.redhat.com/browse/JGRP-1872 public void testPurgeLower(Buffer buf) { @@ -1578,22 +1582,15 @@ public void testPurgeLower(Buffer buf) { public void testCompact(Buffer type) { Buffer buf=type instanceof DynamicBuffer? new DynamicBuffer<>(3,10,0) : new FixedBuffer<>(80, 0); - for(int i=1; i <= 80; i++) - addAndGet(buf, i); + IntStream.rangeClosed(1,80).boxed().forEach(n -> buf.add(n,n)); assert buf.size() == 80; assertIndices(buf, 0, 0, 80); List list=buf.removeMany(false,60); assert list.size() == 60; assert list.get(0) == 1 && list.get(list.size() -1) == 60; - if(type instanceof DynamicBuffer) - assertIndices(buf, 0, 60, 80); - else - assertIndices(buf, 60, 80); + assertIndices(buf, 0, 60, 80); buf.purge(60); - if(type instanceof DynamicBuffer) - assertIndices(buf, 60, 60, 80); - else - assertIndices(buf, 60, 80); + assertIndices(buf, 60, 60, 80); assert buf.size() == 20; if(buf instanceof DynamicBuffer) { ((DynamicBuffer)buf).compact(); @@ -1889,11 +1886,6 @@ protected static void assertIndices(Buffer buf, long low, long hd, long h assert buf.high() == hr : "expected hr=" + hr + " but was " + buf.high(); } - protected static void assertIndices(Buffer buf, long low, long high) { - assert buf.low() == low : String.format("expected low=%,d but was %,d", low, buf.low()); - assert buf.high() == high : String.format("expected hr=%,d but was %,d", high, buf.high()); - } - protected static class Adder extends Thread { protected final CountDownLatch latch; protected final int seqno;