From 1865eab634f69985820da0dc30e3823365f96adb Mon Sep 17 00:00:00 2001 From: ikiler Date: Wed, 31 Jan 2024 21:04:34 +0800 Subject: [PATCH 1/3] Optimized the heartbeat of the cluster --- .../controller/ClusterInstanceController.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java index 71a71a3b38..e2d36999b1 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java @@ -29,6 +29,9 @@ import org.dinky.service.ClusterInstanceService; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.DeleteMapping; @@ -44,6 +47,7 @@ import cn.dev33.satoken.annotation.SaCheckPermission; import cn.dev33.satoken.annotation.SaMode; +import cn.hutool.core.thread.ThreadUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; @@ -195,12 +199,18 @@ public Result> listSessionEnable() { @Log(title = "Cluster Instance Heartbeat", businessType = BusinessType.UPDATE) @ApiOperation("Cluster Instance Heartbeat") @SaCheckPermission(value = {PermissionConstants.REGISTRATION_CLUSTER_INSTANCE_HEARTBEATS}) - public Result heartbeat() { + public Result heartbeat() { List clusterInstances = clusterInstanceService.list(); - for (ClusterInstance clusterInstance : clusterInstances) { - clusterInstanceService.registersCluster(clusterInstance); - } - return Result.succeed(Status.CLUSTER_INSTANCE_HEARTBEAT_SUCCESS); + ExecutorService executor = ThreadUtil.newExecutor(Math.min(clusterInstances.size(), 10)); + List> futures = clusterInstances.stream() + .map(c -> CompletableFuture.supplyAsync( + () -> clusterInstanceService.registersCluster(c).getStatus(), executor)) + .collect(Collectors.toList()); + long aliveCount = futures.stream() + .map(CompletableFuture::join) + .filter(x -> x == 1) + .count(); + return Result.succeed(aliveCount, Status.CLUSTER_INSTANCE_HEARTBEAT_SUCCESS); } /** From a0c6f626da3cb7561219052decc8c2ca4b51f0da Mon Sep 17 00:00:00 2001 From: ikiler Date: Wed, 31 Jan 2024 21:08:28 +0800 Subject: [PATCH 2/3] Optimized the heartbeat of the cluster --- .../controller/ClusterInstanceController.java | 12 +----------- .../dinky/service/ClusterInstanceService.java | 6 ++++++ .../impl/ClusterInstanceServiceImpl.java | 18 ++++++++++++++++++ 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java index e2d36999b1..0696f0ee0e 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java @@ -200,17 +200,7 @@ public Result> listSessionEnable() { @ApiOperation("Cluster Instance Heartbeat") @SaCheckPermission(value = {PermissionConstants.REGISTRATION_CLUSTER_INSTANCE_HEARTBEATS}) public Result heartbeat() { - List clusterInstances = clusterInstanceService.list(); - ExecutorService executor = ThreadUtil.newExecutor(Math.min(clusterInstances.size(), 10)); - List> futures = clusterInstances.stream() - .map(c -> CompletableFuture.supplyAsync( - () -> clusterInstanceService.registersCluster(c).getStatus(), executor)) - .collect(Collectors.toList()); - long aliveCount = futures.stream() - .map(CompletableFuture::join) - .filter(x -> x == 1) - .count(); - return Result.succeed(aliveCount, Status.CLUSTER_INSTANCE_HEARTBEAT_SUCCESS); + return Result.succeed(clusterInstanceService.heartbeat(), Status.CLUSTER_INSTANCE_HEARTBEAT_SUCCESS); } /** diff --git a/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java index b0654f69f5..df240ae441 100644 --- a/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java @@ -138,4 +138,10 @@ public interface ClusterInstanceService extends ISuperService { * @return {@link Boolean} true: has relationship, false: no relationship */ boolean hasRelationShip(Integer id); + + /** + * heartbeat + * @return {@link Long} + */ + Long heartbeat(); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java index 4c56bae45f..ca08fcda1a 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java @@ -19,6 +19,7 @@ package org.dinky.service.impl; +import cn.hutool.core.thread.ThreadUtil; import org.dinky.assertion.Assert; import org.dinky.assertion.Asserts; import org.dinky.cluster.FlinkCluster; @@ -48,6 +49,9 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -275,6 +279,20 @@ public boolean hasRelationShip(Integer id) { .isEmpty(); } + @Override + public Long heartbeat() { + List clusterInstances = this.list(); + ExecutorService executor = ThreadUtil.newExecutor(Math.min(clusterInstances.size(), 10)); + List> futures = clusterInstances.stream() + .map(c -> CompletableFuture.supplyAsync( + () -> this.registersCluster(c).getStatus(), executor)) + .collect(Collectors.toList()); + return futures.stream() + .map(CompletableFuture::join) + .filter(x -> x == 1) + .count(); + } + private boolean checkHealth(ClusterInstance clusterInstance) { FlinkClusterInfo info = checkHeartBeat(clusterInstance.getHosts(), clusterInstance.getJobManagerHost()); if (!info.isEffective()) { From 307c13fba5fe3780c9876fc1cd8015415da24004 Mon Sep 17 00:00:00 2001 From: ikiler Date: Wed, 31 Jan 2024 21:12:39 +0800 Subject: [PATCH 3/3] Optimized the heartbeat of the cluster --- .../org/dinky/controller/ClusterInstanceController.java | 4 ---- .../dinky/service/impl/ClusterInstanceServiceImpl.java | 8 ++------ 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java index 0696f0ee0e..fbd2f7922c 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/ClusterInstanceController.java @@ -29,9 +29,6 @@ import org.dinky.service.ClusterInstanceService; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.DeleteMapping; @@ -47,7 +44,6 @@ import cn.dev33.satoken.annotation.SaCheckPermission; import cn.dev33.satoken.annotation.SaMode; -import cn.hutool.core.thread.ThreadUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java index c7bbf483d8..71b89816f0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java @@ -19,8 +19,6 @@ package org.dinky.service.impl; -import cn.hutool.core.thread.ThreadUtil; -import org.dinky.assertion.Assert; import org.dinky.assertion.Asserts; import org.dinky.assertion.DinkyAssert; import org.dinky.cluster.FlinkCluster; @@ -62,6 +60,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; @@ -288,10 +287,7 @@ public Long heartbeat() { .map(c -> CompletableFuture.supplyAsync( () -> this.registersCluster(c).getStatus(), executor)) .collect(Collectors.toList()); - return futures.stream() - .map(CompletableFuture::join) - .filter(x -> x == 1) - .count(); + return futures.stream().map(CompletableFuture::join).filter(x -> x == 1).count(); } private boolean checkHealth(ClusterInstance clusterInstance) {