diff --git a/src/org/jgroups/protocols/Bundler.java b/src/org/jgroups/protocols/Bundler.java index 543c7d0087..15b72358c8 100644 --- a/src/org/jgroups/protocols/Bundler.java +++ b/src/org/jgroups/protocols/Bundler.java @@ -42,5 +42,7 @@ default void viewChange(View view) {} int getMaxSize(); default Bundler setMaxSize(int s) {return this;} + default void renameThread() {} + default void resetStats() {} } diff --git a/src/org/jgroups/protocols/PerDestinationBundler.java b/src/org/jgroups/protocols/PerDestinationBundler.java index 23a71072df..6e9a89eabf 100644 --- a/src/org/jgroups/protocols/PerDestinationBundler.java +++ b/src/org/jgroups/protocols/PerDestinationBundler.java @@ -67,7 +67,7 @@ public class PerDestinationBundler implements Bundler { protected Address local_addr; protected final Map dests=Util.createConcurrentMap(); protected static final Address NULL=new NullAddress(); - + protected static final String THREAD_NAME="pd-bundler"; public int size() { return dests.values().stream().map(SendBuffer::size).reduce(0, Integer::sum); @@ -200,7 +200,7 @@ protected void sendBundledMessages() { public SendBuffer start() { if(running) stop(); - bundler_thread=transport.getThreadFactory().newThread(this, "pd-bundler"); // new Thread(this, "pd-bundler"); + bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME); running=true; bundler_thread.start(); return this; @@ -213,6 +213,9 @@ public void stop() { tmp.interrupt(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread); + } protected void send(Message msg) throws Exception { queue.put(msg); diff --git a/src/org/jgroups/protocols/RemoveQueueBundler.java b/src/org/jgroups/protocols/RemoveQueueBundler.java index 6bbd6791e8..d571fa377b 100644 --- a/src/org/jgroups/protocols/RemoveQueueBundler.java +++ b/src/org/jgroups/protocols/RemoveQueueBundler.java @@ -6,7 +6,6 @@ import org.jgroups.annotations.ManagedAttribute; import org.jgroups.annotations.Property; import org.jgroups.util.AverageMinMax; -import org.jgroups.util.DefaultThreadFactory; import org.jgroups.util.RingBuffer; import org.jgroups.util.Runner; @@ -32,6 +31,7 @@ public class RemoveQueueBundler extends BaseBundler { protected Runner runner; protected Message[] remove_queue; protected final AverageMinMax avg_batch_size=new AverageMinMax(); + protected static final String THREAD_NAME="rq-bundler"; @Property(name="remove_queue_size",description="The capacity of the remove queue",writable=false) protected int queue_size=1024; @@ -51,7 +51,7 @@ public void init(TP transport) { super.init(transport); rb=new RingBuffer<>(Message.class, capacity); remove_queue=new Message[queue_size]; - runner=new Runner(new DefaultThreadFactory("aqb", true, true), "runner", this::run, null); + runner=new Runner(transport.getThreadFactory(), THREAD_NAME, this::run, null); } public synchronized void start() { @@ -64,6 +64,10 @@ public synchronized void stop() { super.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, runner.getThread()); + } + public void send(Message msg) throws Exception { rb.put(msg); } diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index 2727e28e0d..c3f4f048f4 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -56,7 +56,6 @@ public RingBufferBundler(int capacity) { } public RingBuffer buf() {return rb;} - public Thread getThread() {return bundler_thread.getThread();} public int size() {return rb.size();} public int getQueueSize() {return rb.size();} public int numSpins() {return num_spins;} @@ -85,6 +84,10 @@ public void stop() { bundler_thread.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread()); + } + public void send(Message msg) throws Exception { rb.put(msg); } diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index d0c9a22138..e7361ae98c 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -70,6 +70,9 @@ public void stop() { bundler_thread.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread()); + } public void send(Message msg) throws Exception { if(msg == null) diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index 34d9512cb5..7566e4fa3e 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -71,6 +71,9 @@ public void stop() { bundler_thread.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread()); + } public void send(Message msg) throws Exception { if(msg == null) diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index e5440d26e6..28ebe9c1e0 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -1515,18 +1515,16 @@ protected void fetchLocalAddresses() { protected void setThreadNames() { if(diag_handler != null) diag_handler.setThreadNames(); - if(bundler instanceof TransferQueueBundler) - thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, ((TransferQueueBundler)bundler).getThread()); + if(bundler != null) + bundler.renameThread(); } protected void unsetThreadNames() { if(diag_handler != null) diag_handler.unsetThreadNames(); - Thread thread=bundler instanceof TransferQueueBundler? ((TransferQueueBundler)bundler).getThread() : - bundler instanceof RingBufferBundler? ((RingBufferBundler)bundler).getThread() : null; - if(thread != null) - thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, thread); + if(bundler != null) + bundler.renameThread(); } protected void setInAllThreadFactories(String cluster_name, Address local_address, String pattern) { diff --git a/src/org/jgroups/protocols/TransferQueueBundler.java b/src/org/jgroups/protocols/TransferQueueBundler.java index 0168d12c72..604acacbc0 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler.java +++ b/src/org/jgroups/protocols/TransferQueueBundler.java @@ -43,8 +43,6 @@ public class TransferQueueBundler extends BaseBundler implements Runnable { public TransferQueueBundler() { } - public Thread getThread() {return bundler_thread;} - @ManagedAttribute(description="Size of the queue") public int getQueueSize() {return queue.size();} @@ -89,6 +87,10 @@ public synchronized void stop() { drain(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread); + } + public int size() { return super.size() + removeQueueSize() + getQueueSize(); } diff --git a/src/org/jgroups/protocols/TransferQueueBundler2.java b/src/org/jgroups/protocols/TransferQueueBundler2.java index 1724367aa3..41e05b68b0 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler2.java +++ b/src/org/jgroups/protocols/TransferQueueBundler2.java @@ -83,8 +83,6 @@ public TransferQueueBundler2(int capacity) { this(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity))); } - public Thread getThread() {return bundler_thread;} - public int getCapacity() {return capacity;} public Bundler setCapacity(int c) {this.capacity=c; return this;} public int getMaxSize() {return max_size;} @@ -144,6 +142,10 @@ public synchronized void stop() { drain(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread); + } + public int size() { return removeQueueSize() + getQueueSize(); }