Skip to content

Commit

Permalink
Merge pull request #19 from F-ca7/feature/v1_3_4
Browse files Browse the repository at this point in the history
feature/v1_3_4
  • Loading branch information
F-ca7 authored Sep 23, 2023
2 parents d23c3c8 + 3d6085c commit bc79f9c
Show file tree
Hide file tree
Showing 23 changed files with 271 additions and 177 deletions.
2 changes: 1 addition & 1 deletion batch-tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alibaba.polardbx</groupId>
<artifactId>batch-tool</artifactId>
<version>1.3.3</version>
<version>1.3.4</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
1 change: 1 addition & 0 deletions batch-tool/src/main/java/BatchTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void doBatchOp(BaseOperateCommand command, DataSourceConfig dataSourceCon
logger.info("开始批量操作...");
BaseExecutor commandExecutor = BaseExecutor.getExecutor(command, dataSourceConfig, druid);
commandExecutor.preCheck();
logger.info(command.toString());
try {
long startTime = System.currentTimeMillis();
commandExecutor.execute();
Expand Down
43 changes: 36 additions & 7 deletions batch-tool/src/main/java/cmd/CommandUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,17 @@ private static void configureProducerContext(ConfigResult result,
ProducerExecutionContext producerExecutionContext) {
producerExecutionContext.setCharset(getCharset(result));
producerExecutionContext.setSeparator(getSep(result));
producerExecutionContext.setFileLineRecordList(getFileRecordList(result));
producerExecutionContext.setDdlMode(getDdlMode(result));

if (producerExecutionContext.getDdlMode() != DdlMode.DDL_ONLY) {
producerExecutionContext.setDataFileLineRecordList(getDataFileRecordList(result));
}
if (producerExecutionContext.getDdlMode() != DdlMode.NO_DDL) {
producerExecutionContext.setDdlFileLineRecordList(getDdlFileRecordList(result));
}
producerExecutionContext.setParallelism(getProducerParallelism(result));
producerExecutionContext.setReadBlockSizeInMb(getReadBlockSizeInMb(result));
producerExecutionContext.setWithHeader(getWithHeader(result));
producerExecutionContext.setDdlMode(getDdlMode(result));
producerExecutionContext.setCompressMode(getCompressMode(result));
producerExecutionContext.setEncryptionConfig(getEncryptionConfig(result));
producerExecutionContext.setFileFormat(getFileFormat(result));
Expand Down Expand Up @@ -681,14 +687,15 @@ private static boolean getForceParallelism(ConfigResult result) {
}

/**
* 解析文件路径与行号
* 解析数据文件路径与行号
* 并检测文件是否存在
*/
private static List<FileLineRecord> getFileRecordList(ConfigResult result) {
private static List<FileLineRecord> getDataFileRecordList(ConfigResult result) {
if (result.hasOption(ARG_SHORT_FROM_FILE)) {
// 检查DDL文件后缀
String filePathListStr = result.getOptionValue(ARG_SHORT_FROM_FILE);
return Arrays.stream(StringUtils.split(filePathListStr, ConfigConstant.CMD_SEPARATOR))
.filter(StringUtils::isNotBlank)
.filter(s -> StringUtils.isNotBlank(s) && !StringUtils.endsWith(s, ConfigConstant.DDL_FILE_SUFFIX))
.map(s -> {
String[] strs = StringUtils.split(s, ConfigConstant.CMD_FILE_LINE_SEPARATOR);
if (strs.length == 1) {
Expand All @@ -704,13 +711,35 @@ private static List<FileLineRecord> getFileRecordList(ConfigResult result) {
}).collect(Collectors.toList());
} else if (result.hasOption(ARG_SHORT_DIRECTORY)) {
String dirPathStr = result.getOptionValue(ARG_SHORT_DIRECTORY);
List<String> filePaths = FileUtil.getFilesAbsPathInDir(dirPathStr);
List<String> filePaths = FileUtil.getDataFilesAbsPathInDir(dirPathStr);
return FileLineRecord.fromFilePaths(filePaths);
}
if (result.hasOption(ARG_SHORT_BENCHMARK)) {
return null;
}
throw new IllegalStateException("cannot get data file path list");
}

/**
* 解析数据DDL文件路径与行号
* 并检测文件是否存在
*/
private static List<FileLineRecord> getDdlFileRecordList(ConfigResult result) {
if (result.hasOption(ARG_SHORT_FROM_FILE)) {
// 检查DDL文件后缀
String filePathListStr = result.getOptionValue(ARG_SHORT_FROM_FILE);
return Arrays.stream(StringUtils.split(filePathListStr, ConfigConstant.CMD_SEPARATOR))
.filter(s -> StringUtils.isNotBlank(s) && StringUtils.endsWith(s, ConfigConstant.DDL_FILE_SUFFIX))
.map(FileLineRecord::new).collect(Collectors.toList());
} else if (result.hasOption(ARG_SHORT_DIRECTORY)) {
String dirPathStr = result.getOptionValue(ARG_SHORT_DIRECTORY);
List<String> filePaths = FileUtil.getDdlFilesAbsPathInDir(dirPathStr);
return FileLineRecord.fromFilePaths(filePaths);
}
if (result.hasOption(ARG_SHORT_BENCHMARK)) {
return null;
}
throw new IllegalStateException("cannot get file path list");
throw new IllegalStateException("cannot get ddl file path list");
}

private static int getTpsLimit(ConfigResult result) {
Expand Down
3 changes: 1 addition & 2 deletions batch-tool/src/main/java/cmd/WriteDbCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public ConsumerExecutionContext getConsumerExecutionContext() {
@Override
public String toString() {
return "WriteDbCommand{" +
"producerExecutionContext=" + producerExecutionContext +
", consumerExecutionContext=" + consumerExecutionContext +
producerExecutionContext + ", " + consumerExecutionContext +
'}';
}
}
15 changes: 10 additions & 5 deletions batch-tool/src/main/java/exec/BaseExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import model.config.ExportConfig;
import model.config.FileLineRecord;
import model.config.GlobalVar;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
Expand Down Expand Up @@ -132,7 +133,14 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
String tableName,
boolean usingBlockReader) {
List<FileLineRecord> fileLineRecordList =
getFileRecordList(producerExecutionContext.getFileLineRecordList(), tableName);
getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName);
if (CollectionUtils.isEmpty(fileLineRecordList)) {
if (command.isDbOperation()) {
logger.warn("Skip table {} operation since no filename matches", tableName);
return;
}
throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName);
}
if (!usingBlockReader) {
producerExecutionContext.setParallelism(fileLineRecordList.size());
}
Expand All @@ -142,7 +150,7 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
CountDownLatch countDownLatch = new CountDownLatch(producerExecutionContext.getParallelism());
AtomicInteger emittedDataCounter = new AtomicInteger(0);
List<ConcurrentHashMap<Long, AtomicInteger>> eventCounter = new ArrayList<>();
for (int i = 0; i < producerExecutionContext.getFileLineRecordList().size(); i++) {
for (int i = 0; i < producerExecutionContext.getDataFileLineRecordList().size(); i++) {
eventCounter.add(new ConcurrentHashMap<>(16));
}
producerExecutionContext.setEmittedDataCounter(emittedDataCounter);
Expand Down Expand Up @@ -266,9 +274,6 @@ private List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathL
return true;
})
.collect(Collectors.toList());
if (fileRecordList.isEmpty()) {
throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName);
}
return fileRecordList;
}

Expand Down
33 changes: 24 additions & 9 deletions batch-tool/src/main/java/exec/ImportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import model.config.BenchmarkMode;
import model.config.ConfigConstant;
import model.config.DdlMode;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
Expand Down Expand Up @@ -70,6 +71,9 @@ public void preCheck() {
this.tableNames = command.getTableNames();
}
}
if (CollectionUtils.isNotEmpty(tableNames)) {
logger.info("目标导入表:{}", tableNames);
}
}

