Skip to content

Commit

Permalink
Rename all bundler threads (https://issues.redhat.com/browse/JGRP-2808)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 4, 2024
2 parents 6d5ed51 + bca2541 commit 2aa7a19
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/org/jgroups/protocols/Bundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ default void viewChange(View view) {}
int getMaxSize();
default Bundler setMaxSize(int s) {return this;}

default void renameThread() {}

default void resetStats() {}
}
7 changes: 5 additions & 2 deletions src/org/jgroups/protocols/PerDestinationBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class PerDestinationBundler implements Bundler {
protected Address local_addr;
protected final Map<Address,SendBuffer> dests=Util.createConcurrentMap();
protected static final Address NULL=new NullAddress();

protected static final String THREAD_NAME="pd-bundler";

public int size() {
return dests.values().stream().map(SendBuffer::size).reduce(0, Integer::sum);
Expand Down Expand Up @@ -200,7 +200,7 @@ protected void sendBundledMessages() {
public SendBuffer start() {
if(running)
stop();
bundler_thread=transport.getThreadFactory().newThread(this, "pd-bundler"); // new Thread(this, "pd-bundler");
bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME);
running=true;
bundler_thread.start();
return this;
Expand All @@ -213,6 +213,9 @@ public void stop() {
tmp.interrupt();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread);
}

protected void send(Message msg) throws Exception {
queue.put(msg);
Expand Down
8 changes: 6 additions & 2 deletions src/org/jgroups/protocols/RemoveQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.RingBuffer;
import org.jgroups.util.Runner;

Expand All @@ -32,6 +31,7 @@ public class RemoveQueueBundler extends BaseBundler {
protected Runner runner;
protected Message[] remove_queue;
protected final AverageMinMax avg_batch_size=new AverageMinMax();
protected static final String THREAD_NAME="rq-bundler";

@Property(name="remove_queue_size",description="The capacity of the remove queue",writable=false)
protected int queue_size=1024;
Expand All @@ -51,7 +51,7 @@ public void init(TP transport) {
super.init(transport);
rb=new RingBuffer<>(Message.class, capacity);
remove_queue=new Message[queue_size];
runner=new Runner(new DefaultThreadFactory("aqb", true, true), "runner", this::run, null);
runner=new Runner(transport.getThreadFactory(), THREAD_NAME, this::run, null);
}

public synchronized void start() {
Expand All @@ -64,6 +64,10 @@ public synchronized void stop() {
super.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, runner.getThread());
}

public void send(Message msg) throws Exception {
rb.put(msg);
}
Expand Down
5 changes: 4 additions & 1 deletion src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public RingBufferBundler(int capacity) {
}

public RingBuffer<Message> buf() {return rb;}
public Thread getThread() {return bundler_thread.getThread();}
public int size() {return rb.size();}
public int getQueueSize() {return rb.size();}
public int numSpins() {return num_spins;}
Expand Down Expand Up @@ -85,6 +84,10 @@ public void stop() {
bundler_thread.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread());
}

public void send(Message msg) throws Exception {
rb.put(msg);
}
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public void stop() {
bundler_thread.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread());
}

public void send(Message msg) throws Exception {
if(msg == null)
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/RingBufferBundlerLockless2.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public void stop() {
bundler_thread.stop();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread());
}

public void send(Message msg) throws Exception {
if(msg == null)
Expand Down
10 changes: 4 additions & 6 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -1515,18 +1515,16 @@ protected void fetchLocalAddresses() {
protected void setThreadNames() {
if(diag_handler != null)
diag_handler.setThreadNames();
if(bundler instanceof TransferQueueBundler)
thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, ((TransferQueueBundler)bundler).getThread());
if(bundler != null)
bundler.renameThread();
}


protected void unsetThreadNames() {
if(diag_handler != null)
diag_handler.unsetThreadNames();
Thread thread=bundler instanceof TransferQueueBundler? ((TransferQueueBundler)bundler).getThread() :
bundler instanceof RingBufferBundler? ((RingBufferBundler)bundler).getThread() : null;
if(thread != null)
thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, thread);
if(bundler != null)
bundler.renameThread();
}

protected void setInAllThreadFactories(String cluster_name, Address local_address, String pattern) {
Expand Down
6 changes: 4 additions & 2 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class TransferQueueBundler extends BaseBundler implements Runnable {
public TransferQueueBundler() {
}

public Thread getThread() {return bundler_thread;}

@ManagedAttribute(description="Size of the queue")
public int getQueueSize() {return queue.size();}

Expand Down Expand Up @@ -89,6 +87,10 @@ public synchronized void stop() {
drain();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread);
}

public int size() {
return super.size() + removeQueueSize() + getQueueSize();
}
Expand Down
6 changes: 4 additions & 2 deletions src/org/jgroups/protocols/TransferQueueBundler2.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public TransferQueueBundler2(int capacity) {
this(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity)));
}

public Thread getThread() {return bundler_thread;}

public int getCapacity() {return capacity;}
public Bundler setCapacity(int c) {this.capacity=c; return this;}
public int getMaxSize() {return max_size;}
Expand Down Expand Up @@ -144,6 +142,10 @@ public synchronized void stop() {
drain();
}

public void renameThread() {
transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread);
}

public int size() {
return removeQueueSize() + getQueueSize();
}
Expand Down

0 comments on commit 2aa7a19

Please sign in to comment.