Skip to content

Commit

Permalink
sr multi source
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 1, 2024
1 parent 4d9287f commit 4564add
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 111 deletions.
100 changes: 84 additions & 16 deletions docs/en/connector-v2/source/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ delivers the query plan as a parameter to BE nodes, and then obtains data result
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| table_list | array | yes | - |
| scan_filter | string | no | - |
| schema | config | yes | - |
| request_tablet_size | int | no | Integer.MAX_VALUE |
Expand Down Expand Up @@ -57,6 +58,10 @@ The name of StarRocks database

The name of StarRocks table

### table_list [array]

The list of tables to be read, you can use this configuration instead of `table`

### scan_filter [string]

Filter expression of the query, which is transparently transmitted to StarRocks. StarRocks uses this expression to complete source-side data filtering.
Expand All @@ -76,7 +81,7 @@ The schema of the starRocks that you want to generate
e.g.

```
schema {
schema = {
fields {
name = string
age = int
Expand Down Expand Up @@ -153,28 +158,91 @@ source {
table = "e2e_table_source"
scan_batch_rows = 10
max_retries = 3
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
schema = {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
}
scan.params.scanner_thread_pool_thread_num = "3"
}
}
```

## Example 2: Multiple tables

```
source {
StarRocks {
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table_list = [
{
table = "e2e_table_source"
schema = {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
}
},
{
table = "e2e_table_source_2"
schema = {
fields {
BIGINT_COL_2 = BIGINT
LARGEINT_COL_2 = STRING
SMALLINT_COL_2 = SMALLINT
TINYINT_COL_2 = TINYINT
BOOLEAN_COL_2 = BOOLEAN
DECIMAL_COL_2 = "DECIMAL(20, 1)"
DOUBLE_COL_2 = DOUBLE
FLOAT_COL_2 = FLOAT
INT_COL_2 = INT
CHAR_COL_2 = STRING
VARCHAR_11_COL_2 = STRING
STRING_COL_2 = STRING
DATETIME_COL_2 = TIMESTAMP
DATE_COL_2 = DATE
}
}
}]
scan_batch_rows = 10
max_retries = 3
scan.params.scanner_thread_pool_thread_num = "3"
}
}
```

## Changelog

### next version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
FILE_OPERATION_FAILED("COMMON-01", "<identifier> <operation> file '<fileName>' failed."),
JSON_OPERATION_FAILED(
"COMMON-02", "<identifier> JSON convert/parse '<payload>' operation failed."),
UNSUPPORTED_OPERATION("COMMON-05", "Unsupported operation"),
ILLEGAL_ARGUMENT("COMMON-06", "Illegal argument"),
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of '<field>'"),
UNSUPPORTED_ENCODING("COMMON-08", "unsupported encoding '<encoding>'"),
WRITER_OPERATION_FAILED(
"COMMON-11", "Sink writer operation failed, such as (open, close) etc..."),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
"COMMON-16",
"'<connector>' <type> unsupported convert type '<dataType>' of '<field>' to SeaTunnel data type."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
Expand Down Expand Up @@ -68,7 +68,7 @@ public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
String host = getAvailableHost();
if (null == host) {
throw new StarRocksConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
CommonErrorCode.ILLEGAL_ARGUMENT,
"None of the host in `load_url` could be connected.");
}
String loadUrl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
}

