From d04582e8db261c1e83ed37a5e2fd1d06826b72aa Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Sat, 13 Jul 2024 01:54:59 +0200 Subject: [PATCH 01/11] Fix busy-waiting loop for server-session and Ball and Twin #2977 --- .../java/com/iluwatar/sessionserver/App.java | 49 +++++++++---------- .../java/com/iluwatar/twin/BallThread.java | 22 ++++----- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index a3c66d3ff634..bf41f173b9bc 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -28,9 +28,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.time.Instant; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** @@ -54,8 +57,9 @@ public class App { // Map to store session data (simulated using a HashMap) - private static Map sessions = new HashMap<>(); - private static Map sessionCreationTimes = new HashMap<>(); + private static final Map sessions = new ConcurrentHashMap<>(); + private static final Map sessionCreationTimes = new ConcurrentHashMap<>(); + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private static final long SESSION_EXPIRATION_TIME = 10000; /** @@ -81,31 +85,24 @@ public static void main(String[] args) throws IOException { } private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - LOGGER.info("Session expiration checker started..."); - Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); - } - } - } + scheduler.scheduleWithFixedDelay(() -> { + try { + LOGGER.info("Session expiration checker started..."); + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + Instant currentTime = Instant.now(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + sessions.remove(entry.getKey()); + iterator.remove(); } - LOGGER.info("Session expiration checker finished!"); - } catch (InterruptedException e) { - LOGGER.error("An error occurred: ", e); - Thread.currentThread().interrupt(); } + LOGGER.info("Session expiration checker finished!"); + + } catch (Exception e) { + LOGGER.error("An error occured: ", e); } - }).start(); + }, 0, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } } \ No newline at end of file diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 9d4d9cf71a76..0aff97abdc27 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -26,6 +26,9 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend @@ -40,24 +43,19 @@ public class BallThread extends Thread { private volatile boolean isSuspended; - private volatile boolean isRunning = true; + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + /** * Run the thread. */ public void run() { - - while (isRunning) { - if (!isSuspended) { + scheduler.scheduleWithFixedDelay(()->{ + if (!isSuspended){ twin.draw(); twin.move(); } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + }, 0, 250, TimeUnit.MILLISECONDS); } public void suspendMe() { @@ -71,8 +69,10 @@ public void resumeMe() { } public void stopMe() { - this.isRunning = false; this.isSuspended = true; + if (scheduler != null){ + scheduler.shutdown(); + } } } From f339b2f7607118800909237954ab4fb116ac4a3c Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Sun, 14 Jul 2024 21:26:38 +0200 Subject: [PATCH 02/11] Update Readme for server-session pattern and Twin pattern #2977 --- server-session/README.md | 65 +++++++++---------- twin/README.md | 21 +++--- .../java/com/iluwatar/twin/BallThread.java | 10 +-- 3 files changed, 46 insertions(+), 50 deletions(-) diff --git a/server-session/README.md b/server-session/README.md index 4ce452b53f7d..0b0158a85bb1 100644 --- a/server-session/README.md +++ b/server-session/README.md @@ -47,46 +47,45 @@ The `main` application starts a server and assigns handlers to manage login and ```java public class App { - private static Map sessions = new HashMap<>(); - private static Map sessionCreationTimes = new HashMap<>(); - private static final long SESSION_EXPIRATION_TIME = 10000; + private static final Map sessions = new ConcurrentHashMap<>(); + private static final Map sessionCreationTimes = new ConcurrentHashMap<>(); + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private static final long SESSION_EXPIRATION_TIME = 10000; - public static void main(String[] args) throws IOException { - HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); + public static void main(String[] args) { + HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); - server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); - server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); + server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); + server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); - server.start(); + server.start(); - sessionExpirationTask(); - } + sessionExpirationTask(); + + LOGGER.info("Server started. Listening on port 8080..."); + } + + private static void sessionExpirationTask() { + scheduler.scheduleWithFixedDelay(() -> { + try { + LOGGER.info("Session expiration checker started..."); + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + Instant currentTime = Instant.now(); - private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - Thread.sleep(SESSION_EXPIRATION_TIME); - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + sessions.remove(entry.getKey()); + iterator.remove(); + } } - } + LOGGER.info("Session expiration checker finished!"); + + } catch (Exception e) { + LOGGER.error("An error occured: ", e); } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }).start(); - } + }, 0, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); + } } ``` diff --git a/twin/README.md b/twin/README.md index b2913e0a963f..13b06593d4c8 100644 --- a/twin/README.md +++ b/twin/README.md @@ -85,20 +85,15 @@ public class BallThread extends Thread { @Setter private BallItem twin; private volatile boolean isSuspended; - private volatile boolean isRunning = true; + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public void run() { - while (isRunning) { - if (!isSuspended) { - twin.draw(); - twin.move(); + scheduler.scheduleWithFixedDelay(()->{ + if (!isSuspended){ + twin.draw(); + twin.move(); } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + }, 0, 250, TimeUnit.MILLISECONDS); } public void suspendMe() { @@ -112,8 +107,10 @@ public class BallThread extends Thread { } public void stopMe() { - this.isRunning = false; this.isSuspended = true; + if (scheduler != null){ + scheduler.shutdown(); + } } } ``` diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 0aff97abdc27..fcee24f6a6de 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -24,11 +24,11 @@ */ package com.iluwatar.twin; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; /** * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend @@ -50,8 +50,8 @@ public class BallThread extends Thread { * Run the thread. */ public void run() { - scheduler.scheduleWithFixedDelay(()->{ - if (!isSuspended){ + scheduler.scheduleWithFixedDelay(() -> { + if (!isSuspended) { twin.draw(); twin.move(); } @@ -70,7 +70,7 @@ public void resumeMe() { public void stopMe() { this.isSuspended = true; - if (scheduler != null){ + if (scheduler != null) { scheduler.shutdown(); } } From 302b6b33726ed30311bc0711c159cf7ff5697cee Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Mon, 15 Jul 2024 08:53:38 +0200 Subject: [PATCH 03/11] Refactor: remove busy waiting for LogAggregator #2977 --- .../com/iluwatar/logaggregation/LogAggregator.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 37417e21267d..f16b52944c36 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; @@ -46,6 +47,7 @@ public class LogAggregator { private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicInteger logCount = new AtomicInteger(0); /** @@ -106,15 +108,7 @@ private void flushBuffer() { } private void startBufferFlusher() { - executorService.execute(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(5000); // Flush every 5 seconds. - flushBuffer(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }); + //flush every 5 seconds + scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS); } } From 5a460a0ea7bcaf7bcd400fee42c57c5fcf5ab577 Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Thu, 18 Jul 2024 18:28:33 +0200 Subject: [PATCH 04/11] Update README for LogAggregator #2977 --- microservices-log-aggregation/README.md | 61 ++++++++++++++++++++----- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/microservices-log-aggregation/README.md b/microservices-log-aggregation/README.md index c23b1fe4319b..56cf035c74c7 100644 --- a/microservices-log-aggregation/README.md +++ b/microservices-log-aggregation/README.md @@ -66,19 +66,58 @@ The `LogAggregator` collects logs from various services and stores them in the ` ```java public class LogAggregator { - private final CentralLogStore centralLogStore; - private final LogLevel minimumLogLevel; - public LogAggregator(CentralLogStore centralLogStore, LogLevel minimumLogLevel) { - this.centralLogStore = centralLogStore; - this.minimumLogLevel = minimumLogLevel; - } + private static final int BUFFER_THRESHOLD = 3; + private final CentralLogStore centralLogStore; + private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + private final LogLevel minLogLevel; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final AtomicInteger logCount = new AtomicInteger(0); + + public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { + this.centralLogStore = centralLogStore; + this.minLogLevel = minLogLevel; + startBufferFlusher(); + } + + public void collectLog(LogEntry logEntry) { + if (logEntry.getLevel() == null || minLogLevel == null) { + LOGGER.warn("Log level or threshold level is null. Skipping."); + return; + } + + if (logEntry.getLevel().compareTo(minLogLevel) < 0) { + LOGGER.debug("Log level below threshold. Skipping."); + return; + } + + buffer.offer(logEntry); + + if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) { + flushBuffer(); + } + } + + public void stop() { + executorService.shutdownNow(); + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("Log aggregator did not terminate."); + } + flushBuffer(); + } - public void collectLog(LogEntry logEntry) { - if (logEntry.getLogLevel().compareTo(minimumLogLevel) >= 0) { - centralLogStore.storeLog(logEntry); + private void flushBuffer() { + LogEntry logEntry; + while ((logEntry = buffer.poll()) != null) { + centralLogStore.storeLog(logEntry); + logCount.decrementAndGet(); + } + } + + private void startBufferFlusher() { + scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS); } - } } ``` @@ -107,7 +146,7 @@ The `main` application creates services, generates logs, aggregates, and finally ```java public class App { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) { final CentralLogStore centralLogStore = new CentralLogStore(); final LogAggregator aggregator = new LogAggregator(centralLogStore, LogLevel.INFO); From c8eb70819d63a01377f76d4e1f3823cefa100644 Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Thu, 18 Jul 2024 18:37:00 +0200 Subject: [PATCH 05/11] Refactor: shutdown the scheduler in LogAggregator #2977 --- .../java/com/iluwatar/logaggregation/LogAggregator.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index f16b52944c36..58c9d224ab0e 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -25,7 +25,6 @@ package com.iluwatar.logaggregation; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -46,7 +45,6 @@ public class LogAggregator { private final CentralLogStore centralLogStore; private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicInteger logCount = new AtomicInteger(0); @@ -92,8 +90,8 @@ public void collectLog(LogEntry logEntry) { * @throws InterruptedException If any thread has interrupted the current thread. */ public void stop() throws InterruptedException { - executorService.shutdownNow(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { LOGGER.error("Log aggregator did not terminate."); } flushBuffer(); From 913668dd07384a09235316ce42b1b995857fdd68 Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Thu, 18 Jul 2024 19:09:53 +0200 Subject: [PATCH 06/11] Refactor: Remove busy-waiting for Commander #2977 --- .../java/com/iluwatar/commander/Retry.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/commander/src/main/java/com/iluwatar/commander/Retry.java b/commander/src/main/java/com/iluwatar/commander/Retry.java index 71614668254b..23ccc9802d57 100644 --- a/commander/src/main/java/com/iluwatar/commander/Retry.java +++ b/commander/src/main/java/com/iluwatar/commander/Retry.java @@ -28,6 +28,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -59,6 +62,7 @@ public interface HandleErrorIssue { private static final SecureRandom RANDOM = new SecureRandom(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Operation op; private final HandleErrorIssue handleError; private final int maxAttempts; @@ -86,26 +90,25 @@ public interface HandleErrorIssue { */ public void perform(List list, T obj) { - do { + scheduler.schedule(() -> { try { op.operation(list); - return; - } catch (Exception e) { + }catch (Exception e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { this.handleError.handleIssue(obj, e); + scheduler.shutdown(); return; //return here... don't go further } - try { - long testDelay = - (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); - long delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } + perform(list, obj); } - } while (true); + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay(){ + long testDelay = + (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, this.maxDelay); } } From 5a9f3359bbb3779d918916a51752ba83d8cd9859 Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Thu, 18 Jul 2024 23:27:34 +0200 Subject: [PATCH 07/11] Refactor: Remove busy-waiting for Retry and RetryExponentialBackoff. Update README #2977 --- retry/README.md | 42 +++++++++++------ .../main/java/com/iluwatar/retry/Retry.java | 40 ++++++++++++---- .../retry/RetryExponentialBackoff.java | 46 ++++++++++++++----- 3 files changed, 93 insertions(+), 35 deletions(-) diff --git a/retry/README.md b/retry/README.md index 193866e436b5..04b4ac7182f8 100644 --- a/retry/README.md +++ b/retry/README.md @@ -61,14 +61,14 @@ The `Retry` class is where the Retry pattern is implemented. It takes a `Busines ```java public final class Retry implements BusinessOperation { - private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final int maxAttempts; private final long delay; private final AtomicInteger attempts; private final Predicate test; private final List errors; - + @SafeVarargs public Retry( BusinessOperation op, @@ -83,34 +83,48 @@ public final class Retry implements BusinessOperation { this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); } - + public List errors() { return Collections.unmodifiableList(this.errors); } - + public int attempts() { return this.attempts.intValue(); } @Override public T perform() throws BusinessException { - do { + final CompletableFuture future = new CompletableFuture<>(); + + performRetry(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be){ + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performRetry(CompletableFuture future){ + scheduler.schedule(() -> { try { - return this.op.perform(); - } catch (BusinessException e) { + T result = this.op.perform(); + future.complete(result); + } catch (BusinessException e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; + future.completeExceptionally(e); + return; } - try { - Thread.sleep(this.delay); - } catch (InterruptedException f) { - //ignore - } + performRetry(future); } - } while (true); + }, this.delay, TimeUnit.MILLISECONDS); } } ``` diff --git a/retry/src/main/java/com/iluwatar/retry/Retry.java b/retry/src/main/java/com/iluwatar/retry/Retry.java index ad9580454993..a73e90c8b1f5 100644 --- a/retry/src/main/java/com/iluwatar/retry/Retry.java +++ b/retry/src/main/java/com/iluwatar/retry/Retry.java @@ -28,6 +28,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -38,6 +43,7 @@ */ public final class Retry implements BusinessOperation { private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final int maxAttempts; private final long delay; private final AtomicInteger attempts; @@ -88,22 +94,36 @@ public int attempts() { @Override public T perform() throws BusinessException { - do { + final CompletableFuture future = new CompletableFuture<>(); + + performRetry(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be){ + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performRetry(CompletableFuture future){ + scheduler.schedule(() -> { try { - return this.op.perform(); - } catch (BusinessException e) { + T result = this.op.perform(); + future.complete(result); + } catch (BusinessException e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; + future.completeExceptionally(e); + return; } - try { - Thread.sleep(this.delay); - } catch (InterruptedException f) { - //ignore - } + performRetry(future); } - } while (true); + }, this.delay, TimeUnit.MILLISECONDS); } } diff --git a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java index 1661095b7298..1a59a3f2175e 100644 --- a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java +++ b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java @@ -22,6 +22,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ + package com.iluwatar.retry; import java.util.ArrayList; @@ -29,6 +30,11 @@ import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -40,6 +46,7 @@ public final class RetryExponentialBackoff implements BusinessOperation { private static final Random RANDOM = new Random(); private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final int maxAttempts; private final long maxDelay; private final AtomicInteger attempts; @@ -89,24 +96,41 @@ public int attempts() { @Override public T perform() throws BusinessException { - do { + final CompletableFuture future = new CompletableFuture<>(); + + performWithRetries(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be) { + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performWithRetries(CompletableFuture future) { + scheduler.schedule(() -> { try { - return this.op.perform(); + T result = this.op.perform(); + future.complete(result); } catch (BusinessException e) { this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; + future.completeExceptionally(e); + return; } - try { - var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); - var delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } + performWithRetries(future); } - } while (true); + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay() { + var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, this.maxDelay); } } From 302f92dce653b968ce708bcb7facfd427a5412c9 Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Sun, 21 Jul 2024 15:21:02 +0200 Subject: [PATCH 08/11] Refactor: Remove busy-waiting for ServiceExecutor. Update README #2977 --- queue-based-load-leveling/README.md | 75 ++++++++++++------- .../com/iluwatar/queue/load/leveling/App.java | 8 +- .../queue/load/leveling/ServiceExecutor.java | 32 +++++--- 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/queue-based-load-leveling/README.md b/queue-based-load-leveling/README.md index a72d39ea3c43..1b4bc2f4bcd0 100644 --- a/queue-based-load-leveling/README.md +++ b/queue-based-load-leveling/README.md @@ -99,25 +99,36 @@ The `ServiceExecutor` class represents the task consumer. It retrieves tasks fro ```java public class ServiceExecutor implements Runnable { - - private MessageQueue msgQueue; - public ServiceExecutor(MessageQueue msgQueue) { - this.msgQueue = msgQueue; - } + private final MessageQueue msgQueue; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - @Override - public void run() { - while (true) { - Message message = msgQueue.getMessage(); // Retrieve a message from the queue - if (message != null) { - // Process the message - } else { - // No more messages to process - break; - } + public ServiceExecutor(MessageQueue msgQueue) { + this.msgQueue = msgQueue; + } + + public void run() { + scheduler.scheduleWithFixedDelay(() -> { + var msg = msgQueue.retrieveMsg(); + + if (null != msg) { + LOGGER.info(msg + " is served."); + } else { + LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + } + }, 0, 1, TimeUnit.SECONDS); + } + + public void shutdown(int shutdownTime) { + try { + if (!scheduler.awaitTermination(shutdownTime, TimeUnit.SECONDS)) { + LOGGER.info("Executor was shut down and Exiting."); + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + LOGGER.error(e.getMessage()); + } } - } } ``` @@ -125,23 +136,29 @@ Finally, we have the `App` class which sets up the `TaskGenerator` and `ServiceE ```java public class App { - public static void main(String[] args) { - var msgQueue = new MessageQueue(); + public static void main(String[] args) { + ExecutorService executor = null; - final var taskRunnable1 = new TaskGenerator(msgQueue, 5); - final var taskRunnable2 = new TaskGenerator(msgQueue, 1); - final var taskRunnable3 = new TaskGenerator(msgQueue, 2); + var msgQueue = new MessageQueue(); - final var srvRunnable = new ServiceExecutor(msgQueue); + final var taskRunnable1 = new TaskGenerator(msgQueue, 5); + final var taskRunnable2 = new TaskGenerator(msgQueue, 1); + final var taskRunnable3 = new TaskGenerator(msgQueue, 2); - ExecutorService executor = Executors.newFixedThreadPool(2); - executor.submit(taskRunnable1); - executor.submit(taskRunnable2); - executor.submit(taskRunnable3); - executor.submit(srvRunnable); + final var srvRunnable = new ServiceExecutor(msgQueue); - executor.shutdown(); - } + executor = Executors.newFixedThreadPool(2); + executor.submit(taskRunnable1); + executor.submit(taskRunnable2); + executor.submit(taskRunnable3); + + executor.submit(srvRunnable); + + executor.shutdown(); + + srvRunnable.shutdown(SHUTDOWN_TIME); + + } } ``` diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java index 7042ff7b79a2..2ceeb8d79ee5 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java @@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** @@ -104,12 +103,7 @@ public static void main(String[] args) { + " Executor will shutdown only after all the Threads are completed."); executor.shutdown(); - // Wait for SHUTDOWN_TIME seconds for all the threads to complete - // their tasks and then shut down the executor and then exit. - if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { - LOGGER.info("Executor was shut down and Exiting."); - executor.shutdownNow(); - } + srvRunnable.shutdown(SHUTDOWN_TIME); } catch (Exception e) { LOGGER.error(e.getMessage()); } diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 02530042b370..47a0a9d03c43 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -25,6 +25,9 @@ package com.iluwatar.queue.load.leveling; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and @@ -32,8 +35,8 @@ */ @Slf4j public class ServiceExecutor implements Runnable { - private final MessageQueue msgQueue; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; @@ -43,19 +46,26 @@ public ServiceExecutor(MessageQueue msgQueue) { * The ServiceExecutor thread will retrieve each message and process it. */ public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - var msg = msgQueue.retrieveMsg(); + scheduler.scheduleWithFixedDelay(() -> { + var msg = msgQueue.retrieveMsg(); - if (null != msg) { - LOGGER.info(msg + " is served."); - } else { - LOGGER.info("Service Executor: Waiting for Messages to serve .. "); - } + if (null != msg) { + LOGGER.info(msg + " is served."); + } else { + LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + } + }, 0, 1, TimeUnit.SECONDS); + } - Thread.sleep(1000); + public void shutdown(int shutdownTime) { + // Wait for SHUTDOWN_TIME seconds for all the threads to complete + // their tasks and then shut down the executor and then exit. + try { + if (!scheduler.awaitTermination(shutdownTime, TimeUnit.SECONDS)) { + LOGGER.info("Executor was shut down and Exiting."); + scheduler.shutdownNow(); } - } catch (Exception e) { + } catch (InterruptedException e) { LOGGER.error(e.getMessage()); } } From fa52b29e0ea9f90ad406331d17174165ebf6b95d Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Sun, 21 Jul 2024 15:41:36 +0200 Subject: [PATCH 09/11] Refactor: Enhance scheduler session-server #2977 --- .../src/main/java/com/iluwatar/sessionserver/App.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index bf41f173b9bc..a2211550c27d 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -88,8 +88,9 @@ private static void sessionExpirationTask() { scheduler.scheduleWithFixedDelay(() -> { try { LOGGER.info("Session expiration checker started..."); - Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + Instant currentTime = Instant.now(); + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); @@ -103,6 +104,6 @@ private static void sessionExpirationTask() { } catch (Exception e) { LOGGER.error("An error occured: ", e); } - }, 0, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); + }, SESSION_EXPIRATION_TIME, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } } \ No newline at end of file From 643e09f9576c471067203e3a1901850adcb34e7d Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Sun, 21 Jul 2024 16:51:35 +0200 Subject: [PATCH 10/11] fix checkstyle violation #2977 --- twin/src/main/java/com/iluwatar/twin/BallThread.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index fcee24f6a6de..105db2e17469 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -68,6 +68,9 @@ public void resumeMe() { LOGGER.info("Begin to resume BallThread"); } + /** + * Stop the scheduled task. + */ public void stopMe() { this.isSuspended = true; if (scheduler != null) { From 16c3361e80add6908702cb7047d1d55f073e6b85 Mon Sep 17 00:00:00 2001 From: Mehdi-17 Date: Sun, 21 Jul 2024 16:59:16 +0200 Subject: [PATCH 11/11] refactor testInterrupt to testStopped #2977 --- .../com/iluwatar/twin/BallThreadTest.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java index 26cf78509dcf..dd3462fd7c56 100644 --- a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java +++ b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java @@ -24,12 +24,9 @@ */ package com.iluwatar.twin; -import static java.lang.Thread.UncaughtExceptionHandler; import static java.lang.Thread.sleep; import static java.time.Duration.ofMillis; import static org.junit.jupiter.api.Assertions.assertTimeout; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -100,21 +97,27 @@ void testResume() { } /** - * Verify if the {@link BallThread} is interruptible + * Verify if the {@link BallThread} can be stopped */ @Test - void testInterrupt() { + void testStopped() { assertTimeout(ofMillis(5000), () -> { final var ballThread = new BallThread(); - final var exceptionHandler = mock(UncaughtExceptionHandler.class); - ballThread.setUncaughtExceptionHandler(exceptionHandler); - ballThread.setTwin(mock(BallItem.class)); + final var twin = mock(BallItem.class); + ballThread.setTwin(twin); ballThread.start(); - ballThread.interrupt(); + + + sleep(300); + verify(twin, atLeastOnce()).draw(); + verify(twin, atLeastOnce()).move(); + + // Stop the thread + ballThread.stopMe(); ballThread.join(); - verify(exceptionHandler).uncaughtException(eq(ballThread), any(RuntimeException.class)); - verifyNoMoreInteractions(exceptionHandler); + // Ensure that the thread has stopped and no more interactions occur + verifyNoMoreInteractions(twin); }); } } \ No newline at end of file