Skip to content

Commit

Permalink
Merge branch 'dev' into paimon_source_optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed Apr 28, 2024
2 parents 76e2593 + 6d71069 commit eecacd6
Show file tree
Hide file tree
Showing 24 changed files with 398 additions and 210 deletions.
46 changes: 23 additions & 23 deletions docs/en/connector-v2/sink/Doris.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down Expand Up @@ -79,7 +78,7 @@

@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig>
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>, SupportCoordinate {
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {

protected ReadonlyConfig readonlyConfig;
protected SourceConfig.Factory<C> configFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@ public class DorisSinkWriter
new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
private long lastCheckpointId;
private DorisStreamLoad dorisStreamLoad;
volatile boolean loading;
private final DorisConfig dorisConfig;
private final String labelPrefix;
private final LabelGenerator labelGenerator;
private final int intervalTime;
private final DorisSerializer serializer;
private final CatalogTable catalogTable;
private final ScheduledExecutorService scheduledExecutorService;
private Thread executorThread;
private volatile Exception loadException = null;

public DorisSinkWriter(
Expand Down Expand Up @@ -94,7 +92,6 @@ public DorisSinkWriter(
1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
this.serializer = createSerializer(dorisConfig, catalogTable.getSeaTunnelRowType());
this.intervalTime = dorisConfig.getCheckInterval();
this.loading = false;
this.initializeLoad();
}

Expand Down Expand Up @@ -123,7 +120,7 @@ private void initializeLoad() {

@Override
public void write(SeaTunnelRow element) throws IOException {
checkLoadExceptionAndResetThread();
checkLoadException();
byte[] serialize =
serializer.serialize(
dorisConfig.isNeedsUnsupportedTypeCasting()
Expand Down Expand Up @@ -154,7 +151,6 @@ public Optional<DorisCommitInfo> prepareCommit() throws IOException {

private RespContent flush() throws IOException {
// disable exception checker before stop load.
loading = false;
checkState(dorisStreamLoad != null);
RespContent respContent = dorisStreamLoad.stopLoad();
if (respContent != null && !DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
Expand All @@ -177,7 +173,6 @@ public List<DorisSinkState> snapshotState(long checkpointId) {

private void startLoad(String label) {
this.dorisStreamLoad.startLoad(label);
this.loading = true;
}

@Override
Expand All @@ -194,37 +189,19 @@ public void abortPrepare() {
private void checkDone() {
// the load future is done and checked in prepareCommit().
// this will check error while loading.
String errorMsg;
log.debug("start timer checker, interval {} ms", intervalTime);
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
if (!loading) {
log.debug("not loading, skip timer checker");
return;
}
String errorMsg;
try {
RespContent content =
dorisStreamLoad.handlePreCommitResponse(
dorisStreamLoad.getPendingLoadFuture().get());
errorMsg = content.getMessage();
} catch (Exception e) {
errorMsg = e.getMessage();
}

if ((errorMsg = dorisStreamLoad.getLoadFailedMsg()) != null) {
log.error("stream load finished unexpectedly: {}", errorMsg);
loadException =
new DorisConnectorException(
DorisConnectorErrorCode.STREAM_LOAD_FAILED, errorMsg);
log.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
// set the executor thread interrupted in case blocking in write data.
executorThread.interrupt();
}
}

private void checkLoadExceptionAndResetThread() {
private void checkLoadException() {
if (loadException != null) {
throw new RuntimeException("error while loading data.", loadException);
} else {
executorThread = Thread.currentThread();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DorisStreamLoad implements Serializable {
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private volatile boolean loadBatchFirstRecord;
private volatile boolean loading = false;
private String label;
private long recordCount = 0;

Expand Down Expand Up @@ -199,7 +200,25 @@ public long getRecordCount() {
return recordCount;
}

public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception {
public String getLoadFailedMsg() {
if (!loading) {
return null;
}
if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) {
String errorMessage;
try {
errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage();
} catch (Exception e) {
errorMessage = e.getMessage();
}
recordStream.setErrorMessageByStreamLoad(errorMessage);
return errorMessage;
} else {
return null;
}
}

private RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception {
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Expand All @@ -211,6 +230,7 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw
}

public RespContent stopLoad() throws IOException {
loading = false;
if (pendingLoadFuture != null) {
log.info("stream load stopped.");
recordStream.endInput();
Expand All @@ -230,6 +250,7 @@ public void startLoad(String label) {
loadBatchFirstRecord = true;
recordCount = 0;
this.label = label;
this.loading = true;
}

private void startStreamLoad() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.seatunnel.connectors.doris.sink.writer;

import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand All @@ -25,18 +29,21 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;

/** Channel of record stream and HTTP data stream. */
@Slf4j
public class RecordBuffer {
BlockingQueue<ByteBuffer> writeQueue;
BlockingQueue<ByteBuffer> readQueue;
int bufferCapacity;
int queueSize;
ByteBuffer currentWriteBuffer;
ByteBuffer currentReadBuffer;
private final BlockingQueue<ByteBuffer> writeQueue;
private final BlockingQueue<ByteBuffer> readQueue;
private final int bufferCapacity;
private final int queueSize;
private ByteBuffer currentWriteBuffer;
private ByteBuffer currentReadBuffer;
// used to check stream load error by stream load thread
@Setter private volatile String errorMessageByStreamLoad;

public RecordBuffer(int capacity, int queueSize) {
log.info("init RecordBuffer capacity {}, count {}", capacity, queueSize);
Expand Down Expand Up @@ -76,7 +83,11 @@ public void stopBufferData() throws IOException {
currentWriteBuffer = null;
}
if (!isEmpty) {
ByteBuffer byteBuffer = writeQueue.take();
ByteBuffer byteBuffer = null;
while (byteBuffer == null) {
checkErrorMessageByStreamLoad();
byteBuffer = writeQueue.poll(100, TimeUnit.MILLISECONDS);
}
((Buffer) byteBuffer).flip();
checkState(byteBuffer.limit() == 0);
readQueue.put(byteBuffer);
Expand All @@ -89,8 +100,9 @@ public void stopBufferData() throws IOException {
public void write(byte[] buf) throws InterruptedException {
int wPos = 0;
do {
if (currentWriteBuffer == null) {
currentWriteBuffer = writeQueue.take();
while (currentWriteBuffer == null) {
checkErrorMessageByStreamLoad();
currentWriteBuffer = writeQueue.poll(100, TimeUnit.MILLISECONDS);
}
int available = currentWriteBuffer.remaining();
int nWrite = Math.min(available, buf.length - wPos);
Expand All @@ -105,14 +117,15 @@ public void write(byte[] buf) throws InterruptedException {
}

public int read(byte[] buf) throws InterruptedException {
if (currentReadBuffer == null) {
currentReadBuffer = readQueue.take();
while (currentReadBuffer == null) {
checkErrorMessageByStreamLoad();
currentReadBuffer = readQueue.poll(100, TimeUnit.MILLISECONDS);
}
// add empty buffer as end flag
if (currentReadBuffer.limit() == 0) {
recycleBuffer(currentReadBuffer);
currentReadBuffer = null;
checkState(readQueue.size() == 0);
checkState(readQueue.isEmpty());
return -1;
}
int available = currentReadBuffer.remaining();
Expand All @@ -125,16 +138,17 @@ public int read(byte[] buf) throws InterruptedException {
return nRead;
}

private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
((Buffer) buffer).clear();
writeQueue.put(buffer);
}

public int getWriteQueueSize() {
return writeQueue.size();
private void checkErrorMessageByStreamLoad() {
if (errorMessageByStreamLoad != null) {
throw new DorisConnectorException(
DorisConnectorErrorCode.STREAM_LOAD_FAILED, errorMessageByStreamLoad);
}
}

public int getReadQueueSize() {
return readQueue.size();
private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
((Buffer) buffer).clear();
while (!writeQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) {
checkErrorMessageByStreamLoad();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ public void write(byte[] buff) throws IOException {
throw new RuntimeException(e);
}
}

public void setErrorMessageByStreamLoad(String errorMessageByStreamLoad) {
recordBuffer.setErrorMessageByStreamLoad(errorMessageByStreamLoad);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,22 @@ public Column convert(BasicTypeDefine typeDefine) {
scale = 127;
}

if (scale == 0) {
if (precision == 1) {
if (scale <= 0) {
int newPrecision = (int) (precision - scale);
if (newPrecision == 1) {
builder.dataType(BasicType.BOOLEAN_TYPE);
} else if (precision <= 9) {
} else if (newPrecision <= 9) {
builder.dataType(BasicType.INT_TYPE);
} else if (precision <= 18) {
} else if (newPrecision <= 18) {
builder.dataType(BasicType.LONG_TYPE);
} else if (precision < 38) {
builder.dataType(new DecimalType(precision.intValue(), 0));
builder.columnLength(precision);
} else if (newPrecision < 38) {
builder.dataType(new DecimalType(newPrecision, 0));
builder.columnLength((long) newPrecision);
} else {
builder.dataType(new DecimalType(DEFAULT_PRECISION, 0));
builder.columnLength((long) DEFAULT_PRECISION);
}
} else if (scale > 0 && scale <= DEFAULT_SCALE) {
} else if (scale <= DEFAULT_SCALE) {
builder.dataType(new DecimalType(precision.intValue(), scale));
builder.columnLength(precision);
builder.scale(scale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,4 +754,59 @@ public void testReconvertDatetime() {
typeDefine.getDataType());
Assertions.assertEquals(column.getScale(), typeDefine.getScale());
}

@Test
public void testNumberWithNegativeScale() {
BasicTypeDefine<Object> typeDefine =
BasicTypeDefine.builder()
.name("test")
.columnType("number(38,-1)")
.dataType("number")
.precision(38L)
.scale(-1)
.build();
Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());

typeDefine =
BasicTypeDefine.builder()
.name("test")
.columnType("number(5,-2)")
.dataType("number")
.precision(5L)
.scale(-2)
.build();
column = OracleTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());

typeDefine =
BasicTypeDefine.builder()
.name("test")
.columnType("number(9,-2)")
.dataType("number")
.precision(9L)
.scale(-2)
.build();
column = OracleTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());

typeDefine =
BasicTypeDefine.builder()
.name("test")
.columnType("number(14,-11)")
.dataType("number")
.precision(14L)
.scale(-11)
.build();
column = OracleTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(25, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,22 +331,12 @@ private void setCheckpoint() {
}
}

public void registerResultTable(
Config config, DataStream<Row> dataStream, String name, Boolean isAppend) {
public void registerResultTable(Config config, DataStream<Row> dataStream, String name) {
StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
if (isAppend) {
if (config.hasPath("field_name")) {
String fieldName = config.getString("field_name");
tableEnvironment.registerDataStream(name, dataStream, fieldName);
return;
}
tableEnvironment.registerDataStream(name, dataStream);
return;
}
tableEnvironment.createTemporaryView(
name, tableEnvironment.fromChangelogStream(dataStream));
}
tableEnvironment.createTemporaryView(
name, tableEnvironment.fromChangelogStream(dataStream));
}

public static FlinkRuntimeEnvironment getInstance(Config config) {
Expand Down
Loading

0 comments on commit eecacd6

Please sign in to comment.