Skip to content

Commit

Permalink
- Added hd to FixedBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 16, 2024
1 parent cf9569e commit 4f243be
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 173 deletions.
55 changes: 50 additions & 5 deletions src/org/jgroups/util/Buffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ public boolean add(long seqno, T element) {

public abstract T _get(long seqno);

public abstract T remove();
public T remove() {
return remove(true);
}

public abstract T remove(boolean nullify);

Expand All @@ -107,15 +109,20 @@ public abstract <R> R removeMany(boolean nullify, int max_results, Predicate<T>
Supplier<R> result_creator, BiConsumer<R,T> accumulator);

/**
* Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix
* and nulling all elements < index(seqno) of the first row that cannot be removed
* @param seqno
* Removes all elements <= seqno from the buffer. Does this by nulling all elements < index(seqno)
*/
// used: in stable() [NAKACK3 only]
public int purge(long seqno) {
return purge(seqno, false);
}

/**
* Purges (nulls) all elements <= seqno.
* @param seqno All elements <= seqno will be purged.
* @param force If false, seqno is max(seqno,hd), else max(seqno,high). In the latter case (seqno > hd), we might
* purge elements that have not yet been received
* @return 0. The number of purged elements
*/
public abstract int purge(long seqno, boolean force);

public void forEach(Visitor<T> visitor, boolean nullify) {
Expand All @@ -131,8 +138,11 @@ public void forEach(Visitor<T> visitor, boolean nullify) {

public abstract Stream<T> stream(long from, long to);

/** Iterates from hd to high and adds up non-null values. Caller must hold the lock. */
@GuardedBy("lock")
public abstract int computeSize();
public int computeSize() {
return (int)stream().filter(Objects::nonNull).count();
}

/** Returns the number of null elements in the range [hd+1 .. hr-1] excluding hd and hr */
public int numMissing() {
Expand Down Expand Up @@ -276,6 +286,41 @@ public interface Visitor<T> {
boolean visit(long seqno, T element);
}

protected class Remover<R> implements Visitor<T> {
protected final int max_results;
protected int num_results;
protected final Predicate<T> filter;
protected R result;
protected Supplier<R> result_creator;
protected BiConsumer<R,T> result_accumulator;

public Remover(int max_results, Predicate<T> filter, Supplier<R> creator, BiConsumer<R,T> accumulator) {
this.max_results=max_results;
this.filter=filter;
this.result_creator=creator;
this.result_accumulator=accumulator;
}

public R getResult() {return result;}

@GuardedBy("lock")
public boolean visit(long seqno, T element) {
if(element == null)
return false;
if(filter == null || filter.test(element)) {
if(result == null)
result=result_creator.get();
result_accumulator.accept(result, element);
num_results++;
}
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
if(seqno - hd > 0)
hd=seqno;
return max_results == 0 || num_results < max_results;
}
}


protected class NumDeliverable implements Visitor<T> {
protected int num_deliverable=0;

Expand Down
80 changes: 33 additions & 47 deletions src/org/jgroups/util/DynamicBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,24 @@ public boolean add(long seqno, T element, Predicate<T> remove_filter, Options ig
T[] row=getRow(row_index);
int index=computeIndex(seqno);
T existing_element=row[index];
if(existing_element == null) {
row[index]=element;
size++;
if(seqno - high > 0)
high=seqno;
if(remove_filter != null && seqno-hd > 0) {
forEach((seq, msg) -> {
if(msg == null || !remove_filter.test(msg))
return false;
if(seq - hd > 0)
hd=seq;
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
return true;
}, false);
}
return true;
if(existing_element != null)
return false;
row[index]=element;
size++;
if(seqno - high > 0)
high=seqno;
if(remove_filter != null && seqno-hd > 0) {
Visitor<T> v=(seq,msg) -> {
if(msg == null || !remove_filter.test(msg))
return false;
if(seq - hd > 0)
hd=seq;
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
return true;
};
forEach(v, false);
}
return false;
return true;
}
finally {
lock.unlock();
Expand Down Expand Up @@ -247,12 +247,6 @@ public T _get(long seqno) {
}
}


@Override
public T remove() {
return remove(true);
}

/** Removes the next non-null element and nulls the index if nullify=true */
@Override
public T remove(boolean nullify) {
Expand Down Expand Up @@ -306,7 +300,7 @@ public <R> R removeMany(boolean nullify, int max_results, Predicate<T> filter,
Supplier<R> result_creator, BiConsumer<R,T> accumulator) {
lock.lock();
try {
Remover<R> remover=new Remover<>(max_results, filter, result_creator, accumulator);
Remover<R> remover=new Remover<R>(max_results, filter, result_creator, accumulator);
forEach(remover, nullify);
return remover.getResult();
}
Expand All @@ -317,11 +311,11 @@ public <R> R removeMany(boolean nullify, int max_results, Predicate<T> filter,


/**
* Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix
* Removes all elements less than or equal to seqno from the buffer. Does this by nulling entire rows in the matrix
* and nulling all elements < index(seqno) of the first row that cannot be removed.
* @param seqno All elements <= seqno will be nulled
* @param force If true, we only ensure that seqno <= hr, but don't care about hd, and set hd=low=seqno.
* @return 0. This value is never used by {@link DynamicBuffer}, so it is <em>not</em> computed. Don't use it!
* @return 0. The number of purged elements
*/
@Override
public int purge(long seqno, boolean force) {
Expand Down Expand Up @@ -542,13 +536,6 @@ protected void _compact() {
}
}

/** Iterates from low to hr and add up non-null values. Caller must hold the lock. */
@Override
@GuardedBy("lock")
public int computeSize() {
return (int)stream().filter(Objects::nonNull).count();
}

/**
* Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
* @param index
Expand Down Expand Up @@ -632,7 +619,7 @@ public T next() {
}


protected class Remover<R> implements Visitor<T> {
/*protected class Remover<R> implements Visitor<T> {
protected final int max_results;
protected int num_results;
protected final Predicate<T> filter;
Expand All @@ -651,21 +638,20 @@ public Remover(int max_results, Predicate<T> filter, Supplier<R> creator, BiCons
@GuardedBy("lock")
public boolean visit(long seqno, T element) {
if(element != null) {
if(filter == null || filter.test(element)) {
if(result == null)
result=result_creator.get();
result_accumulator.accept(result, element);
num_results++;
}
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
if(seqno - hd > 0)
hd=seqno;
return max_results == 0 || num_results < max_results;
if(element == null)
return false;
if(filter == null || filter.test(element)) {
if(result == null)
result=result_creator.get();
result_accumulator.accept(result, element);
num_results++;
}
return false;
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
if(seqno - hd > 0)
hd=seqno;
return max_results == 0 || num_results < max_results;
}
}

*/
}

Loading

0 comments on commit 4f243be

Please sign in to comment.