Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix busy-waiting loops #2977 #2999

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions commander/src/main/java/com/iluwatar/commander/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -66,46 +67,54 @@ public interface HandleErrorIssue<T> {
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private final ScheduledExecutorService scheduler;

Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
long maxDelay, Predicate<Exception>... ignoreTests) {
long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
this.handleError = handleError;
this.maxAttempts = maxAttempts;
this.maxDelay = maxDelay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
this.scheduler = Executors.newScheduledThreadPool(1);
}

/**
* Performing the operation with retries.
*
* @param list is the exception list
* @param obj is the parameter to be passed into handleIsuue method
* @param obj is the parameter to be passed into handleIssue method
*/

public void perform(List<Exception> list, T obj) {
do {
attempts.set(0); // reset attempts before starting
executeWithRetry(list, obj);
}

private void executeWithRetry(List<Exception> list, T obj) {
scheduler.schedule(() -> {
try {
op.operation(list);
return;
} catch (Exception e) {
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
return; //return here... don't go further
}
try {
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
Thread.sleep(delay);
} catch (InterruptedException f) {
//ignore
errors.add(e);
if (attempts.incrementAndGet() >= maxAttempts || !test.test(e)) {
handleError.handleIssue(obj, e);
scheduler.shutdown();
} else {
long testDelay = (long) Math.pow(2, attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, maxDelay);
executeWithRetry(list, obj);
}
}
} while (true);
}, calculateDelay(), TimeUnit.MILLISECONDS);
}

private long calculateDelay() {
if (attempts.get() == 0) {
return 0;
}
long testDelay = (long) Math.pow(2, attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
return Math.min(testDelay, maxDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Responsible for collecting and buffering logs from different services.
* Once the logs reach a certain threshold or after a certain time interval,
* they are flushed to the central log store. This class ensures logs are collected
* and processed asynchronously and efficiently, providing both an immediate collection
* they are flushed to the central log store. This class ensures logs are
* collected
* and processed asynchronously and efficiently, providing both an immediate
* collection
* and periodic flushing.
*/
@Slf4j
Expand All @@ -45,14 +45,14 @@ public class LogAggregator {
private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final AtomicInteger logCount = new AtomicInteger(0);

/**
* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
* @param minLogLevel min log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
Expand Down Expand Up @@ -86,15 +86,23 @@ public void collectLog(LogEntry logEntry) {
/**
* Stops the log aggregator service and flushes any remaining logs to
* the central log store.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
public void stop() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
}
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
LOGGER.error("Log aggregator thread interrupted.", e);
} finally {
flushBuffer();
}
flushBuffer();
}

private void flushBuffer() {
Expand All @@ -106,15 +114,6 @@ private void flushBuffer() {
}

private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
scheduler.scheduleAtFixedRate(this::flushBuffer, 5, 5, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.iluwatar.queue.load.leveling;

import lombok.extern.slf4j.Slf4j;

/**
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
* ServiceExecutor class. This class will pick up Messages one by one from the
* Blocking Queue and
* process them.
*/
@Slf4j
public class ServiceExecutor implements Runnable {

private final MessageQueue msgQueue;
private volatile boolean isRunning = true;

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
Expand All @@ -42,21 +45,31 @@ public ServiceExecutor(MessageQueue msgQueue) {
/**
* The ServiceExecutor thread will retrieve each message and process it.
*/
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
while (isRunning) {
var msg = msgQueue.retrieveMsg();

if (null != msg) {
if (msg != null) {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
}

// Simulating processing time
Thread.sleep(1000);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Reset interrupt status
LOGGER.error("ServiceExecutor thread interrupted", e);
}
}

/**
* Stops the execution of the ServiceExecutor thread.
*/
public void stop() {
isRunning = false;
}
}
56 changes: 39 additions & 17 deletions retry/src/main/java/com/iluwatar/retry/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

/**
* Decorates {@link BusinessOperation business operation} with "retry" capabilities.
* Decorates {@link BusinessOperation business operation} with "retry"
* capabilities.
*
* @param <T> the remote op's return type
*/
Expand All @@ -43,29 +48,31 @@ public final class Retry<T> implements BusinessOperation<T> {
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;
private final ScheduledExecutorService scheduler;

/**
* Ctor.
*
* @param op the {@link BusinessOperation} to retry
* @param maxAttempts number of times to retry
* @param delay delay (in milliseconds) between attempts
* @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions
* @param ignoreTests tests to check whether the remote exception can be
* ignored. No exceptions
* will be ignored if no tests are given
*/
@SafeVarargs
public Retry(
BusinessOperation<T> op,
int maxAttempts,
long delay,
Predicate<Exception>... ignoreTests
) {
Predicate<Exception>... ignoreTests) {
this.op = op;
this.maxAttempts = maxAttempts;
this.delay = delay;
this.attempts = new AtomicInteger();
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
this.scheduler = Executors.newScheduledThreadPool(1);
}

/**
Expand All @@ -88,22 +95,37 @@ public int attempts() {

@Override
public T perform() throws BusinessException {
do {
try {
return this.op.perform();
} catch (BusinessException e) {
this.errors.add(e);
CompletableFuture<T> future = new CompletableFuture<>();
performWithRetry(future);
try {
return future.get();
} catch (Exception e) {
throw new BusinessException("Operation failed after retries");
} finally {
scheduler.shutdown();
}
}

private void performWithRetry(CompletableFuture<T> future) {
scheduler.schedule(() -> {
try {
future.complete(this.op.perform());
} catch (Exception e) {
this.errors.add((Exception) e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
throw e;
}

try {
Thread.sleep(this.delay);
} catch (InterruptedException f) {
//ignore
future.completeExceptionally(e);
scheduler.shutdown();
} else {
performWithRetry(future);
}
}
} while (true);
}, calculateDelay(), TimeUnit.MILLISECONDS);
}

private long calculateDelay() {
if (attempts.get() == 0) {
return 0;
}
return delay;
}
}
Loading
Loading