Skip to content

Commit

Permalink
refactor ReactiveSqlSessionOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
chenggangpro authored and GangCheng committed Jan 23, 2024
1 parent f8dbbe0 commit afe5d30
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @author Gang Cheng
* @version 2.0.0
*/
public interface ReactiveSqlSession {
public interface ReactiveSqlSession extends MybatisReactiveContextManager {

/**
* The constant DEFAULT_PROFILE of ReactiveSqlSessionProfile.
Expand Down Expand Up @@ -195,6 +195,13 @@ default Mono<Void> rollback() {
*/
Mono<Void> close();

/**
* Gets reactive sql session profile.
*
* @return the ReactiveSqlSessionProfile
*/
ReactiveSqlSessionProfile getProfile();

/**
* Retrieves current configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package pro.chenggang.project.reactive.mybatis.support.r2dbc;

import org.reactivestreams.Publisher;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.defaults.ReactiveSqlSessionProfile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Function;
import java.util.function.BiFunction;

import static pro.chenggang.project.reactive.mybatis.support.r2dbc.ReactiveSqlSession.DEFAULT_PROFILE;

Expand All @@ -32,135 +33,153 @@
public interface ReactiveSqlSessionOperator {

/**
* execute with Mono
* Execute then close.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param reactiveSqlSessionProfile the reactive sql session profile
* @param monoExecution the mono execution
* @return mono
* @param execution the execution
* @return the flux
*/
<T> Mono<T> execute(ReactiveSqlSessionProfile reactiveSqlSessionProfile,
Function<ReactiveSqlSession, Mono<T>> monoExecution);
<T> Flux<T> executeThenClose(final ReactiveSqlSessionProfile reactiveSqlSessionProfile,
BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Publisher<T>> execution);

/**
* execute with Mono
*
* @param <T> the type parameter
* @param monoExecution the mono execution
* @return mono
*/
default <T> Mono<T> execute(Function<ReactiveSqlSession, Mono<T>> monoExecution) {
return execute(DEFAULT_PROFILE, monoExecution);
}

/**
* execute with Mono then commit
* Execute mono then close.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param reactiveSqlSessionProfile the reactive sql session profile
* @param monoExecution the mono execution
* @return mono
*/
<T> Mono<T> executeAndCommit(ReactiveSqlSessionProfile reactiveSqlSessionProfile,
Function<ReactiveSqlSession, Mono<T>> monoExecution);

/**
* execute with Mono then commit
*
* @param <T> the type parameter
* @param monoExecution the mono execution
* @return mono
* @param execution the execution
* @return the mono
*/
default <T> Mono<T> executeAndCommit(Function<ReactiveSqlSession, Mono<T>> monoExecution) {
return executeAndCommit(DEFAULT_PROFILE, monoExecution);
default <T> Mono<T> executeMonoThenClose(final ReactiveSqlSessionProfile reactiveSqlSessionProfile,
BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Mono<T>> execution) {
return executeThenClose(reactiveSqlSessionProfile, execution::apply).singleOrEmpty();
}

/**
* execute with Mono then rollback
* Execute flux then close.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param reactiveSqlSessionProfile the reactive sql session profile
* @param monoExecution the mono execution
* @return mono
*/
<T> Mono<T> executeAndRollback(ReactiveSqlSessionProfile reactiveSqlSessionProfile,
Function<ReactiveSqlSession, Mono<T>> monoExecution);

/**
* execute with Mono then rollback
*
* @param <T> the type parameter
* @param monoExecution the mono execution
* @return the mono
* @param execution the execution
* @return the flux
*/
default <T> Mono<T> executeAndRollback(Function<ReactiveSqlSession, Mono<T>> monoExecution) {
return executeAndRollback(DEFAULT_PROFILE, monoExecution);
default <T> Flux<T> executeFluxThenClose(final ReactiveSqlSessionProfile reactiveSqlSessionProfile,
BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Flux<T>> execution) {
return executeThenClose(reactiveSqlSessionProfile, execution::apply);
}

/**
* execute with Mono then commit
* Execute then close with default reactive sql session profile.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param reactiveSqlSessionProfile the reactive sql session profile
* @param fluxExecution the flux execution
* @return flux
* @param <T> the type parameter
* @param execution the execution
* @return the flux
*/
<T> Flux<T> executeMany(ReactiveSqlSessionProfile reactiveSqlSessionProfile,
Function<ReactiveSqlSession, Flux<T>> fluxExecution);
default <T> Flux<T> executeThenClose(BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Publisher<T>> execution) {
return executeThenClose(DEFAULT_PROFILE, execution);
}

/**
* execute with Mono then commit
* Execute mono then close with default reactive sql session profile.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param fluxExecution the flux execution
* @return flux
* @param <T> the type parameter
* @param execution the execution
* @return the mono
*/
default <T> Flux<T> executeMany(Function<ReactiveSqlSession, Flux<T>> fluxExecution) {
return executeMany(DEFAULT_PROFILE, fluxExecution);
default <T> Mono<T> executeMonoThenClose(BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Mono<T>> execution) {
return executeThenClose(DEFAULT_PROFILE, execution::apply).singleOrEmpty();
}

/**
* execute with Flux
* Execute flux then close with default reactive sql session profile.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param reactiveSqlSessionProfile the reactive sql session profile
* @param fluxExecution the flux execution
* @return flux
* @param <T> the type parameter
* @param execution the execution
* @return the flux
*/
<T> Flux<T> executeManyAndCommit(ReactiveSqlSessionProfile reactiveSqlSessionProfile,
Function<ReactiveSqlSession, Flux<T>> fluxExecution);
default <T> Flux<T> executeFluxThenClose(BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Flux<T>> execution) {
return executeThenClose(DEFAULT_PROFILE, execution::apply);
}

/**
* execute with Flux
* Execute then close with given reactive sql session.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param fluxExecution the flux execution
* @return flux
* @param <T> the type parameter
* @param reactiveSqlSession the reactive sql session
* @param execution the execution
* @return the flux
*/
default <T> Flux<T> executeManyAndCommit(Function<ReactiveSqlSession, Flux<T>> fluxExecution) {
return executeManyAndCommit(DEFAULT_PROFILE, fluxExecution);
static <T> Flux<T> executeThenClose(final ReactiveSqlSession reactiveSqlSession,
BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Publisher<T>> execution) {
return MybatisReactiveContextManager.currentContext()
.flatMapMany(reactiveExecutorContext -> Flux
.usingWhen(
Mono.just(reactiveSqlSession),
currentReactiveSqlSession -> execution.apply(currentReactiveSqlSession,
currentReactiveSqlSession.getProfile()
),
currentReactiveSqlSession -> Mono.defer(
() -> {
if (currentReactiveSqlSession.getProfile().isForceToRollback()) {
return currentReactiveSqlSession.rollback(true);
} else {
return currentReactiveSqlSession.commit(true);
}
})
.then(Mono.defer(currentReactiveSqlSession::close)),
(currentReactiveSqlSession, err) -> currentReactiveSqlSession.rollback(true)
.then(Mono.defer(currentReactiveSqlSession::close)),
currentReactiveSqlSession -> currentReactiveSqlSession.rollback(true)
.then(Mono.defer(currentReactiveSqlSession::close))
.onErrorMap(throwable -> {
if (throwable instanceof RuntimeException && throwable.getCause() != null) {
String msg = throwable.getMessage();
if (msg != null && msg.startsWith("Async resource cleanup failed")) {
return throwable.getCause();
}
}
return throwable;
})
)
)
.contextWrite(reactiveSqlSession::initReactiveExecutorContext)
.contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
}

/**
* execute with Flux then rollback
* Execute mono then close with given reactive sql session.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param reactiveSqlSessionProfile the reactive sql session profile
* @param fluxExecution the flux execution
* @return flux
* @param <T> the type parameter
* @param reactiveSqlSession the reactive sql session
* @param execution the execution
* @return the mono
*/
<T> Flux<T> executeManyAndRollback(ReactiveSqlSessionProfile reactiveSqlSessionProfile,
Function<ReactiveSqlSession, Flux<T>> fluxExecution);
static <T> Mono<T> executeMonoThenClose(final ReactiveSqlSession reactiveSqlSession,
BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Mono<T>> execution) {
return executeThenClose(reactiveSqlSession, execution::apply).singleOrEmpty();
}

/**
* execute with Flux then rollback
* Execute flux then close with given reactive sql session.
* Configure {@code reactiveSqlSessionProfile.forceToRollback()} to require a rollback operation in BiFunction named as execution.
*
* @param <T> the type parameter
* @param fluxExecution the flux execution
* @return flux
* @param <T> the type parameter
* @param reactiveSqlSession the reactive sql session
* @param execution the execution
* @return the flux
*/
default <T> Flux<T> executeManyAndRollback(Function<ReactiveSqlSession, Flux<T>> fluxExecution) {
return executeManyAndRollback(DEFAULT_PROFILE, fluxExecution);
static <T> Flux<T> executeFluxThenClose(final ReactiveSqlSession reactiveSqlSession,
BiFunction<ReactiveSqlSession, ReactiveSqlSessionProfile, Flux<T>> execution) {
return executeThenClose(reactiveSqlSession, execution::apply);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* @author Gang Cheng
* @version 2.0.0
*/
public class DefaultReactiveSqlSession implements ReactiveSqlSession, MybatisReactiveContextManager {
public class DefaultReactiveSqlSession implements ReactiveSqlSession {

private static final Log log = LogFactory.getLog(DefaultReactiveSqlSession.class);

Expand Down Expand Up @@ -125,6 +125,11 @@ public Mono<Void> close() {
.contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
}

@Override
public ReactiveSqlSessionProfile getProfile() {
return this.reactiveSqlSessionProfile;
}

private Object wrapCollection(final Object object) {
return ParamNameResolver.wrapToMapIfCollection(object, null);
}
Expand Down
Loading

0 comments on commit afe5d30

Please sign in to comment.