Skip to content

Commit

Permalink
Merge pull request #15 from F-ca7/feature/v1_3_3
Browse files Browse the repository at this point in the history
BatchTool V1.3.3
  • Loading branch information
F-ca7 authored Aug 28, 2023
2 parents b5cde46 + 31e90af commit d23c3c8
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 35 deletions.
6 changes: 6 additions & 0 deletions batch-tool/docs/usage-details.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,9 @@ mask: >-

**解决**:导入、导出时都加入参数 `-quote force` 参数(开启该参数后,导入时客户端CPU占用会更高一些,效率也会略低一点,
但文本解析的兼容性会更好)

10. 导入时报错,`Incorrect value: '\N' for column`

**原因**:BatchTool 导出NULL值的时候会转义成'\N'字符串

**解决**:导入、导出时都加入参数 `-quote force` 参数
2 changes: 1 addition & 1 deletion batch-tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alibaba.polardbx</groupId>
<artifactId>batch-tool</artifactId>
<version>1.3.2</version>
<version>1.3.3</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
6 changes: 6 additions & 0 deletions batch-tool/src/main/java/cmd/CommandUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static cmd.ConfigArgOption.ARG_SHORT_VERSION;
import static cmd.ConfigArgOption.ARG_SHORT_WHERE;
import static cmd.ConfigArgOption.ARG_SHORT_WITH_DDL;
import static cmd.FlagOption.ARG_DROP_TABLE_IF_EXISTS;
import static cmd.FlagOption.ARG_EMPTY_AS_NULL;
import static cmd.FlagOption.ARG_SHORT_ENABLE_SHARDING;
import static cmd.FlagOption.ARG_SHORT_IGNORE_AND_RESUME;
Expand Down Expand Up @@ -406,6 +407,10 @@ private static boolean getEmptyAsNull(ConfigResult result) {
return result.getBooleanFlag(ARG_EMPTY_AS_NULL);
}

private static boolean getDropTableIfExist(ConfigResult result) {
return result.getBooleanFlag(ARG_DROP_TABLE_IF_EXISTS);
}