public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) {
eos.set(false);
this.readerOffset = 0;
this.rowBatch = null;
this.seaTunnelRowType = seaTunnelRowType;
Set<Long> tabletIds = partition.getTabletIds();
TScanOpenParams params = new TScanOpenParams();
params.setTablet_ids(new ArrayList<>(tabletIds));
params.setOpaqued_query_plan(partition.getQueryPlan());
params.setCluster(DEFAULT_CLUSTER_NAME);
params.setDatabase(sourceConfig.getDatabase());
params.setTable(sourceConfig.getTable());
params.setDatabase(partition.getDatabase());
params.setTable(partition.getTable());
params.setUser(sourceConfig.getUsername());
params.setPasswd(sourceConfig.getPassword());
params.setBatch_size(sourceConfig.getBatchRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPlan;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

Expand All @@ -40,45 +41,47 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class StarRocksQueryPlanReadClient {
private RetryUtils.RetryMaterial retryMaterial;
private SourceConfig sourceConfig;
private SeaTunnelRowType seaTunnelRowType;
private final HttpHelper httpHelper = new HttpHelper();
private final Map<String, StarRocksSourceTableConfig> tables;

private static final long DEFAULT_SLEEP_TIME_MS = 1000L;

public StarRocksQueryPlanReadClient(
SourceConfig sourceConfig, SeaTunnelRowType seaTunnelRowType) {
public StarRocksQueryPlanReadClient(SourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
this.seaTunnelRowType = seaTunnelRowType;
this.retryMaterial =
new RetryUtils.RetryMaterial(
sourceConfig.getMaxRetries(),
true,
exception -> true,
DEFAULT_SLEEP_TIME_MS);
this.tables =
sourceConfig.getTableConfigList().stream()
.collect(
Collectors.toMap(
StarRocksSourceTableConfig::getTable, Function.identity()));
}

public List<QueryPartition> findPartitions() {
public List<QueryPartition> findPartitions(String table) {
List<String> nodeUrls = sourceConfig.getNodeUrls();
QueryPlan queryPlan =
getQueryPlan(genQuerySql(), nodeUrls.get(new Random().nextInt(nodeUrls.size())));
getQueryPlan(
genQuerySql(table),
nodeUrls.get(new Random().nextInt(nodeUrls.size())),
table);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
return tabletsMapToPartition(
be2Tablets,
queryPlan.getQueryPlan(),
sourceConfig.getDatabase(),
sourceConfig.getTable());
return tabletsMapToPartition(be2Tablets, queryPlan.getQueryPlan(), table);
}

private List<QueryPartition> tabletsMapToPartition(
Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan,
String database,
String table)
Map<String, List<Long>> be2Tablets, String opaquedQueryPlan, String table)
throws IllegalArgumentException {
int tabletsSize = sourceConfig.getRequestTabletSize();
List<QueryPartition> partitions = new ArrayList<>();
Expand All @@ -100,7 +103,7 @@ private List<QueryPartition> tabletsMapToPartition(
first = first + tabletsSize;
QueryPartition partitionDefinition =
new QueryPartition(
database,
sourceConfig.getDatabase(),
table,
beInfo.getKey(),
partitionTablets,
Expand Down Expand Up @@ -136,14 +139,14 @@ private Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan) {
return beXTablets;
}

private QueryPlan getQueryPlan(String querySQL, String httpNode) {
private QueryPlan getQueryPlan(String querySQL, String httpNode, String table) {
String url =
new StringBuilder("http://")
.append(httpNode)
.append("/api/")
.append(sourceConfig.getDatabase())
.append("/")
.append(sourceConfig.getTable())
.append(table)
.append("/_query_plan")
.toString();

Expand Down Expand Up @@ -184,15 +187,17 @@ private Map<String, String> getQueryPlanHttpHeader() {
return headerMap;
}

private String genQuerySql() {
private String genQuerySql(String table) {

StarRocksSourceTableConfig starRocksSourceTableConfig = tables.get(table);
SeaTunnelRowType seaTunnelRowType =
starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType();
String columns =
seaTunnelRowType.getFieldNames().length != 0
? String.join(",", seaTunnelRowType.getFieldNames())
: "*";
String filter =
sourceConfig.getScanFilter().isEmpty()
? ""
: " where " + sourceConfig.getScanFilter();
String scanFilter = starRocksSourceTableConfig.getScanFilter();
String filter = scanFilter.isEmpty() ? "" : " where " + scanFilter;

String sql =
"select "
Expand All @@ -203,7 +208,7 @@ private String genQuerySql() {
+ "`"
+ "."
+ "`"
+ sourceConfig.getTable()
+ table
+ "`"
+ filter;
log.debug("Generate query sql '{}'.", sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public int compareTo(QueryPartition o) {
similar.retainAll(o.tabletIds);
diffSelf.removeAll(similar);
diffOther.removeAll(similar);
if (diffSelf.size() == 0) {
if (diffSelf.isEmpty()) {
return 0;
}
long diff = Collections.min(diffSelf) - Collections.min(diffOther);
Expand Down Expand Up @@ -103,7 +103,7 @@ public int hashCode() {
@Override
public String toString() {
return "QueryPartition{"
+ ", database='"
+ "database='"
+ database
+ '\''
+ ", table='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ public class CommonConfig implements Serializable {
private String username;
private String password;
private String database;
private String table;

public CommonConfig(ReadonlyConfig config) {
this.nodeUrls = config.get(NODE_URLS);
this.username = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.database = config.get(DATABASE);
this.table = config.get(TABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Setter
Expand Down Expand Up @@ -53,6 +57,7 @@ public SourceConfig(ReadonlyConfig config) {
key.substring(prefix.length()).toLowerCase(), value);
}
});
tableConfigList = StarRocksSourceTableConfig.of(config);
}

public static final Option<Integer> MAX_RETRIES =
Expand Down Expand Up @@ -106,6 +111,12 @@ public SourceConfig(ReadonlyConfig config) {
.noDefaultValue()
.withDescription("The parameter of the scan data from be");

public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("table list config");

private int maxRetries = MAX_RETRIES.defaultValue();
private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
private String scanFilter = SCAN_FILTER.defaultValue();
Expand All @@ -115,4 +126,5 @@ public SourceConfig(ReadonlyConfig config) {
private int batchRows = SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
private Map<String, String> sourceOptionProps = new HashMap<>();
private List<StarRocksSourceTableConfig> tableConfigList = new ArrayList<>();
}
Loading

0 comments on commit 4564add

Please sign in to comment.