private void checkDbNotExist(String dbName) {
Expand Down Expand Up @@ -111,6 +115,16 @@ public void execute() {
switch (producerExecutionContext.getDdlMode()) {
case WITH_DDL:
handleDDL();
if (command.isDbOperation()) {
// 库级别导入模式下更新导入的目标表
try (Connection conn = dataSource.getConnection()) {
this.tableNames = DbUtil.getAllTablesInDb(conn, command.getDbName());
} catch (SQLException | DatabaseException e) {
throw new RuntimeException(e);
}
} else {
throw new IllegalStateException("Do not support importing to table with DDL");
}
break;
case DDL_ONLY:
handleDDL();
Expand All @@ -121,12 +135,16 @@ public void execute() {
throw new UnsupportedOperationException("DDL mode is not supported: " +
producerExecutionContext.getDdlMode());
}
configureFieldMetaInfo();

logger.debug(producerExecutionContext.toString());
logger.debug(consumerExecutionContext.toString());
if (CollectionUtils.isEmpty(tableNames)) {
logger.warn("目标表未设置");
return;
}

configureFieldMetaInfo();

for (String tableName : tableNames) {
logger.info("开始导入表:{}", tableName);
if (producerExecutionContext.isSingleThread()
&& consumerExecutionContext.isSingleThread()) {
// 使用按行读取insert模式
Expand Down Expand Up @@ -229,13 +247,10 @@ private void handleTpchImport(List<String> tableNames) {
private void handleDDL() {
DdlImportWorker ddlImportWorker;
if (command.isDbOperation()) {
if (producerExecutionContext.getFileLineRecordList().size() != 1) {
throw new UnsupportedOperationException("Import database DDL only support one ddl file now!");
}
ddlImportWorker = new DdlImportWorker(producerExecutionContext.getFileLineRecordList()
.get(0).getFilePath(), dataSource);
ddlImportWorker =
DdlImportWorker.fromFiles(producerExecutionContext.getDdlFileLineRecordList(), dataSource);
} else {
ddlImportWorker = new DdlImportWorker(command.getTableNames(), dataSource);
ddlImportWorker = DdlImportWorker.fromTables(command.getTableNames(), dataSource);
}
ddlImportWorker.doImportSync();
}
Expand Down
21 changes: 11 additions & 10 deletions batch-tool/src/main/java/exec/WriteDbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import worker.common.BaseWorkHandler;
import worker.common.ReadFileWithBlockProducer;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -71,8 +72,8 @@ protected void configurePkList() {
Map<String, List<PrimaryKey>> tablePkList = new HashMap<>();
for (String tableName : tableNames) {
List<PrimaryKey> pkList = null;
try {
pkList = DbUtil.getPkList(dataSource.getConnection(), getSchemaName(), tableName);
try (Connection connection = dataSource.getConnection()) {
pkList = DbUtil.getPkList(connection, getSchemaName(), tableName);
tablePkList.put(tableName, pkList);
} catch (DatabaseException | SQLException e) {
logger.error(e.getMessage());
Expand All @@ -88,15 +89,15 @@ protected void configurePkList() {
protected void configureFieldMetaInfo() {
logger.info("正在获取所有表的元信息...");
Map<String, TableFieldMetaInfo> tableFieldMetaInfoMap = null;
try {
try (Connection connection = dataSource.getConnection()) {
if (command.getColumnNames() != null) {
assert tableNames.size() == 1;
tableFieldMetaInfoMap = new HashMap<>();
TableFieldMetaInfo fieldMetaInfo = DbUtil.getTableFieldMetaInfo(dataSource.getConnection(), getSchemaName(),
TableFieldMetaInfo fieldMetaInfo = DbUtil.getTableFieldMetaInfo(connection, getSchemaName(),
tableNames.get(0), command.getColumnNames());
tableFieldMetaInfoMap.put(tableNames.get(0), fieldMetaInfo);
} else {
tableFieldMetaInfoMap = DbUtil.getDbFieldMetaInfo(dataSource.getConnection(),
tableFieldMetaInfoMap = DbUtil.getDbFieldMetaInfo(connection,
getSchemaName(), tableNames);
}
} catch (DatabaseException | SQLException e) {
Expand All @@ -115,8 +116,8 @@ protected void configureTopology() {
Map<String, List<TableTopology>> tableTopologyMap = new HashMap<>();
for (String tableName : tableNames) {
List<TableTopology> topologyList = null;
try {
topologyList = DbUtil.getTopology(dataSource.getConnection(), tableName);
try (Connection connection = dataSource.getConnection()) {
topologyList = DbUtil.getTopology(connection, tableName);
tableTopologyMap.put(tableName, topologyList);
} catch (DatabaseException | SQLException e) {
logger.error(e.getMessage());
Expand All @@ -129,9 +130,9 @@ protected void configureTopology() {
protected void configurePartitionKey() {
Map<String, PartitionKey> tablePartitionKey = new HashMap<>();
for (String tableName : tableNames) {
PartitionKey partitionKey = null;
try {
partitionKey = DbUtil.getPartitionKey(dataSource.getConnection(),
PartitionKey partitionKey;
try (Connection connection = dataSource.getConnection()) {
partitionKey = DbUtil.getPartitionKey(connection,
getSchemaName(), tableName);
logger.info("表 {} 使用分片键 {}", tableName, partitionKey);
tablePartitionKey.put(tableName, partitionKey);
Expand Down
28 changes: 14 additions & 14 deletions batch-tool/src/main/java/exec/export/OrderByExportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import util.DbUtil;
import util.FileUtil;
import worker.MyThreadPool;
import worker.export.DirectExportWorker;
import worker.export.order.DirectOrderExportWorker;
import worker.export.order.LocalOrderByExportProducer;
import worker.export.order.OrderByExportEvent;
Expand All @@ -41,6 +40,7 @@
import worker.export.order.ParallelOrderByExportEvent;
import worker.factory.ExportWorkerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -96,11 +96,11 @@ private void doExportWithOrderByLocal() {
for (String tableName : command.getTableNames()) {
String filePathPrefix = FileUtil.getFilePathPrefix(config.getPath(),
config.getFilenamePrefix(), tableName);
try {
topologyList = DbUtil.getTopology(dataSource.getConnection(), tableName);
TableFieldMetaInfo tableFieldMetaInfo = DbUtil.getTableFieldMetaInfo(dataSource.getConnection(),
try (Connection connection = dataSource.getConnection()) {
topologyList = DbUtil.getTopology(connection, tableName);
TableFieldMetaInfo tableFieldMetaInfo = DbUtil.getTableFieldMetaInfo(connection,
getSchemaName(), tableName);
orderByColumnInfoList = DbUtil.getFieldMetaInfoListByColNames(dataSource.getConnection(), getSchemaName(),
orderByColumnInfoList = DbUtil.getFieldMetaInfoListByColNames(connection, getSchemaName(),
tableName, config.getOrderByColumnNameList());
// 分片数
final int shardSize = topologyList.size();
Expand All @@ -124,7 +124,7 @@ private void doExportWithOrderByLocal() {
break;
case FIXED_FILE_NUM:
// 固定文件数的情况 先拿到全部的行数
double totalRowCount = DbUtil.getTableRowCount(dataSource.getConnection(), tableName);
double totalRowCount = DbUtil.getTableRowCount(connection, tableName);
int fileNum = config.getLimitNum();
int singleLineLimit = (int) Math.ceil(totalRowCount / fileNum);
// 再转为限制单文件行数的形式
Expand Down Expand Up @@ -160,8 +160,8 @@ private void doExportWithOrderByLocal() {
*/
private void handleExportWithOrderByFromDb() {
for (String tableName : command.getTableNames()) {
try {
TableFieldMetaInfo tableFieldMetaInfo = DbUtil.getTableFieldMetaInfo(dataSource.getConnection(),
try (Connection connection = dataSource.getConnection()) {
TableFieldMetaInfo tableFieldMetaInfo = DbUtil.getTableFieldMetaInfo(connection,
getSchemaName(), tableName);
DirectOrderExportWorker directOrderByExportWorker = ExportWorkerFactory
.buildDirectOrderExportWorker(dataSource, tableFieldMetaInfo, (ExportCommand) command, tableName);
Expand All @@ -182,13 +182,13 @@ private void handleExportWithOrderByParallelMerge() {
for (String tableName : command.getTableNames()) {
List<TableTopology> topologyList;
List<FieldMetaInfo> orderByColumnInfoList;
try {
try (Connection connection = dataSource.getConnection()) {
String filePathPrefix = FileUtil.getFilePathPrefix(config.getPath(),
config.getFilenamePrefix(), tableName);
topologyList = DbUtil.getTopology(dataSource.getConnection(), tableName);
TableFieldMetaInfo tableFieldMetaInfo = DbUtil.getTableFieldMetaInfo(dataSource.getConnection(),
getSchemaName(),tableName);
orderByColumnInfoList = DbUtil.getFieldMetaInfoListByColNames(dataSource.getConnection(), getSchemaName(),
topologyList = DbUtil.getTopology(connection, tableName);
TableFieldMetaInfo tableFieldMetaInfo = DbUtil.getTableFieldMetaInfo(connection,
getSchemaName(), tableName);
orderByColumnInfoList = DbUtil.getFieldMetaInfoListByColNames(connection, getSchemaName(),
tableName, config.getOrderByColumnNameList());
// 分片数
final int shardSize = topologyList.size();
Expand All @@ -211,7 +211,7 @@ private void handleExportWithOrderByParallelMerge() {
break;
case FIXED_FILE_NUM:
// 固定文件数的情况 先拿到全部的行数
double totalRowCount = DbUtil.getTableRowCount(dataSource.getConnection(), tableName);
double totalRowCount = DbUtil.getTableRowCount(connection, tableName);
int fileNum = config.getLimitNum();
int singleLineLimit = (int) Math.ceil(totalRowCount / fileNum);
// 再转为限制单文件行数的形式
Expand Down
Loading

0 comments on commit bc79f9c

Please sign in to comment.