From 7daff2b8a9bc7dccfd5a2d887a5ce32c1d8580c7 Mon Sep 17 00:00:00 2001 From: DanLi39 <147678474+DanLi39@users.noreply.github.com> Date: Thu, 11 Jan 2024 17:06:17 +0800 Subject: [PATCH] feat: support flux type in dynamic (#372) --- .../common/arex-common/pom.xml | 23 +++ .../io/arex/inst/common/util/FluxUtil.java | 96 ++++++++++ .../arex/inst/common/util/FluxUtilTest.java | 55 ++++++ .../dynamic/arex-dynamic-common/pom.xml | 17 +- .../dynamic/common/DynamicClassExtractor.java | 15 ++ .../dynamic/common/listener/FluxConsumer.java | 57 ++++++ .../dynamic/common/listener/MonoConsumer.java | 14 +- .../common/DynamicClassExtractorTest.java | 71 +++++++- .../common/listener/FluxConsumerTest.java | 165 ++++++++++++++++++ arex-instrumentation/pom.xml | 2 +- 10 files changed, 495 insertions(+), 20 deletions(-) create mode 100644 arex-instrumentation/common/arex-common/pom.xml create mode 100644 arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java create mode 100644 arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java create mode 100644 arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java create mode 100644 arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java diff --git a/arex-instrumentation/common/arex-common/pom.xml b/arex-instrumentation/common/arex-common/pom.xml new file mode 100644 index 000000000..fc0dbdc2f --- /dev/null +++ b/arex-instrumentation/common/arex-common/pom.xml @@ -0,0 +1,23 @@ + + + + arex-instrumentation-parent + io.arex + ${revision} + ../../pom.xml + + 4.0.0 + + arex-common + + + + io.projectreactor + reactor-core + ${reactor.version} + provided + + + diff --git a/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java new file mode 100644 index 000000000..9d3ed3aa3 --- /dev/null +++ b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java @@ -0,0 +1,96 @@ +package io.arex.inst.common.util; + +import io.arex.inst.runtime.serializer.Serializer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import reactor.core.publisher.Flux; +import io.arex.agent.bootstrap.util.CollectionUtil; + +public class FluxUtil { + + static final String FLUX_FROM_ITERATOR = "reactor.core.publisher.FluxIterable-"; + static final String FLUX_FROM_ARRAY = "reactor.core.publisher.FluxArray-"; + static final String FLUX_FROM_STREAM = "reactor.core.publisher.FluxStream-"; + + private FluxUtil() { + } + + public static Flux restore(Object fluxObj) { + if(fluxObj == null){ + return Flux.empty(); + } + FluxResult fluxResult = (FluxResult) fluxObj; + List fluxElementResults = fluxResult.getFluxElementResults(); + if (CollectionUtil.isEmpty(fluxElementResults)) { + return Flux.empty(); + } + List resultList = new ArrayList<>(fluxElementResults.size()); + sortFluxElement(fluxElementResults).forEach( + fluxElement -> resultList.add(Serializer.deserialize(fluxElement.getContent(), fluxElement.getType()))); + String responseType = fluxResult.getResponseType(); + if (responseType != null) { + if (FLUX_FROM_ITERATOR.equals(responseType)) { + return Flux.fromIterable(resultList); + } else if (FLUX_FROM_ARRAY.equals(responseType)) { + return Flux.fromArray(resultList.toArray()); + } else if (FLUX_FROM_STREAM.equals(responseType)) { + return Flux.fromStream(resultList.stream()); + } + } + return Flux.just(resultList); + } + + + private static List sortFluxElement(List list) { + Comparator comparator = Comparator.comparingInt( + FluxElementResult::getIndex); + Collections.sort(list, comparator); + return list; + } + + public static class FluxResult { + + private final String responseType; + private final List fluxElementResults; + + public FluxResult(String responseType, List fluxElementResults) { + this.responseType = responseType; + this.fluxElementResults = fluxElementResults; + } + + public String getResponseType() { + return responseType; + } + + public List getFluxElementResults() { + return fluxElementResults; + } + } + + public static class FluxElementResult { + + private final int index; + private final String content; + private final String type; + + public FluxElementResult(int index, String content, String type) { + this.index = index; + this.content = content; + this.type = type; + } + + public int getIndex() { + return index; + } + + public String getContent() { + return content; + } + + public String getType() { + return type; + } + } +} diff --git a/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java b/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java new file mode 100644 index 000000000..79f912817 --- /dev/null +++ b/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java @@ -0,0 +1,55 @@ +package io.arex.inst.common.util; + +import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ARRAY; +import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ITERATOR; +import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_STREAM; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import io.arex.inst.common.util.FluxUtil.FluxElementResult; +import io.arex.inst.common.util.FluxUtil.FluxResult; +import io.arex.inst.runtime.util.TypeUtil; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +public class FluxUtilTest { + + @Test + void FluxRecory() { + List list = new ArrayList<>(); + FluxResult fluxResult = new FluxResult(null, list); + // flux is empty + assertNotNull(FluxUtil.restore(null)); + Flux result = FluxUtil.restore(fluxResult); + assertNotNull(result); + + // flux is not empty + FluxElementResult fluxElement1 = new FluxElementResult(1, "1", "java.lang.Integer"); + FluxElementResult fluxException1 = new FluxElementResult(2, null, "java.lang.RuntimeException"); + list.add(fluxElement1); + list.add(fluxException1); + + // Flux.just() + fluxResult = new FluxResult(null, list); + result = FluxUtil.restore(fluxResult); + assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxJust-java.util.ArrayList-"); + + // Flux.fromIterable() + fluxResult = new FluxResult(FLUX_FROM_ITERATOR, list); + result = FluxUtil.restore(fluxResult); + assertEquals(TypeUtil.getName(result),FLUX_FROM_ITERATOR); + + // Flux.fromArray() + fluxResult = new FluxResult(FLUX_FROM_ARRAY, list); + result = FluxUtil.restore(fluxResult); + assertEquals(TypeUtil.getName(result),FLUX_FROM_ARRAY); + + // Flux.fromStream() + fluxResult = new FluxResult(FLUX_FROM_STREAM, list); + result = FluxUtil.restore(fluxResult); + assertEquals(TypeUtil.getName(result),FLUX_FROM_STREAM); + } +} diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/pom.xml b/arex-instrumentation/dynamic/arex-dynamic-common/pom.xml index 85768a25b..3a81e6b93 100644 --- a/arex-instrumentation/dynamic/arex-dynamic-common/pom.xml +++ b/arex-instrumentation/dynamic/arex-dynamic-common/pom.xml @@ -19,17 +19,22 @@ ${springframework.version} provided - - io.projectreactor - reactor-core - ${reactor.version} - provided - joda-time joda-time 2.9 provided + + io.arex + arex-common + ${project.version} + + + io.projectreactor + reactor-core + ${reactor.version} + provided + diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java index 2b72d2542..ccb483f7b 100644 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java @@ -8,6 +8,8 @@ import io.arex.agent.bootstrap.util.ArrayUtils; import io.arex.agent.bootstrap.util.StringUtil; import io.arex.agent.thirdparty.util.time.DateFormatUtils; +import io.arex.inst.common.util.FluxUtil; +import io.arex.inst.dynamic.common.listener.FluxConsumer; import io.arex.inst.dynamic.common.listener.ListenableFutureAdapter; import io.arex.inst.dynamic.common.listener.MonoConsumer; import io.arex.inst.dynamic.common.listener.ResponseConsumer; @@ -31,6 +33,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class DynamicClassExtractor { @@ -41,6 +44,7 @@ public class DynamicClassExtractor { private static final String NEED_RECORD_TITLE = "dynamic.needRecord"; private static final String NEED_REPLAY_TITLE = "dynamic.needReplay"; public static final String MONO = "reactor.core.publisher.Mono"; + public static final String FLUX = "reactor.core.publisher.Flux"; private static final String JODA_LOCAL_DATE_TIME = "org.joda.time.LocalDateTime"; private static final String JODA_LOCAL_TIME = "org.joda.time.LocalTime"; public static final String SIMPLE_DATE_FORMAT_MILLIS = "yyyy-MM-dd HH:mm:"; @@ -95,6 +99,11 @@ public Object recordResponse(Object response) { if (MONO.equals(methodReturnType) && response instanceof Mono) { return new MonoConsumer(this).accept((Mono) response); } + + if (FLUX.equals(methodReturnType) && response instanceof Flux) { + return new FluxConsumer(this).accept((Flux) response); + } + this.result = response; if (needRecord()) { this.resultClazz = buildResultClazz(TypeUtil.getName(response)); @@ -308,6 +317,12 @@ Object restoreResponse(Object result) { return Mono.justOrEmpty(result); } + if (FLUX.equals(this.methodReturnType)) { + if (result instanceof Throwable) { + return Flux.error((Throwable) result); + } + return FluxUtil.restore(result); + } return result; } diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java new file mode 100644 index 000000000..82703d9de --- /dev/null +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java @@ -0,0 +1,57 @@ +package io.arex.inst.dynamic.common.listener; + +import io.arex.agent.bootstrap.ctx.TraceTransmitter; +import io.arex.inst.common.util.FluxUtil; +import io.arex.inst.common.util.FluxUtil.FluxResult; +import io.arex.inst.dynamic.common.DynamicClassExtractor; +import io.arex.inst.runtime.model.ArexConstants; +import io.arex.inst.runtime.serializer.Serializer; +import io.arex.inst.runtime.util.TypeUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import reactor.core.publisher.Flux; + +public class FluxConsumer { + + private final TraceTransmitter traceTransmitter; + private final DynamicClassExtractor extractor; + + public FluxConsumer(DynamicClassExtractor extractor) { + this.traceTransmitter = TraceTransmitter.create(); + this.extractor = extractor; + } + + public Flux accept(Flux responseFlux) { + // use a list to record all elements + List fluxElementMockerResults = new ArrayList<>(); + AtomicInteger index = new AtomicInteger(1); + String responseType = TypeUtil.getName(responseFlux); + return responseFlux + // add element to list + .doOnNext(element -> { + try (TraceTransmitter tm = traceTransmitter.transmit()) { + fluxElementMockerResults.add( + getFluxElementMockerResult(index.getAndIncrement(), element)); + } + }) + // add error to list + .doOnError(error -> { + try (TraceTransmitter tm = traceTransmitter.transmit()) { + fluxElementMockerResults.add( + getFluxElementMockerResult(index.getAndIncrement(), error)); + } + }) + .doFinally(result -> { + try (TraceTransmitter tm = traceTransmitter.transmit()) { + FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults); + extractor.recordResponse(fluxResult); + } + }); + } + + private FluxUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) { + String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER); + return new FluxUtil.FluxElementResult(index, content, TypeUtil.getName(element)); + } +} diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java index 72fd3740d..6ea63154d 100644 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java @@ -14,17 +14,23 @@ public MonoConsumer(DynamicClassExtractor extractor) { this.extractor = extractor; } + /** + * support for Mono type recording + * @param responseMono + * @return + */ public Mono accept(Mono responseMono) { return responseMono - .doOnSuccess(o -> { + .doOnSuccess(result -> { try (TraceTransmitter tm = traceTransmitter.transmit()) { - extractor.recordResponse(o); + extractor.recordResponse(result); } }) - .doOnError(o -> { + .doOnError(error-> { try (TraceTransmitter tm = traceTransmitter.transmit()) { - extractor.recordResponse(o); + extractor.recordResponse(error); } }); } + } diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java index c8cab9537..8b655757b 100644 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java @@ -5,6 +5,7 @@ import io.arex.agent.bootstrap.model.ArexMocker; import io.arex.agent.bootstrap.model.Mocker.Target; import io.arex.agent.thirdparty.util.time.DateFormatUtils; +import io.arex.inst.common.util.FluxUtil; import io.arex.inst.dynamic.common.listener.MonoConsumer; import io.arex.inst.runtime.config.ConfigBuilder; import io.arex.inst.runtime.context.ArexContext; @@ -20,6 +21,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.Set; @@ -38,12 +40,12 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Predicate; import java.util.stream.Stream; import org.mockito.stubbing.Answer; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -85,19 +87,30 @@ void record(Runnable mocker, Object[] args, Object result, Predicate pre System.out.println("mock MockService.recordMocker"); return null; }); - Method testWithArexMock; - if (result instanceof Mono) { - testWithArexMock = Mono.class.getDeclaredMethod("just", Object.class); - } else { - testWithArexMock = DynamicClassExtractorTest.class.getDeclaredMethod("testWithArexMock", String.class); - } + Method testWithArexMock = getMethod(result); DynamicClassExtractor extractor = new DynamicClassExtractor(testWithArexMock, args); - extractor.recordResponse(result); + assertTrue(predicate.test(result)); } } + private Method getMethod(Object result) { + try { + + if (result instanceof Mono) { + return Mono.class.getDeclaredMethod("just", Object.class); + } else if (result instanceof Flux) { + return Flux.class.getDeclaredMethod("just", Object.class); + + } else { + return DynamicClassExtractorTest.class.getDeclaredMethod("testWithArexMock", String.class); + } + } catch (NoSuchMethodException e) { + return null; + } + } + @Test void resetMonoResponse() { try { @@ -161,7 +174,8 @@ static Stream recordCase() { arguments(resultIsNull, new Object[]{"mock"}, new int[1001], nonNull), arguments(resultIsNull, null, null, isNull), arguments(resultIsNull, null, Mono.just("mono test"), nonNull), - arguments(resultIsNull, null, Futures.immediateFuture("mock-future"), nonNull) + arguments(resultIsNull, null, Futures.immediateFuture("mock-future"), nonNull), + arguments(resultIsNull, null, Flux.just("mock-exception"), nonNull) ); } @@ -274,6 +288,19 @@ void restoreResponseTest() throws NoSuchMethodException, ExecutionException, Int monoTestExtractorActualResult = monoTestExtractor.restoreResponse(new RuntimeException("test-exception")); Object monoTestFinalActualResult = monoTestExtractorActualResult; assertThrows(RuntimeException.class, () -> ((Mono) monoTestFinalActualResult).block()); + + // flux value + Method testReturnFlux = DynamicClassExtractorTest.class.getDeclaredMethod("testReturnFlux", String.class, + Throwable.class); + DynamicClassExtractor fluxTestExtractor = new DynamicClassExtractor(testReturnFlux, new Object[]{"mock"}, + "#val", null); + List list = new ArrayList<>(); + FluxUtil.FluxResult fluxResult = new FluxUtil.FluxResult(null, list); + Object fluxNormalTest = fluxTestExtractor.restoreResponse(fluxResult); + assertNull(((Flux) fluxNormalTest).blockFirst()); + + Object fluxExceptionTest = fluxTestExtractor.restoreResponse(new RuntimeException()); + assertThrows(RuntimeException.class,()-> ((Flux) fluxExceptionTest).blockFirst()); } @Test @@ -484,6 +511,13 @@ public Mono testReturnMono(String val, Throwable t) { return Mono.justOrEmpty(val + "testReturnMono"); } + public Flux testReturnFlux(String val,Throwable t){ + if (t != null) { + return Flux.error(t); + } + return val == null ? Flux.empty() : Flux.just(val + "testReturnFlux"); + } + public static Mono monoTest() { return Mono.justOrEmpty("Mono test") .doOnNext(value -> System.out.println("Mono context:" + value)) @@ -495,4 +529,23 @@ public static Mono monoExceptionTest() { .doOnError(throwable -> System.out.println("Mono error:" + throwable)) .doOnSuccess(object -> System.out.println("Mono success:" + object.getClass())); } + + public static Flux fluxTest() { + return Flux.just("flux","test") + .doOnNext(value -> System.out.println("Flux context:" + value)) + .onErrorResume(t -> Mono.empty()); + } + + public static Flux fluxOnErrorTest() { + return Flux.just("flux", "test") + .doOnNext(value -> { + throw new RuntimeException("error"); + }); + } + + public static Flux fluxExceptionTest() { + return Flux.error(new RuntimeException("e")) + .doOnError(throwable -> System.out.println("Flux error:" + throwable)) + .doOnNext(object -> System.out.println("Flux success:" + object.getClass())); + } } diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java new file mode 100644 index 000000000..bef4fd909 --- /dev/null +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java @@ -0,0 +1,165 @@ +package io.arex.inst.dynamic.common.listener; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import io.arex.inst.dynamic.common.DynamicClassExtractor; +import io.arex.inst.runtime.config.ConfigBuilder; +import io.arex.inst.runtime.context.ContextManager; +import java.lang.reflect.Method; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import reactor.core.publisher.Flux; + +public class FluxConsumerTest { + + static DynamicClassExtractor extractor; + static FluxConsumer fluxConsumer; + + @BeforeAll + static void setUp() { + Method testWithArexMock; + try { + testWithArexMock = FluxConsumerTest.class.getDeclaredMethod("testWithArexMock", String.class); + } catch (NoSuchMethodException e) { + testWithArexMock = null; + } + final Object[] args = {"errorSerialize"}; + extractor = new DynamicClassExtractor(testWithArexMock, args); + fluxConsumer = new FluxConsumer(extractor); + Mockito.mockStatic(ContextManager.class); + ConfigBuilder.create("test").enableDebug(true).build(); + } + + @AfterAll + static void tearDown() { + Mockito.clearAllCaches(); + } + + + @Test + void record() { + + // Record empty flux + testEmptyFlux(); + + // Record Exception + testFluxError(); + + // Normal conditions without exceptions or errors, all elements are recorded. + testNormalFlux(); + + // Elements before the error occurs and all elements in alternate Flux sequenceare when error occurs are recorded. + testFluxOnErrorResume(); + + // Except for the element when the error occurs, all other elements are recorded + testFluxOnErrorContinue(); + + // Elements before the error occurs and the exception are recorded (Flux terminates when exception is thrown). + testFluxOnError(); + } + + private static void testNormalFlux() { + Flux flux = Flux.just(1, 2, 3, 4, 5) + .doOnNext(val -> System.out.println("val" + ":" + val)) + // doFinally performs some operations that have nothing to do with the value of the element. + // If the doFinally operator is called multiple times, doFinally will be executed once at the end of each sequence. + .doFinally(System.out::println); + Flux subscribe = fluxConsumer.accept(flux); + Flux blockFirst = fluxConsumer.accept(flux); + // record content: 1,2,3,4,5 + subscribe.subscribe(); + // record content: 1 + assertEquals(blockFirst.blockFirst(), 1); + } + + private static void testEmptyFlux() { + Flux flux = Flux.empty(); + Flux subscribe = fluxConsumer.accept(flux); + Flux blockFirst = fluxConsumer.accept(flux); + // record content: 1,2,3,4,5 + subscribe.subscribe(); + // record content: 1 + assertNull(blockFirst.blockFirst()); + } + + + private static void testFluxOnErrorResume() { + Flux flux = Flux.just(1, 2) + .doOnNext(val -> { + if (val.equals(2)) { + throw new RuntimeException("error"); + } + }) + .doOnError(t -> System.out.println("error" + ":" + t)) + // returns an alternate Flux sequence when a Flux error occurs, + .onErrorResume(t -> Flux.just(7, 8, 9)); + + Flux subscribe = fluxConsumer.accept(flux); + Flux blockFirst = fluxConsumer.accept(flux); + + // record content: 1,7,8,9 + subscribe.subscribe(); + // record content: 1 + assertEquals(blockFirst.blockFirst(), 1); + } + + private static void testFluxOnError() { + final Flux flux = Flux.just(1, 2, 3, 4, 5) + .doOnNext(val -> { + if (val.equals(3)) { + throw new RuntimeException("error"); + } + }) + .doOnError(t -> System.out.println("error" + ":" + t)); + + Flux subscribe = fluxConsumer.accept(flux); + Flux blockFirst = fluxConsumer.accept(flux); + Flux blockLast = fluxConsumer.accept(flux); + + // record content: 1,2,RuntimeException + subscribe.subscribe(); + // record content: 1 + assertEquals(blockFirst.blockFirst(), 1); + // record content: RuntimeException + assertThrows(RuntimeException.class, () -> blockLast.blockLast()); + } + + private static void testFluxOnErrorContinue() { + Flux flux = Flux.just(1, 2, 3, 4, 5) + .doOnNext(val -> { + if (val.equals(3)) { + throw new RuntimeException("error"); + } + }) + .onErrorContinue((t, o) -> System.out.println("error" + ":" + t)) + .doOnNext(val -> System.out.println("val" + ":" + val)); + Flux subscribe = fluxConsumer.accept(flux); + Flux blockFirst = fluxConsumer.accept(flux); + Flux blockLast = fluxConsumer.accept(flux); + + // record content: 1,2,4,5 + subscribe.subscribe(); + // record content: 1 + assertEquals(blockFirst.blockFirst(), 1); + // record content: 5 + assertEquals(blockLast.blockLast(), 5); + } + + private static void testFluxError() { + Flux flux = Flux.error(new RuntimeException("error")); + Flux subscribe = fluxConsumer.accept(flux); + Flux blockFirst = fluxConsumer.accept(flux); + // record content: RuntimeException + subscribe.subscribe(); + // record content: RuntimeException + assertThrows(RuntimeException.class, () -> blockFirst.blockFirst()); + } + + public String testWithArexMock(String val) { + return val + "testWithArexMock"; + } + +} diff --git a/arex-instrumentation/pom.xml b/arex-instrumentation/pom.xml index a3a23ac84..3ab821708 100644 --- a/arex-instrumentation/pom.xml +++ b/arex-instrumentation/pom.xml @@ -35,6 +35,7 @@ dynamic/arex-dynamic dynamic/arex-dynamic-common dynamic/arex-cache + common/arex-common time-machine/arex-time-machine httpclient/arex-httpclient-common httpclient/arex-httpclient-okhttp-v3 @@ -64,6 +65,5 @@ com.google.auto.service auto-service -