private static FileFormat getFileFormat(ConfigResult result) {
if (result.hasOption(ARG_SHORT_FILE_FORMAT)) {
String fileFormat = result.getOptionValue(ARG_SHORT_FILE_FORMAT);
Expand All @@ -427,6 +432,7 @@ private static ExportCommand parseExportCommand(ConfigResult result) {
if (exportConfig.getDdlMode() != DdlMode.NO_DDL) {
setGlobalDdlConfig(result);
}
exportConfig.setDropTableIfExists(getDropTableIfExist(result));
exportConfig.setEncryptionConfig(getEncryptionConfig(result));
exportConfig.setFileFormat(getFileFormat(result));
exportConfig.setCompressMode(getCompressMode(result));
Expand Down
4 changes: 4 additions & 0 deletions batch-tool/src/main/java/cmd/FlagOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ private static FlagOption of(String argShort, String argLong, String desc, Boole
of("trimRight", "trimRight", "Remove trailing whitespaces in a line for BlockReader (default false).", false);
public static final FlagOption ARG_EMPTY_AS_NULL =
of("emptyAsNull", "emptyAsNull", "Treat an empty value for string-type as NULL (default false).", false);
public static final FlagOption ARG_DROP_TABLE_IF_EXISTS =
of("dropTableIfExists", "dropTableIfExists",
"Add 'drop table if exists xxx' when exporting DDL (default false).",
false);
}
18 changes: 6 additions & 12 deletions batch-tool/src/main/java/exec/ImportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import util.DbUtil;
import worker.MyThreadPool;
import worker.MyWorkerPool;
import worker.ddl.DdlImporter;
import worker.ddl.DdlImportWorker;
import worker.insert.DirectImportWorker;
import worker.insert.ImportConsumer;
import worker.insert.ProcessOnlyImportConsumer;
Expand All @@ -58,13 +58,7 @@ public ImportExecutor(DataSourceConfig dataSourceConfig,

@Override
public void preCheck() {
if (producerExecutionContext.getDdlMode() != DdlMode.NO_DDL) {
if (command.isDbOperation()) {
checkDbNotExist(command.getDbName());
} else {
checkTableNotExist(command.getTableNames());
}
} else {
if (producerExecutionContext.getDdlMode() == DdlMode.NO_DDL) {
if (command.isDbOperation()) {
try (Connection conn = dataSource.getConnection()) {
this.tableNames = DbUtil.getAllTablesInDb(conn, command.getDbName());
Expand Down Expand Up @@ -233,17 +227,17 @@ private void handleTpchImport(List<String> tableNames) {
* 同步导入建库建表语句
*/
private void handleDDL() {
DdlImporter ddlImporter;
DdlImportWorker ddlImportWorker;
if (command.isDbOperation()) {
if (producerExecutionContext.getFileLineRecordList().size() != 1) {
throw new UnsupportedOperationException("Import database DDL only support one ddl file now!");
}
ddlImporter = new DdlImporter(producerExecutionContext.getFileLineRecordList()
ddlImportWorker = new DdlImportWorker(producerExecutionContext.getFileLineRecordList()
.get(0).getFilePath(), dataSource);
} else {
ddlImporter = new DdlImporter(command.getTableNames(), dataSource);
ddlImportWorker = new DdlImportWorker(command.getTableNames(), dataSource);
}
ddlImporter.doImportSync();
ddlImportWorker.doImportSync();
}

private void doSingleThreadImport(String tableName) {
Expand Down
4 changes: 2 additions & 2 deletions batch-tool/src/main/java/exec/export/BaseExportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public void execute() {
private Thread exportDDL() {
DdlExportWorker ddlExportWorker;
if (command.isDbOperation()) {
ddlExportWorker = new DdlExportWorker(dataSource, command.getDbName());
ddlExportWorker = new DdlExportWorker(dataSource, command.getDbName(), config);
} else {
ddlExportWorker = new DdlExportWorker(dataSource, command.getDbName(), command.getTableNames());
ddlExportWorker = new DdlExportWorker(dataSource, command.getDbName(), command.getTableNames(), config);
}
Thread ddlThread = new Thread(ddlExportWorker);
ddlThread.start();
Expand Down
10 changes: 10 additions & 0 deletions batch-tool/src/main/java/model/config/BaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ int bitCount() {

protected DdlMode ddlMode = DdlMode.NO_DDL;

protected boolean dropTableIfExists = false;

protected CompressMode compressMode = CompressMode.NONE;

protected EncryptionConfig encryptionConfig = EncryptionConfig.NONE;
Expand Down Expand Up @@ -196,6 +198,14 @@ public void validate() {
}
}

public boolean isDropTableIfExists() {
return dropTableIfExists;
}

public void setDropTableIfExists(boolean dropTableIfExists) {
this.dropTableIfExists = dropTableIfExists;
}

@Override
public String toString() {
return "BaseConfig{" +
Expand Down
10 changes: 3 additions & 7 deletions batch-tool/src/main/java/util/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,17 @@ private static void writeWithQuoteEscape(ByteArrayOutputStream os, byte[] value)
}

/**
* 如果value中包含`"` `\`
* 则用`""` `\\`进行转义
* 如果value中包含`"`
* 则用`""`进行转义
*/
private static void writeWithQuoteEscapeInQuote(ByteArrayOutputStream os, byte[] value) {
public static void writeWithQuoteEscapeInQuote(ByteArrayOutputStream os, byte[] value) {
// ascii字符为一字节
byte quoteByte = DOUBLE_QUOTE_BYTE[0];
byte backSlashByte = BACK_SLASH_BYTE[0];

for (byte b : value) {
if (b == quoteByte) {
os.write(quoteByte);
os.write(quoteByte);
} else if (b == backSlashByte) {
os.write(backSlashByte);
os.write(backSlashByte);
} else {
os.write(b);
}
Expand Down
30 changes: 25 additions & 5 deletions batch-tool/src/main/java/worker/ddl/DdlExportWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package worker.ddl;

import exception.DatabaseException;
import model.config.ExportConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
Expand Down Expand Up @@ -47,24 +48,27 @@ public class DdlExportWorker implements Runnable {
* 是否导出整个数据库与其中的所有表
*/
private final boolean isExportWholeDb;
private final ExportConfig config;

private static final Pattern DB_MODE_PATTERN = Pattern.compile("/\\* MODE = '(.*)' \\*/$");

public DdlExportWorker(DataSource druid, String dbName) {
public DdlExportWorker(DataSource druid, String dbName, ExportConfig config) {
this.druid = druid;
this.dbName = dbName;
this.config = config;
try (Connection conn = druid.getConnection()) {
this.tableNames = DbUtil.getAllTablesInDb(conn, dbName);;
this.tableNames = DbUtil.getAllTablesInDb(conn, dbName);
} catch (SQLException | DatabaseException e) {
throw new RuntimeException(e);
}
this.isExportWholeDb = true;
this.filename = dbName + DDL_FILE_SUFFIX;
}

public DdlExportWorker(DataSource druid, String dbName, List<String> tableNames) {
public DdlExportWorker(DataSource druid, String dbName, List<String> tableNames, ExportConfig config) {
this.druid = druid;
this.dbName = dbName;
this.config = config;
this.tableNames = tableNames;
this.isExportWholeDb = false;
this.filename = dbName + DDL_FILE_SUFFIX;
Expand Down Expand Up @@ -112,12 +116,17 @@ private void exportDatabaseStructure(Connection conn, String dbName) throws IOEx
private void exportTableStructure(Connection conn, String tableName) throws IOException, DatabaseException {
logger.info("表:{} 开始导出表结构", tableName);

writeCommentForTable(tableName);
beforeCreateTable(tableName);
String tableDdl = DbUtil.getShowCreateTable(conn, tableName);
writeLine(tableDdl + ";");
writeLine("");
}

private void beforeCreateTable(String tableName) throws IOException {
writeCommentForTable(tableName);
writeDropTableIfExists(tableName);
}

private void writeCommentForDatabase(String dbName) throws IOException {
writeLine("--");
writeLine("-- Database structure for database `" + dbName + "`");
Expand All @@ -130,13 +139,24 @@ private void writeCommentForTable(String tableName) throws IOException {
writeLine("--");
}

private void writeDropTableIfExists(String tableName) throws IOException {
if (!config.isDropTableIfExists()) {
return;
}
writeLine(String.format("DROP TABLE IF EXISTS `" + tableName + "`;"));
}

private void writeLine(String line) throws IOException {
bufferedWriter.write(line);
bufferedWriter.newLine();
}

private void beforeRun() throws Exception {
bufferedWriter = new BufferedWriter(new FileWriter(filename));
bufferedWriter = new BufferedWriter(new FileWriter(getFilepath()));
}

private String getFilepath() {
return config.getPath() + filename;
}

private void afterRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.druid.util.JdbcUtils;
import model.config.ConfigConstant;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.FileUtil;
Expand All @@ -43,18 +44,18 @@
/**
* 直接通过读取SQL导入库表
*/
public class DdlImporter {
public class DdlImportWorker {

private static final Logger logger = LoggerFactory.getLogger(DdlExportWorker.class);


private final List<String> filepaths = new ArrayList<>();;
private final List<String> filepaths = new ArrayList<>();
private final DataSource dataSource;
private final ExecutorService ddlThreadPool = MyThreadPool.createUnboundedFixedExecutor("DDL-importer", DDL_PARALLELISM);
private final ExecutorService ddlThreadPool =
MyThreadPool.createUnboundedFixedExecutor("DDL-importer", DDL_PARALLELISM);
private final AtomicInteger taskCount = new AtomicInteger(0);
private volatile String useDbSql = null;

public DdlImporter(String filename, DataSource dataSource) {
public DdlImportWorker(String filename, DataSource dataSource) {
this.dataSource = dataSource;
File file = new File(filename);
if (!file.exists() || !file.isFile()) {
Expand All @@ -63,7 +64,7 @@ public DdlImporter(String filename, DataSource dataSource) {
this.filepaths.add(file.getAbsolutePath());
}

public DdlImporter(List<String> tableNames, DataSource dataSource) {
public DdlImportWorker(List<String> tableNames, DataSource dataSource) {
this.dataSource = dataSource;
for (String name : tableNames) {
String filename = name + ConfigConstant.DDL_FILE_SUFFIX;
Expand Down Expand Up @@ -159,7 +160,8 @@ private void submitDDL(String sql) {
}
stmt.execute(sql);
} catch (SQLException e) {
if (stmt != null) {
String msg = e.getMessage();
if (stmt != null && !StringUtils.containsIgnoreCase(msg, "already exists")) {
int retryCount = 0;
for (; retryCount < DDL_RETRY_COUNT; retryCount++) {
try {
Expand All @@ -175,7 +177,7 @@ private void submitDDL(String sql) {
return;
}
}
logger.error("Failed to import DDL: [{}] due to [{}]", sql, e.getMessage());
logger.error("Failed to import DDL: [{}] due to [{}]", sql, msg);
} finally {
taskCount.decrementAndGet();
JdbcUtils.close(conn);
Expand Down
38 changes: 38 additions & 0 deletions batch-tool/src/test/java/preprocess/QuoteTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright [2013-2021], Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package preprocess;

import org.junit.Assert;
import org.junit.Test;
import util.FileUtil;

import java.io.ByteArrayOutputStream;

public class QuoteTest {

@Test
public void testWriteInQuote() {
final String input = "{\"id\": 123, \"task\": \"{\\\"id\\\": 999, \\\"job\\\": \\\"测试\\\"}}";
final String expectedResult =
"{\"\"id\"\": 123, \"\"task\"\": \"\"{\\\"\"id\\\"\": 999, \\\"\"job\\\"\": \\\"\"测试\\\"\"}}";
ByteArrayOutputStream os = new ByteArrayOutputStream(128);
FileUtil.writeWithQuoteEscapeInQuote(os, input.getBytes());
String result = os.toString();
Assert.assertEquals(expectedResult, result);
}

}

0 comments on commit d23c3c8

Please sign in to comment.