From 29c287d091716c20c7c67ffa062cdf0fdd456532 Mon Sep 17 00:00:00 2001 From: cfredri4 Date: Thu, 30 May 2024 09:01:35 +0200 Subject: [PATCH] Bundler thread renaming This makes TP rename all bundler threads (previously only done for TransferQueueBundler+RingBufferBundler) and also makes TP not care about the specific bundler type being used --- src/org/jgroups/protocols/Bundler.java | 2 ++ src/org/jgroups/protocols/PerDestinationBundler.java | 7 +++++-- src/org/jgroups/protocols/RemoveQueueBundler.java | 8 ++++++-- src/org/jgroups/protocols/RingBufferBundler.java | 5 ++++- .../jgroups/protocols/RingBufferBundlerLockless.java | 3 +++ .../jgroups/protocols/RingBufferBundlerLockless2.java | 3 +++ src/org/jgroups/protocols/TP.java | 10 ++++------ src/org/jgroups/protocols/TransferQueueBundler.java | 6 ++++-- src/org/jgroups/protocols/TransferQueueBundler2.java | 6 ++++-- 9 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/org/jgroups/protocols/Bundler.java b/src/org/jgroups/protocols/Bundler.java index 543c7d00872..15b72358c8b 100644 --- a/src/org/jgroups/protocols/Bundler.java +++ b/src/org/jgroups/protocols/Bundler.java @@ -42,5 +42,7 @@ default void viewChange(View view) {} int getMaxSize(); default Bundler setMaxSize(int s) {return this;} + default void renameThread() {} + default void resetStats() {} } diff --git a/src/org/jgroups/protocols/PerDestinationBundler.java b/src/org/jgroups/protocols/PerDestinationBundler.java index 6439b67353b..39b072d4a83 100644 --- a/src/org/jgroups/protocols/PerDestinationBundler.java +++ b/src/org/jgroups/protocols/PerDestinationBundler.java @@ -67,7 +67,7 @@ public class PerDestinationBundler implements Bundler { protected Address local_addr; protected final Map dests=Util.createConcurrentMap(); protected static final Address NULL=new NullAddress(); - + protected static final String THREAD_NAME="pd-bundler"; public int size() { return dests.values().stream().map(SendBuffer::size).reduce(0, Integer::sum); @@ -200,7 +200,7 @@ protected void sendBundledMessages() { public SendBuffer start() { if(running) stop(); - bundler_thread=transport.getThreadFactory().newThread(this, "pd-bundler"); // new Thread(this, "pd-bundler"); + bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME); running=true; bundler_thread.start(); return this; @@ -213,6 +213,9 @@ public void stop() { tmp.interrupt(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread); + } protected void send(Message msg) throws Exception { queue.put(msg); diff --git a/src/org/jgroups/protocols/RemoveQueueBundler.java b/src/org/jgroups/protocols/RemoveQueueBundler.java index 6bbd6791e87..d571fa377b6 100644 --- a/src/org/jgroups/protocols/RemoveQueueBundler.java +++ b/src/org/jgroups/protocols/RemoveQueueBundler.java @@ -6,7 +6,6 @@ import org.jgroups.annotations.ManagedAttribute; import org.jgroups.annotations.Property; import org.jgroups.util.AverageMinMax; -import org.jgroups.util.DefaultThreadFactory; import org.jgroups.util.RingBuffer; import org.jgroups.util.Runner; @@ -32,6 +31,7 @@ public class RemoveQueueBundler extends BaseBundler { protected Runner runner; protected Message[] remove_queue; protected final AverageMinMax avg_batch_size=new AverageMinMax(); + protected static final String THREAD_NAME="rq-bundler"; @Property(name="remove_queue_size",description="The capacity of the remove queue",writable=false) protected int queue_size=1024; @@ -51,7 +51,7 @@ public void init(TP transport) { super.init(transport); rb=new RingBuffer<>(Message.class, capacity); remove_queue=new Message[queue_size]; - runner=new Runner(new DefaultThreadFactory("aqb", true, true), "runner", this::run, null); + runner=new Runner(transport.getThreadFactory(), THREAD_NAME, this::run, null); } public synchronized void start() { @@ -64,6 +64,10 @@ public synchronized void stop() { super.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, runner.getThread()); + } + public void send(Message msg) throws Exception { rb.put(msg); } diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index 972155c2df7..365ca8520ab 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -56,7 +56,6 @@ public RingBufferBundler(int capacity) { } public RingBuffer buf() {return rb;} - public Thread getThread() {return bundler_thread.getThread();} public int size() {return rb.size();} public int getQueueSize() {return rb.size();} public int numSpins() {return num_spins;} @@ -85,6 +84,10 @@ public void stop() { bundler_thread.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread()); + } + public void send(Message msg) throws Exception { rb.put(msg); } diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index df06a695fc4..2d620e057e2 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -70,6 +70,9 @@ public void stop() { bundler_thread.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread()); + } public void send(Message msg) throws Exception { if(msg == null) diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index 78f55ab1075..020ccf01c4e 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -71,6 +71,9 @@ public void stop() { bundler_thread.stop(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread.getThread()); + } public void send(Message msg) throws Exception { if(msg == null) diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index d2839ad837c..97b2d435352 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -1491,18 +1491,16 @@ protected void fetchLocalAddresses() { protected void setThreadNames() { if(diag_handler != null) diag_handler.setThreadNames(); - if(bundler instanceof TransferQueueBundler) - thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, ((TransferQueueBundler)bundler).getThread()); + if(bundler != null) + bundler.renameThread(); } protected void unsetThreadNames() { if(diag_handler != null) diag_handler.unsetThreadNames(); - Thread thread=bundler instanceof TransferQueueBundler? ((TransferQueueBundler)bundler).getThread() : - bundler instanceof RingBufferBundler? ((RingBufferBundler)bundler).getThread() : null; - if(thread != null) - thread_factory.renameThread(TransferQueueBundler.THREAD_NAME, thread); + if(bundler != null) + bundler.renameThread(); } protected void setInAllThreadFactories(String cluster_name, Address local_address, String pattern) { diff --git a/src/org/jgroups/protocols/TransferQueueBundler.java b/src/org/jgroups/protocols/TransferQueueBundler.java index f8b6e17239e..9fe5649b80d 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler.java +++ b/src/org/jgroups/protocols/TransferQueueBundler.java @@ -52,8 +52,6 @@ public TransferQueueBundler(int capacity) { this(new ArrayBlockingQueue<>(Util.assertPositive(capacity, "bundler capacity cannot be " + capacity))); } - public Thread getThread() {return bundler_thread;} - @ManagedAttribute(description="Size of the queue") public int getQueueSize() {return queue.size();} @@ -98,6 +96,10 @@ public synchronized void stop() { drain(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread); + } + public int size() { return super.size() + removeQueueSize() + getQueueSize(); } diff --git a/src/org/jgroups/protocols/TransferQueueBundler2.java b/src/org/jgroups/protocols/TransferQueueBundler2.java index 11a1927a159..1b5bc815ea8 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler2.java +++ b/src/org/jgroups/protocols/TransferQueueBundler2.java @@ -83,8 +83,6 @@ public TransferQueueBundler2(int capacity) { this(new ArrayBlockingQueue<>(assertPositive(capacity, "bundler capacity cannot be " + capacity))); } - public Thread getThread() {return bundler_thread;} - public int getCapacity() {return capacity;} public Bundler setCapacity(int c) {this.capacity=c; return this;} public int getMaxSize() {return max_size;} @@ -144,6 +142,10 @@ public synchronized void stop() { drain(); } + public void renameThread() { + transport.getThreadFactory().renameThread(THREAD_NAME, bundler_thread); + } + public int size() { return removeQueueSize() + getQueueSize(); }