From c5e87be755ed0fe735b1ce1ef2e7dff943906482 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 8 Dec 2024 12:06:10 +0800 Subject: [PATCH 01/10] avoid oom --- config/seatunnel.yaml | 3 ++ .../engine/common/config/EngineConfig.java | 4 +++ .../server/CoordinatorServiceConfig.java | 30 +++++++++++++++++++ .../config/server/ServerConfigOptions.java | 18 +++++++++++ .../engine/server/CoordinatorService.java | 4 +-- 5 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 79a713a71e0..f1bd9136a95 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -25,6 +25,9 @@ seatunnel: print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true + coordinator-service: + core-thread-num: 10 + max-thread-num: 1000 checkpoint: interval: 10000 timeout: 60000 diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 99a721109bd..dd043aba55f 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig; +import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; @@ -57,6 +58,9 @@ public class EngineConfig { private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue(); + private CoordinatorServiceConfig coordinatorServiceConfig = + ServerConfigOptions.COORDINATOR_SERVICE.defaultValue(); + private ConnectorJarStorageConfig connectorJarStorageConfig = ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue(); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java new file mode 100644 index 00000000000..7079cf16c31 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.config.server; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class CoordinatorServiceConfig implements Serializable { + + private int coreThreadNum = ServerConfigOptions.CORE_THREAD_NUM.defaultValue(); + + private int maxThreadNum = ServerConfigOptions.MAX_THREAD_NUM.defaultValue(); +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 7153cccd0f5..183db3332c8 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -127,6 +127,12 @@ public class ServerConfigOptions { .defaultValue(new CheckpointConfig()) .withDescription("The checkpoint configuration."); + public static final Option COORDINATOR_SERVICE = + Options.key("coordinator-service") + .type(new TypeReference() {}) + .defaultValue(new CoordinatorServiceConfig()) + .withDescription("The coordinator service configuration."); + public static final Option> CHECKPOINT_STORAGE_PLUGIN_CONFIG = Options.key("plugin-config") .type(new TypeReference>() {}) @@ -281,6 +287,18 @@ public class ServerConfigOptions { .defaultValue(new HttpConfig()) .withDescription("The http configuration."); + public static final Option CORE_THREAD_NUM = + Options.key("core-thread-num") + .intType() + .defaultValue(10) + .withDescription("The interval (in seconds) of job metrics backups"); + + public static final Option MAX_THREAD_NUM = + Options.key("max-thread-num") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("The interval (in seconds) of job metrics backups"); + public static final String EVENT_REPORT_HTTP = "event-report-http"; public static final String EVENT_REPORT_HTTP_URL = "url"; public static final String EVENT_REPORT_HTTP_HEADERS = "headers"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index bfa4379ad50..380aa08b3b9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -202,8 +202,8 @@ public CoordinatorService( this.logger = nodeEngine.getLogger(getClass()); this.executorService = new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, + engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(), + engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(), 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), From c13d7b6249a1a4375cbf80d1e48b9b121198f0ee Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 8 Dec 2024 12:21:45 +0800 Subject: [PATCH 02/10] avoid oom --- config/seatunnel.yaml | 2 +- .../engine/common/config/server/ServerConfigOptions.java | 4 ++-- .../seatunnel/engine/server/CoordinatorServiceTest.java | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index f1bd9136a95..d2d9369292f 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -26,7 +26,7 @@ seatunnel: slot-service: dynamic-slot: true coordinator-service: - core-thread-num: 10 + core-thread-num: 30 max-thread-num: 1000 checkpoint: interval: 10000 diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 183db3332c8..620e1c3c218 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -291,13 +291,13 @@ public class ServerConfigOptions { Options.key("core-thread-num") .intType() .defaultValue(10) - .withDescription("The interval (in seconds) of job metrics backups"); + .withDescription("The core thread num of coordinator service"); public static final Option MAX_THREAD_NUM = Options.key("max-thread-num") .intType() .defaultValue(Integer.MAX_VALUE) - .withDescription("The interval (in seconds) of job metrics backups"); + .withDescription("The max thread num of coordinator service"); public static final String EVENT_REPORT_HTTP = "event-report-http"; public static final String EVENT_REPORT_HTTP_URL = "url"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index 2053ca01a8f..c6fd0fa22fe 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -138,6 +138,8 @@ public void testClearCoordinatorService() { // because runningJobMasterMap is empty and we have no JobHistoryServer, so return // UNKNOWABLE. Assertions.assertTrue(JobStatus.UNKNOWABLE.equals(coordinatorService.getJobStatus(jobId))); + Assertions.assertEquals(30, coordinatorService.getThreadPoolStatusMetrics().getCorePoolSize()); + Assertions.assertEquals(1000, coordinatorService.getThreadPoolStatusMetrics().getMaximumPoolSize()); coordinatorServiceTest.shutdown(); } From 637477ad2eed4027ed4a1f992292e490c7746f0f Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 8 Dec 2024 12:47:33 +0800 Subject: [PATCH 03/10] fix style --- .../seatunnel/engine/server/CoordinatorServiceTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index c6fd0fa22fe..850a2fcf614 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -138,8 +138,10 @@ public void testClearCoordinatorService() { // because runningJobMasterMap is empty and we have no JobHistoryServer, so return // UNKNOWABLE. Assertions.assertTrue(JobStatus.UNKNOWABLE.equals(coordinatorService.getJobStatus(jobId))); - Assertions.assertEquals(30, coordinatorService.getThreadPoolStatusMetrics().getCorePoolSize()); - Assertions.assertEquals(1000, coordinatorService.getThreadPoolStatusMetrics().getMaximumPoolSize()); + Assertions.assertEquals( + 30, coordinatorService.getThreadPoolStatusMetrics().getCorePoolSize()); + Assertions.assertEquals( + 1000, coordinatorService.getThreadPoolStatusMetrics().getMaximumPoolSize()); coordinatorServiceTest.shutdown(); } From d225de3c6c3431a3048b59574067163e75c49ef2 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 8 Dec 2024 18:33:43 +0800 Subject: [PATCH 04/10] fix config init npe --- .../YamlSeaTunnelDomConfigProcessor.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index c0f6b6ad6ce..310485a5245 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.common.config.server.ConnectorJarHAStorageConfig; import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig; import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode; +import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; @@ -106,6 +107,25 @@ private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) { return slotServiceConfig; } + private CoordinatorServiceConfig parseCoordinatorServiceConfig(Node coordinatorServiceNode) { + CoordinatorServiceConfig coordinatorServiceConfig = new CoordinatorServiceConfig(); + for (Node node : childElements(coordinatorServiceNode)) { + String name = cleanNodeName(node); + if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) { + coordinatorServiceConfig.setMaxThreadNum( + getIntegerValue( + ServerConfigOptions.MAX_THREAD_NUM.key(), getTextContent(node))); + } else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name)) { + coordinatorServiceConfig.setCoreThreadNum( + getIntegerValue( + ServerConfigOptions.CORE_THREAD_NUM.key(), getTextContent(node))); + } else { + LOGGER.warning("Unrecognized element: " + name); + } + } + return coordinatorServiceConfig; + } + private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { final EngineConfig engineConfig = config.getEngineConfig(); for (Node node : childElements(engineNode)) { @@ -177,6 +197,8 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT))); } else if (ServerConfigOptions.HTTP.key().equals(name)) { engineConfig.setHttpConfig(parseHttpConfig(node)); + } else if (ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) { + engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node)); } else { LOGGER.warning("Unrecognized element: " + name); } From f733d72061898ddcf205d54472581ac4ce50dd55 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 8 Dec 2024 19:13:04 +0800 Subject: [PATCH 05/10] fix config init npe --- .../server/CoordinatorServiceConfig.java | 12 +++++++ .../config/server/ServerConfigOptions.java | 35 ++++++++++--------- .../engine/server/CoordinatorService.java | 2 +- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java index 7079cf16c31..3ed455e3738 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java @@ -21,10 +21,22 @@ import java.io.Serializable; +import static com.hazelcast.internal.util.Preconditions.checkPositive; + @Data public class CoordinatorServiceConfig implements Serializable { private int coreThreadNum = ServerConfigOptions.CORE_THREAD_NUM.defaultValue(); private int maxThreadNum = ServerConfigOptions.MAX_THREAD_NUM.defaultValue(); + + public void setCoreThreadNum(int coreThreadNum) { + checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + " must be >= 0"); + this.coreThreadNum = coreThreadNum; + } + + public void setMaxThreadNum(int maxThreadNum) { + checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + " must be > 0"); + this.maxThreadNum = maxThreadNum; + } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 620e1c3c218..e0de7617429 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -127,17 +127,30 @@ public class ServerConfigOptions { .defaultValue(new CheckpointConfig()) .withDescription("The checkpoint configuration."); + public static final Option> CHECKPOINT_STORAGE_PLUGIN_CONFIG = + Options.key("plugin-config") + .type(new TypeReference>() {}) + .noDefaultValue() + .withDescription("The checkpoint storage instance configuration."); + + public static final Option CORE_THREAD_NUM = + Options.key("core-thread-num") + .intType() + .defaultValue(10) + .withDescription("The core thread num of coordinator service"); + + public static final Option MAX_THREAD_NUM = + Options.key("max-thread-num") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("The max thread num of coordinator service"); + public static final Option COORDINATOR_SERVICE = Options.key("coordinator-service") .type(new TypeReference() {}) .defaultValue(new CoordinatorServiceConfig()) .withDescription("The coordinator service configuration."); - public static final Option> CHECKPOINT_STORAGE_PLUGIN_CONFIG = - Options.key("plugin-config") - .type(new TypeReference>() {}) - .noDefaultValue() - .withDescription("The checkpoint storage instance configuration."); public static final Option HISTORY_JOB_EXPIRE_MINUTES = Options.key("history-job-expire-minutes") .intType() @@ -287,18 +300,6 @@ public class ServerConfigOptions { .defaultValue(new HttpConfig()) .withDescription("The http configuration."); - public static final Option CORE_THREAD_NUM = - Options.key("core-thread-num") - .intType() - .defaultValue(10) - .withDescription("The core thread num of coordinator service"); - - public static final Option MAX_THREAD_NUM = - Options.key("max-thread-num") - .intType() - .defaultValue(Integer.MAX_VALUE) - .withDescription("The max thread num of coordinator service"); - public static final String EVENT_REPORT_HTTP = "event-report-http"; public static final String EVENT_REPORT_HTTP_URL = "url"; public static final String EVENT_REPORT_HTTP_HEADERS = "headers"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 380aa08b3b9..5b74a1f1810 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -199,6 +199,7 @@ public CoordinatorService( @NonNull SeaTunnelServer seaTunnelServer, EngineConfig engineConfig) { this.nodeEngine = nodeEngine; + this.engineConfig = engineConfig; this.logger = nodeEngine.getLogger(getClass()); this.executorService = new ThreadPoolExecutor( @@ -212,7 +213,6 @@ public CoordinatorService( .build(), new ThreadPoolStatus.RejectionCountingHandler()); this.seaTunnelServer = seaTunnelServer; - this.engineConfig = engineConfig; masterActiveListener = Executors.newSingleThreadScheduledExecutor(); masterActiveListener.scheduleAtFixedRate( this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS); From be3f10ae8e4967211416e4821961869e605fefe3 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 8 Dec 2024 22:50:13 +0800 Subject: [PATCH 06/10] disable HDFS --- .../java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java index bfa83dfb3b9..9cc75d1fbf0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java @@ -228,6 +228,8 @@ private void executeJob(TestContainer container, String job1, String job2) } @TestTemplate + @Disabled( + "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}") public void testFakeSinkHiveOnHDFS(TestContainer container) throws Exception { executeJob(container, "/fake_to_hive_on_hdfs.conf", "/hive_on_hdfs_to_assert.conf"); } From f501c5a607a92cb869e54dfc19395729781730a2 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Mon, 9 Dec 2024 00:57:34 +0800 Subject: [PATCH 07/10] add new tests --- .../seatunnel-engine-common/src/main/resources/seatunnel.yaml | 3 +++ .../engine/common/config/YamlSeaTunnelConfigParserTest.java | 4 ++++ .../seatunnel-engine-common/src/test/resources/seatunnel.yaml | 3 +++ .../seatunnel/engine/server/CoordinatorServiceTest.java | 4 ---- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index 9177a9afca8..af5308fbfa6 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -23,6 +23,9 @@ seatunnel: print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true + coordinator-service: + core-thread-num: 30 + max-thread-num: 1000 checkpoint: interval: 300000 timeout: 10000 diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java index 42e6aa681b6..bae761237d6 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java @@ -76,6 +76,10 @@ public void testSeaTunnelConfig() { Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort()); Assertions.assertEquals(8080, config.getEngineConfig().getHttpConfig().getPort()); Assertions.assertEquals(200, config.getEngineConfig().getHttpConfig().getPortRange()); + Assertions.assertEquals( + 30, config.getEngineConfig().getCoordinatorServiceConfig().getCoreThreadNum()); + Assertions.assertEquals( + 1000, config.getEngineConfig().getCoordinatorServiceConfig().getMaxThreadNum()); } @Test diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml index 88f8c3f9bbc..da6b831f386 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml @@ -22,6 +22,9 @@ seatunnel: slot-service: dynamic-slot: false slot-num: 5 + coordinator-service: + core-thread-num: 30 + max-thread-num: 1000 checkpoint: interval: 6000 timeout: 7000 diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index 850a2fcf614..2053ca01a8f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -138,10 +138,6 @@ public void testClearCoordinatorService() { // because runningJobMasterMap is empty and we have no JobHistoryServer, so return // UNKNOWABLE. Assertions.assertTrue(JobStatus.UNKNOWABLE.equals(coordinatorService.getJobStatus(jobId))); - Assertions.assertEquals( - 30, coordinatorService.getThreadPoolStatusMetrics().getCorePoolSize()); - Assertions.assertEquals( - 1000, coordinatorService.getThreadPoolStatusMetrics().getMaximumPoolSize()); coordinatorServiceTest.shutdown(); } From 317e07e1da8a56dd028f3d68cea12c9272fd70c6 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Thu, 12 Dec 2024 09:04:13 +0800 Subject: [PATCH 08/10] add doc --- config/seatunnel.yaml | 3 --- .../hybrid-cluster-deployment.md | 21 ++++++++++++++++++ .../hybrid-cluster-deployment.md | 22 +++++++++++++++++++ .../src/main/resources/seatunnel.yaml | 3 --- 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index d2d9369292f..79a713a71e0 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -25,9 +25,6 @@ seatunnel: print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true - coordinator-service: - core-thread-num: 30 - max-thread-num: 1000 checkpoint: interval: 10000 timeout: 60000 diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md index ac072c494df..358d2808593 100644 --- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md +++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md @@ -154,6 +154,27 @@ seatunnel: When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots. +### 4.7 Coordinator Service + +CoordinatorService responsible for the process of generating each job from a LogicalDag to an ExecutionDag, +and then to a PhysicalDag. It ultimately creates the JobMaster for the job to handle scheduling, execution, and state monitoring. + +**core-thread-num** + +The corePoolSize of seatunnel coordinator job's executor cached thread pool + +**max-thread-num** + +The maximumPoolSize of seatunnel coordinator job's executor cached thread pool + +Example + +```yaml +coordinator-service: + core-thread-num: 30 + max-thread-num: 1000 +``` + ## 5. Configure The SeaTunnel Engine Network Service All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file. diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md index 77805273452..cad793f6e53 100644 --- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md +++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md @@ -153,6 +153,28 @@ seatunnel: 当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。 +### 4.7 Coordinator Service + +CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 并最终创建作业的 JobMaster 进行作业的调度执行和状态监控 + +**core-thread-num** + +配置 CoordinatorService 线程池核心线程数量 + +**max-thread-num** + +配置 CoordinatorService 线程池最大线程数量 + +Example + +```yaml +coordinator-service: + core-thread-num: 30 + max-thread-num: 1000 +``` + + + ## 5. 配置 SeaTunnel Engine 网络服务 所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中. diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index af5308fbfa6..9177a9afca8 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -23,9 +23,6 @@ seatunnel: print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true - coordinator-service: - core-thread-num: 30 - max-thread-num: 1000 checkpoint: interval: 300000 timeout: 10000 From e9b8e0b1ac99cfd6a69f90e45cab1ce69d982c2f Mon Sep 17 00:00:00 2001 From: ClownXC Date: Thu, 12 Dec 2024 21:13:45 +0800 Subject: [PATCH 09/10] update doc --- docs/en/seatunnel-engine/hybrid-cluster-deployment.md | 2 +- docs/zh/seatunnel-engine/hybrid-cluster-deployment.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md index 358d2808593..6aad26dccbb 100644 --- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md +++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md @@ -165,7 +165,7 @@ The corePoolSize of seatunnel coordinator job's executor cached thread pool **max-thread-num** -The maximumPoolSize of seatunnel coordinator job's executor cached thread pool +The max job count can be executed at same time Example diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md index cad793f6e53..084bf980f77 100644 --- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md +++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md @@ -163,7 +163,7 @@ CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 **max-thread-num** -配置 CoordinatorService 线程池最大线程数量 +同时可执行的最大作业数量 Example From fa69efdee706089782172a7d347e8d590fac0a5a Mon Sep 17 00:00:00 2001 From: ClownXC Date: Thu, 19 Dec 2024 09:01:08 +0800 Subject: [PATCH 10/10] add doc --- .../separated-cluster-deployment.md | 23 +++++++++++++++++++ .../separated-cluster-deployment.md | 20 ++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/docs/en/seatunnel-engine/separated-cluster-deployment.md b/docs/en/seatunnel-engine/separated-cluster-deployment.md index 91215eb459a..1244042dbb9 100644 --- a/docs/en/seatunnel-engine/separated-cluster-deployment.md +++ b/docs/en/seatunnel-engine/separated-cluster-deployment.md @@ -297,6 +297,29 @@ seatunnel: ``` When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots. + +### 4.8 Coordinator Service + +CoordinatorService responsible for the process of generating each job from a LogicalDag to an ExecutionDag, +and then to a PhysicalDag. It ultimately creates the JobMaster for the job to handle scheduling, execution, and state monitoring. + +**core-thread-num** + +The corePoolSize of seatunnel coordinator job's executor cached thread pool + +**max-thread-num** + +The max job count can be executed at same time + +Example + +```yaml +coordinator-service: + core-thread-num: 30 + max-thread-num: 1000 +``` + + ## 5. Configuring SeaTunnel Engine Network Services All network-related configurations of the SeaTunnel Engine are in the `hazelcast-master.yaml` and `hazelcast-worker.yaml` files. diff --git a/docs/zh/seatunnel-engine/separated-cluster-deployment.md b/docs/zh/seatunnel-engine/separated-cluster-deployment.md index bdc369ff8c0..b0dd827789e 100644 --- a/docs/zh/seatunnel-engine/separated-cluster-deployment.md +++ b/docs/zh/seatunnel-engine/separated-cluster-deployment.md @@ -301,6 +301,26 @@ seatunnel: 当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。 +### 4.8 Coordinator Service + +CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 并最终创建作业的 JobMaster 进行作业的调度执行和状态监控 + +**core-thread-num** + +配置 CoordinatorService 线程池核心线程数量 + +**max-thread-num** + +同时可执行的最大作业数量 + +Example + +```yaml +coordinator-service: + core-thread-num: 30 + max-thread-num: 1000 +``` + ## 5. 配置 SeaTunnel Engine 网络服务 所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml` 文件中.