diff --git a/batch-tool/pom.xml b/batch-tool/pom.xml
index 2453df3..e7939ce 100644
--- a/batch-tool/pom.xml
+++ b/batch-tool/pom.xml
@@ -6,7 +6,7 @@
com.alibaba.polardbx
batch-tool
- 1.3.5
+ 1.3.6
UTF-8
diff --git a/batch-tool/src/main/java/BatchTool.java b/batch-tool/src/main/java/BatchTool.java
index 6be4412..90a66cc 100644
--- a/batch-tool/src/main/java/BatchTool.java
+++ b/batch-tool/src/main/java/BatchTool.java
@@ -15,24 +15,36 @@
*/
import cmd.BaseOperateCommand;
+import cmd.ExportCommand;
+import cmd.WriteDbCommand;
import com.alibaba.druid.pool.DruidDataSource;
+import com.lmax.disruptor.RingBuffer;
import datasource.DataSourceConfig;
import datasource.DruidSource;
import exec.BaseExecutor;
+import model.config.GlobalVar;
+import model.stat.DebugInfo;
+import model.stat.FileReaderStat;
+import model.stat.SqlStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import sun.misc.Signal;
import java.sql.SQLException;
+import java.util.List;
+
+import static model.config.ConfigConstant.DEBUG_SIGNAL;
public class BatchTool {
private static final Logger logger = LoggerFactory.getLogger(BatchTool.class);
private DruidDataSource druid;
+ private BaseOperateCommand command = null;
private static final BatchTool instance = new BatchTool();
private BatchTool() {
- Runtime.getRuntime().addShutdownHook(new Thread(this::destroy));
+ addHooks();
}
public static BatchTool getInstance() {
@@ -50,6 +62,7 @@ public void initDatasource(DataSourceConfig dataSourceConfig) throws SQLExceptio
public void doBatchOp(BaseOperateCommand command, DataSourceConfig dataSourceConfig) {
logger.info("开始批量操作...");
+ this.command = command;
BaseExecutor commandExecutor = BaseExecutor.getExecutor(command, dataSourceConfig, druid);
commandExecutor.preCheck();
logger.info(command.toString());
@@ -63,6 +76,70 @@ public void doBatchOp(BaseOperateCommand command, DataSourceConfig dataSourceCon
}
}
+ private void addHooks() {
+ Runtime.getRuntime().addShutdownHook(new Thread(this::destroy));
+ try {
+ Signal.handle(DEBUG_SIGNAL, sig -> {
+ printDebugInfo();
+ });
+ } catch (Exception e) {
+ logger.info("Failed to register signal handler(this can be ignored): {}", e.getMessage());
+ }
+ }
+
+ private void printDebugInfo() {
+ logger.warn("Collecting debug info...");
+ if (this.druid != null) {
+ logger.warn("[Druid] activeCount:{}, poolingCount:{}.",
+ druid.getActiveCount(), druid.getPoolingCount());
+ }
+ if (this.command == null) {
+ logger.warn("Batch operation is not started yet!");
+ return;
+ }
+ DebugInfo debugInfo = GlobalVar.DEBUG_INFO;
+ RingBuffer ringBuffer = debugInfo.getRingBuffer();
+ if (ringBuffer != null) {
+ logger.warn("[RingBuffer] size: {}, available: {}.",
+ ringBuffer.getBufferSize(), ringBuffer.remainingCapacity());
+ }
+ if (this.command instanceof ExportCommand) {
+ logger.warn("Detailed debug info of export operation is not supported yet!");
+ } else if (this.command instanceof WriteDbCommand) {
+ if (debugInfo.getCountDownLatch() != null && debugInfo.getRemainDataCounter() != null) {
+ logger.warn("[Counter] countDownLatch: {}, remainDataCount: {}.",
+ debugInfo.getCountDownLatch().getCount(), debugInfo.getRemainDataCounter().get());
+ }
+
+ List fileReaderStatList = debugInfo.getFileReaderStatList();
+ if (!fileReaderStatList.isEmpty()) {
+ long totalCount = 0;
+ for (FileReaderStat fileReaderStat : fileReaderStatList) {
+ totalCount += fileReaderStat.getCount();
+ }
+ logger.warn("[Producer] totalCount: {}.", totalCount);
+ }
+
+ List sqlStatList = debugInfo.getSqlStatList();
+ if (!sqlStatList.isEmpty()) {
+ SqlStat firstStat = sqlStatList.get(0);
+ double totalAvg = firstStat.getAvgTimeMillis();
+ double minAvg = totalAvg, maxAvg = totalAvg;
+ for (int i = 1; i < sqlStatList.size(); i++) {
+ double avg = sqlStatList.get(i).getAvgTimeMillis();
+ totalAvg += avg;
+ minAvg = Math.min(minAvg, avg);
+ maxAvg = Math.max(maxAvg, avg);
+ }
+ double avgConsumerAvg = totalAvg / sqlStatList.size();
+ logger.warn("[Consumer] avgSqlRtAvg: {}ms, avgSqlRtMin: {}ms, avgSqlRtMax: {}ms.",
+ avgConsumerAvg, minAvg, maxAvg);
+ }
+
+ }
+ logger.warn("End of debug info.");
+ }
+
private void destroy() {
if (druid != null) {
druid.close();
diff --git a/batch-tool/src/main/java/BatchToolLauncher.java b/batch-tool/src/main/java/BatchToolLauncher.java
index 9680a25..d600509 100644
--- a/batch-tool/src/main/java/BatchToolLauncher.java
+++ b/batch-tool/src/main/java/BatchToolLauncher.java
@@ -44,8 +44,8 @@ public static void main(String[] args) {
try {
handleCmd(commandLine);
} catch (Throwable e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ // 主线程异常
+ logger.error(e.getMessage(), e);
System.exit(1);
}
}
diff --git a/batch-tool/src/main/java/cmd/CommandUtil.java b/batch-tool/src/main/java/cmd/CommandUtil.java
index f8730cb..0ef7fb3 100644
--- a/batch-tool/src/main/java/cmd/CommandUtil.java
+++ b/batch-tool/src/main/java/cmd/CommandUtil.java
@@ -599,6 +599,7 @@ private static void configureConsumerContext(ConfigResult result,
consumerExecutionContext.setReadProcessFileOnly(getReadAndProcessFileOnly(result));
consumerExecutionContext.setWhereInEnabled(getWhereInEnabled(result));
consumerExecutionContext.setWithLastSep(getWithLastSep(result));
+ consumerExecutionContext.setQuoteEncloseMode(getQuoteEncloseMode(result));
consumerExecutionContext.setTpsLimit(getTpsLimit(result));
consumerExecutionContext.setUseColumns(getUseColumns(result));
consumerExecutionContext.setEmptyStrAsNull(getEmptyAsNull(result));
diff --git a/batch-tool/src/main/java/exec/BaseExecutor.java b/batch-tool/src/main/java/exec/BaseExecutor.java
index 6caef72..feb45b8 100644
--- a/batch-tool/src/main/java/exec/BaseExecutor.java
+++ b/batch-tool/src/main/java/exec/BaseExecutor.java
@@ -40,8 +40,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
+import util.SyncUtil;
import worker.MyThreadPool;
import worker.MyWorkerPool;
+import worker.common.BaseDefaultConsumer;
import worker.common.BaseWorkHandler;
import worker.common.BatchLineEvent;
import worker.common.ReadFileProducer;
@@ -141,14 +143,18 @@ protected void configureCommonContextAndRun(Class extends BaseWorkHandler> cla
}
throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName);
}
+ String producerType;
if (!usingBlockReader) {
+ producerType = "Line";
producerExecutionContext.setParallelism(fileLineRecordList.size());
+ } else {
+ producerType = "Block";
}
- ThreadPoolExecutor producerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getName() + "-producer",
+ ThreadPoolExecutor producerThreadPool = MyThreadPool.createExecutorWithEnsure(producerType + "-producer",
producerExecutionContext.getParallelism());
producerExecutionContext.setProducerExecutor(producerThreadPool);
- CountDownLatch countDownLatch = new CountDownLatch(producerExecutionContext.getParallelism());
- AtomicInteger emittedDataCounter = new AtomicInteger(0);
+ CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(producerExecutionContext.getParallelism());
+ AtomicInteger emittedDataCounter = SyncUtil.newRemainDataCounter();
List> eventCounter = new ArrayList<>();
for (int i = 0; i < producerExecutionContext.getDataFileLineRecordList().size(); i++) {
eventCounter.add(new ConcurrentHashMap<>(16));
@@ -168,7 +174,7 @@ protected void configureCommonContextAndRun(Class extends BaseWorkHandler> cla
/ (consumerNum * GlobalVar.EMIT_BATCH_SIZE));
- ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getName() + "-consumer",
+ ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer",
consumerNum);
EventFactory factory = BatchLineEvent::new;
RingBuffer ringBuffer = MyWorkerPool.createRingBuffer(factory);
@@ -189,18 +195,20 @@ protected void configureCommonContextAndRun(Class extends BaseWorkHandler> cla
BaseWorkHandler[] consumers = new BaseWorkHandler[consumerNum];
try {
for (int i = 0; i < consumerNum; i++) {
- consumers[i] = clazz.newInstance();
- consumers[i].setConsumerContext(consumerExecutionContext);
- consumers[i].createTpsLimiter(consumerExecutionContext.getBatchTpsLimitPerConsumer());
- consumers[i].setTableName(tableName);
+ BaseWorkHandler consumer = clazz.newInstance();
+ consumers[i] = consumer;
+ consumer.setConsumerContext(consumerExecutionContext);
+ consumer.createTpsLimiter(consumerExecutionContext.getBatchTpsLimitPerConsumer());
+ consumer.setTableName(tableName);
+ if (consumer instanceof BaseDefaultConsumer) {
+ GlobalVar.DEBUG_INFO.addSqlStat(((BaseDefaultConsumer) consumer).getSqlStat());
+ }
}
} catch (Exception e) {
- e.printStackTrace();
logger.error(e.getMessage());
throw new RuntimeException(e);
}
-
logger.debug("producer config {}", producerExecutionContext);
logger.debug("consumer config {}", consumerExecutionContext);
@@ -210,9 +218,8 @@ protected void configureCommonContextAndRun(Class extends BaseWorkHandler> cla
try {
producer.produce();
} catch (Exception e) {
- e.printStackTrace();
logger.error(e.getMessage());
- System.exit(1);
+ throw new RuntimeException(e);
}
// 开启断点续传和insert ignore,并且不是测试读性能模式,才开始记录断点
if (usingBlockReader
@@ -281,19 +288,13 @@ private List getFileRecordList(List allFilePathL
* 等待生产者、消费者结束
*
* @param countDownLatch 生产者结束标志
- * @param emittedDataCounter 消费者结束标志
+ * @param remainDataCounter 消费者结束标志
*/
- protected void waitForFinish(CountDownLatch countDownLatch, AtomicInteger emittedDataCounter) {
+ protected void waitForFinish(CountDownLatch countDownLatch, AtomicInteger remainDataCounter) {
try {
- // 等待生产者结束
- countDownLatch.await();
- // 等待消费者消费完成
- int remain;
- while ((remain = emittedDataCounter.get()) > 0) {
- Thread.sleep(500);
- }
+ SyncUtil.waitForFinish(countDownLatch, remainDataCounter);
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("Interrupted when waiting for finish", e);
}
onWorkFinished();
}
diff --git a/batch-tool/src/main/java/exec/ImportExecutor.java b/batch-tool/src/main/java/exec/ImportExecutor.java
index 1af5bba..3914232 100644
--- a/batch-tool/src/main/java/exec/ImportExecutor.java
+++ b/batch-tool/src/main/java/exec/ImportExecutor.java
@@ -30,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
+import util.SyncUtil;
import worker.MyThreadPool;
import worker.MyWorkerPool;
import worker.ddl.DdlImportWorker;
@@ -86,8 +87,7 @@ private void checkDbNotExist(String dbName) {
dbName));
}
} catch (SQLException | DatabaseException e) {
- e.printStackTrace();
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
}
@@ -99,8 +99,7 @@ private void checkTableNotExist(List tableNames) {
tableNames));
}
} catch (SQLException | DatabaseException e) {
- e.printStackTrace();
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
}
}
@@ -184,7 +183,7 @@ private void handleBenchmark(List tableNames) {
private void handleTpchImport(List tableNames) {
int producerParallelism = producerExecutionContext.getParallelism();
- AtomicInteger emittedDataCounter = new AtomicInteger(0);
+ AtomicInteger emittedDataCounter = SyncUtil.newRemainDataCounter();
ThreadPoolExecutor producerThreadPool = MyThreadPool.createExecutorExact(TpchProducer.class.getSimpleName(),
producerParallelism);
@@ -200,9 +199,8 @@ private void handleTpchImport(List tableNames) {
EventFactory factory = BatchInsertSqlEvent::new;
RingBuffer ringBuffer = MyWorkerPool.createRingBuffer(factory);
-
TpchProducer tpchProducer = new TpchProducer(producerExecutionContext, tableNames, ringBuffer);
- CountDownLatch countDownLatch = new CountDownLatch(tpchProducer.getWorkerCount());
+ CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(tpchProducer.getWorkerCount());
producerExecutionContext.setCountDownLatch(countDownLatch);
TpchConsumer[] consumers = new TpchConsumer[consumerParallelism];
@@ -211,7 +209,6 @@ private void handleTpchImport(List tableNames) {
consumers[i] = new TpchConsumer(consumerExecutionContext);
}
} catch (Exception e) {
- e.printStackTrace();
logger.error(e.getMessage());
throw new RuntimeException(e);
}
@@ -263,7 +260,7 @@ private void doSingleThreadImport(String tableName) {
try {
importThread.join();
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("Interrupted when waiting for finish", e);
}
}
diff --git a/batch-tool/src/main/java/exec/export/OrderByExportExecutor.java b/batch-tool/src/main/java/exec/export/OrderByExportExecutor.java
index 0f20f27..1418797 100644
--- a/batch-tool/src/main/java/exec/export/OrderByExportExecutor.java
+++ b/batch-tool/src/main/java/exec/export/OrderByExportExecutor.java
@@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;
import util.DbUtil;
import util.FileUtil;
+import util.SyncUtil;
import worker.MyThreadPool;
import worker.export.order.DirectOrderExportWorker;
import worker.export.order.LocalOrderByExportProducer;
@@ -141,13 +142,12 @@ private void doExportWithOrderByLocal() {
try {
consumer.consume();
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("Interrupted when waiting for finish", e);
}
executor.shutdown();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ logger.error(e.getMessage(), e);
}
}
@@ -169,8 +169,7 @@ private void handleExportWithOrderByFromDb() {
directOrderByExportWorker.exportSerially();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ logger.error(e.getMessage(), e);
}
}
}
@@ -195,7 +194,7 @@ private void handleExportWithOrderByParallelMerge() {
ExecutorService executor = MyThreadPool.createExecutorWithEnsure(APP_NAME, shardSize);
LocalOrderByExportProducer orderByExportProducer;
LinkedList[] orderedLists = new LinkedList[shardSize];
- CountDownLatch countDownLatch = new CountDownLatch(shardSize);
+ CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(shardSize);
for (int i = 0; i < shardSize; i++) {
orderedLists[i] = new LinkedList();
orderByExportProducer = new LocalOrderByExportProducer(dataSource, topologyList.get(i),
@@ -230,13 +229,12 @@ private void handleExportWithOrderByParallelMerge() {
countDownLatch.await();
consumer.consume();
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("Interrupted when waiting for finish", e);
}
executor.shutdown();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ logger.error(e.getMessage(), e);
}
}
}
diff --git a/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java b/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java
index 8179956..fef72d8 100644
--- a/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java
+++ b/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java
@@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;
import util.DbUtil;
import util.FileUtil;
+import util.SyncUtil;
import worker.MyThreadPool;
import worker.MyWorkerPool;
import worker.export.CollectFragmentWorker;
@@ -103,7 +104,7 @@ private void doExportWithSharding(String tableName) {
ExecutorService executor = MyThreadPool.createExecutorWithEnsure(APP_NAME, shardSize);
DirectExportWorker directExportWorker;
- CountDownLatch countDownLatch = new CountDownLatch(shardSize);
+ CountDownLatch countDownLatch = SyncUtil.newMainCountDownLatch(shardSize);
switch (config.getExportWay()) {
case MAX_LINE_NUM_IN_SINGLE_FILE:
case DEFAULT:
@@ -118,7 +119,7 @@ private void doExportWithSharding(String tableName) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("Interrupted when waiting for finish", e);
}
break;
case FIXED_FILE_NUM:
@@ -131,7 +132,7 @@ private void doExportWithSharding(String tableName) {
executor.shutdown();
logger.info("导出 {} 数据完成", tableName);
} catch (DatabaseException | SQLException e) {
- e.printStackTrace();
+ logger.error(e.getMessage(), e);
}
}
@@ -150,7 +151,7 @@ private void shardingExportWithFixedFile(List topologyList,
// 初始化缓冲区等
EventFactory factory = ExportEvent::new;
RingBuffer ringBuffer = MyWorkerPool.createRingBuffer(factory);
- AtomicInteger emittedDataCounter = new AtomicInteger(0);
+ AtomicInteger emittedDataCounter = SyncUtil.newRemainDataCounter();
// 消费者数量与文件数一致 生产者数量和shard数一致
final int consumerCount = config.getLimitNum(), producerCount = shardSize;
@@ -203,7 +204,7 @@ private void shardingExportWithFixedFile(List topologyList,
// 待消费者消费结束
waitForFinish(countDownLatch, emittedDataCounter);
workerPool.drainAndHalt();
- CountDownLatch fragmentCountLatch = new CountDownLatch(consumerCount);
+ CountDownLatch fragmentCountLatch = SyncUtil.newMainCountDownLatch(consumerCount);
// 再将碎片一次分配给每个文件
for (int i = 0; i < consumerCount; i++) {
CollectFragmentWorker collectFragmentWorker = new CollectFragmentWorker(
@@ -213,7 +214,7 @@ private void shardingExportWithFixedFile(List topologyList,
try {
fragmentCountLatch.await();
} catch (InterruptedException e) {
- e.printStackTrace();
+ logger.error("Interrupted when waiting for finish", e);
}
}
for (ExportConsumer consumer : consumers) {
diff --git a/batch-tool/src/main/java/exec/export/SingleThreadExportExecutor.java b/batch-tool/src/main/java/exec/export/SingleThreadExportExecutor.java
index 81691bf..4fddcd2 100644
--- a/batch-tool/src/main/java/exec/export/SingleThreadExportExecutor.java
+++ b/batch-tool/src/main/java/exec/export/SingleThreadExportExecutor.java
@@ -71,8 +71,7 @@ private void doDefaultExport(String tableName, ExecutorService executor) {
executor.submit(directExportWorker);
logger.info("开始导出表 {} 到文件 {}", tableName, fileName);
} catch (DatabaseException | SQLException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ logger.error(e.getMessage(), e);
}
}
}
diff --git a/batch-tool/src/main/java/model/CyclicAtomicInteger.java b/batch-tool/src/main/java/model/CyclicAtomicInteger.java
index ef9649f..f703ecb 100644
--- a/batch-tool/src/main/java/model/CyclicAtomicInteger.java
+++ b/batch-tool/src/main/java/model/CyclicAtomicInteger.java
@@ -21,8 +21,8 @@
public class CyclicAtomicInteger {
private final static long PARK_TIME = 1000 * 1000L;
- private AtomicInteger counter;
- private int range;
+ private final AtomicInteger counter;
+ private final int range;
public CyclicAtomicInteger(int range) {
this.counter = new AtomicInteger(0);
diff --git a/batch-tool/src/main/java/model/config/ConfigConstant.java b/batch-tool/src/main/java/model/config/ConfigConstant.java
index 1327a3e..1409f34 100644
--- a/batch-tool/src/main/java/model/config/ConfigConstant.java
+++ b/batch-tool/src/main/java/model/config/ConfigConstant.java
@@ -16,6 +16,8 @@
package model.config;
+import sun.misc.Signal;
+
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -25,6 +27,8 @@ public class ConfigConstant {
public static final String APP_NAME = "BatchTool";
+ public static final Signal DEBUG_SIGNAL = new Signal("USR2");
+
public static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
/**
* 默认分隔符
diff --git a/batch-tool/src/main/java/model/config/GlobalVar.java b/batch-tool/src/main/java/model/config/GlobalVar.java
index 3a039de..4de9c17 100644
--- a/batch-tool/src/main/java/model/config/GlobalVar.java
+++ b/batch-tool/src/main/java/model/config/GlobalVar.java
@@ -16,12 +16,14 @@
package model.config;
+import model.stat.DebugInfo;
+
public class GlobalVar {
/**
* 发送一批数据的元组数
*/
- public static int EMIT_BATCH_SIZE = 5;
+ public static int EMIT_BATCH_SIZE = 200;
/**
* RingBuffer 缓冲区大小
@@ -41,4 +43,6 @@ public class GlobalVar {
public static int DDL_PARALLELISM = 4;
public static final boolean DEBUG_MODE = false;
+
+ public static final DebugInfo DEBUG_INFO = new DebugInfo();
}
diff --git a/batch-tool/src/main/java/model/stat/DebugInfo.java b/batch-tool/src/main/java/model/stat/DebugInfo.java
new file mode 100644
index 0000000..f2f624c
--- /dev/null
+++ b/batch-tool/src/main/java/model/stat/DebugInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright [2013-2021], Alibaba Group Holding Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model.stat;
+
+import com.lmax.disruptor.RingBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DebugInfo {
+
+ private final List sqlStatList = new ArrayList<>();
+ private final List fileReaderStatList = new ArrayList<>();
+ private RingBuffer ringBuffer;
+ private CountDownLatch countDownLatch;
+ private AtomicInteger remainDataCounter;
+
+ public List getFileReaderStatList() {
+ return fileReaderStatList;
+ }
+
+ public List getSqlStatList() {
+ return sqlStatList;
+ }
+
+ public void addFileReaderStat(FileReaderStat fileReaderStat) {
+ this.fileReaderStatList.add(fileReaderStat);
+ }
+
+ public void addSqlStat(SqlStat sqlStat) {
+ this.sqlStatList.add(sqlStat);
+ }
+
+ public AtomicInteger getRemainDataCounter() {
+ return remainDataCounter;
+ }
+
+ public void setRemainDataCounter(AtomicInteger remainDataCounter) {
+ this.remainDataCounter = remainDataCounter;
+ }
+
+ public RingBuffer getRingBuffer() {
+ return ringBuffer;
+ }
+
+ public void setRingBuffer(RingBuffer ringBuffer) {
+ this.ringBuffer = ringBuffer;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+}
diff --git a/batch-tool/src/main/java/model/stat/FileReaderStat.java b/batch-tool/src/main/java/model/stat/FileReaderStat.java
new file mode 100644
index 0000000..3f74750
--- /dev/null
+++ b/batch-tool/src/main/java/model/stat/FileReaderStat.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright [2013-2021], Alibaba Group Holding Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model.stat;
+
+public class FileReaderStat {
+
+ private long startTimeMillis = 0;
+ /**
+ * LineCount or BlockCount
+ */
+ private long count = 0;
+
+ public void start() {
+ this.startTimeMillis = System.currentTimeMillis();
+ this.count = 0;
+ }
+
+ public void increment() {
+ this.count++;
+ }
+
+ public long getStartTimeMillis() {
+ return startTimeMillis;
+ }
+
+ public long getCount() {
+ return count;
+ }
+}
diff --git a/batch-tool/src/main/java/model/stat/SqlStat.java b/batch-tool/src/main/java/model/stat/SqlStat.java
new file mode 100644
index 0000000..1e81597
--- /dev/null
+++ b/batch-tool/src/main/java/model/stat/SqlStat.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright [2013-2021], Alibaba Group Holding Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model.stat;
+
+import java.util.concurrent.TimeUnit;
+
+public class SqlStat {
+
+ private long totalTimeNanos = 0;
+ private long count = 0;
+
+ public synchronized void addTimeNs(long timeNanos) {
+ totalTimeNanos += timeNanos;
+ count++;
+ }
+
+ public synchronized double getAvgTimeMillis() {
+ return count == 0 ? 0 : (double) TimeUnit.NANOSECONDS.toMillis(totalTimeNanos) / count;
+ }
+
+ public long getCount() {
+ return count;
+ }
+}
diff --git a/batch-tool/src/main/java/util/SyncUtil.java b/batch-tool/src/main/java/util/SyncUtil.java
new file mode 100644
index 0000000..ed218c8
--- /dev/null
+++ b/batch-tool/src/main/java/util/SyncUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright [2013-2021], Alibaba Group Holding Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util;
+
+import model.config.GlobalVar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncUtil {
+
+ public static CountDownLatch newMainCountDownLatch(int count) {
+ CountDownLatch countDownLatch = new CountDownLatch(count);
+ GlobalVar.DEBUG_INFO.setCountDownLatch(countDownLatch);
+ return countDownLatch;
+ }
+
+ public static AtomicInteger newRemainDataCounter() {
+ AtomicInteger remainDataCounter = new AtomicInteger(0);
+ GlobalVar.DEBUG_INFO.setRemainDataCounter(remainDataCounter);
+ return remainDataCounter;
+ }
+
+ public static void waitForFinish(CountDownLatch countDownLatch, AtomicInteger remainDataCounter)
+ throws InterruptedException {
+ // 等待生产者结束
+ countDownLatch.await();
+ // 等待消费者消费完成
+ int remain;
+ while ((remain = remainDataCounter.get()) > 0) {
+ Thread.sleep(500);
+ }
+ }
+}
diff --git a/batch-tool/src/main/java/worker/MyWorkerPool.java b/batch-tool/src/main/java/worker/MyWorkerPool.java
index a42e005..38a0abe 100644
--- a/batch-tool/src/main/java/worker/MyWorkerPool.java
+++ b/batch-tool/src/main/java/worker/MyWorkerPool.java
@@ -23,14 +23,17 @@
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.ProducerType;
+import model.config.GlobalVar;
import static model.config.GlobalVar.DEFAULT_RING_BUFFER_SIZE;
public class MyWorkerPool {
public static RingBuffer createRingBuffer(EventFactory factory) {
- return RingBuffer.create(ProducerType.MULTI,
+ RingBuffer ringBuffer = RingBuffer.create(ProducerType.MULTI,
factory, DEFAULT_RING_BUFFER_SIZE, new BlockingWaitStrategy());
+ GlobalVar.DEBUG_INFO.setRingBuffer(ringBuffer);
+ return ringBuffer;
}
public static RingBuffer createSingleProducerRingBuffer(EventFactory factory) {
diff --git a/batch-tool/src/main/java/worker/NamedThreadFactory.java b/batch-tool/src/main/java/worker/NamedThreadFactory.java
index e9b2394..5b3762c 100644
--- a/batch-tool/src/main/java/worker/NamedThreadFactory.java
+++ b/batch-tool/src/main/java/worker/NamedThreadFactory.java
@@ -31,17 +31,9 @@ public class NamedThreadFactory implements ThreadFactory {
private final String namePrefix;
private final boolean isDaemon;
private final Thread.UncaughtExceptionHandler handler = (t, e) -> {
- logger.error("{}", e);
+ logger.error(e.getMessage(), e);
};
- public NamedThreadFactory() {
- this("pool");
- }
-
- public NamedThreadFactory(String prefix) {
- this(prefix, false);
- }
-
public NamedThreadFactory(String prefix, boolean daemon) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
diff --git a/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java b/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java
index 66bd8c6..b3fa4d8 100644
--- a/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java
+++ b/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java
@@ -18,6 +18,7 @@
import com.alibaba.druid.util.JdbcUtils;
import com.alibaba.druid.util.StringUtils;
+import model.stat.SqlStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.FileUtil;
@@ -36,6 +37,7 @@ public abstract class BaseDefaultConsumer extends BaseWorkHandler {
private static final Logger logger = LoggerFactory.getLogger(BaseDefaultConsumer.class);
protected int estimateFieldCount = 16;
+ protected final SqlStat sqlStat = new SqlStat();
protected void initLocalVars() {
super.initLocalVars();
@@ -99,7 +101,10 @@ protected void execSql(StringBuilder data) throws SQLException {
conn = consumerContext.getDataSource().getConnection();
stmt = conn.createStatement();
sql = getSql(data);
+ long startTime = System.nanoTime();
stmt.execute(sql);
+ long endTime = System.nanoTime();
+ sqlStat.addTimeNs(endTime - startTime);
} catch (SQLException e) {
// logger.error(sql);
throw e;
@@ -108,4 +113,8 @@ protected void execSql(StringBuilder data) throws SQLException {
JdbcUtils.close(conn);
}
}
+
+ public SqlStat getSqlStat() {
+ return sqlStat;
+ }
}
diff --git a/batch-tool/src/main/java/worker/common/BaseWorkHandler.java b/batch-tool/src/main/java/worker/common/BaseWorkHandler.java
index 85ec703..ceb194a 100644
--- a/batch-tool/src/main/java/worker/common/BaseWorkHandler.java
+++ b/batch-tool/src/main/java/worker/common/BaseWorkHandler.java
@@ -21,6 +21,7 @@
import model.ConsumerExecutionContext;
import model.config.ConfigConstant;
import model.config.GlobalVar;
+import model.config.QuoteEncloseMode;
/**
* 限流代理类
@@ -39,15 +40,20 @@ public abstract class BaseWorkHandler implements WorkHandler {
protected void initLocalVars() {
if (GlobalVar.IN_PERF_MODE) {
this.sep = consumerContext.getSeparator();
- hasEscapedQuote = true;
+ this.hasEscapedQuote = true;
+ return;
+ }
+ if (consumerContext.getQuoteEncloseMode() == QuoteEncloseMode.AUTO) {
+ this.sep = consumerContext.getSeparator();
+ this.hasEscapedQuote = true;
return;
}
if (consumerContext.isUseMagicSeparator()) {
this.sep = ConfigConstant.MAGIC_CSV_SEP1;
- hasEscapedQuote = true;
+ this.hasEscapedQuote = true;
} else {
this.sep = consumerContext.getSeparator();
- hasEscapedQuote = false;
+ this.hasEscapedQuote = false;
}
}
diff --git a/batch-tool/src/main/java/worker/common/reader/FileBufferedBatchReader.java b/batch-tool/src/main/java/worker/common/reader/FileBufferedBatchReader.java
index ac91856..2af5d51 100644
--- a/batch-tool/src/main/java/worker/common/reader/FileBufferedBatchReader.java
+++ b/batch-tool/src/main/java/worker/common/reader/FileBufferedBatchReader.java
@@ -20,6 +20,8 @@
import model.ProducerExecutionContext;
import model.config.CompressMode;
import model.config.ConfigConstant;
+import model.config.GlobalVar;
+import model.stat.FileReaderStat;
import worker.common.BatchLineEvent;
import java.io.File;
@@ -31,6 +33,7 @@ public abstract class FileBufferedBatchReader implements Runnable {
protected final RingBuffer ringBuffer;
protected int bufferedLineCount = 0;
+ protected final FileReaderStat fileReaderStat = new FileReaderStat();
protected String[] lineBuffer;
protected volatile int localProcessingFileIndex;
protected long localProcessingBlockIndex = -1;
@@ -52,6 +55,7 @@ protected FileBufferedBatchReader(ProducerExecutionContext context,
this.fileList = fileList;
this.lineBuffer = new String[EMIT_BATCH_SIZE];
this.compressMode = compressMode;
+ GlobalVar.DEBUG_INFO.addFileReaderStat(fileReaderStat);
}
protected void appendToLineBuffer(String line) {
@@ -61,6 +65,7 @@ protected void appendToLineBuffer(String line) {
lineBuffer = new String[EMIT_BATCH_SIZE];
bufferedLineCount = 0;
}
+ fileReaderStat.increment();
}
protected void emitLineBuffer() {
@@ -84,6 +89,7 @@ protected void emitLineBuffer() {
@Override
public void run() {
try {
+ fileReaderStat.start();
readData();
} catch (Exception e) {
context.setException(e);
@@ -107,4 +113,8 @@ private void afterRun() {
public boolean useMagicSeparator() {
return false;
}
+
+ public FileReaderStat getFileReaderStat() {
+ return fileReaderStat;
+ }
}
diff --git a/batch-tool/src/main/java/worker/export/order/ParallelMergeExportConsumer.java b/batch-tool/src/main/java/worker/export/order/ParallelMergeExportConsumer.java
index 2ff2bd0..a93e405 100644
--- a/batch-tool/src/main/java/worker/export/order/ParallelMergeExportConsumer.java
+++ b/batch-tool/src/main/java/worker/export/order/ParallelMergeExportConsumer.java
@@ -17,19 +17,12 @@
package worker.export.order;
import model.db.FieldMetaInfo;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import worker.MyThreadPool;
import worker.util.ExportUtil;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
@@ -104,8 +97,7 @@ public void consume() throws InterruptedException {
writeToBuffer(parallelOrderByExportEvent.getData());
}
} catch (IOException e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ logger.error(e.getMessage(), e);
}
logger.info("写入文件结束");
}