Skip to content

Commit

Permalink
1144 Fix propagation of skipInputConversion
Browse files Browse the repository at this point in the history
Resolves #1144
  • Loading branch information
olegz committed May 27, 2024
1 parent 953217f commit 67180ac
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ public class FunctionInvocationWrapper implements Function<Object, Object>, Cons

private PostProcessingFunction postProcessor;

private Consumer<Boolean> skipInputConversionCallback;

/*
* This is primarily to support Stream's ability to access
* un-converted payload (e.g., to evaluate expression on some attribute of a payload)
Expand Down Expand Up @@ -483,6 +485,9 @@ public boolean isSkipOutputConversion() {
return skipOutputConversion;
}

public boolean isSkipInputConversion() {
return skipInputConversion;
}

public boolean isPrototype() {
return !this.isSingleton;
Expand All @@ -493,6 +498,13 @@ public void setSkipInputConversion(boolean skipInputConversion) {
logger.debug("'skipInputConversion' was explicitely set to true. No input conversion will be attempted");
}
this.skipInputConversion = skipInputConversion;
if (this.skipInputConversionCallback != null) {
this.skipInputConversionCallback.accept(skipInputConversion);
}
}

void setSkipInputConversionCallback(Consumer<Boolean> skipInputConversionCallback) {
this.skipInputConversionCallback = skipInputConversionCallback;
}

public void setSkipOutputConversion(boolean skipOutputConversion) {
Expand Down Expand Up @@ -684,6 +696,10 @@ else if (this.outputType == null) {

String composedName = this.functionDefinition + "|" + afterWrapper.functionDefinition;
FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, composedFunctionType);
composedFunction.setSkipInputConversionCallback((skipInputConversion) -> {
this.setSkipInputConversion(skipInputConversion);
afterWrapper.setSkipInputConversion(skipInputConversion);
});
composedFunction.composed = true;
if (((FunctionInvocationWrapper) after).target instanceof PostProcessingFunction) {
composedFunction.postProcessor = (PostProcessingFunction) ((FunctionInvocationWrapper) after).target;
Expand Down Expand Up @@ -836,7 +852,7 @@ private Object fluxifyInputIfNecessary(Object input) {
if ((!treatPayloadAsPlainText && JsonMapper.isJsonStringRepresentsCollection(payload))
&& !FunctionTypeUtils.isTypeCollection(this.inputType)
&& !FunctionTypeUtils.isTypeArray(this.inputType)) {
MessageHeaders headers = ((Message) input).getHeaders();
MessageHeaders headers = input instanceof Message ? ((Message) input).getHeaders() : new MessageHeaders(Collections.emptyMap());
Collection collectionPayload = jsonMapper.fromJson(payload, Collection.class);
Class inputClass = FunctionTypeUtils.getRawType(this.inputType);
if (this.isInputTypeMessage()) {
Expand Down Expand Up @@ -1103,6 +1119,9 @@ else if (FunctionTypeUtils.isMultipleArgumentType(type)) {
convertedInput = Tuples.fromArray(convertedInputs);
}
else if (this.skipInputConversion) {
if (!(input instanceof Message)) {
input = MessageBuilder.withPayload(input).build();
}
convertedInput = this.isInputTypeMessage()
? input
: new OriginalMessageHolder(((Message) input).getPayload(), (Message<?>) input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ public void testCompositionWithNonExistingFunction() throws Exception {
assertThat(registration.getNames().iterator().next()).isEqualTo("echo1");
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testMessageWithArrayAsPayload() throws Exception {
FunctionCatalog catalog = this.configureCatalog(MessageWithArrayAsPayload.class);
FunctionInvocationWrapper function = catalog.lookup("myFunction");

List payload = List.of("Ricky", "Julien", "Bubbles");

Message result = (Message) function.apply(MessageBuilder.withPayload(payload).build());

assertThat(((Collection) result.getPayload())).isNotEmpty();

}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testCompositionWithNullReturnInBetween() {
Expand Down Expand Up @@ -325,7 +339,7 @@ public void testReactiveVoidInputFunctionAsSupplier() {


@Test
public void testComposition() {
public void testComposition() throws Exception {
FunctionCatalog catalog = this.configureCatalog();
Function<Flux<String>, Flux<String>> fluxFunction = catalog.lookup("uppercase|reverseFlux");

Expand All @@ -348,8 +362,24 @@ public void testComposition() {
assertThat(result.get(0)).isEqualTo("OLLEH");
assertThat(result.get(1)).isEqualTo("EYB");

Function<String, String> function = catalog.lookup("uppercase|reverse");
FunctionInvocationWrapper function = catalog.lookup("uppercase|reverse");
assertThat(function.apply("foo")).isEqualTo("OOF");

Object target = function.getTarget();
Field arg1Field = ReflectionUtils.findField(target.getClass(), "arg$1");
arg1Field.setAccessible(true);
FunctionInvocationWrapper functionUppercase = (FunctionInvocationWrapper) arg1Field.get(target);

Field arg2Field = ReflectionUtils.findField(target.getClass(), "arg$2");
arg2Field.setAccessible(true);
FunctionInvocationWrapper functionReverse = (FunctionInvocationWrapper) arg2Field.get(target);

assertThat(functionUppercase.isSkipInputConversion()).isFalse();
assertThat(functionReverse.isSkipInputConversion()).isFalse();

function.setSkipInputConversion(true);
assertThat(functionUppercase.isSkipInputConversion()).isTrue();
assertThat(functionReverse.isSkipInputConversion()).isTrue();
}

@Test
Expand Down Expand Up @@ -1509,4 +1539,13 @@ public Function<String, String> echo2() {
}
}

@EnableAutoConfiguration
@Configuration
public static class MessageWithArrayAsPayload {

@Bean
public Function<Message<?>, Message<?>> myFunction() {
return msg -> msg;
}
}
}

0 comments on commit 67180ac

Please sign in to comment.