Skip to content

Commit

Permalink
Merge pull request #20 from TongchengOpenSource/fa_ha
Browse files Browse the repository at this point in the history
[Feature][Connector-V2]Support StarRocks Fe Node HA
  • Loading branch information
xiaochen-zhou authored May 7, 2024
2 parents 4d9287f + 3f84e3d commit 7c1d37a
Showing 1 changed file with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

@Slf4j
Expand All @@ -64,8 +63,7 @@ public StarRocksQueryPlanReadClient(

public List<QueryPartition> findPartitions() {
List<String> nodeUrls = sourceConfig.getNodeUrls();
QueryPlan queryPlan =
getQueryPlan(genQuerySql(), nodeUrls.get(new Random().nextInt(nodeUrls.size())));
QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
return tabletsMapToPartition(
be2Tablets,
Expand Down Expand Up @@ -136,37 +134,38 @@ private Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan) {
return beXTablets;
}

private QueryPlan getQueryPlan(String querySQL, String httpNode) {
String url =
new StringBuilder("http://")
.append(httpNode)
.append("/api/")
.append(sourceConfig.getDatabase())
.append("/")
.append(sourceConfig.getTable())
.append("/_query_plan")
.toString();
private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {

Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("sql", querySQL);
String body = JsonUtils.toJsonString(bodyMap);
String respString;
try {
respString =
RetryUtils.retryWithException(
() -> httpHelper.doHttpPost(url, getQueryPlanHttpHeader(), body),
retryMaterial);
} catch (Exception e) {
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED, e);
String respString = "";
for (String feNode : nodeUrls) {
String url =
new StringBuilder("http://")
.append(feNode)
.append("/api/")
.append(sourceConfig.getDatabase())
.append("/")
.append(sourceConfig.getTable())
.append("/_query_plan")
.toString();
try {
respString =
RetryUtils.retryWithException(
() -> httpHelper.doHttpPost(url, getQueryPlanHttpHeader(), body),
retryMaterial);
if (StringUtils.isNoneEmpty(respString)) {
return JsonUtils.parseObject(respString, QueryPlan.class);
}
} catch (Exception e) {
log.error("Request query Plan From {} failed: {}", feNode, e.getMessage());
}
}

if (StringUtils.isEmpty(respString)) {
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED,
"query failed with empty response");
}
return JsonUtils.parseObject(respString, QueryPlan.class);
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED,
"query failed with empty response");
}

private String getBasicAuthHeader(String username, String password) {
Expand Down

0 comments on commit 7c1d37a

Please sign in to comment.