diff --git a/commander/src/main/java/com/iluwatar/commander/Retry.java b/commander/src/main/java/com/iluwatar/commander/Retry.java index 71614668254b..6f31cadc7417 100644 --- a/commander/src/main/java/com/iluwatar/commander/Retry.java +++ b/commander/src/main/java/com/iluwatar/commander/Retry.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -66,9 +67,10 @@ public interface HandleErrorIssue { private final AtomicInteger attempts; private final Predicate test; private final List errors; + private final ScheduledExecutorService scheduler; Retry(Operation op, HandleErrorIssue handleError, int maxAttempts, - long maxDelay, Predicate... ignoreTests) { + long maxDelay, Predicate... ignoreTests) { this.op = op; this.handleError = handleError; this.maxAttempts = maxAttempts; @@ -76,36 +78,43 @@ public interface HandleErrorIssue { this.attempts = new AtomicInteger(); this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); + this.scheduler = Executors.newScheduledThreadPool(1); } /** * Performing the operation with retries. * * @param list is the exception list - * @param obj is the parameter to be passed into handleIsuue method + * @param obj is the parameter to be passed into handleIssue method */ - public void perform(List list, T obj) { - do { + attempts.set(0); // reset attempts before starting + executeWithRetry(list, obj); + } + + private void executeWithRetry(List list, T obj) { + scheduler.schedule(() -> { try { op.operation(list); - return; } catch (Exception e) { - this.errors.add(e); - if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - this.handleError.handleIssue(obj, e); - return; //return here... don't go further - } - try { - long testDelay = - (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); - long delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore + errors.add(e); + if (attempts.incrementAndGet() >= maxAttempts || !test.test(e)) { + handleError.handleIssue(obj, e); + scheduler.shutdown(); + } else { + long testDelay = (long) Math.pow(2, attempts.intValue()) * 1000 + RANDOM.nextInt(1000); + long delay = Math.min(testDelay, maxDelay); + executeWithRetry(list, obj); } } - } while (true); + }, calculateDelay(), TimeUnit.MILLISECONDS); } + private long calculateDelay() { + if (attempts.get() == 0) { + return 0; + } + long testDelay = (long) Math.pow(2, attempts.intValue()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, maxDelay); + } } diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 37417e21267d..3bb74be907ca 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -24,18 +24,18 @@ */ package com.iluwatar.logaggregation; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + /** * Responsible for collecting and buffering logs from different services. * Once the logs reach a certain threshold or after a certain time interval, - * they are flushed to the central log store. This class ensures logs are collected - * and processed asynchronously and efficiently, providing both an immediate collection + * they are flushed to the central log store. This class ensures logs are + * collected + * and processed asynchronously and efficiently, providing both an immediate + * collection * and periodic flushing. */ @Slf4j @@ -45,14 +45,14 @@ public class LogAggregator { private final CentralLogStore centralLogStore; private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final AtomicInteger logCount = new AtomicInteger(0); /** * constructor of LogAggregator. * * @param centralLogStore central log store implement - * @param minLogLevel min log level to store log + * @param minLogLevel min log level to store log */ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { this.centralLogStore = centralLogStore; @@ -86,15 +86,23 @@ public void collectLog(LogEntry logEntry) { /** * Stops the log aggregator service and flushes any remaining logs to * the central log store. - * - * @throws InterruptedException If any thread has interrupted the current thread. */ - public void stop() throws InterruptedException { - executorService.shutdownNow(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { - LOGGER.error("Log aggregator did not terminate."); + public void stop() { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("Log aggregator did not terminate."); + } + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + LOGGER.error("Log aggregator thread interrupted.", e); + } finally { + flushBuffer(); } - flushBuffer(); } private void flushBuffer() { @@ -106,15 +114,6 @@ private void flushBuffer() { } private void startBufferFlusher() { - executorService.execute(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(5000); // Flush every 5 seconds. - flushBuffer(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }); + scheduler.scheduleAtFixedRate(this::flushBuffer, 5, 5, TimeUnit.SECONDS); } } diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 02530042b370..d78113d302d6 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -22,18 +22,21 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ + package com.iluwatar.queue.load.leveling; import lombok.extern.slf4j.Slf4j; /** - * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and + * ServiceExecutor class. This class will pick up Messages one by one from the + * Blocking Queue and * process them. */ @Slf4j public class ServiceExecutor implements Runnable { private final MessageQueue msgQueue; + private volatile boolean isRunning = true; public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; @@ -42,21 +45,31 @@ public ServiceExecutor(MessageQueue msgQueue) { /** * The ServiceExecutor thread will retrieve each message and process it. */ + @Override public void run() { try { - while (!Thread.currentThread().isInterrupted()) { + while (isRunning) { var msg = msgQueue.retrieveMsg(); - if (null != msg) { + if (msg != null) { LOGGER.info(msg + " is served."); } else { LOGGER.info("Service Executor: Waiting for Messages to serve .. "); } + // Simulating processing time Thread.sleep(1000); } - } catch (Exception e) { - LOGGER.error(e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Reset interrupt status + LOGGER.error("ServiceExecutor thread interrupted", e); } } + + /** + * Stops the execution of the ServiceExecutor thread. + */ + public void stop() { + isRunning = false; + } } diff --git a/retry/src/main/java/com/iluwatar/retry/Retry.java b/retry/src/main/java/com/iluwatar/retry/Retry.java index ad9580454993..0a210b2e917b 100644 --- a/retry/src/main/java/com/iluwatar/retry/Retry.java +++ b/retry/src/main/java/com/iluwatar/retry/Retry.java @@ -28,11 +28,16 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; /** - * Decorates {@link BusinessOperation business operation} with "retry" capabilities. + * Decorates {@link BusinessOperation business operation} with "retry" + * capabilities. * * @param the remote op's return type */ @@ -43,6 +48,7 @@ public final class Retry implements BusinessOperation { private final AtomicInteger attempts; private final Predicate test; private final List errors; + private final ScheduledExecutorService scheduler; /** * Ctor. @@ -50,7 +56,8 @@ public final class Retry implements BusinessOperation { * @param op the {@link BusinessOperation} to retry * @param maxAttempts number of times to retry * @param delay delay (in milliseconds) between attempts - * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions + * @param ignoreTests tests to check whether the remote exception can be + * ignored. No exceptions * will be ignored if no tests are given */ @SafeVarargs @@ -58,14 +65,14 @@ public Retry( BusinessOperation op, int maxAttempts, long delay, - Predicate... ignoreTests - ) { + Predicate... ignoreTests) { this.op = op; this.maxAttempts = maxAttempts; this.delay = delay; this.attempts = new AtomicInteger(); this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); + this.scheduler = Executors.newScheduledThreadPool(1); } /** @@ -88,22 +95,37 @@ public int attempts() { @Override public T perform() throws BusinessException { - do { - try { - return this.op.perform(); - } catch (BusinessException e) { - this.errors.add(e); + CompletableFuture future = new CompletableFuture<>(); + performWithRetry(future); + try { + return future.get(); + } catch (Exception e) { + throw new BusinessException("Operation failed after retries"); + } finally { + scheduler.shutdown(); + } + } + private void performWithRetry(CompletableFuture future) { + scheduler.schedule(() -> { + try { + future.complete(this.op.perform()); + } catch (Exception e) { + this.errors.add((Exception) e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; - } - - try { - Thread.sleep(this.delay); - } catch (InterruptedException f) { - //ignore + future.completeExceptionally(e); + scheduler.shutdown(); + } else { + performWithRetry(future); } } - } while (true); + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay() { + if (attempts.get() == 0) { + return 0; + } + return delay; } } diff --git a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java index 1661095b7298..469e67fe943d 100644 --- a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java +++ b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java @@ -29,11 +29,13 @@ import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; /** - * Decorates {@link BusinessOperation business operation} with "retry" capabilities. + * Decorates {@link BusinessOperation business operation} with "retry" + * capabilities. * * @param the remote op's return type */ @@ -45,13 +47,16 @@ public final class RetryExponentialBackoff implements BusinessOperation { private final AtomicInteger attempts; private final Predicate test; private final List errors; + private final ScheduledExecutorService scheduler; /** * Ctor. * * @param op the {@link BusinessOperation} to retry * @param maxAttempts number of times to retry - * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions + * @param maxDelay maximum delay between retries (in milliseconds) + * @param ignoreTests tests to check whether the remote exception can be + * ignored. No exceptions * will be ignored if no tests are given */ @SafeVarargs @@ -59,14 +64,14 @@ public RetryExponentialBackoff( BusinessOperation op, int maxAttempts, long maxDelay, - Predicate... ignoreTests - ) { + Predicate... ignoreTests) { this.op = op; this.maxAttempts = maxAttempts; this.maxDelay = maxDelay; this.attempts = new AtomicInteger(); this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); + this.scheduler = Executors.newScheduledThreadPool(1); // Create a single-threaded scheduled executor } /** @@ -89,6 +94,23 @@ public int attempts() { @Override public T perform() throws BusinessException { + try { + return executeWithRetry(); + } finally { + scheduler.shutdown(); // Shutdown the scheduler when no longer needed + } + } + + private T executeWithRetry() throws BusinessException { + ScheduledFuture future = scheduler.schedule(this::retryOperation, 0, TimeUnit.MILLISECONDS); + try { + return future.get(); // Wait for the operation to complete + } catch (InterruptedException | ExecutionException e) { + throw new BusinessException("Retry operation failed"); + } + } + + private T retryOperation() throws BusinessException { do { try { return this.op.perform(); @@ -100,11 +122,12 @@ public T perform() throws BusinessException { } try { - var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); - var delay = Math.min(testDelay, this.maxDelay); + long testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); + long delay = Math.min(testDelay, this.maxDelay); Thread.sleep(delay); } catch (InterruptedException f) { - //ignore + Thread.currentThread().interrupt(); // Reset interrupt status + throw new BusinessException("Thread interrupted while retrying operation"); } } } while (true); diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index a3c66d3ff634..4c297edb200a 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -31,23 +31,38 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** - * The server session pattern is a behavioral design pattern concerned with assigning the responsibility - * of storing session data on the server side. Within the context of stateless protocols like HTTP all - * requests are isolated events independent of previous requests. In order to create sessions during - * user-access for a particular web application various methods can be used, such as cookies. Cookies - * are a small piece of data that can be sent between client and server on every request and response - * so that the server can "remember" the previous requests. In general cookies can either store the session - * data or the cookie can store a session identifier and be used to access appropriate data from a persistent - * storage. In the latter case the session data is stored on the server-side and appropriate data is + * The server session pattern is a behavioral design pattern concerned with + * assigning the responsibility + * of storing session data on the server side. Within the context of stateless + * protocols like HTTP all + * requests are isolated events independent of previous requests. In order to + * create sessions during + * user-access for a particular web application various methods can be used, + * such as cookies. Cookies + * are a small piece of data that can be sent between client and server on every + * request and response + * so that the server can "remember" the previous requests. In general cookies + * can either store the session + * data or the cookie can store a session identifier and be used to access + * appropriate data from a persistent + * storage. In the latter case the session data is stored on the server-side and + * appropriate data is * identified by the cookie sent from a client's request. * This project demonstrates the latter case. - * In the following example the ({@link App}) class starts a server and assigns ({@link LoginHandler}) - * class to handle login request. When a user logs in a session identifier is created and stored for future - * requests in a list. When a user logs out the session identifier is deleted from the list along with - * the appropriate user session data, which is handle by the ({@link LogoutHandler}) class. + * In the following example the ({@link App}) class starts a server and assigns + * ({@link LoginHandler}) + * class to handle login request. When a user logs in a session identifier is + * created and stored for future + * requests in a list. When a user logs out the session identifier is deleted + * from the list along with + * the appropriate user session data, which is handle by the + * ({@link LogoutHandler}) class. */ @Slf4j @@ -60,11 +75,12 @@ public class App { /** * Main entry point. + * * @param args arguments * @throws IOException ex */ public static void main(String[] args) throws IOException { - // Create HTTP server listening on port 8000 + // Create HTTP server listening on port 8080 HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); // Set up session management endpoints @@ -74,38 +90,33 @@ public static void main(String[] args) throws IOException { // Start the server server.start(); - // Start background task to check for expired sessions + // Start scheduled task to check for expired sessions sessionExpirationTask(); LOGGER.info("Server started. Listening on port 8080..."); } private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - LOGGER.info("Session expiration checker started..."); - Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); - } - } + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + Runnable task = () -> { + LOGGER.info("Session expiration checker started..."); + Instant currentTime = Instant.now(); + synchronized (sessions) { + synchronized (sessionCreationTimes) { + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + sessions.remove(entry.getKey()); + iterator.remove(); } } - LOGGER.info("Session expiration checker finished!"); - } catch (InterruptedException e) { - LOGGER.error("An error occurred: ", e); - Thread.currentThread().interrupt(); } } - }).start(); + LOGGER.info("Session expiration checker finished!"); + }; + + scheduler.scheduleAtFixedRate(task, 0, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } -} \ No newline at end of file +} diff --git a/server-session/src/test/java/com.iluwatar.sessionserver/LoginHandlerTest.java b/server-session/src/test/java/com.iluwatar.sessionserver/LoginHandlerTest.java index db5445f88483..ff6087a38a86 100644 --- a/server-session/src/test/java/com.iluwatar.sessionserver/LoginHandlerTest.java +++ b/server-session/src/test/java/com.iluwatar.sessionserver/LoginHandlerTest.java @@ -43,40 +43,40 @@ */ public class LoginHandlerTest { - private LoginHandler loginHandler; - //private Headers headers; - private Map sessions; - private Map sessionCreationTimes; + private LoginHandler loginHandler; + // private Headers headers; + private Map sessions; + private Map sessionCreationTimes; - @Mock - private HttpExchange exchange; + @Mock + private HttpExchange exchange; - /** - * Setup tests. - */ - @BeforeEach - public void setUp() { - MockitoAnnotations.initMocks(this); - sessions = new HashMap<>(); - sessionCreationTimes = new HashMap<>(); - loginHandler = new LoginHandler(sessions, sessionCreationTimes); - } + /** + * Setup tests. + */ + @BeforeEach + public void setUp() { + MockitoAnnotations.initMocks(this); + sessions = new HashMap<>(); + sessionCreationTimes = new HashMap<>(); + loginHandler = new LoginHandler(sessions, sessionCreationTimes); + } - @Test - public void testHandle() { + @Test + public void testHandle() { - //assemble - ByteArrayOutputStream outputStream = - new ByteArrayOutputStream(); //Exchange object is mocked so OutputStream must be manually created - when(exchange.getResponseHeaders()).thenReturn( - new Headers()); //Exchange object is mocked so Header object must be manually created - when(exchange.getResponseBody()).thenReturn(outputStream); + // assemble + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Exchange object is mocked so OutputStream + // must be manually created + when(exchange.getResponseHeaders()).thenReturn(new Headers()); // Exchange object is mocked so Header object + // must be manually created + when(exchange.getResponseBody()).thenReturn(outputStream); - //act - loginHandler.handle(exchange); + // act + loginHandler.handle(exchange); - //assert - String[] response = outputStream.toString().split("Session ID: "); - assertEquals(sessions.entrySet().toArray()[0].toString().split("=1")[0], response[1]); - } + // assert + String[] response = outputStream.toString().split("Session ID: "); + assertEquals(sessions.entrySet().toArray()[0].toString().split("=1")[0], response[1]); + } } \ No newline at end of file diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 9d4d9cf71a76..7f347f415f62 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -28,8 +28,10 @@ import lombok.extern.slf4j.Slf4j; /** - * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend - * and resume. It holds the reference of {@link BallItem} to delegate the draw task. + * This class is a UI thread for drawing the {@link BallItem}, and provide the + * method for suspend + * and resume. It holds the reference of {@link BallItem} to delegate the draw + * task. */ @Slf4j @@ -46,33 +48,45 @@ public class BallThread extends Thread { * Run the thread. */ public void run() { - - while (isRunning) { - if (!isSuspended) { + synchronized (this) { + while (isRunning) { + while (isSuspended) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread was interrupted", e); + return; + } + } twin.draw(); twin.move(); - } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread was interrupted during sleep", e); + return; + } } } } - public void suspendMe() { + public synchronized void suspendMe() { isSuspended = true; - LOGGER.info("Begin to suspend BallThread"); + LOGGER.info("Suspending BallThread"); } - public void resumeMe() { + public synchronized void resumeMe() { isSuspended = false; - LOGGER.info("Begin to resume BallThread"); + notify(); + LOGGER.info("Resuming BallThread"); } - public void stopMe() { - this.isRunning = false; - this.isSuspended = true; + public synchronized void stopMe() { + isRunning = false; + isSuspended = false; + notify(); + LOGGER.info("Stopping BallThread"); } } -