Skip to content

Commit

Permalink
[Fix] Fix duplicate exception alert (#3109)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Jan 31, 2024
1 parent 46101d4 commit 541abf7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {

if (jobDataDto.isError()) {
builder.errorMsg(jobDataDto.getErrorMsg());
} else if (exceptions != null && ExceptionRule.isException(id, exceptions)) {
} else if (exceptions != null && ExceptionRule.isException(exceptions)) {
// The error message is too long to send an alarm,
// and only the first line of abnormal information is used
String err = Optional.ofNullable(exceptions.getRootException())
Expand All @@ -198,8 +198,8 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {
}

if (checkpoints != null) {
builder.checkpointCostTime(CheckpointsRule.checkpointTime(id, checkpoints))
.isCheckpointFailed(CheckpointsRule.checkFailed(id, checkpoints));
builder.checkpointCostTime(CheckpointsRule.checkpointTime(checkpoints))
.isCheckpointFailed(CheckpointsRule.checkFailed(checkpoints));
if (checkpoints.getCounts() != null) {
builder.checkpointFailedCount(checkpoints.getCounts().getNumberFailedCheckpoints())
.checkpointCompleteCount(checkpoints.getCounts().getNumberCompletedCheckpoints());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,94 +20,24 @@
package org.dinky.alert.rules;

import org.dinky.data.flink.checkpoint.CheckPointOverView;
import org.dinky.utils.TimeUtil;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.time.Duration;
import java.time.LocalDateTime;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CheckpointsRule {

private static final Logger logger = LoggerFactory.getLogger(CheckpointsRule.class);

private static final LoadingCache<String, Object> checkpointsCache =
CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(CacheLoader.from(key -> null));

/**
* Checks if a checkpoint has expired.
*
* @param latest The latest checkpoint node.
* @param jobInstanceID The key used to identify the checkpoint.
* @param ckKey The checkpoint key to check for expiration.
* @return True if the checkpoint has expired, false otherwise.
*/
private static boolean isExpire(CheckPointOverView latest, String jobInstanceID, String ckKey) {
logger.debug("checkpointTime key: {} ,checkpoints: {}, key: {}", jobInstanceID, latest, ckKey);

CheckPointOverView his = (CheckPointOverView) checkpointsCache.getIfPresent(jobInstanceID);

switch (ckKey) {
case "completed":
if (his != null) {
CheckPointOverView.CompletedCheckpointStatistics completedCheckpointStatistics =
his.getLatestCheckpoints().getCompletedCheckpointStatistics();
if (completedCheckpointStatistics != null) {
return Objects.equals(completedCheckpointStatistics.getStatus(), "completed");
}
}
return false;
case "failed":
CheckPointOverView.FailedCheckpointStatistics failedCheckpointStatistics = null;
if (his != null) {
failedCheckpointStatistics = his.getLatestCheckpoints().getFailedCheckpointStatistics();
}
long failureTimestamp = 0;
CheckPointOverView.LatestCheckpoints latestLatestCheckpoints = latest.getLatestCheckpoints();
if (latestLatestCheckpoints != null
&& latestLatestCheckpoints.getFailedCheckpointStatistics() != null) {
failureTimestamp = latestLatestCheckpoints
.getFailedCheckpointStatistics()
.getTriggerTimestamp();
}
if (null == latestLatestCheckpoints || 0 == failureTimestamp) {
return true;
}
long latestTime =
latestLatestCheckpoints.getFailedCheckpointStatistics().getTriggerTimestamp();
checkpointsCache.put(jobInstanceID, latest);
if (his != null) {
long hisTime = 0;
if (failedCheckpointStatistics != null) {
hisTime = failedCheckpointStatistics.getTriggerTimestamp();
}
return hisTime == latestTime || System.currentTimeMillis() - latestTime > 60000;
}
return false;

default:
return false;
}
}

/**
* Retrieves the checkpoint time for a specific key.
*
* @param key The key used to identify the checkpoint.
* @param checkpoints The checkpoints object containing relevant data.
* @return The checkpoint time, or null if the checkpoint has expired.
*/
public static Long checkpointTime(String key, CheckPointOverView checkpoints) {
if (isExpire(checkpoints, key, "completed")) {
return -1L;
}
public static Long checkpointTime(CheckPointOverView checkpoints) {

CheckPointOverView.LatestCheckpoints checkpointsLatestCheckpoints = checkpoints.getLatestCheckpoints();
if (null == checkpointsLatestCheckpoints
|| null == checkpointsLatestCheckpoints.getCompletedCheckpointStatistics()) {
Expand All @@ -122,11 +52,19 @@ public static Long checkpointTime(String key, CheckPointOverView checkpoints) {
/**
* Checks if a specific checkpoint has failed.
*
* @param key The key used to identify the checkpoint.
* @param checkpoints The checkpoints object containing relevant data.
* @return True if the checkpoint has failed, null if it has expired.
*/
public static Boolean checkFailed(String key, CheckPointOverView checkpoints) {
return !isExpire(checkpoints, key, "failed");
public static Boolean checkFailed(CheckPointOverView checkpoints) {
CheckPointOverView.LatestCheckpoints latestLatestCheckpoints = checkpoints.getLatestCheckpoints();
if (latestLatestCheckpoints != null && latestLatestCheckpoints.getFailedCheckpointStatistics() != null) {
long failureTimestamp =
latestLatestCheckpoints.getFailedCheckpointStatistics().getTriggerTimestamp();
LocalDateTime localDateTime = TimeUtil.toLocalDateTime(failureTimestamp);
LocalDateTime now = LocalDateTime.now();
long diff = Duration.between(localDateTime, now).toMinutes();
return diff <= 2;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,33 @@
package org.dinky.alert.rules;

import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
import org.dinky.utils.TimeUtil;

import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.time.Duration;
import java.time.LocalDateTime;

public class ExceptionRule {

private static final LoadingCache<String, Long> hisTime =
CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.SECONDS).build(CacheLoader.from(key -> null));

/**
* Executes a certain operation based on the provided key and exceptions object.
* This method is stored within the database, is called through SPEL, and is not an executable method
* @param jobinstanceId The key used to identify the operation.
* @param exceptions The exceptions object containing relevant data.
* @return True if the operation should be executed, false otherwise.
*/
public static Boolean isException(String jobinstanceId, FlinkJobExceptionsDetail exceptions) {
public static Boolean isException(FlinkJobExceptionsDetail exceptions) {

// If the exception is the same as the previous one, it will not be reported again
if (exceptions.getTimestamp() == null) {
return false;
}
long timestamp = exceptions.getTimestamp();
Long hisTimeIfPresent = hisTime.getIfPresent(jobinstanceId);
if (hisTimeIfPresent != null && hisTimeIfPresent == timestamp) {
LocalDateTime localDateTime = TimeUtil.toLocalDateTime(timestamp);
LocalDateTime now = LocalDateTime.now();
long diff = Duration.between(localDateTime, now).toMinutes();

// If the exception is older than 2 minutes, we don't care about it anymore.
if (diff >= 2) {
return false;
}
hisTime.put(jobinstanceId, timestamp);
if (exceptions.getRootException() != null) {
return !exceptions.getRootException().isEmpty();
} else {
Expand Down

0 comments on commit 541abf7

Please sign in to comment.