Skip to content

Commit

Permalink
feat: support flux type in dynamic (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanLi39 authored Jan 11, 2024
1 parent 28d4fd9 commit 7daff2b
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 20 deletions.
23 changes: 23 additions & 0 deletions arex-instrumentation/common/arex-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arex-instrumentation-parent</artifactId>
<groupId>io.arex</groupId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arex-common</artifactId>

<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.arex.inst.common.util;

import io.arex.inst.runtime.serializer.Serializer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import reactor.core.publisher.Flux;
import io.arex.agent.bootstrap.util.CollectionUtil;

public class FluxUtil {

static final String FLUX_FROM_ITERATOR = "reactor.core.publisher.FluxIterable-";
static final String FLUX_FROM_ARRAY = "reactor.core.publisher.FluxArray-";
static final String FLUX_FROM_STREAM = "reactor.core.publisher.FluxStream-";

private FluxUtil() {
}

public static Flux<?> restore(Object fluxObj) {
if(fluxObj == null){
return Flux.empty();
}
FluxResult fluxResult = (FluxResult) fluxObj;
List<FluxElementResult> fluxElementResults = fluxResult.getFluxElementResults();
if (CollectionUtil.isEmpty(fluxElementResults)) {
return Flux.empty();
}
List<Object> resultList = new ArrayList<>(fluxElementResults.size());
sortFluxElement(fluxElementResults).forEach(
fluxElement -> resultList.add(Serializer.deserialize(fluxElement.getContent(), fluxElement.getType())));
String responseType = fluxResult.getResponseType();
if (responseType != null) {
if (FLUX_FROM_ITERATOR.equals(responseType)) {
return Flux.fromIterable(resultList);
} else if (FLUX_FROM_ARRAY.equals(responseType)) {
return Flux.fromArray(resultList.toArray());
} else if (FLUX_FROM_STREAM.equals(responseType)) {
return Flux.fromStream(resultList.stream());
}
}
return Flux.just(resultList);
}


private static List<FluxElementResult> sortFluxElement(List<FluxElementResult> list) {
Comparator<FluxElementResult> comparator = Comparator.comparingInt(
FluxElementResult::getIndex);
Collections.sort(list, comparator);
return list;
}

public static class FluxResult {

private final String responseType;
private final List<FluxElementResult> fluxElementResults;

public FluxResult(String responseType, List<FluxElementResult> fluxElementResults) {
this.responseType = responseType;
this.fluxElementResults = fluxElementResults;
}

public String getResponseType() {
return responseType;
}

public List<FluxElementResult> getFluxElementResults() {
return fluxElementResults;
}
}

public static class FluxElementResult {

private final int index;
private final String content;
private final String type;

public FluxElementResult(int index, String content, String type) {
this.index = index;
this.content = content;
this.type = type;
}

public int getIndex() {
return index;
}

public String getContent() {
return content;
}

public String getType() {
return type;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.arex.inst.common.util;

import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ARRAY;
import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ITERATOR;
import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_STREAM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.arex.inst.common.util.FluxUtil.FluxElementResult;
import io.arex.inst.common.util.FluxUtil.FluxResult;
import io.arex.inst.runtime.util.TypeUtil;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

public class FluxUtilTest {

@Test
void FluxRecory() {
List<FluxElementResult> list = new ArrayList<>();
FluxResult fluxResult = new FluxResult(null, list);
// flux is empty
assertNotNull(FluxUtil.restore(null));
Flux<?> result = FluxUtil.restore(fluxResult);
assertNotNull(result);

// flux is not empty
FluxElementResult fluxElement1 = new FluxElementResult(1, "1", "java.lang.Integer");
FluxElementResult fluxException1 = new FluxElementResult(2, null, "java.lang.RuntimeException");
list.add(fluxElement1);
list.add(fluxException1);

// Flux.just()
fluxResult = new FluxResult(null, list);
result = FluxUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxJust-java.util.ArrayList-");

// Flux.fromIterable()
fluxResult = new FluxResult(FLUX_FROM_ITERATOR, list);
result = FluxUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_ITERATOR);

// Flux.fromArray()
fluxResult = new FluxResult(FLUX_FROM_ARRAY, list);
result = FluxUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_ARRAY);

// Flux.fromStream()
fluxResult = new FluxResult(FLUX_FROM_STREAM, list);
result = FluxUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_STREAM);
}
}
17 changes: 11 additions & 6 deletions arex-instrumentation/dynamic/arex-dynamic-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@
<version>${springframework.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.arex</groupId>
<artifactId>arex-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.arex.agent.bootstrap.util.ArrayUtils;
import io.arex.agent.bootstrap.util.StringUtil;
import io.arex.agent.thirdparty.util.time.DateFormatUtils;
import io.arex.inst.common.util.FluxUtil;
import io.arex.inst.dynamic.common.listener.FluxConsumer;
import io.arex.inst.dynamic.common.listener.ListenableFutureAdapter;
import io.arex.inst.dynamic.common.listener.MonoConsumer;
import io.arex.inst.dynamic.common.listener.ResponseConsumer;
Expand All @@ -31,6 +33,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DynamicClassExtractor {
Expand All @@ -41,6 +44,7 @@ public class DynamicClassExtractor {
private static final String NEED_RECORD_TITLE = "dynamic.needRecord";
private static final String NEED_REPLAY_TITLE = "dynamic.needReplay";
public static final String MONO = "reactor.core.publisher.Mono";
public static final String FLUX = "reactor.core.publisher.Flux";
private static final String JODA_LOCAL_DATE_TIME = "org.joda.time.LocalDateTime";
private static final String JODA_LOCAL_TIME = "org.joda.time.LocalTime";
public static final String SIMPLE_DATE_FORMAT_MILLIS = "yyyy-MM-dd HH:mm:";
Expand Down Expand Up @@ -95,6 +99,11 @@ public Object recordResponse(Object response) {
if (MONO.equals(methodReturnType) && response instanceof Mono<?>) {
return new MonoConsumer(this).accept((Mono<?>) response);
}

if (FLUX.equals(methodReturnType) && response instanceof Flux<?>) {
return new FluxConsumer(this).accept((Flux<?>) response);
}

this.result = response;
if (needRecord()) {
this.resultClazz = buildResultClazz(TypeUtil.getName(response));
Expand Down Expand Up @@ -308,6 +317,12 @@ Object restoreResponse(Object result) {
return Mono.justOrEmpty(result);
}

if (FLUX.equals(this.methodReturnType)) {
if (result instanceof Throwable) {
return Flux.error((Throwable) result);
}
return FluxUtil.restore(result);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.arex.inst.dynamic.common.listener;

import io.arex.agent.bootstrap.ctx.TraceTransmitter;
import io.arex.inst.common.util.FluxUtil;
import io.arex.inst.common.util.FluxUtil.FluxResult;
import io.arex.inst.dynamic.common.DynamicClassExtractor;
import io.arex.inst.runtime.model.ArexConstants;
import io.arex.inst.runtime.serializer.Serializer;
import io.arex.inst.runtime.util.TypeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Flux;

public class FluxConsumer {

private final TraceTransmitter traceTransmitter;
private final DynamicClassExtractor extractor;

public FluxConsumer(DynamicClassExtractor extractor) {
this.traceTransmitter = TraceTransmitter.create();
this.extractor = extractor;
}

public Flux<?> accept(Flux<?> responseFlux) {
// use a list to record all elements
List<FluxUtil.FluxElementResult> fluxElementMockerResults = new ArrayList<>();
AtomicInteger index = new AtomicInteger(1);
String responseType = TypeUtil.getName(responseFlux);
return responseFlux
// add element to list
.doOnNext(element -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
fluxElementMockerResults.add(
getFluxElementMockerResult(index.getAndIncrement(), element));
}
})
// add error to list
.doOnError(error -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
fluxElementMockerResults.add(
getFluxElementMockerResult(index.getAndIncrement(), error));
}
})
.doFinally(result -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults);
extractor.recordResponse(fluxResult);
}
});
}

private FluxUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) {
String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER);
return new FluxUtil.FluxElementResult(index, content, TypeUtil.getName(element));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ public MonoConsumer(DynamicClassExtractor extractor) {
this.extractor = extractor;
}

/**
* support for Mono type recording
* @param responseMono
* @return
*/
public Mono<?> accept(Mono<?> responseMono) {
return responseMono
.doOnSuccess(o -> {
.doOnSuccess(result -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
extractor.recordResponse(o);
extractor.recordResponse(result);
}
})
.doOnError(o -> {
.doOnError(error-> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
extractor.recordResponse(o);
extractor.recordResponse(error);
}
});
}

}
Loading

0 comments on commit 7daff2b

Please sign in to comment.