Skip to content

Commit

Permalink
Merge pull request #21 from F-ca7/feature/v1_3_6
Browse files Browse the repository at this point in the history
BatchTool V1.3.6
  • Loading branch information
F-ca7 authored Dec 28, 2023
2 parents bfb6f29 + fbcbc0c commit b84938f
Show file tree
Hide file tree
Showing 22 changed files with 373 additions and 77 deletions.
2 changes: 1 addition & 1 deletion batch-tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alibaba.polardbx</groupId>
<artifactId>batch-tool</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
79 changes: 78 additions & 1 deletion batch-tool/src/main/java/BatchTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,36 @@
*/

import cmd.BaseOperateCommand;
import cmd.ExportCommand;
import cmd.WriteDbCommand;
import com.alibaba.druid.pool.DruidDataSource;
import com.lmax.disruptor.RingBuffer;
import datasource.DataSourceConfig;
import datasource.DruidSource;
import exec.BaseExecutor;
import model.config.GlobalVar;
import model.stat.DebugInfo;
import model.stat.FileReaderStat;
import model.stat.SqlStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;

import java.sql.SQLException;
import java.util.List;

import static model.config.ConfigConstant.DEBUG_SIGNAL;

public class BatchTool {
private static final Logger logger = LoggerFactory.getLogger(BatchTool.class);

private DruidDataSource druid;
private BaseOperateCommand command = null;

private static final BatchTool instance = new BatchTool();

private BatchTool() {
Runtime.getRuntime().addShutdownHook(new Thread(this::destroy));
addHooks();
}

public static BatchTool getInstance() {
Expand All @@ -50,6 +62,7 @@ public void initDatasource(DataSourceConfig dataSourceConfig) throws SQLExceptio

public void doBatchOp(BaseOperateCommand command, DataSourceConfig dataSourceConfig) {
logger.info("开始批量操作...");
this.command = command;
BaseExecutor commandExecutor = BaseExecutor.getExecutor(command, dataSourceConfig, druid);
commandExecutor.preCheck();
logger.info(command.toString());
Expand All @@ -63,6 +76,70 @@ public void doBatchOp(BaseOperateCommand command, DataSourceConfig dataSourceCon
}
}

private void addHooks() {
Runtime.getRuntime().addShutdownHook(new Thread(this::destroy));
try {
Signal.handle(DEBUG_SIGNAL, sig -> {
printDebugInfo();
});
} catch (Exception e) {
logger.info("Failed to register signal handler(this can be ignored): {}", e.getMessage());
}
}

private void printDebugInfo() {
logger.warn("Collecting debug info...");
if (this.druid != null) {
logger.warn("[Druid] activeCount:{}, poolingCount:{}.",
druid.getActiveCount(), druid.getPoolingCount());
}
if (this.command == null) {
logger.warn("Batch operation is not started yet!");
return;
}
DebugInfo debugInfo = GlobalVar.DEBUG_INFO;
RingBuffer ringBuffer = debugInfo.getRingBuffer();
if (ringBuffer != null) {
logger.warn("[RingBuffer] size: {}, available: {}.",
ringBuffer.getBufferSize(), ringBuffer.remainingCapacity());
}
if (this.command instanceof ExportCommand) {
logger.warn("Detailed debug info of export operation is not supported yet!");
} else if (this.command instanceof WriteDbCommand) {
if (debugInfo.getCountDownLatch() != null && debugInfo.getRemainDataCounter() != null) {
logger.warn("[Counter] countDownLatch: {}, remainDataCount: {}.",
debugInfo.getCountDownLatch().getCount(), debugInfo.getRemainDataCounter().get());
}

List<FileReaderStat> fileReaderStatList = debugInfo.getFileReaderStatList();
if (!fileReaderStatList.isEmpty()) {
long totalCount = 0;
for (FileReaderStat fileReaderStat : fileReaderStatList) {
totalCount += fileReaderStat.getCount();
}
logger.warn("[Producer] totalCount: {}.", totalCount);
}

List<SqlStat> sqlStatList = debugInfo.getSqlStatList();
if (!sqlStatList.isEmpty()) {
SqlStat firstStat = sqlStatList.get(0);
double totalAvg = firstStat.getAvgTimeMillis();
double minAvg = totalAvg, maxAvg = totalAvg;
for (int i = 1; i < sqlStatList.size(); i++) {
double avg = sqlStatList.get(i).getAvgTimeMillis();
totalAvg += avg;
minAvg = Math.min(minAvg, avg);
maxAvg = Math.max(maxAvg, avg);
}
double avgConsumerAvg = totalAvg / sqlStatList.size();
logger.warn("[Consumer] avgSqlRtAvg: {}ms, avgSqlRtMin: {}ms, avgSqlRtMax: {}ms.",
avgConsumerAvg, minAvg, maxAvg);
}

}
logger.warn("End of debug info.");
}

private void destroy() {
if (druid != null) {
druid.close();
Expand Down
4 changes: 2 additions & 2 deletions batch-tool/src/main/java/BatchToolLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public static void main(String[] args) {
try {
handleCmd(commandLine);
} catch (Throwable e) {
e.printStackTrace();
logger.error(e.getMessage());
// 主线程异常
logger.error(e.getMessage(), e);
System.exit(1);
}
}
Expand Down
1 change: 1 addition & 0 deletions batch-tool/src/main/java/cmd/CommandUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ private static void configureConsumerContext(ConfigResult result,
consumerExecutionContext.setReadProcessFileOnly(getReadAndProcessFileOnly(result));
consumerExecutionContext.setWhereInEnabled(getWhereInEnabled(result));
consumerExecutionContext.setWithLastSep(getWithLastSep(result));
consumerExecutionContext.setQuoteEncloseMode(getQuoteEncloseMode(result));
consumerExecutionContext.setTpsLimit(getTpsLimit(result));
consumerExecutionContext.setUseColumns(getUseColumns(result));
consumerExecutionContext.setEmptyStrAsNull(getEmptyAsNull(result));
Expand Down
45 changes: 23 additions & 22 deletions batch-tool/src/main/java/exec/BaseExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
import util.SyncUtil;
import worker.MyThreadPool;
import worker.MyWorkerPool;
import worker.common.BaseDefaultConsumer;
import worker.common.BaseWorkHandler;
import worker.common.BatchLineEvent;
import worker.common.ReadFileProducer;
Expand Down Expand Up @@ -141,14 +143,18 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
}
throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName);
}
String producerType;
if (!usingBlockReader) {
producerType = "Line";
producerExecutionContext.setParallelism(fileLineRecordList.size());
} else {
producerType = "Block";
}
ThreadPoolExecutor producerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getName() + "-producer",
ThreadPoolExecutor producerThreadPool = MyThreadPool.createExecutorWithEnsure(producerType + "-producer",
producerExecutionContext.getParallelism());
producerExecutionContext.setProducerExecutor(producerThreadPool);
CountDownLatch countDownLatch = new CountDownLatch(producerExecutionContext.getParallelism());
AtomicInteger emittedDataCounter = new AtomicInteger(0);
CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(producerExecutionContext.getParallelism());
AtomicInteger emittedDataCounter = SyncUtil.newRemainDataCounter();
List<ConcurrentHashMap<Long, AtomicInteger>> eventCounter = new ArrayList<>();
for (int i = 0; i < producerExecutionContext.getDataFileLineRecordList().size(); i++) {
eventCounter.add(new ConcurrentHashMap<>(16));
Expand All @@ -168,7 +174,7 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
/ (consumerNum * GlobalVar.EMIT_BATCH_SIZE));


ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getName() + "-consumer",
ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer",
consumerNum);
EventFactory<BatchLineEvent> factory = BatchLineEvent::new;
RingBuffer<BatchLineEvent> ringBuffer = MyWorkerPool.createRingBuffer(factory);
Expand All @@ -189,18 +195,20 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
BaseWorkHandler[] consumers = new BaseWorkHandler[consumerNum];
try {
for (int i = 0; i < consumerNum; i++) {
consumers[i] = clazz.newInstance();
consumers[i].setConsumerContext(consumerExecutionContext);
consumers[i].createTpsLimiter(consumerExecutionContext.getBatchTpsLimitPerConsumer());
consumers[i].setTableName(tableName);
BaseWorkHandler consumer = clazz.newInstance();
consumers[i] = consumer;
consumer.setConsumerContext(consumerExecutionContext);
consumer.createTpsLimiter(consumerExecutionContext.getBatchTpsLimitPerConsumer());
consumer.setTableName(tableName);
if (consumer instanceof BaseDefaultConsumer) {
GlobalVar.DEBUG_INFO.addSqlStat(((BaseDefaultConsumer) consumer).getSqlStat());
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
throw new RuntimeException(e);
}


logger.debug("producer config {}", producerExecutionContext);
logger.debug("consumer config {}", consumerExecutionContext);

Expand All @@ -210,9 +218,8 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
try {
producer.produce();
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
System.exit(1);
throw new RuntimeException(e);
}
// 开启断点续传和insert ignore,并且不是测试读性能模式,才开始记录断点
if (usingBlockReader
Expand Down Expand Up @@ -281,19 +288,13 @@ private List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathL
* 等待生产者、消费者结束
*
* @param countDownLatch 生产者结束标志
* @param emittedDataCounter 消费者结束标志
* @param remainDataCounter 消费者结束标志
*/
protected void waitForFinish(CountDownLatch countDownLatch, AtomicInteger emittedDataCounter) {
protected void waitForFinish(CountDownLatch countDownLatch, AtomicInteger remainDataCounter) {
try {
// 等待生产者结束
countDownLatch.await();
// 等待消费者消费完成
int remain;
while ((remain = emittedDataCounter.get()) > 0) {
Thread.sleep(500);
}
SyncUtil.waitForFinish(countDownLatch, remainDataCounter);
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("Interrupted when waiting for finish", e);
}
onWorkFinished();
}
Expand Down
15 changes: 6 additions & 9 deletions batch-tool/src/main/java/exec/ImportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
import util.SyncUtil;
import worker.MyThreadPool;
import worker.MyWorkerPool;
import worker.ddl.DdlImportWorker;
Expand Down Expand Up @@ -86,8 +87,7 @@ private void checkDbNotExist(String dbName) {
dbName));
}
} catch (SQLException | DatabaseException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
throw new RuntimeException(e);
}
}

Expand All @@ -99,8 +99,7 @@ private void checkTableNotExist(List<String> tableNames) {
tableNames));
}
} catch (SQLException | DatabaseException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
throw new RuntimeException(e);
}
}
}
Expand Down Expand Up @@ -184,7 +183,7 @@ private void handleBenchmark(List<String> tableNames) {

private void handleTpchImport(List<String> tableNames) {
int producerParallelism = producerExecutionContext.getParallelism();
AtomicInteger emittedDataCounter = new AtomicInteger(0);
AtomicInteger emittedDataCounter = SyncUtil.newRemainDataCounter();

ThreadPoolExecutor producerThreadPool = MyThreadPool.createExecutorExact(TpchProducer.class.getSimpleName(),
producerParallelism);
Expand All @@ -200,9 +199,8 @@ private void handleTpchImport(List<String> tableNames) {

EventFactory<BatchInsertSqlEvent> factory = BatchInsertSqlEvent::new;
RingBuffer<BatchInsertSqlEvent> ringBuffer = MyWorkerPool.createRingBuffer(factory);

TpchProducer tpchProducer = new TpchProducer(producerExecutionContext, tableNames, ringBuffer);
CountDownLatch countDownLatch = new CountDownLatch(tpchProducer.getWorkerCount());
CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(tpchProducer.getWorkerCount());
producerExecutionContext.setCountDownLatch(countDownLatch);

TpchConsumer[] consumers = new TpchConsumer[consumerParallelism];
Expand All @@ -211,7 +209,6 @@ private void handleTpchImport(List<String> tableNames) {
consumers[i] = new TpchConsumer(consumerExecutionContext);
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -263,7 +260,7 @@ private void doSingleThreadImport(String tableName) {
try {
importThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("Interrupted when waiting for finish", e);
}
}

Expand Down
16 changes: 7 additions & 9 deletions batch-tool/src/main/java/exec/export/OrderByExportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;
import util.DbUtil;
import util.FileUtil;
import util.SyncUtil;
import worker.MyThreadPool;
import worker.export.order.DirectOrderExportWorker;
import worker.export.order.LocalOrderByExportProducer;
Expand Down Expand Up @@ -141,13 +142,12 @@ private void doExportWithOrderByLocal() {
try {
consumer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("Interrupted when waiting for finish", e);
}
executor.shutdown();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
e.printStackTrace();
logger.error(e.getMessage());
logger.error(e.getMessage(), e);
}
}

Expand All @@ -169,8 +169,7 @@ private void handleExportWithOrderByFromDb() {
directOrderByExportWorker.exportSerially();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
e.printStackTrace();
logger.error(e.getMessage());
logger.error(e.getMessage(), e);
}
}
}
Expand All @@ -195,7 +194,7 @@ private void handleExportWithOrderByParallelMerge() {
ExecutorService executor = MyThreadPool.createExecutorWithEnsure(APP_NAME, shardSize);
LocalOrderByExportProducer orderByExportProducer;
LinkedList[] orderedLists = new LinkedList[shardSize];
CountDownLatch countDownLatch = new CountDownLatch(shardSize);
CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(shardSize);
for (int i = 0; i < shardSize; i++) {
orderedLists[i] = new LinkedList<ParallelOrderByExportEvent>();
orderByExportProducer = new LocalOrderByExportProducer(dataSource, topologyList.get(i),
Expand Down Expand Up @@ -230,13 +229,12 @@ private void handleExportWithOrderByParallelMerge() {
countDownLatch.await();
consumer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("Interrupted when waiting for finish", e);
}
executor.shutdown();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
e.printStackTrace();
logger.error(e.getMessage());
logger.error(e.getMessage(), e);
}
}
}
Expand Down
Loading

0 comments on commit b84938f

Please sign in to comment.