diff --git a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java index 71a71a3b38..fbd2f7922c 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java @@ -195,12 +195,8 @@ public Result> listSessionEnable() { @Log(title = "Cluster Instance Heartbeat", businessType = BusinessType.UPDATE) @ApiOperation("Cluster Instance Heartbeat") @SaCheckPermission(value = {PermissionConstants.REGISTRATION_CLUSTER_INSTANCE_HEARTBEATS}) - public Result heartbeat() { - List clusterInstances = clusterInstanceService.list(); - for (ClusterInstance clusterInstance : clusterInstances) { - clusterInstanceService.registersCluster(clusterInstance); - } - return Result.succeed(Status.CLUSTER_INSTANCE_HEARTBEAT_SUCCESS); + public Result heartbeat() { + return Result.succeed(clusterInstanceService.heartbeat(), Status.CLUSTER_INSTANCE_HEARTBEAT_SUCCESS); } /** diff --git a/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java index b0654f69f5..df240ae441 100644 --- a/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java @@ -138,4 +138,10 @@ public interface ClusterInstanceService extends ISuperService { * @return {@link Boolean} true: has relationship, false: no relationship */ boolean hasRelationShip(Integer id); + + /** + * heartbeat + * @return {@link Long} + */ + Long heartbeat(); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java index 65df48a011..71b89816f0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java @@ -48,6 +48,9 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -57,6 +60,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; @@ -275,6 +279,17 @@ public boolean hasRelationShip(Integer id) { .isEmpty(); } + @Override + public Long heartbeat() { + List clusterInstances = this.list(); + ExecutorService executor = ThreadUtil.newExecutor(Math.min(clusterInstances.size(), 10)); + List> futures = clusterInstances.stream() + .map(c -> CompletableFuture.supplyAsync( + () -> this.registersCluster(c).getStatus(), executor)) + .collect(Collectors.toList()); + return futures.stream().map(CompletableFuture::join).filter(x -> x == 1).count(); + } + private boolean checkHealth(ClusterInstance clusterInstance) { FlinkClusterInfo info = checkHeartBeat(clusterInstance.getHosts(), clusterInstance.getJobManagerHost()); if (!info.isEffective()) {