From f565afebec5e0d5d8fb36094753f36335813d14e Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 3 Sep 2024 13:22:54 +0200 Subject: [PATCH] - TP.use_vthread is now the only place to configure virtual threads (https://issues.redhat.com/browse/JGRP-2827) - Moved thread pool creation from ThreadCreator -> ThreadPool --- build.xml | 11 ++ src/org/jgroups/protocols/TP.java | 16 +-- .../jgroups/util/DefaultThreadFactory.java | 9 +- src/org/jgroups/util/ThreadCreator.java | 113 ++++-------------- src/org/jgroups/util/ThreadFactory.java | 2 +- src/org/jgroups/util/ThreadPool.java | 65 +++++++--- 6 files changed, 97 insertions(+), 119 deletions(-) diff --git a/build.xml b/build.xml index d4ad2118a12..9b782721da1 100644 --- a/build.xml +++ b/build.xml @@ -531,6 +531,17 @@ + + + + + + + + + + + T setThreadPool(Executor thread_pool) { /** Don't remove! https://issues.redhat.com/browse/JGRP-2814 */ @ManagedAttribute(type=SCALAR) @Deprecated - public long getNumberOfThreadDumps() {return thread_pool.getNumberOfThreadDumps();} + public static long getNumberOfThreadDumps() {return ThreadPool.getNumberOfThreadDumps();} /** Don't remove! https://issues.redhat.com/browse/JGRP-2814 */ @ManagedAttribute(type=SCALAR) @Deprecated @@ -731,12 +729,10 @@ public String defaultHeaders(boolean detailed) { public void init() throws Exception { this.id=ClassConfigurator.getProtocolId(TP.class); - if(use_virtual_threads && !Util.virtualThreadsAvailable()) { + if(use_vthreads && !Util.virtualThreadsAvailable()) { log.debug("use_virtual_threads was set to false, as virtual threads are not available in this Java version"); - use_virtual_threads=false; + use_vthreads=false; } - thread_pool.useVirtualThreads(this.use_virtual_threads); - if(local_transport_class != null) { Class cl=Util.loadClass(local_transport_class, getClass()); local_transport=(LocalTransport)cl.getDeclaredConstructor().newInstance(); @@ -745,7 +741,7 @@ public void init() throws Exception { if(thread_factory == null) setThreadFactory(new LazyThreadFactory("jgroups", false, true) - .useVirtualThreads(use_virtual_threads).log(this.log)); + .useVirtualThreads(use_vthreads).log(this.log)); // local_addr is null when shared transport, channel_name is not used setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern); diff --git a/src/org/jgroups/util/DefaultThreadFactory.java b/src/org/jgroups/util/DefaultThreadFactory.java index 92f2e494b59..69a0aff3859 100644 --- a/src/org/jgroups/util/DefaultThreadFactory.java +++ b/src/org/jgroups/util/DefaultThreadFactory.java @@ -23,7 +23,7 @@ public class DefaultThreadFactory implements ThreadFactory { protected String clusterName; protected boolean includeLocalAddress; protected String address; - protected boolean use_virtual_threads; // use fibers instead of threads (requires Java 15) + protected boolean use_vthreads; // use fibers instead of threads (requires Java 15) protected Log log; @@ -60,8 +60,9 @@ public void setAddress(String address) { public boolean useFibers() {return useVirtualThreads();} @Deprecated(forRemoval=true) public T useFibers(boolean f) {return useVirtualThreads(f);} - public boolean useVirtualThreads() {return use_virtual_threads;} - public T useVirtualThreads(boolean f) {this.use_virtual_threads=f; return (T)this;} + @Override + public boolean useVirtualThreads() {return use_vthreads;} + public T useVirtualThreads(boolean f) {this.use_vthreads=f; return (T)this;} public T log(Log l) {this.log=l; return (T)this;} public Thread newThread(Runnable r, String name) { @@ -74,7 +75,7 @@ public Thread newThread(Runnable r) { protected Thread newThread(Runnable r, String name, String addr, String cluster_name) { String thread_name=getNewThreadName(name, addr, cluster_name); - return ThreadCreator.createThread(r, thread_name, createDaemons, use_virtual_threads); + return ThreadCreator.createThread(r, thread_name, createDaemons, use_vthreads); } public void renameThread(String base_name, Thread thread) { diff --git a/src/org/jgroups/util/ThreadCreator.java b/src/org/jgroups/util/ThreadCreator.java index e62bf5802c4..49605acca89 100644 --- a/src/org/jgroups/util/ThreadCreator.java +++ b/src/org/jgroups/util/ThreadCreator.java @@ -6,7 +6,6 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; -import java.util.concurrent.*; /** * @author Bela Ban @@ -15,27 +14,47 @@ */ public class ThreadCreator { private static final Log LOG=LogFactory.getLog(ThreadCreator.class); - private static final MethodHandles.Lookup LOOKUP; + private static final MethodHandles.Lookup LOOKUP=MethodHandles.publicLookup(); private static final String OF_VIRTUAL_NAME="java.lang.Thread$Builder$OfVirtual"; private static final Class OF_VIRTUAL_CLASS; private static final MethodHandle OF_VIRTUAL; private static final MethodHandle CREATE_VTHREAD; - private static final MethodHandle EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY; - static { - LOOKUP=MethodHandles.publicLookup(); OF_VIRTUAL_CLASS=getOfVirtualClass(); OF_VIRTUAL=getOfVirtualHandle(); CREATE_VTHREAD=getCreateVThreadHandle(); - EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY=getNewVirtualThreadFactoryHandle(); } - public static boolean hasVirtualThreads() { return CREATE_VTHREAD != null; } + public static Thread createThread(Runnable r, String name, boolean daemon, boolean virtual) { + if(!virtual || CREATE_VTHREAD == null) { + Thread t=new Thread(r, name); + t.setDaemon(daemon); + return t; + } + + // Thread.ofVirtual().unstarted() + try { + Object of=OF_VIRTUAL.invoke(); + Thread t=(Thread)CREATE_VTHREAD.invokeWithArguments(of, r); + t.setName(name); + return t; + } + catch(Throwable t) { + } + + // Thread.newThread(String name, int characteristics, Runnable task) in JDKs 15 & 16 + try { + return (Thread)CREATE_VTHREAD.invokeExact(name, 1, r); + } + catch(Throwable ex) { + } + return new Thread(r, name); + } protected static MethodHandle getCreateVThreadHandle() { @@ -58,7 +77,6 @@ protected static MethodHandle getCreateVThreadHandle() { return null; } - protected static Class getOfVirtualClass() { try { return Util.loadClass(OF_VIRTUAL_NAME, (Class)null); @@ -79,83 +97,4 @@ protected static MethodHandle getOfVirtualHandle() { } } - - protected static MethodHandle getNewVirtualThreadFactoryHandle() { - MethodType type=MethodType.methodType(ExecutorService.class); - String[] names={ - "newVirtualThreadPerTaskExecutor", // jdk 18-21 - "newVirtualThreadExecutor", // jdk 17 - "newUnboundedVirtualThreadExecutor" // jdk 15 & 16 - }; - - for(int i=0; i < names.length; i++) { - try { - return LOOKUP.findStatic(Executors.class, names[i], type); - } - catch(Exception e) { - String next=(i+1) < names.length? names[i+1] : "regular thread pool"; - LOG.debug("%s not found, falling back to %s", names[i], next); - } - } - - return null; - } - - - public static Thread createThread(Runnable r, String name, boolean daemon, boolean virtual) { - if(!virtual || CREATE_VTHREAD == null) { - Thread t=new Thread(r, name); - t.setDaemon(daemon); - return t; - } - - // Thread.ofVirtual().unstarted() - try { - Object of=OF_VIRTUAL.invoke(); - Thread t=(Thread)CREATE_VTHREAD.invokeWithArguments(of, r); - t.setName(name); - return t; - } - catch(Throwable t) { - } - - // Thread.newThread(String name, int characteristics, Runnable task) in JDKs 15 & 16 - try { - return (Thread)CREATE_VTHREAD.invokeExact(name, 1, r); - } - catch(Throwable ex) { - } - return new Thread(r, name); - } - - - public static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time, - boolean virtual_threads, Log log) { - return createThreadPool(min_threads, max_threads, keep_alive_time, "abort", new SynchronousQueue<>(), - new DefaultThreadFactory("threads", true, true), - virtual_threads, log); - } - - public static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time, - String rejection_policy, - BlockingQueue queue, final ThreadFactory factory, - boolean useVirtualThreads, Log log) { - if(!useVirtualThreads || EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY == null) { - ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, - TimeUnit.MILLISECONDS, queue, factory); - RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy); - pool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler)); - if(log != null) - log.debug("thread pool min/max/keep-alive (ms): %d/%d/%d", min_threads, max_threads, keep_alive_time); - return pool; - } - - try { - return (ExecutorService)EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY.invokeExact(); - } - catch(Throwable t) { - throw new IllegalStateException(String.format("failed to create virtual thread pool: %s", t)); - } - } - } diff --git a/src/org/jgroups/util/ThreadFactory.java b/src/org/jgroups/util/ThreadFactory.java index b81fb61fa2e..125e541e013 100644 --- a/src/org/jgroups/util/ThreadFactory.java +++ b/src/org/jgroups/util/ThreadFactory.java @@ -2,7 +2,7 @@ public interface ThreadFactory extends java.util.concurrent.ThreadFactory { Thread newThread(Runnable r, String name); - + boolean useVirtualThreads(); void setPattern(String pattern); void setIncludeClusterName(boolean includeClusterName); void setClusterName(String channelName); diff --git a/src/org/jgroups/util/ThreadPool.java b/src/org/jgroups/util/ThreadPool.java index c326bd82af4..01d97d7b185 100644 --- a/src/org/jgroups/util/ThreadPool.java +++ b/src/org/jgroups/util/ThreadPool.java @@ -9,6 +9,9 @@ import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; @@ -22,6 +25,7 @@ * @since 5.2 */ public class ThreadPool implements Lifecycle { + private static final MethodHandle EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY=getNewVirtualThreadFactoryHandle(); protected Executor thread_pool; protected Log log; protected ThreadFactory thread_factory; @@ -31,9 +35,6 @@ public class ThreadPool implements Lifecycle { @Property(description="Whether or not the thread pool is enabled. If false, tasks will be run on the caller's thread") protected boolean enabled=true; - @Property(description="If true, create virtual threads, otherwise create native threads") - protected boolean use_virtual_threads; - @Property(description="Minimum thread pool size for the thread pool") protected int min_threads; @@ -152,8 +153,6 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { public ThreadPool setThreadPoolFullSuppressTime(long t) {this.thread_pool_full_suppress_time=t; return this;} public boolean getThreadDumpsEnabled() {return thread_dumps_enabled;} public ThreadPool setThreadDumpsEnabled(boolean b) {thread_dumps_enabled=b; return this;} - @Deprecated public int getThreadDumpsThreshold() {return 0;} - @Deprecated public ThreadPool setThreadDumpsThreshold(int t) {return this;} public Address getAddress() {return address;} public ThreadPool setAddress(Address a) {this.address=a; return this;} public boolean getIncreaseMaxSizeDynamically() {return increase_max_size_dynamically;} @@ -162,11 +161,11 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { public ThreadPool setDelta(int d) {delta=d; return this;} public long numberOfRejectedMessages() {return num_rejected_msgs.sum();} public ThreadPool log(Log l) {log=l; return this;} - public boolean useVirtualThreads() {return use_virtual_threads;} - public ThreadPool useVirtualThreads(boolean b) {use_virtual_threads=b; return this;} - @Deprecated public int getNumberOfThreadDumps() {return -1;} - @Deprecated public void resetThreadDumps() {} + @Deprecated public static int getThreadDumpsThreshold() {return 0;} + @Deprecated public ThreadPool setThreadDumpsThreshold(int ignore) {return this;} + @Deprecated public static int getNumberOfThreadDumps() {return -1;} + @Deprecated public void resetThreadDumps() {} @ManagedAttribute(description="Current number of threads in the thread pool",type=SCALAR) public int getThreadPoolSize() { @@ -202,8 +201,8 @@ public void init() throws Exception { if(enabled) { if(thread_factory == null) thread_factory=new DefaultThreadFactory("thread-pool", true, true); - thread_pool=ThreadCreator.createThreadPool(min_threads, max_threads, keep_alive_time, - rejection_policy, new SynchronousQueue<>(), thread_factory, use_virtual_threads, log); + thread_pool=createThreadPool(min_threads, max_threads, keep_alive_time, + rejection_policy, new SynchronousQueue<>(), thread_factory, log); } else // otherwise use the caller's thread to unmarshal the byte buffer into a message thread_pool=new DirectExecutor(); @@ -257,14 +256,46 @@ public String toString() { return thread_pool != null? thread_pool.toString() : "n/a"; } + protected static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time, String rejection_policy, - BlockingQueue queue, final ThreadFactory factory) { - ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, - queue, factory); - RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy); - pool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler)); - return pool; + BlockingQueue queue, final ThreadFactory factory, + Log log) { + if(!factory.useVirtualThreads() || EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY == null) { + ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, + TimeUnit.MILLISECONDS, queue, factory); + RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy); + pool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler)); + if(log != null) + log.debug("thread pool min/max/keep-alive (ms): %d/%d/%d", min_threads, max_threads, keep_alive_time); + return pool; + } + + try { + return (ExecutorService)EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY.invokeExact(); + } + catch(Throwable t) { + throw new IllegalStateException(String.format("failed to create virtual thread pool: %s", t)); + } + } + + protected static MethodHandle getNewVirtualThreadFactoryHandle() { + MethodType type=MethodType.methodType(ExecutorService.class); + String[] names={ + "newVirtualThreadPerTaskExecutor", // jdk 18-21 + "newVirtualThreadExecutor", // jdk 17 + "newUnboundedVirtualThreadExecutor" // jdk 15 & 16 + }; + + MethodHandles.Lookup LOOKUP=MethodHandles.publicLookup(); + for(int i=0; i < names.length; i++) { + try { + return LOOKUP.findStatic(Executors.class, names[i], type); + } + catch(Exception e) { + } + } + return null; } }