From 8be703297fe23f9c4245acf00efff552276fc2ec Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 17 Dec 2024 13:28:36 +0800 Subject: [PATCH] random node --- .../seatunnel/connectors/doris/rest/RestService.java | 10 +++++++--- .../connectors/doris/sink/writer/DorisSinkWriter.java | 9 +++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index 12af5f4d5bd..02be965ffba 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -55,7 +55,9 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -287,14 +289,16 @@ public static List findPartitions( stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); - String[] feNodes = dorisSourceConfig.getFrontends().split(","); - int feNodesNum = feNodes.length; + List feNodes = Arrays.asList(dorisSourceConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + int feNodesNum = feNodes.size(); String resStr = null; for (int i = 0; i < feNodesNum; i++) { try { HttpPost httpPost = - new HttpPost(getUriStr(feNodes[i], dorisSourceTable, logger) + QUERY_PLAN); + new HttpPost( + getUriStr(feNodes.get(i), dorisSourceTable, logger) + QUERY_PLAN); httpPost.setEntity(stringEntity); resStr = send(dorisSourceConfig, httpPost, logger); break; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index a6a88be1773..2d40ea7bdd0 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -98,14 +98,15 @@ public DorisSinkWriter( private void initializeLoad() { - String[] feNodes = dorisSinkConfig.getFrontends().split(","); - int feNodesNum = feNodes.length; + List feNodes = Arrays.asList(dorisSinkConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + int feNodesNum = feNodes.size(); for (int i = 0; i < feNodesNum; i++) { try { this.dorisStreamLoad = new DorisStreamLoad( - feNodes[i], + feNodes.get(i), catalogTable.getTablePath(), dorisSinkConfig, labelGenerator, @@ -121,7 +122,7 @@ private void initializeLoad() { } log.error( "stream load error for feNode: {} with exception: {}", - feNodes[i], + feNodes.get(i), e.getMessage()); } }