Skip to content

Commit

Permalink
refactor DefaultReactiveMybatisExecutor and ReadableResultWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
GangCheng committed Jan 20, 2024
1 parent 0bb852e commit ece57a0
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.MariaDBContainer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -38,9 +38,9 @@
public class MybatisR2dbcApplicationTests extends MybatisR2dbcBaseTests {

@DynamicPropertySource
static void postgresqlProperties(DynamicPropertyRegistry registry) {
static void configureProperties(DynamicPropertyRegistry registry) {
String envDatabaseType = System.getProperty("databaseType",
MySQLContainer.class.getSimpleName()
MariaDBContainer.class.getSimpleName()
);
databaseInitializationContainer.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void validateTestcontainers() {
return;
}
String envDatabaseType = System.getProperty("databaseType",
MySQLContainer.class.getSimpleName()
MariaDBContainer.class.getSimpleName()
);
for (Class<?> aClass : MybatisR2dbcBaseTests.databaseInitializationContainer.keySet()) {
if (!aClass.getSimpleName().equalsIgnoreCase(envDatabaseType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
public class MybatisR2dbcRoutingApplicationTests extends MybatisR2dbcBaseTests {

@DynamicPropertySource
static void postgresqlProperties(DynamicPropertyRegistry registry) {
static void configureProperties(DynamicPropertyRegistry registry) {
R2dbcProtocol mysqlR2dbcProtocol = setUp(MySQLContainer.class, false);
registry.add("spring.r2dbc.mybatis.routing.definitions[0].name", MySQLContainer.class::getSimpleName);
registry.add("spring.r2dbc.mybatis.routing.definitions[0].as-default", () -> Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class MybatisR2dbcXmlConfigApplicationTests extends MybatisR2dbcBaseTests {

@DynamicPropertySource
static void postgresqlProperties(DynamicPropertyRegistry registry) {
static void configureProperties(DynamicPropertyRegistry registry) {
String envDatabaseType = System.getProperty("databaseType",
MySQLContainer.class.getSimpleName()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,13 @@
import reactor.core.publisher.Mono;

import java.sql.SQLException;
import java.util.Collection;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.key.KeyGeneratorType.SELECT_KEY_AFTER;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.key.KeyGeneratorType.SELECT_KEY_BEFORE;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.key.KeyGeneratorType.SIMPLE_RETURN;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.OUT_PARAMETERS_METADATA_EXTRACTOR;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.OUT_PARAMETERS_METADATA_EXTRACTOR_BY_INDEX;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.OUT_PARAMETERS_METADATA_EXTRACTOR_BY_NAME;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.ROW_METADATA_EXTRACTOR;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.ROW_METADATA_EXTRACTOR_BY_INDEX;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.ROW_METADATA_EXTRACTOR_BY_NAME;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.handler.ReactiveResultHandler.DEFERRED;

/**
* The type Default reactive mybatis executor.
Expand Down Expand Up @@ -126,17 +118,11 @@ protected Mono<Integer> doUpdateWithConnection(Connection connection, MappedStat
)
.checkpoint("[DefaultReactiveExecutor] SQL: \"" + boundSqlStatement + "\" ")
.concatMap(result -> result.map((row, rowMetadata) -> {
ReadableResultWrapper<Row> readableResultWrapper = new ReadableResultWrapper<>(
row,
ROW_METADATA_EXTRACTOR,
ROW_METADATA_EXTRACTOR_BY_INDEX,
ROW_METADATA_EXTRACTOR_BY_NAME,
configuration
);
ReadableResultWrapper<Row> readableResultWrapper = ReadableResultWrapper.ofRow(row,configuration);
return r2dbcKeyGenerator.processGeneratedKeyResult(readableResultWrapper, parameter);
}));
}
ReactiveResultHandler reactiveResultHandler = new DefaultReactiveResultHandler(configuration, mappedStatement, boundSql, parameterHandler);
final ReactiveResultHandler reactiveResultHandler = new DefaultReactiveResultHandler(configuration, mappedStatement, boundSql, parameterHandler);
boolean anyOutParameterExist = boundSql.getParameterMappings()
.stream()
.anyMatch(parameterMapping ->
Expand Down Expand Up @@ -165,15 +151,12 @@ protected Mono<Integer> doUpdateWithConnection(Connection connection, MappedStat
}
// output parameters
if (segment instanceof Result.OutSegment) {
ReadableResultWrapper<OutParameters> readableResultWrapper = new ReadableResultWrapper<>(
ReadableResultWrapper<OutParameters> readableResultWrapper = ReadableResultWrapper.ofOutParameters(
((Result.OutSegment) segment).outParameters(),
OUT_PARAMETERS_METADATA_EXTRACTOR,
OUT_PARAMETERS_METADATA_EXTRACTOR_BY_INDEX,
OUT_PARAMETERS_METADATA_EXTRACTOR_BY_NAME,
configuration
);
reactiveResultHandler.handleOutputParameters(readableResultWrapper);
return Mono.just(1);
return reactiveResultHandler.handleOutputParameters(readableResultWrapper)
.thenReturn(1);
}
log.trace("[DoUpdate]Ignore process result's segment : " + segment.getClass());
return Mono.empty();
Expand Down Expand Up @@ -208,7 +191,7 @@ protected <E> Flux<E> doQueryWithConnection(Connection connection, MappedStateme
StatementHandler handler = configuration.newStatementHandler(null, mappedStatement, parameter, rowBounds, null, null);
ParameterHandler parameterHandler = handler.getParameterHandler();
Statement statement = this.createStatementInternal(connection, boundSql, mappedStatement, parameterHandler, rowBounds, false, attribute, r2dbcStatementLog);
ReactiveResultHandler reactiveResultHandler = new DefaultReactiveResultHandler(configuration, mappedStatement, boundSql, parameterHandler);
final ReactiveResultHandler reactiveResultHandler = new DefaultReactiveResultHandler(configuration, mappedStatement, boundSql, parameterHandler);
boolean anyOutParameterExist = boundSql.getParameterMappings()
.stream()
.anyMatch(parameterMapping ->
Expand All @@ -231,28 +214,22 @@ protected <E> Flux<E> doQueryWithConnection(Connection connection, MappedStateme
}
// row data
if (segment instanceof Result.RowSegment) {
ReadableResultWrapper<Row> readableResultWrapper = new ReadableResultWrapper<>(
ReadableResultWrapper<Row> readableResultWrapper = ReadableResultWrapper.ofRow(
((Result.RowSegment) segment).row(),
ROW_METADATA_EXTRACTOR,
ROW_METADATA_EXTRACTOR_BY_INDEX,
ROW_METADATA_EXTRACTOR_BY_NAME,
configuration
);
return Mono.just((E) reactiveResultHandler.handleResult(readableResultWrapper));
return reactiveResultHandler.handleResult(readableResultWrapper);
}
// output parameters
if (segment instanceof Result.OutSegment) {
ReadableResultWrapper<OutParameters> readableResultWrapper = new ReadableResultWrapper<>(
ReadableResultWrapper<OutParameters> readableResultWrapper = ReadableResultWrapper.ofOutParameters(
((Result.OutSegment) segment).outParameters(),
OUT_PARAMETERS_METADATA_EXTRACTOR,
OUT_PARAMETERS_METADATA_EXTRACTOR_BY_INDEX,
OUT_PARAMETERS_METADATA_EXTRACTOR_BY_NAME,
configuration
);
return Mono.just(reactiveResultHandler.handleOutputParameters(readableResultWrapper));
return reactiveResultHandler.handleOutputParameters(readableResultWrapper);
}
log.trace("[DoQuery]Ignore process result's segment : " + segment.getClass());
return Mono.empty();
return Mono.<E>empty();
});
}
return result.filter(segment -> segment instanceof Result.Message
Expand All @@ -262,21 +239,14 @@ protected <E> Flux<E> doQueryWithConnection(Connection connection, MappedStateme
if (segment instanceof Result.Message) {
return Mono.error(((Result.Message) segment).exception());
}
ReadableResultWrapper<Row> readableResultWrapper = new ReadableResultWrapper<>(
ReadableResultWrapper<Row> readableResultWrapper = ReadableResultWrapper.ofRow(
((Result.RowSegment) segment).row(),
ROW_METADATA_EXTRACTOR,
ROW_METADATA_EXTRACTOR_BY_INDEX,
ROW_METADATA_EXTRACTOR_BY_NAME,
configuration
);
return Mono.just(reactiveResultHandler.handleResult(readableResultWrapper));
return reactiveResultHandler.handleResult(readableResultWrapper);
});
})
.concatWith(Flux.defer(() -> Flux
.fromIterable((Collection<E>)reactiveResultHandler.getRemainedResults())
.filter(Objects::nonNull))
)
.filter(data -> !Objects.equals(data, DEFERRED))
.concatWith(Flux.defer(reactiveResultHandler::getRemainedResults))
.doOnCancel(() -> {
//clean up reactiveResultHandler
reactiveResultHandler.cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.OUT_PARAMETERS_METADATA_EXTRACTOR;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.OUT_PARAMETERS_METADATA_EXTRACTOR_BY_INDEX;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.OUT_PARAMETERS_METADATA_EXTRACTOR_BY_NAME;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.ROW_METADATA_EXTRACTOR;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.ROW_METADATA_EXTRACTOR_BY_INDEX;
import static pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.result.ReadableResultWrapper.Functions.ROW_METADATA_EXTRACTOR_BY_NAME;

/**
* The type Row result wrapper.
* <p>
Expand Down Expand Up @@ -94,6 +101,40 @@ public abstract static class Functions {
= (outParameters, name) -> outParameters.getMetadata().getParameterMetadata(name);
}

/**
* New result wrapper of row readable.
*
* @param row the row
* @param r2dbcMybatisConfiguration the r2dbc mybatis configuration
* @return the readable result wrapper
*/
public static ReadableResultWrapper<Row> ofRow(Row row, R2dbcMybatisConfiguration r2dbcMybatisConfiguration) {
return new ReadableResultWrapper<>(
row,
ROW_METADATA_EXTRACTOR,
ROW_METADATA_EXTRACTOR_BY_INDEX,
ROW_METADATA_EXTRACTOR_BY_NAME,
r2dbcMybatisConfiguration
);
}

/**
* New result wrapper of out parameters.
*
* @param outParameters the out parameters
* @param r2dbcMybatisConfiguration the r2dbc mybatis configuration
* @return the readable result wrapper
*/
public static ReadableResultWrapper<OutParameters> ofOutParameters(OutParameters outParameters,
R2dbcMybatisConfiguration r2dbcMybatisConfiguration) {
return new ReadableResultWrapper<>(
outParameters,
OUT_PARAMETERS_METADATA_EXTRACTOR,
OUT_PARAMETERS_METADATA_EXTRACTOR_BY_INDEX,
OUT_PARAMETERS_METADATA_EXTRACTOR_BY_NAME,
r2dbcMybatisConfiguration
);
}

private final T readable;
private final BiFunction<T, Integer, ReadableMetadata> metadataExtractorByIndex;
Expand Down
Loading

0 comments on commit ece57a0

Please sign in to comment.