From 6d5ed517d2ebe7f7b432b94c9b2e23dff73e0dc6 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 4 Jul 2024 11:17:44 +0200 Subject: [PATCH] - Added SuppressLog for thread-pool-full event in ThreadPool (https://issues.redhat.com/browse/JGRP-2802) --- conf/jg-messages.properties | 6 +- conf/jg-messages_de.properties | 6 +- src/org/jgroups/protocols/TP.java | 5 +- src/org/jgroups/protocols/UDP.java | 2 +- src/org/jgroups/protocols/pbcast/NAKACK2.java | 12 +-- src/org/jgroups/protocols/relay/RELAY.java | 2 +- src/org/jgroups/util/SuppressCache.java | 6 +- src/org/jgroups/util/SuppressLog.java | 11 ++- src/org/jgroups/util/ThreadPool.java | 83 ++++++++----------- src/org/jgroups/util/Util.java | 8 +- 10 files changed, 65 insertions(+), 76 deletions(-) diff --git a/conf/jg-messages.properties b/conf/jg-messages.properties index c70f7c0ee4c..01e8dc73c1e 100644 --- a/conf/jg-messages.properties +++ b/conf/jg-messages.properties @@ -8,11 +8,10 @@ AttrReadFailure = JGRP000007: failed reading value of attribute %s: MissingAttribute = JGRP000008: did not find attribute with name %s AttrWriteFailure = JGRP000009: failed writing to attribute %s: %s VersionMismatch = JGRP000010: packet from %s has different version (%s) than ours (%s); packet is discarded -MsgDroppedNak = JGRP000011: %s: dropped message %s from non-member %s (view=%s) +MsgDroppedNak = JGRP000011: %s: dropped message from non-member %s (view=%s) MsgDroppedDiffCluster = JGRP000012: discarded message from different cluster %s (our cluster is %s). Sender was %s BatchDroppedDiffCluster = JGRP000013: discarded message batch from different cluster %s (our cluster is %s). Sender was %s -SuppressMsg = (received %d identical messages from %s in the last %d ms) -SuppressMsgRelay = (%d identical messages for site %s in the last %d ms) +SuppressMsg = (%d suppressed messages in the last %s) Deprecated = JGRP000014: %s has been deprecated: %s IncorrectBufferSize = JGRP000015: the %s buffer of socket %s was set to %s, but the OS only allocated %s CallbackException = JGRP000016: exception in %s callback: %s @@ -46,6 +45,7 @@ BufferSizeFailed = JGRP000049: failed setting %s buffer size of %d in CompressionFailure = JGRP000050: exception on uncompression RSVP_Misconfig = JGRP000051: resend_interval (%d) is >= timeout (%d); setting resend_interval to timeout / 3 RSVP_Timeout = JGRP000052: message ran into a timeout, missing acks: %s +ThreadPoolFull = JGRP000053: %s: thread pool is full (max=%d, active=%d)%s AREYOUDEADHdrFromIsNull = JGRP000101: ARE_YOU_DEAD: hdr.from is null AttemptToMoveFailedAt = JGRP000103: attempt to move failed at: AttemptToWriteDataFailedAt = JGRP000104: attempt to write data failed at diff --git a/conf/jg-messages_de.properties b/conf/jg-messages_de.properties index d25e03f0c5a..e15c33cef07 100644 --- a/conf/jg-messages_de.properties +++ b/conf/jg-messages_de.properties @@ -8,11 +8,10 @@ AttrReadFailure = JGRP000007: Attribut %s konnte nicht gelesen werde MissingAttribute = JGRP000008: Attribut %s wurde nicht gefunden AttrWriteFailure = JGRP000009: Attribut %s konnte nicht geschrieben werden: %s VersionMismatch = JGRP000010: Nachricht von %s hat eine andere Version (%s) als unsere (%s); Nachricht wird verworfen -MsgDroppedNak = JGRP000011: %s: Nachricht %s von Nicht-Mitglied %s wurde verworfen (View=%s) +MsgDroppedNak = JGRP000011: %s: Nachricht von Nicht-Mitglied %s wurde verworfen (View=%s) MsgDroppedDiffCluster = JGRP000012: Nachricht von Cluster %s wurde verworfen (unser Cluster ist %s). Der Sender war %s BatchDroppedDiffCluster = JGRP000013: Nachrichten-Batch von Cluster %s wurde verworfen (unser Cluster ist %s). Der Sender war %s -SuppressMsg = (%d identische Nachrichten erhalten von %s in den letzten %d ms) -SuppressMsgRelay = (%d identische Nachrichten für Site %s in den letzten %d ms) +SuppressMsg = (%d unterdrückte Nachrichten in den letzten %s) Deprecated = JGRP000014: %s wird nicht mehr unterstuetzt: %s IncorrectBufferSize = JGRP000015: Der %s Puffer des Sockets %s wurde auf %s gesetzt, aber das Betriebssystem stellt nur %s zur Verfuegung MethodNotFound=JGRP000023: %s: Methode %s.%s() nicht gefunden @@ -23,4 +22,5 @@ OperationInvocationFailure=JGRP000021: Fehler beim Aufruf der Operation %s: %s ReceiverFailure=JGRP000019: Fehler beim Uebergeben der Nachricht an den Empfaenger StackDestroyFailure=JGRP000020: Fehler beim Zerstoeren des Protokoll-Stacks UpHandlerFailure=JGRP000018: Fehler beim Uebergeben der Nachricht an den Up-Handler +ThreadPoolFull=JGRP000053: %s: thread pool ist voll (max=%d, activ=%d)%s RelayNoRouteToSite = JGRP000290: %s: keine Route für Nachricht von %s an %s: die Nachricht wird verworfen \ No newline at end of file diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index a7143f39af0..e5440d26e65 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -731,9 +731,9 @@ public void init() throws Exception { who_has_cache=new ExpiryCache<>(who_has_cache_timeout); if(suppress_time_different_version_warnings > 0) - suppress_log_different_version=new SuppressLog<>(log, "VersionMismatch", "SuppressMsg"); + suppress_log_different_version=new SuppressLog<>(log, "VersionMismatch"); if(suppress_time_different_cluster_warnings > 0) - suppress_log_different_cluster=new SuppressLog<>(log, "MsgDroppedDiffCluster", "SuppressMsg"); + suppress_log_different_cluster=new SuppressLog<>(log, "MsgDroppedDiffCluster"); // ========================================== Timer ============================== if(timer == null) { @@ -970,6 +970,7 @@ public Object down(Event evt) { thread_pool.setMaxThreads(new_size); } } + thread_pool.removeExpired(); break; case Event.CONNECT: diff --git a/src/org/jgroups/protocols/UDP.java b/src/org/jgroups/protocols/UDP.java index e41fa536a5e..488fea3f2a3 100644 --- a/src/org/jgroups/protocols/UDP.java +++ b/src/org/jgroups/protocols/UDP.java @@ -328,7 +328,7 @@ public void init() throws Exception { throw new IllegalArgumentException("bundler.max_size (" + bundler.getMaxSize() + ") cannot exceed the max " + "datagram packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE); if(is_mac && suppress_time_out_of_buffer_space > 0) - suppress_log_out_of_buffer_space=new SuppressLog<>(log, "FailureSendingToPhysAddr", "SuppressMsg"); + suppress_log_out_of_buffer_space=new SuppressLog<>(log, "FailureSendingToPhysAddr"); } /** Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads */ diff --git a/src/org/jgroups/protocols/pbcast/NAKACK2.java b/src/org/jgroups/protocols/pbcast/NAKACK2.java index a13f3001313..308b4cfe382 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK2.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK2.java @@ -494,7 +494,7 @@ public void init() throws Exception { } if(suppress_time_non_member_warnings > 0) - suppress_log_non_member=new SuppressLog<>(log, "MsgDroppedNak", "SuppressMsg"); + suppress_log_non_member=new SuppressLog<>(log, "MsgDroppedNak"); // max bundle size (minus overhead) divided by times bits per long int estimated_max_msgs_in_xmit_req=(transport.getBundler().getMaxSize() -50) * Global.LONG_SIZE; @@ -748,15 +748,15 @@ protected void queueMessage(Message msg, long seqno) { log.trace("%s: message %s#%d was discarded (not yet server, queue full)", local_addr, msg.getSrc(), seqno); } - protected void unknownMember(Address sender, Object message) { + protected void unknownMember(Address sender) { if(leaving) return; if(log_discard_msgs && log.isWarnEnabled()) { if(suppress_log_non_member != null) suppress_log_non_member.log(SuppressLog.Level.warn, sender, suppress_time_non_member_warnings, - local_addr, message, sender, view); + local_addr, sender, view); else - log.warn(Util.getMessage("MsgDroppedNak"), local_addr, message, sender, view); + log.warn(Util.getMessage("MsgDroppedNak"), local_addr, sender, view); } } @@ -821,7 +821,7 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) { Address sender=msg.getSrc(); Table buf=xmit_table.get(sender); if(buf == null) { // discard message if there is no entry for sender - unknownMember(sender, hdr.seqno); + unknownMember(sender); return; } @@ -852,7 +852,7 @@ protected void handleMessageBatch(MessageBatch mb) { Table buf=xmit_table.get(sender); if(buf == null) { // discard message if there is no entry for sender mb.removeIf(HAS_HEADER, true); - unknownMember(sender, "batch"); + unknownMember(sender); return; } int size=mb.size(); diff --git a/src/org/jgroups/protocols/relay/RELAY.java b/src/org/jgroups/protocols/relay/RELAY.java index 794e36cb399..0135dcd3ddc 100644 --- a/src/org/jgroups/protocols/relay/RELAY.java +++ b/src/org/jgroups/protocols/relay/RELAY.java @@ -277,7 +277,7 @@ public void init() throws Exception { if(suppress_time_no_route_errors <= 0) throw new IllegalArgumentException("suppress_time_no_route_errors has to be > 0"); - suppress_log_no_route=new SuppressLog<>(log, "RelayNoRouteToSite", "SuppressMsgRelay"); + suppress_log_no_route=new SuppressLog<>(log, "RelayNoRouteToSite"); } public void stop() { diff --git a/src/org/jgroups/util/SuppressCache.java b/src/org/jgroups/util/SuppressCache.java index ad8356251af..2bda05b6136 100644 --- a/src/org/jgroups/util/SuppressCache.java +++ b/src/org/jgroups/util/SuppressCache.java @@ -19,11 +19,11 @@ public class SuppressCache { /** - * Adds a new key to the hashmap, or updates the Value associated with the existing key if present. If expiry_time - * is greater than the age of the Value, the key will be removed. + * Adds a new key to the hashmap, or updates the value associated with the existing key if present. If expiry_time + * is greater than the age of the value, the key will be removed. * @param key The key * @param expiry_time Expiry time (in ms) - * @return Null if the key was present and not expired, or the Value associated with the existing key + * @return Null if the key was present and not expired, or the value associated with the existing key * (its count incremented) */ public Value putIfAbsent(T key, long expiry_time) { diff --git a/src/org/jgroups/util/SuppressLog.java b/src/org/jgroups/util/SuppressLog.java index 64be39181b0..417f29bcef6 100644 --- a/src/org/jgroups/util/SuppressLog.java +++ b/src/org/jgroups/util/SuppressLog.java @@ -2,6 +2,8 @@ import org.jgroups.logging.Log; +import java.util.concurrent.TimeUnit; + /** * Log (using {@link SuppressCache}) which suppresses (certain) messages from the same member for a given time * @author Bela Ban @@ -15,11 +17,11 @@ public class SuppressLog { public enum Level {error,warn,trace}; - public SuppressLog(Log log, String message_key, String suppress_msg) { + public SuppressLog(Log log, String message_key) { this.log=log; cache=new SuppressCache<>(); message_format=Util.getMessage(message_key); - suppress_format=Util.getMessage(suppress_msg); // "(received %d identical messages from %s in the last %d ms)" + suppress_format=Util.getMessage("SuppressMsg"); // "(%d identical messages in the last %s)" } public SuppressCache getCache() {return cache;} @@ -27,7 +29,7 @@ public SuppressLog(Log log, String message_key, String suppress_msg) { /** * Logs a message from a given member if is hasn't been logged for timeout ms * @param level The level, either warn or error - * @param key The key into the SuppressCache + * @param key The key into the SuppressCache, e.g. a member address or other topic ("thread_pool_full") * @param timeout The timeout * @param args The arguments to the message key */ @@ -37,7 +39,8 @@ public void log(Level level, T key, long timeout, Object ... args) { return; String message=val.count() == 1? String.format(message_format, args) : - String.format(message_format, args) + " " + String.format(suppress_format, val.count(), key, val.age()); + String.format(message_format, args) + " " + + String.format(suppress_format, val.count(), Util.printTime(val.age(), TimeUnit.MILLISECONDS)); switch(level) { case error: diff --git a/src/org/jgroups/util/ThreadPool.java b/src/org/jgroups/util/ThreadPool.java index 9752ea0f78e..c326bd82af4 100644 --- a/src/org/jgroups/util/ThreadPool.java +++ b/src/org/jgroups/util/ThreadPool.java @@ -4,21 +4,17 @@ import org.jgroups.Global; import org.jgroups.Lifecycle; import org.jgroups.annotations.ManagedAttribute; -import org.jgroups.annotations.ManagedOperation; import org.jgroups.annotations.Property; import org.jgroups.conf.AttributeType; import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import static org.jgroups.conf.AttributeType.SCALAR; +import static org.jgroups.conf.AttributeType.TIME; +import static org.jgroups.util.SuppressLog.Level.warn; /** * Thread pool based on {@link java.util.concurrent.ThreadPoolExecutor} @@ -30,10 +26,7 @@ public class ThreadPool implements Lifecycle { protected Log log; protected ThreadFactory thread_factory; protected Address address; - - // Incremented when a message is rejected due to a full thread pool. When this value exceeds thread_dumps_threshold, - // the threads will be dumped at FATAL level, and thread_dumps will be reset to 0 - protected final AtomicInteger thread_dumps=new AtomicInteger(); + protected SuppressLog thread_pool_full_log; @Property(description="Whether or not the thread pool is enabled. If false, tasks will be run on the caller's thread") protected boolean enabled=true; @@ -54,18 +47,28 @@ public class ThreadPool implements Lifecycle { "See Util.parseRejectionPolicy() for details") protected String rejection_policy="abort"; - @Property(description="The number of times a thread pool needs to be full before a thread dump is logged") + @Property(description="Time (in milliseconds) during which thread-pool full messages are suppressed",type=TIME) + protected long thread_pool_full_suppress_time=60_000; + + @Property(description="The number of times a thread pool needs to be full before a thread dump is logged", + deprecatedMessage="ignored") + @Deprecated(since="5.4") protected int thread_dumps_threshold=1; @Property(description="Path to which the thread dump will be written. Ignored if null", - systemProperty="jgroups.threaddump.path") + systemProperty="jgroups.threaddump.path",deprecatedMessage="ignored") + @Deprecated(since="5.4") protected String thread_dump_path; + @Property(description="Dump threads when the thread pool is full") + protected boolean thread_dumps_enabled; + @Property(description="Increases max_threads by the view size + delta if enabled " + "(https://issues.redhat.com/browse/JGRP-2655)") protected boolean increase_max_size_dynamically=true; - @Property(description="Added to the view size when the pool is increased dynamically") + @Property(description="If the view is greater than the max thread pool size, the latter is set to " + + "view size + delta. Only enabled if increase_max_size_dynamically is true") protected int delta=10; @ManagedAttribute(description="The number of messages dropped because the thread pool was full",type= SCALAR) @@ -145,15 +148,12 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { ((ThreadPoolExecutor)thread_pool).setRejectedExecutionHandler(handler); } - public int getThreadDumpsThreshold() { - return thread_dumps_threshold; - } - - public ThreadPool setThreadDumpsThreshold(int t) { - this.thread_dumps_threshold=t; - return this; - } - + public long getThreadPoolFullSuppressTime() {return thread_pool_full_suppress_time;} + public ThreadPool setThreadPoolFullSuppressTime(long t) {this.thread_pool_full_suppress_time=t; return this;} + public boolean getThreadDumpsEnabled() {return thread_dumps_enabled;} + public ThreadPool setThreadDumpsEnabled(boolean b) {thread_dumps_enabled=b; return this;} + @Deprecated public int getThreadDumpsThreshold() {return 0;} + @Deprecated public ThreadPool setThreadDumpsThreshold(int t) {return this;} public Address getAddress() {return address;} public ThreadPool setAddress(Address a) {this.address=a; return this;} public boolean getIncreaseMaxSizeDynamically() {return increase_max_size_dynamically;} @@ -165,11 +165,8 @@ public ThreadPool setThreadDumpsThreshold(int t) { public boolean useVirtualThreads() {return use_virtual_threads;} public ThreadPool useVirtualThreads(boolean b) {use_virtual_threads=b; return this;} - @ManagedAttribute(description="Number of thread dumps",type=SCALAR) - public int getNumberOfThreadDumps() {return thread_dumps.get();} - - @ManagedOperation(description="Resets the thread_dumps counter") - public void resetThreadDumps() {thread_dumps.set(0);} + @Deprecated public int getNumberOfThreadDumps() {return -1;} + @Deprecated public void resetThreadDumps() {} @ManagedAttribute(description="Current number of threads in the thread pool",type=SCALAR) public int getThreadPoolSize() { @@ -178,7 +175,6 @@ public int getThreadPoolSize() { return 0; } - @ManagedAttribute(description="Current number of active threads in the thread pool",type=SCALAR) public int getThreadPoolSizeActive() { if(thread_pool instanceof ThreadPoolExecutor) @@ -202,6 +198,7 @@ public void resetStats() { public void init() throws Exception { if(log == null) log=LogFactory.getLog(getClass()); + thread_pool_full_log=new SuppressLog<>(log, "ThreadPoolFull"); if(enabled) { if(thread_factory == null) thread_factory=new DefaultThreadFactory("thread-pool", true, true); @@ -225,6 +222,11 @@ public void destroy() { } } + public ThreadPool removeExpired() { + thread_pool_full_log.removeExpired(thread_pool_full_suppress_time); + return this; + } + public void doExecute(Runnable task) { thread_pool.execute(task); } @@ -238,27 +240,10 @@ public boolean execute(Runnable task) { } catch(RejectedExecutionException ex) { num_rejected_msgs.increment(); - // https://issues.redhat.com/browse/JGRP-2403 - if(thread_dumps.incrementAndGet() == thread_dumps_threshold) { - String thread_dump=Util.dumpThreads(); - if(thread_dump_path != null) { - File file=new File(thread_dump_path, "jgroups_threaddump_" + System.currentTimeMillis() + ".txt"); - try(BufferedWriter writer=new BufferedWriter(new FileWriter(file))) { - writer.write(thread_dump); - log.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset): %s", - address, max_threads, getThreadPoolSize(), file.getAbsolutePath()); - } - catch(IOException e) { - log.warn("%s: cannot generate the thread dump to %s: %s", address, file.getAbsolutePath(), e); - log.fatal("%s: thread pool is full (max=%d, active=%d); " + - "thread dump (dumped once, until thread_dump is reset):\n%s", - address, max_threads, getThreadPoolSize(), thread_dump); - } - } - else - log.fatal("%s: thread pool is full (max=%d, active=%d); thread dump (dumped once, until thread_dump is reset):\n%s", - address, max_threads, getThreadPoolSize(), thread_dump); - } + //https://issues.redhat.com/browse/JGRP-2802 + String thread_dump=thread_dumps_enabled? String.format(". Threads:\n%s", Util.dumpThreads()) : ""; + thread_pool_full_log.log(warn, "thread-pool-full", thread_pool_full_suppress_time, + address, max_threads, getThreadPoolSize(), thread_dump); return false; } catch(Throwable t) { diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index c0b48a2761b..a419111c973 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -2628,10 +2628,10 @@ public static String suffix(TimeUnit u) { case NANOSECONDS: return "ns"; case MICROSECONDS: return "us"; case MILLISECONDS: return "ms"; - case SECONDS: return "s "; - case MINUTES: return "m "; - case HOURS: return "h "; - case DAYS: return "d "; + case SECONDS: return "s"; + case MINUTES: return "m"; + case HOURS: return "h"; + case DAYS: return "d"; default: return u.toString(); } }