Skip to content

Commit

Permalink
Bundler thread renaming
Browse files Browse the repository at this point in the history
This makes TP rename all bundler threads (previously only done for TransferQueueBundler+RingBufferBundler) and also makes TP not care about the specific bundler type being used

JIRA: https://issues.redhat.com/browse/JGRP-2808
  • Loading branch information
belaban committed Jul 4, 2024
1 parent b089daf commit c0be647
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/org/jgroups/protocols/Bundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ default void viewChange(View view) {}
int getMaxSize();
default Bundler setMaxSize(int s) {return this;}

default void renameThread() {}

default void resetStats() {}
}
7 changes: 5 additions & 2 deletions src/org/jgroups/protocols/PerDestinationBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class PerDestinationBundler implements Bundler {
protected Address local_addr;
protected final Map<Address,SendBuffer> dests=Util.createConcurrentMap();
protected static final Address NULL=new NullAddress();

protected static final String THREAD_NAME="pd-bundler";

public int size() {
return dests.values().stream().map(SendBuffer::size).reduce(0, Integer::sum);
Expand Down Expand Up @@ -200,7 +200,7 @@ protected void sendBundledMessages() {
public SendBuffer start() {
if(running)
stop();
bundler_thread=transport.getThreadFactory().newThread(this, "pd-bundler"); // new Thread(this, "pd-bundler");
bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME);
running=true;
bundler_thread.start();
return this;
Expand All @@ -213,6 +213,9 @@ public void stop() {
tmp.interrupt();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread);
}

protected void send(Message msg) throws Exception {
queue.put(msg);
Expand Down
8 changes: 6 additions & 2 deletions src/org/jgroups/protocols/RemoveQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.RingBuffer;
import org.jgroups.util.Runner;

Expand All @@ -32,6 +31,7 @@ public class RemoveQueueBundler extends BaseBundler {
protected Runner runner;
protected Message[] remove_queue;
protected final AverageMinMax avg_batch_size=new AverageMinMax();
protected static final String THREAD_NAME="rq-bundler";

@Property(name="remove_queue_size",description="The capacity of the remove queue",writable=false)
protected int queue_size=1024;
Expand All @@ -51,7 +51,7 @@ public void init(TP transport) {
super.init(transport);
rb=new RingBuffer<>(Message.class, capacity);
remove_queue=new Message[queue_size];
runner=new Runner(new DefaultThreadFactory("aqb", true, true), "runner", this::run, null);
runner=new Runner(transport.getThreadFactory(), THREAD_NAME, this::run, null);
}

public synchronized void start() {
Expand All @@ -64,6 +64,10 @@ public synchronized void stop() {
super.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, runner.getThread());
}

public void send(Message msg) throws Exception {
rb.put(msg);
}
Expand Down
5 changes: 4 additions & 1 deletion src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public RingBufferBundler(int capacity) {
}

public RingBuffer<Message> buf() {return rb;}
public Thread getThread() {return bundler_thread.getThread();}
public int size() {return rb.size();}
public int getQueueSize() {return rb.size();}
public int numSpins() {return num_spins;}
Expand Down Expand Up @@ -85,6 +84,10 @@ public void stop() {
bundler_thread.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread());
}

public void send(Message msg) throws Exception {
rb.put(msg);
}
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public void stop() {
bundler_thread.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread());
}

public void send(Message msg) throws Exception {
if(msg == null)
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/RingBufferBundlerLockless2.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public void stop() {
bundler_thread.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread());
}

public void send(Message msg) throws Exception {
if(msg == null)
Expand Down
10 changes: 4 additions & 6 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -1519,18 +1519,16 @@ protected void fetchLocalAddresses() {
protected void setThreadNames() {
if(diag_handler != null)
diag_handler.setThreadNames();
if(bundler instanceof TransferQueueBundler)
thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, ((TransferQueueBundler)bundler).getThread());
if(bundler != null)
bundler.renameThread();
}


protected void unsetThreadNames() {
if(diag_handler != null)
diag_handler.unsetThreadNames();
Thread thread=bundler instanceof TransferQueueBundler? ((TransferQueueBundler)bundler).getThread() :
bundler instanceof RingBufferBundler? ((RingBufferBundler)bundler).getThread() : null;
if(thread != null)
thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, thread);
if(bundler != null)
bundler.renameThread();
}

protected void setInAllThreadFactories(String cluster_name, Address local_address, String pattern) {
Expand Down
6 changes: 4 additions & 2 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class TransferQueueBundler extends BaseBundler implements Runnable {
public TransferQueueBundler() {
}

public Thread getThread() {return bundler_thread;}

@ManagedAttribute(description="Size of the queue")
public int getQueueSize() {return queue.size();}

Expand Down Expand Up @@ -89,6 +87,10 @@ public synchronized void stop() {
drain();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread);
}

public int size() {
return super.size() + removeQueueSize() + getQueueSize();
}
Expand Down
6 changes: 4 additions & 2 deletions src/org/jgroups/protocols/TransferQueueBundler2.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public TransferQueueBundler2(int capacity) {
this(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity)));
}

public Thread getThread() {return bundler_thread;}

public int getCapacity() {return capacity;}
public Bundler setCapacity(int c) {this.capacity=c; return this;}
public int getMaxSize() {return max_size;}
Expand Down Expand Up @@ -144,6 +142,10 @@ public synchronized void stop() {
drain();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread);
}

public int size() {
return removeQueueSize() + getQueueSize();
}
Expand Down

0 comments on commit c0be647

Please sign in to comment.