diff --git a/src/org/jgroups/util/RingBufferSeqno.java b/src/org/jgroups/util/RingBufferSeqno.java index 7701cd50e0..28c4407c6a 100644 --- a/src/org/jgroups/util/RingBufferSeqno.java +++ b/src/org/jgroups/util/RingBufferSeqno.java @@ -10,6 +10,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; /** * Ring buffer of fixed capacity. Indices low and high point to the beginning and end of the buffer. Sequence numbers @@ -70,7 +71,7 @@ public RingBufferSeqno(int capacity, long offset) { public int spaceUsed() {return (int)(high - low);} public Lock lock() {return lock;} - public double saturation() { + public double saturation() { int space=spaceUsed(); return space == 0? 0.0 : space / (double)capacity(); } @@ -240,6 +241,27 @@ public int purge(long seqno) { } } + /** + * Iterates throiugh all elements and invokes a consumer function + * @param c The consumer accepting the index and element + */ + public RingBufferSeqno forAll(BiConsumer c) { + if(c == null) + return this; + lock.lock(); + try { + for(long i=low+1; i <= high; i++) { + int index=index(i); + T element=buf[index]; + c.accept(index, element); + } + return this; + } + finally { + lock.unlock(); + } + } + @Override public void close() { lock.lock(); @@ -327,9 +349,8 @@ public boolean hasNext() { } public T next() { - if(!hasNext()){ + if(!hasNext()) throw new NoSuchElementException(); - } if(current <= low) current=low+1; return buffer[index(current++)]; diff --git a/tests/junit-functional/org/jgroups/tests/RingBufferSeqnoTest.java b/tests/junit-functional/org/jgroups/tests/RingBufferSeqnoTest.java index 8a8a199ced..54785b6341 100644 --- a/tests/junit-functional/org/jgroups/tests/RingBufferSeqnoTest.java +++ b/tests/junit-functional/org/jgroups/tests/RingBufferSeqnoTest.java @@ -6,6 +6,7 @@ import org.jgroups.util.Util; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -91,8 +92,8 @@ public void testSaturation() { saturation=buf.saturation(); System.out.println("size=" + size + ", space used=" + space_used + ", saturation=" + saturation); assert buf.size() == 5; - assert buf.spaceUsed() == 8; - assert buf.saturation() == 0.5; + assert buf.spaceUsed() == 5; + assert buf.saturation() == 5/16.0; long low=buf.low(); buf.purge(3); @@ -105,7 +106,7 @@ public void testSaturation() { System.out.println("size=" + size + ", space used=" + space_used + ", saturation=" + saturation); assert buf.size() == 5; assert buf.spaceUsed() == 5; - assert buf.saturation() == 0.3125; + assert buf.saturation() == 5/16.0; } public void testAddWithWrapAround() { @@ -221,15 +222,19 @@ public void testGetMissing() { public void testGetMissing2() { RingBufferSeqno buf=new RingBufferSeqno<>(10, 0); - buf.add(1,1); SeqnoList missing=buf.getMissing(); System.out.println("missing = " + missing); assert missing == null && buf.missing() == 0; + buf.add(1,1); + missing=buf.getMissing(); + System.out.println("missing = " + missing); + assert missing == null && buf.missing() == 0; buf=new RingBufferSeqno<>(10, 0); buf.add(10,10); missing=buf.getMissing(); System.out.println("missing = " + missing); + assert buf.missing() == 9; assert buf.missing() == missing.size(); buf=new RingBufferSeqno<>(10, 0); @@ -237,6 +242,12 @@ public void testGetMissing2() { missing=buf.getMissing(); System.out.println("missing = " + missing); assert buf.missing() == missing.size(); + buf.add(1,1); + buf.add(10,10); + missing=buf.getMissing(); + System.out.println("missing = " + missing); + assert buf.missing() == 7; + assert buf.missing() == missing.size(); buf=new RingBufferSeqno<>(10, 0); buf.add(5,7); @@ -301,6 +312,22 @@ public void testBlockingAddAndPurge2() throws TimeoutException { assert buf.size() == 15; } + public void testBlockingAddAndRemove() throws TimeoutException { + final RingBufferSeqno buf=new RingBufferSeqno<>(10, 0); + for(int i=1; i <= buf.capacity(); i++) + buf.add(i, i, true); + System.out.println("buf = " + buf); + assert buf.size() == 16; + Remover remover=new Remover(buf); + remover.start(); + for(int i=buf.capacity()+1; i <= buf.capacity()+5; i++) { + boolean rc=buf.add(i,i, true); + assert rc; + } + List removed=remover.removed(); + assert removed.size() == 5; + } + public void testGet() { final RingBufferSeqno buf=new RingBufferSeqno<>(10, 0); for(int i: Arrays.asList(1,2,3,4,5)) @@ -369,19 +396,13 @@ public void testRemovedPastHighestReceived() { RingBufferSeqno buf=new RingBufferSeqno<>(10, 0); int highest=buf.capacity(); for(int i=1; i <= 20; i++) { - if(i > highest) { + if(i > highest) assert !buf.add(i,i); - Integer num=buf.remove(); - assert num == null; - } - else { + else assert buf.add(i,i); - Integer num=buf.remove(); - assert num != null && num == i; - } } System.out.println("buf = " + buf); - assert buf.size() == 0; + assert buf.size() == 16; assert buf.missing() == 0; } @@ -520,6 +541,27 @@ public void run() { assertIndices(buf, 15, 15); } + public void testConcurrentAddAndRemove2() throws InterruptedException { + final int NUM=1000; + final RingBufferSeqno buf=new RingBufferSeqno<>(10,0); + final CountDownLatch latch=new CountDownLatch(1); + Adder[] adders=new Adder[NUM]; + for(int i=0; i < NUM; i++) { + adders[i]=new Adder(latch, i+1, buf); + adders[i].start(); + } + latch.countDown(); + List list=new ArrayList<>(NUM); + while(list.size() != NUM) { + List l=buf.removeMany(256); + if(l != null) + list.addAll(l); + } + assert list.size() == NUM; + for(int i=0; i < NUM; i++) + assert list.get(i) == i+1; + } + public void testPurge() { RingBufferSeqno buf=new RingBufferSeqno<>(10, 0); int purged=buf.purge(0); @@ -659,5 +701,21 @@ public void run() { } } + protected static class Remover extends Thread { + protected final RingBufferSeqno buf; + protected List removed; + + public Remover(RingBufferSeqno buf) { + this.buf=buf; + } + + public List removed() {return removed;} + + public void run() { + Util.sleep(1000); + removed=buf.removeMany(5); + } + } + }