Skip to content

Commit

Permalink
- TP.use_vthread is now the only place to configure virtual threads (h…
Browse files Browse the repository at this point in the history
…ttps://issues.redhat.com/browse/JGRP-2827)

- Moved thread pool creation from ThreadCreator -> ThreadPool
  • Loading branch information
belaban committed Sep 3, 2024
1 parent b506a1f commit f565afe
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 119 deletions.
11 changes: 11 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,17 @@
</sequential>
</target>

<target name="smoke-tests" depends="delete-reports" description="Runs all tests">
<sequential>
<antcall target="functional"/>
<antcall target="udp"/>
<antcall target="tcp"/>
<antcall target="stack-independent"/>
<antcall target="time-sensitive"/>
<antcall target="byteman"/>
</sequential>
</target>

<target name="functional" depends="postcompile,define-testng-task" description="Runs functional tests">
<mkdir dir="${tmp.dir}/test-results/xml/functional"/>
<runtest suitename="functional"
Expand Down
16 changes: 6 additions & 10 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan
@Property(description="The fully qualified name of a class implementing LocalTransport")
protected String local_transport_class;

@Deprecated
@Property(description="If true, create virtual threads, otherwise create native threads",
deprecatedMessage="use thread_pool.use_virtual_threads instead")
protected boolean use_virtual_threads;
@Property(description="If true, create virtual threads, otherwise create native threads")
protected boolean use_vthreads;

@Property(description="Thread naming pattern for threads in this channel. Valid values are \"pcl\": " +
"\"p\": includes the thread name, e.g. \"Incoming thread-1\", \"UDP ucast receiver\", " +
Expand Down Expand Up @@ -585,7 +583,7 @@ public <T extends TP> T setThreadPool(Executor thread_pool) {

/** Don't remove! https://issues.redhat.com/browse/JGRP-2814 */
@ManagedAttribute(type=SCALAR) @Deprecated
public long getNumberOfThreadDumps() {return thread_pool.getNumberOfThreadDumps();}
public static long getNumberOfThreadDumps() {return ThreadPool.getNumberOfThreadDumps();}

/** Don't remove! https://issues.redhat.com/browse/JGRP-2814 */
@ManagedAttribute(type=SCALAR) @Deprecated
Expand Down Expand Up @@ -731,12 +729,10 @@ public String defaultHeaders(boolean detailed) {
public void init() throws Exception {
this.id=ClassConfigurator.getProtocolId(TP.class);

if(use_virtual_threads && !Util.virtualThreadsAvailable()) {
if(use_vthreads && !Util.virtualThreadsAvailable()) {
log.debug("use_virtual_threads was set to false, as virtual threads are not available in this Java version");
use_virtual_threads=false;
use_vthreads=false;
}
thread_pool.useVirtualThreads(this.use_virtual_threads);

if(local_transport_class != null) {
Class<?> cl=Util.loadClass(local_transport_class, getClass());
local_transport=(LocalTransport)cl.getDeclaredConstructor().newInstance();
Expand All @@ -745,7 +741,7 @@ public void init() throws Exception {

if(thread_factory == null)
setThreadFactory(new LazyThreadFactory("jgroups", false, true)
.useVirtualThreads(use_virtual_threads).log(this.log));
.useVirtualThreads(use_vthreads).log(this.log));

// local_addr is null when shared transport, channel_name is not used
setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern);
Expand Down
9 changes: 5 additions & 4 deletions src/org/jgroups/util/DefaultThreadFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DefaultThreadFactory implements ThreadFactory {
protected String clusterName;
protected boolean includeLocalAddress;
protected String address;
protected boolean use_virtual_threads; // use fibers instead of threads (requires Java 15)
protected boolean use_vthreads; // use fibers instead of threads (requires Java 15)
protected Log log;


Expand Down Expand Up @@ -60,8 +60,9 @@ public void setAddress(String address) {
public boolean useFibers() {return useVirtualThreads();}
@Deprecated(forRemoval=true)
public <T extends DefaultThreadFactory> T useFibers(boolean f) {return useVirtualThreads(f);}
public boolean useVirtualThreads() {return use_virtual_threads;}
public <T extends DefaultThreadFactory> T useVirtualThreads(boolean f) {this.use_virtual_threads=f; return (T)this;}
@Override
public boolean useVirtualThreads() {return use_vthreads;}
public <T extends DefaultThreadFactory> T useVirtualThreads(boolean f) {this.use_vthreads=f; return (T)this;}
public <T extends DefaultThreadFactory> T log(Log l) {this.log=l; return (T)this;}

public Thread newThread(Runnable r, String name) {
Expand All @@ -74,7 +75,7 @@ public Thread newThread(Runnable r) {

protected Thread newThread(Runnable r, String name, String addr, String cluster_name) {
String thread_name=getNewThreadName(name, addr, cluster_name);
return ThreadCreator.createThread(r, thread_name, createDaemons, use_virtual_threads);
return ThreadCreator.createThread(r, thread_name, createDaemons, use_vthreads);
}

public void renameThread(String base_name, Thread thread) {
Expand Down
113 changes: 26 additions & 87 deletions src/org/jgroups/util/ThreadCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.*;

/**
* @author Bela Ban
Expand All @@ -15,27 +14,47 @@
*/
public class ThreadCreator {
private static final Log LOG=LogFactory.getLog(ThreadCreator.class);
private static final MethodHandles.Lookup LOOKUP;
private static final MethodHandles.Lookup LOOKUP=MethodHandles.publicLookup();
private static final String OF_VIRTUAL_NAME="java.lang.Thread$Builder$OfVirtual";
private static final Class<?> OF_VIRTUAL_CLASS;
private static final MethodHandle OF_VIRTUAL;
private static final MethodHandle CREATE_VTHREAD;
private static final MethodHandle EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY;


static {
LOOKUP=MethodHandles.publicLookup();
OF_VIRTUAL_CLASS=getOfVirtualClass();
OF_VIRTUAL=getOfVirtualHandle();
CREATE_VTHREAD=getCreateVThreadHandle();
EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY=getNewVirtualThreadFactoryHandle();
}


public static boolean hasVirtualThreads() {
return CREATE_VTHREAD != null;
}

public static Thread createThread(Runnable r, String name, boolean daemon, boolean virtual) {
if(!virtual || CREATE_VTHREAD == null) {
Thread t=new Thread(r, name);
t.setDaemon(daemon);
return t;
}

// Thread.ofVirtual().unstarted()
try {
Object of=OF_VIRTUAL.invoke();
Thread t=(Thread)CREATE_VTHREAD.invokeWithArguments(of, r);
t.setName(name);
return t;
}
catch(Throwable t) {
}

// Thread.newThread(String name, int characteristics, Runnable task) in JDKs 15 & 16
try {
return (Thread)CREATE_VTHREAD.invokeExact(name, 1, r);
}
catch(Throwable ex) {
}
return new Thread(r, name);
}


protected static MethodHandle getCreateVThreadHandle() {
Expand All @@ -58,7 +77,6 @@ protected static MethodHandle getCreateVThreadHandle() {
return null;
}


protected static Class<?> getOfVirtualClass() {
try {
return Util.loadClass(OF_VIRTUAL_NAME, (Class<?>)null);
Expand All @@ -79,83 +97,4 @@ protected static MethodHandle getOfVirtualHandle() {
}
}


protected static MethodHandle getNewVirtualThreadFactoryHandle() {
MethodType type=MethodType.methodType(ExecutorService.class);
String[] names={
"newVirtualThreadPerTaskExecutor", // jdk 18-21
"newVirtualThreadExecutor", // jdk 17
"newUnboundedVirtualThreadExecutor" // jdk 15 & 16
};

for(int i=0; i < names.length; i++) {
try {
return LOOKUP.findStatic(Executors.class, names[i], type);
}
catch(Exception e) {
String next=(i+1) < names.length? names[i+1] : "regular thread pool";
LOG.debug("%s not found, falling back to %s", names[i], next);
}
}

return null;
}


public static Thread createThread(Runnable r, String name, boolean daemon, boolean virtual) {
if(!virtual || CREATE_VTHREAD == null) {
Thread t=new Thread(r, name);
t.setDaemon(daemon);
return t;
}

// Thread.ofVirtual().unstarted()
try {
Object of=OF_VIRTUAL.invoke();
Thread t=(Thread)CREATE_VTHREAD.invokeWithArguments(of, r);
t.setName(name);
return t;
}
catch(Throwable t) {
}

// Thread.newThread(String name, int characteristics, Runnable task) in JDKs 15 & 16
try {
return (Thread)CREATE_VTHREAD.invokeExact(name, 1, r);
}
catch(Throwable ex) {
}
return new Thread(r, name);
}


public static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time,
boolean virtual_threads, Log log) {
return createThreadPool(min_threads, max_threads, keep_alive_time, "abort", new SynchronousQueue<>(),
new DefaultThreadFactory("threads", true, true),
virtual_threads, log);
}

public static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time,
String rejection_policy,
BlockingQueue<Runnable> queue, final ThreadFactory factory,
boolean useVirtualThreads, Log log) {
if(!useVirtualThreads || EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY == null) {
ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time,
TimeUnit.MILLISECONDS, queue, factory);
RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy);
pool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler));
if(log != null)
log.debug("thread pool min/max/keep-alive (ms): %d/%d/%d", min_threads, max_threads, keep_alive_time);
return pool;
}

try {
return (ExecutorService)EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY.invokeExact();
}
catch(Throwable t) {
throw new IllegalStateException(String.format("failed to create virtual thread pool: %s", t));
}
}

}
2 changes: 1 addition & 1 deletion src/org/jgroups/util/ThreadFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public interface ThreadFactory extends java.util.concurrent.ThreadFactory {
Thread newThread(Runnable r, String name);

boolean useVirtualThreads();
void setPattern(String pattern);
void setIncludeClusterName(boolean includeClusterName);
void setClusterName(String channelName);
Expand Down
65 changes: 48 additions & 17 deletions src/org/jgroups/util/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -22,6 +25,7 @@
* @since 5.2
*/
public class ThreadPool implements Lifecycle {
private static final MethodHandle EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY=getNewVirtualThreadFactoryHandle();
protected Executor thread_pool;
protected Log log;
protected ThreadFactory thread_factory;
Expand All @@ -31,9 +35,6 @@ public class ThreadPool implements Lifecycle {
@Property(description="Whether or not the thread pool is enabled. If false, tasks will be run on the caller's thread")
protected boolean enabled=true;

@Property(description="If true, create virtual threads, otherwise create native threads")
protected boolean use_virtual_threads;

@Property(description="Minimum thread pool size for the thread pool")
protected int min_threads;

Expand Down Expand Up @@ -152,8 +153,6 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
public ThreadPool setThreadPoolFullSuppressTime(long t) {this.thread_pool_full_suppress_time=t; return this;}
public boolean getThreadDumpsEnabled() {return thread_dumps_enabled;}
public ThreadPool setThreadDumpsEnabled(boolean b) {thread_dumps_enabled=b; return this;}
@Deprecated public int getThreadDumpsThreshold() {return 0;}
@Deprecated public ThreadPool setThreadDumpsThreshold(int t) {return this;}
public Address getAddress() {return address;}
public ThreadPool setAddress(Address a) {this.address=a; return this;}
public boolean getIncreaseMaxSizeDynamically() {return increase_max_size_dynamically;}
Expand All @@ -162,11 +161,11 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
public ThreadPool setDelta(int d) {delta=d; return this;}
public long numberOfRejectedMessages() {return num_rejected_msgs.sum();}
public ThreadPool log(Log l) {log=l; return this;}
public boolean useVirtualThreads() {return use_virtual_threads;}
public ThreadPool useVirtualThreads(boolean b) {use_virtual_threads=b; return this;}

@Deprecated public int getNumberOfThreadDumps() {return -1;}
@Deprecated public void resetThreadDumps() {}
@Deprecated public static int getThreadDumpsThreshold() {return 0;}
@Deprecated public ThreadPool setThreadDumpsThreshold(int ignore) {return this;}
@Deprecated public static int getNumberOfThreadDumps() {return -1;}
@Deprecated public void resetThreadDumps() {}

@ManagedAttribute(description="Current number of threads in the thread pool",type=SCALAR)
public int getThreadPoolSize() {
Expand Down Expand Up @@ -202,8 +201,8 @@ public void init() throws Exception {
if(enabled) {
if(thread_factory == null)
thread_factory=new DefaultThreadFactory("thread-pool", true, true);
thread_pool=ThreadCreator.createThreadPool(min_threads, max_threads, keep_alive_time,
rejection_policy, new SynchronousQueue<>(), thread_factory, use_virtual_threads, log);
thread_pool=createThreadPool(min_threads, max_threads, keep_alive_time,
rejection_policy, new SynchronousQueue<>(), thread_factory, log);
}
else // otherwise use the caller's thread to unmarshal the byte buffer into a message
thread_pool=new DirectExecutor();
Expand Down Expand Up @@ -257,14 +256,46 @@ public String toString() {
return thread_pool != null? thread_pool.toString() : "n/a";
}


protected static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time,
String rejection_policy,
BlockingQueue<Runnable> queue, final ThreadFactory factory) {
ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS,
queue, factory);
RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy);
pool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler));
return pool;
BlockingQueue<Runnable> queue, final ThreadFactory factory,
Log log) {
if(!factory.useVirtualThreads() || EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY == null) {
ThreadPoolExecutor pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time,
TimeUnit.MILLISECONDS, queue, factory);
RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy);
pool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler));
if(log != null)
log.debug("thread pool min/max/keep-alive (ms): %d/%d/%d", min_threads, max_threads, keep_alive_time);
return pool;
}

try {
return (ExecutorService)EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY.invokeExact();
}
catch(Throwable t) {
throw new IllegalStateException(String.format("failed to create virtual thread pool: %s", t));
}
}

protected static MethodHandle getNewVirtualThreadFactoryHandle() {
MethodType type=MethodType.methodType(ExecutorService.class);
String[] names={
"newVirtualThreadPerTaskExecutor", // jdk 18-21
"newVirtualThreadExecutor", // jdk 17
"newUnboundedVirtualThreadExecutor" // jdk 15 & 16
};

MethodHandles.Lookup LOOKUP=MethodHandles.publicLookup();
for(int i=0; i < names.length; i++) {
try {
return LOOKUP.findStatic(Executors.class, names[i], type);
}
catch(Exception e) {
}
}
return null;
}

}

0 comments on commit f565afe

Please sign in to comment.