Skip to content

Commit

Permalink
5.4.15 stable release
Browse files Browse the repository at this point in the history
  • Loading branch information
lulu2panpan committed Nov 3, 2022
1 parent d80d7a9 commit bbb6ae1
Show file tree
Hide file tree
Showing 20 changed files with 462 additions and 404 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.aliyun.polardbx.binlog.io.OSSFileSystem;
import com.aliyun.polardbx.binlog.monitor.MonitorManager;
import com.aliyun.polardbx.binlog.monitor.MonitorType;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -66,9 +68,12 @@ public void start(String binlogFileDir, IFileCursorProvider provider, String tas
getThreadFactory("binlog-download" + "-%d", false));
executorService.setKeepAliveTime(60, TimeUnit.SECONDS);
executorService.allowCoreThreadTimeOut(true);

// 需要执行完prepare,然后再启动ActionManager
prepareLocalLastBinlog(binlogFileDir);

actionManager = new ActionManager(binlogFileDir, provider, taskName);
actionManager.start();
prepareLocalLastBinlog(binlogFileDir);
}

private List<BinlogOssRecord> prepareDownloadList() {
Expand All @@ -81,6 +86,8 @@ private List<BinlogOssRecord> prepareDownloadList() {
BinlogOssRecordMapper mapper = SpringContextHolder.getObject(BinlogOssRecordMapper.class);
List<BinlogOssRecord> recordList = mapper.select(s -> s.where(BinlogOssRecordDynamicSqlSupport.purgeStatus,
SqlBuilder.isNotEqualTo(BinlogPurgeStatusEnum.COMPLETE.getValue()))
.and(BinlogOssRecordDynamicSqlSupport.uploadStatus,
SqlBuilder.isNotEqualTo(BinlogUploadStatusEnum.IGNORE.getValue()))
.orderBy(BinlogOssRecordDynamicSqlSupport.binlogFile.descending()));

int timeoutSecond = DynamicApplicationConfig.getInt(BINLOG_BACKUP_DOWNLOAD_WAIT_UPLOAD_TIMEOUT);
Expand All @@ -94,7 +101,6 @@ private List<BinlogOssRecord> prepareDownloadList() {
mapper.select(s -> s.where(BinlogOssRecordDynamicSqlSupport.id, SqlBuilder.isEqualTo(id))).get(0);
if (i != 0 && record.getUploadStatus() != BinlogUploadStatusEnum.SUCCESS.getValue()) {
notUploadFiles.add(record.getBinlogFile());
break;
}
}
if (!notUploadFiles.isEmpty()) {
Expand Down Expand Up @@ -154,34 +160,66 @@ public void prepareLocalLastBinlog(String dir) {
}

private void downloadList(List<String> downloadLogList, boolean localHasBinlog) {
int firstIdx = 0;
int fileIdx = 0;
if (!localHasBinlog) {
String lastFileName = downloadLogList.get(firstIdx++);
// 先下载最后一个
doDownloadFile(lastFileName);
while (fileIdx < downloadLogList.size()) {
// 先下载最后一个
String lastFileName = downloadLogList.get(fileIdx++);
try {
downloadWithRetry(lastFileName);
break;
} catch (Throwable t) {
processDownLoadError(lastFileName, t, true);
}
}
}

// 遍历列表,异步下载文件
for (int i = firstIdx; i < downloadLogList.size(); i++) {
for (int i = fileIdx; i < downloadLogList.size(); i++) {
String fileName = downloadLogList.get(i);
executorService.execute(() -> {
try {
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder().retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(500,
TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt(5))
.build();
retryer.call(() -> {
doDownloadFile(fileName);
return true;
});
} catch (Exception e) {
logger.error("download " + fileName + " failed!");
MonitorManager.getInstance().triggerAlarm(MonitorType.BINLOG_DOWNLOAD_FAILED_WARNING, fileName);
downloadWithRetry(fileName);
} catch (Throwable e) {
processDownLoadError(fileName, e, false);
}
});
}
}

private void downloadWithRetry(String fileName) throws ExecutionException, RetryException {
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder().retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(500,
TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt(5))
.build();
retryer.call(() -> {
doDownloadFile(fileName);
return true;
});
}

private void processDownLoadError(String fileName, Throwable t, boolean throwError) {
BinlogOssRecordMapper mapper = SpringContextHolder.getObject(BinlogOssRecordMapper.class);
List<BinlogOssRecord> recordList = mapper.select(
s -> s.where(BinlogOssRecordDynamicSqlSupport.binlogFile, SqlBuilder.isEqualTo(fileName)));
if (!recordList.isEmpty()) {
int status = recordList.get(0).getUploadStatus();
if (status == BinlogUploadStatusEnum.CREATE.getValue() ||
status == BinlogUploadStatusEnum.UPLOADING.getValue()) {
logger.warn("download " + fileName + " failed, record status is " + status
+ ", will skipped the error !", t);
DownloadLogManager.getInstance().successDownload(fileName);
return;
}
}

logger.error("download " + fileName + " failed!", t);
MonitorManager.getInstance().triggerAlarm(MonitorType.BINLOG_DOWNLOAD_FAILED_WARNING, fileName);
if (throwError) {
throw new PolardbxException("download file " + fileName + " failed!", t);
}
}

private void cleanLocalBadFileBefore(List<String> downloadLogList, Map<String, OSSFile> localFileMap) {
downloadLogList.stream().forEach(e -> {
OSSFile ossFile = localFileMap.get(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ private void tryDeleteBackupRecords() {
public void scan() {
try {
purgeRemote();
} catch (Exception e) {
} catch (Throwable e) {
logger.error("purge oss binlog occur exception!", e);
}
try {
boolean enable = DynamicApplicationConfig.getBoolean(BINLOG_CLEANER_CLEAN_ENABLE);
if (enable) {
purgeLocal();
}
} catch (Exception e) {
} catch (Throwable e) {
logger.error("purge binlog occur exception!", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.aliyun.polardbx.binlog.canal;

import com.aliyun.polardbx.binlog.canal.binlog.LogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.GcnLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.QueryLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.RowsQueryLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.SequenceLogEvent;
Expand Down Expand Up @@ -184,10 +185,20 @@ public static boolean isSequenceEvent(LogEvent event) {
return event.getHeader().getType() == LogEvent.SEQUENCE_EVENT;
}

public static boolean isGCNEvent(LogEvent event) {
public static boolean isGcnEvent(LogEvent event) {
return event.getHeader().getType() == LogEvent.GCN_EVENT;
}

public static boolean isHaveCommitSequence(GcnLogEvent gcnLogEvent) {
// 第一个bit位,目前恒为1
// 第二个bit位,如果为1,代表外部传入了snapshot tso;但如果为0,并不意味着外部没有传入snapshot tso;并不是一个充要条件
// 第三个bit位,如果位1,代表外部传入了commit tso; 如果为0,代表外部没有传入snapshot tso;是一个重要条件
// 当第二个bit位为1或者第三个bit位为1时,认为该事务是一个TSO事务
// 当第三个bit位为1时,认为该GCN中包含外部传入的commit sequence
int flagSeed = 0x00000004;
return ((flagSeed & gcnLogEvent.getFlag()) == flagSeed);
}

public static boolean isRowsQueryEvent(LogEvent event) {
return event.getHeader().getType() == LogEvent.ROWS_QUERY_LOG_EVENT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class GcnLogEvent extends LogEvent {
static final int POST_HEADER_LENGTH = ENCODED_FLAG_LEN + ENCODED_GCN_LEN;

private final long gcn;
private final int flag;

public GcnLogEvent(LogHeader header, LogBuffer buffer,
FormatDescriptionLogEvent descriptionEvent) throws IOException {
Expand All @@ -46,10 +47,14 @@ public GcnLogEvent(LogHeader header, LogBuffer buffer,
throw new IOException("gcn event length is too short.");
}
buffer.position(commonHeaderLen);
buffer.getInt8();
flag = buffer.getInt8();
gcn = buffer.getLong64();
}

public int getFlag() {
return flag;
}

public long getGcn() {
return gcn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void search(LogEvent event, LogPosition logPosition) {
onTransactionEndEvent();
} else if (LogEventUtil.isSequenceEvent(event)) {
onSequence(event);
} else if (LogEventUtil.isGCNEvent(event)) {
} else if (LogEventUtil.isGcnEvent(event)) {
onGcn(event);
} else {
if (currentTransaction != null) {
Expand Down Expand Up @@ -356,7 +356,9 @@ private void onSequence(LogEvent event) {

private void onGcn(LogEvent event) {
GcnLogEvent gcnLogEvent = (GcnLogEvent) event;
lastGcn = gcnLogEvent.getGcn();
if (LogEventUtil.isHaveCommitSequence(gcnLogEvent)) {
lastGcn = gcnLogEvent.getGcn();
}
}

private void onDdl(LogEvent event, LogPosition logPosition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ private void initProcessor() {
new XARollbackEventProcessor()));
this.processorMap.put(LogEvent.XID_EVENT, new XACommitEventProcessor(searchTSO, startCmdTSO));
this.processorMap.put(LogEvent.XA_PREPARE_LOG_EVENT, new XAPrepareEventProcessor());
this.processorMap.put(LogEvent.WRITE_ROWS_EVENT, new WriteRowEventProcessor());
this.processorMap.put(LogEvent.WRITE_ROWS_EVENT_V1, new WriteRowEventProcessor());
this.processorMap.put(LogEvent.WRITE_ROWS_EVENT, new WriteRowEventProcessor(searchTSO == -1));
this.processorMap.put(LogEvent.WRITE_ROWS_EVENT_V1, new WriteRowEventProcessor(searchTSO == -1));
this.processorMap.put(LogEvent.SEQUENCE_EVENT, new SequenceEventProcessor());
this.processorMap.put(LogEvent.GCN_EVENT, new GcnEventProcessor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.aliyun.polardbx.binlog.canal.core.handle.processor;

import com.aliyun.polardbx.binlog.canal.LogEventUtil;
import com.aliyun.polardbx.binlog.canal.binlog.LogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.GcnLogEvent;
import com.aliyun.polardbx.binlog.canal.core.handle.ILogEventProcessor;
Expand All @@ -24,7 +25,8 @@ public class GcnEventProcessor implements ILogEventProcessor {
@Override
public void handle(LogEvent event, ProcessorContext context) {
GcnLogEvent gcnLogEvent = (GcnLogEvent) event;
context.setLastTSO(gcnLogEvent.getGcn());

if (LogEventUtil.isHaveCommitSequence(gcnLogEvent)) {
context.setLastTSO(gcnLogEvent.getGcn());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public class WriteRowEventProcessor implements ILogEventProcessor<WriteRowsLogEv

private static final Logger logger = LoggerFactory.getLogger("searchLogger");

private boolean searchCdcStart;

public WriteRowEventProcessor(boolean searchCdcStart) {
this.searchCdcStart = searchCdcStart;
}

@Override
public void handle(WriteRowsLogEvent event, ProcessorContext context) {
TableMapLogEvent tm = event.getTable();
Expand All @@ -35,6 +41,9 @@ public void handle(WriteRowsLogEvent event, ProcessorContext context) {
return;
}
InstructionCommand command = SystemDB.getInstance().parseInstructionCommand(event);
if (searchCdcStart && !command.isCdcStart()) {
return;
}
context.getCurrentTran().setCommand(command);
context.setCommandTran(context.getCurrentTran());
logger.warn("find cdc instruction type : " + command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ public abstract class ConfigKeys {
public static final String TASK_EXTRACTOR_ROWIMAGE_TYPE_REBUILD_SUPPORT =
"task.extractor.rowImage.type.rebuild.support";

/**
* 私有DDL开关
*/
public static final String TASK_DDL_PRIVATEDDL_SUPPORT = "task.ddl.privateDdl.support";
//******************************************************************************************************************
//***********************************************Daemon和调度相关参数*************************************************
//******************************************************************************************************************
Expand Down
Loading

0 comments on commit bbb6ae1

Please sign in to comment.