Skip to content

Commit

Permalink
- Added SuppressLog for thread-pool-full event in ThreadPool (https:/…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 4, 2024
1 parent b3ad28c commit 6d5ed51
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 76 deletions.
6 changes: 3 additions & 3 deletions conf/jg-messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ AttrReadFailure = JGRP000007: failed reading value of attribute %s:
MissingAttribute = JGRP000008: did not find attribute with name %s
AttrWriteFailure = JGRP000009: failed writing to attribute %s: %s
VersionMismatch = JGRP000010: packet from %s has different version (%s) than ours (%s); packet is discarded
MsgDroppedNak = JGRP000011: %s: dropped message %s from non-member %s (view=%s)
MsgDroppedNak = JGRP000011: %s: dropped message from non-member %s (view=%s)
MsgDroppedDiffCluster = JGRP000012: discarded message from different cluster %s (our cluster is %s). Sender was %s
BatchDroppedDiffCluster = JGRP000013: discarded message batch from different cluster %s (our cluster is %s). Sender was %s
SuppressMsg = (received %d identical messages from %s in the last %d ms)
SuppressMsgRelay = (%d identical messages for site %s in the last %d ms)
SuppressMsg = (%d suppressed messages in the last %s)
Deprecated = JGRP000014: %s has been deprecated: %s
IncorrectBufferSize = JGRP000015: the %s buffer of socket %s was set to %s, but the OS only allocated %s
CallbackException = JGRP000016: exception in %s callback: %s
Expand Down Expand Up @@ -46,6 +45,7 @@ BufferSizeFailed = JGRP000049: failed setting %s buffer size of %d in
CompressionFailure = JGRP000050: exception on uncompression
RSVP_Misconfig = JGRP000051: resend_interval (%d) is >= timeout (%d); setting resend_interval to timeout / 3
RSVP_Timeout = JGRP000052: message ran into a timeout, missing acks: %s
ThreadPoolFull = JGRP000053: %s: thread pool is full (max=%d, active=%d)%s
AREYOUDEADHdrFromIsNull = JGRP000101: ARE_YOU_DEAD: hdr.from is null
AttemptToMoveFailedAt = JGRP000103: attempt to move failed at:
AttemptToWriteDataFailedAt = JGRP000104: attempt to write data failed at
Expand Down
6 changes: 3 additions & 3 deletions conf/jg-messages_de.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ AttrReadFailure = JGRP000007: Attribut %s konnte nicht gelesen werde
MissingAttribute = JGRP000008: Attribut %s wurde nicht gefunden
AttrWriteFailure = JGRP000009: Attribut %s konnte nicht geschrieben werden: %s
VersionMismatch = JGRP000010: Nachricht von %s hat eine andere Version (%s) als unsere (%s); Nachricht wird verworfen
MsgDroppedNak = JGRP000011: %s: Nachricht %s von Nicht-Mitglied %s wurde verworfen (View=%s)
MsgDroppedNak = JGRP000011: %s: Nachricht von Nicht-Mitglied %s wurde verworfen (View=%s)
MsgDroppedDiffCluster = JGRP000012: Nachricht von Cluster %s wurde verworfen (unser Cluster ist %s). Der Sender war %s
BatchDroppedDiffCluster = JGRP000013: Nachrichten-Batch von Cluster %s wurde verworfen (unser Cluster ist %s). Der Sender war %s
SuppressMsg = (%d identische Nachrichten erhalten von %s in den letzten %d ms)
SuppressMsgRelay = (%d identische Nachrichten für Site %s in den letzten %d ms)
SuppressMsg = (%d unterdrückte Nachrichten in den letzten %s)
Deprecated = JGRP000014: %s wird nicht mehr unterstuetzt: %s
IncorrectBufferSize = JGRP000015: Der %s Puffer des Sockets %s wurde auf %s gesetzt, aber das Betriebssystem stellt nur %s zur Verfuegung
MethodNotFound=JGRP000023: %s: Methode %s.%s() nicht gefunden
Expand All @@ -23,4 +22,5 @@ OperationInvocationFailure=JGRP000021: Fehler beim Aufruf der Operation %s: %s
ReceiverFailure=JGRP000019: Fehler beim Uebergeben der Nachricht an den Empfaenger
StackDestroyFailure=JGRP000020: Fehler beim Zerstoeren des Protokoll-Stacks
UpHandlerFailure=JGRP000018: Fehler beim Uebergeben der Nachricht an den Up-Handler
ThreadPoolFull=JGRP000053: %s: thread pool ist voll (max=%d, activ=%d)%s
RelayNoRouteToSite = JGRP000290: %s: keine Route für Nachricht von %s an %s: die Nachricht wird verworfen
5 changes: 3 additions & 2 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,9 @@ public void init() throws Exception {
who_has_cache=new ExpiryCache<>(who_has_cache_timeout);

if(suppress_time_different_version_warnings > 0)
suppress_log_different_version=new SuppressLog<>(log, "VersionMismatch", "SuppressMsg");
suppress_log_different_version=new SuppressLog<>(log, "VersionMismatch");
if(suppress_time_different_cluster_warnings > 0)
suppress_log_different_cluster=new SuppressLog<>(log, "MsgDroppedDiffCluster", "SuppressMsg");
suppress_log_different_cluster=new SuppressLog<>(log, "MsgDroppedDiffCluster");

// ========================================== Timer ==============================
if(timer == null) {
Expand Down Expand Up @@ -970,6 +970,7 @@ public Object down(Event evt) {
thread_pool.setMaxThreads(new_size);
}
}
thread_pool.removeExpired();
break;

case Event.CONNECT:
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/UDP.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void init() throws Exception {
throw new IllegalArgumentException("bundler.max_size (" + bundler.getMaxSize() + ") cannot exceed the max " +
"datagram packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE);
if(is_mac && suppress_time_out_of_buffer_space > 0)
suppress_log_out_of_buffer_space=new SuppressLog<>(log, "FailureSendingToPhysAddr", "SuppressMsg");
suppress_log_out_of_buffer_space=new SuppressLog<>(log, "FailureSendingToPhysAddr");
}

/** Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads */
Expand Down
12 changes: 6 additions & 6 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public void init() throws Exception {
}

if(suppress_time_non_member_warnings > 0)
suppress_log_non_member=new SuppressLog<>(log, "MsgDroppedNak", "SuppressMsg");
suppress_log_non_member=new SuppressLog<>(log, "MsgDroppedNak");

// max bundle size (minus overhead) divided by <long size> times bits per long
int estimated_max_msgs_in_xmit_req=(transport.getBundler().getMaxSize() -50) * Global.LONG_SIZE;
Expand Down Expand Up @@ -748,15 +748,15 @@ protected void queueMessage(Message msg, long seqno) {
log.trace("%s: message %s#%d was discarded (not yet server, queue full)", local_addr, msg.getSrc(), seqno);
}

protected void unknownMember(Address sender, Object message) {
protected void unknownMember(Address sender) {
if(leaving)
return;
if(log_discard_msgs && log.isWarnEnabled()) {
if(suppress_log_non_member != null)
suppress_log_non_member.log(SuppressLog.Level.warn, sender, suppress_time_non_member_warnings,
local_addr, message, sender, view);
local_addr, sender, view);
else
log.warn(Util.getMessage("MsgDroppedNak"), local_addr, message, sender, view);
log.warn(Util.getMessage("MsgDroppedNak"), local_addr, sender, view);
}
}

Expand Down Expand Up @@ -821,7 +821,7 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) {
Address sender=msg.getSrc();
Table<Message> buf=xmit_table.get(sender);
if(buf == null) { // discard message if there is no entry for sender
unknownMember(sender, hdr.seqno);
unknownMember(sender);
return;
}

Expand Down Expand Up @@ -852,7 +852,7 @@ protected void handleMessageBatch(MessageBatch mb) {
Table<Message> buf=xmit_table.get(sender);
if(buf == null) { // discard message if there is no entry for sender
mb.removeIf(HAS_HEADER, true);
unknownMember(sender, "batch");
unknownMember(sender);
return;
}
int size=mb.size();
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/relay/RELAY.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void init() throws Exception {

if(suppress_time_no_route_errors <= 0)
throw new IllegalArgumentException("suppress_time_no_route_errors has to be > 0");
suppress_log_no_route=new SuppressLog<>(log, "RelayNoRouteToSite", "SuppressMsgRelay");
suppress_log_no_route=new SuppressLog<>(log, "RelayNoRouteToSite");
}

public void stop() {
Expand Down
6 changes: 3 additions & 3 deletions src/org/jgroups/util/SuppressCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class SuppressCache<T> {


/**
* Adds a new key to the hashmap, or updates the Value associated with the existing key if present. If expiry_time
* is greater than the age of the Value, the key will be removed.
* Adds a new key to the hashmap, or updates the value associated with the existing key if present. If expiry_time
* is greater than the age of the value, the key will be removed.
* @param key The key
* @param expiry_time Expiry time (in ms)
* @return Null if the key was present and not expired, or the Value associated with the existing key
* @return Null if the key was present and not expired, or the value associated with the existing key
* (its count incremented)
*/
public Value putIfAbsent(T key, long expiry_time) {
Expand Down
11 changes: 7 additions & 4 deletions src/org/jgroups/util/SuppressLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.jgroups.logging.Log;

import java.util.concurrent.TimeUnit;

/**
* Log (using {@link SuppressCache}) which suppresses (certain) messages from the same member for a given time
* @author Bela Ban
Expand All @@ -15,19 +17,19 @@ public class SuppressLog<T> {

public enum Level {error,warn,trace};

public SuppressLog(Log log, String message_key, String suppress_msg) {
public SuppressLog(Log log, String message_key) {
this.log=log;
cache=new SuppressCache<>();
message_format=Util.getMessage(message_key);
suppress_format=Util.getMessage(suppress_msg); // "(received %d identical messages from %s in the last %d ms)"
suppress_format=Util.getMessage("SuppressMsg"); // "(%d identical messages in the last %s)"
}

public SuppressCache<T> getCache() {return cache;}

/**
* Logs a message from a given member if is hasn't been logged for timeout ms
* @param level The level, either warn or error
* @param key The key into the SuppressCache
* @param key The key into the SuppressCache, e.g. a member address or other topic ("thread_pool_full")
* @param timeout The timeout
* @param args The arguments to the message key
*/
Expand All @@ -37,7 +39,8 @@ public void log(Level level, T key, long timeout, Object ... args) {
return;

String message=val.count() == 1? String.format(message_format, args) :
String.format(message_format, args) + " " + String.format(suppress_format, val.count(), key, val.age());
String.format(message_format, args) + " "
+ String.format(suppress_format, val.count(), Util.printTime(val.age(), TimeUnit.MILLISECONDS));

switch(level) {
case error:
Expand Down
83 changes: 34 additions & 49 deletions src/org/jgroups/util/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@
import org.jgroups.Global;
import org.jgroups.Lifecycle;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

import static org.jgroups.conf.AttributeType.SCALAR;
import static org.jgroups.conf.AttributeType.TIME;
import static org.jgroups.util.SuppressLog.Level.warn;

/**
* Thread pool based on {@link java.util.concurrent.ThreadPoolExecutor}
Expand All @@ -30,10 +26,7 @@ public class ThreadPool implements Lifecycle {
protected Log log;
protected ThreadFactory thread_factory;
protected Address address;

// Incremented when a message is rejected due to a full thread pool. When this value exceeds thread_dumps_threshold,
// the threads will be dumped at FATAL level, and thread_dumps will be reset to 0
protected final AtomicInteger thread_dumps=new AtomicInteger();
protected SuppressLog<String> thread_pool_full_log;

@Property(description="Whether or not the thread pool is enabled. If false, tasks will be run on the caller's thread")
protected boolean enabled=true;
Expand All @@ -54,18 +47,28 @@ public class ThreadPool implements Lifecycle {
"See Util.parseRejectionPolicy() for details")
protected String rejection_policy="abort";

@Property(description="The number of times a thread pool needs to be full before a thread dump is logged")
@Property(description="Time (in milliseconds) during which thread-pool full messages are suppressed",type=TIME)
protected long thread_pool_full_suppress_time=60_000;

@Property(description="The number of times a thread pool needs to be full before a thread dump is logged",
deprecatedMessage="ignored")
@Deprecated(since="5.4")
protected int thread_dumps_threshold=1;

@Property(description="Path to which the thread dump will be written. Ignored if null",
systemProperty="jgroups.threaddump.path")
systemProperty="jgroups.threaddump.path",deprecatedMessage="ignored")
@Deprecated(since="5.4")
protected String thread_dump_path;

@Property(description="Dump threads when the thread pool is full")
protected boolean thread_dumps_enabled;

@Property(description="Increases max_threads by the view size + delta if enabled " +
"(https://issues.redhat.com/browse/JGRP-2655)")
protected boolean increase_max_size_dynamically=true;

@Property(description="Added to the view size when the pool is increased dynamically")
@Property(description="If the view is greater than the max thread pool size, the latter is set to " +
"view size + delta. Only enabled if increase_max_size_dynamically is true")
protected int delta=10;

@ManagedAttribute(description="The number of messages dropped because the thread pool was full",type= SCALAR)
Expand Down Expand Up @@ -145,15 +148,12 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
((ThreadPoolExecutor)thread_pool).setRejectedExecutionHandler(handler);
}

public int getThreadDumpsThreshold() {
return thread_dumps_threshold;
}

public ThreadPool setThreadDumpsThreshold(int t) {
this.thread_dumps_threshold=t;
return this;
}

public long getThreadPoolFullSuppressTime() {return thread_pool_full_suppress_time;}
public ThreadPool setThreadPoolFullSuppressTime(long t) {this.thread_pool_full_suppress_time=t; return this;}
public boolean getThreadDumpsEnabled() {return thread_dumps_enabled;}
public ThreadPool setThreadDumpsEnabled(boolean b) {thread_dumps_enabled=b; return this;}
@Deprecated public int getThreadDumpsThreshold() {return 0;}
@Deprecated public ThreadPool setThreadDumpsThreshold(int t) {return this;}
public Address getAddress() {return address;}
public ThreadPool setAddress(Address a) {this.address=a; return this;}
public boolean getIncreaseMaxSizeDynamically() {return increase_max_size_dynamically;}
Expand All @@ -165,11 +165,8 @@ public ThreadPool setThreadDumpsThreshold(int t) {
public boolean useVirtualThreads() {return use_virtual_threads;}
public ThreadPool useVirtualThreads(boolean b) {use_virtual_threads=b; return this;}

@ManagedAttribute(description="Number of thread dumps",type=SCALAR)
public int getNumberOfThreadDumps() {return thread_dumps.get();}

@ManagedOperation(description="Resets the thread_dumps counter")
public void resetThreadDumps() {thread_dumps.set(0);}
@Deprecated public int getNumberOfThreadDumps() {return -1;}
@Deprecated public void resetThreadDumps() {}

@ManagedAttribute(description="Current number of threads in the thread pool",type=SCALAR)
public int getThreadPoolSize() {
Expand All @@ -178,7 +175,6 @@ public int getThreadPoolSize() {
return 0;
}


@ManagedAttribute(description="Current number of active threads in the thread pool",type=SCALAR)
public int getThreadPoolSizeActive() {
if(thread_pool instanceof ThreadPoolExecutor)
Expand All @@ -202,6 +198,7 @@ public void resetStats() {
public void init() throws Exception {
if(log == null)
log=LogFactory.getLog(getClass());
thread_pool_full_log=new SuppressLog<>(log, "ThreadPoolFull");
if(enabled) {
if(thread_factory == null)
thread_factory=new DefaultThreadFactory("thread-pool", true, true);
Expand All @@ -225,6 +222,11 @@ public void destroy() {
}
}

public ThreadPool removeExpired() {
thread_pool_full_log.removeExpired(thread_pool_full_suppress_time);
return this;
}

public void doExecute(Runnable task) {
thread_pool.execute(task);
}
Expand All @@ -238,27 +240,10 @@ public boolean execute(Runnable task) {
}
catch(RejectedExecutionException ex) {
num_rejected_msgs.increment();
// https://issues.redhat.com/browse/JGRP-2403
if(thread_dumps.incrementAndGet() == thread_dumps_threshold) {
String thread_dump=Util.dumpThreads();
if(thread_dump_path != null) {
File file=new File(thread_dump_path, "jgroups_threaddump_" + System.currentTimeMillis() + ".txt");
try(BufferedWriter writer=new BufferedWriter(new FileWriter(file))) {
writer.write(thread_dump);
log.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset): %s",
address, max_threads, getThreadPoolSize(), file.getAbsolutePath());
}
catch(IOException e) {
log.warn("%s: cannot generate the thread dump to %s: %s", address, file.getAbsolutePath(), e);
log.fatal("%s: thread pool is full (max=%d, active=%d); " +
"thread dump (dumped once, until thread_dump is reset):\n%s",
address, max_threads, getThreadPoolSize(), thread_dump);
}
}
else
log.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset):\n%s",
address, max_threads, getThreadPoolSize(), thread_dump);
}
//https://issues.redhat.com/browse/JGRP-2802
String thread_dump=thread_dumps_enabled? String.format(". Threads:\n%s", Util.dumpThreads()) : "";
thread_pool_full_log.log(warn, "thread-pool-full", thread_pool_full_suppress_time,
address, max_threads, getThreadPoolSize(), thread_dump);
return false;
}
catch(Throwable t) {
Expand Down
8 changes: 4 additions & 4 deletions src/org/jgroups/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -2628,10 +2628,10 @@ public static String suffix(TimeUnit u) {
case NANOSECONDS: return "ns";
case MICROSECONDS: return "us";
case MILLISECONDS: return "ms";
case SECONDS: return "s ";
case MINUTES: return "m ";
case HOURS: return "h ";
case DAYS: return "d ";
case SECONDS: return "s";
case MINUTES: return "m";
case HOURS: return "h";
case DAYS: return "d";
default: return u.toString();
}
}
Expand Down

0 comments on commit 6d5ed51

Please sign in to comment.