diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index eb315cf4f..252b52389 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -422,6 +422,8 @@ public class FunctionInvocationWrapper implements Function, Cons private PostProcessingFunction postProcessor; + private Consumer skipInputConversionCallback; + /* * This is primarily to support Stream's ability to access * un-converted payload (e.g., to evaluate expression on some attribute of a payload) @@ -483,6 +485,9 @@ public boolean isSkipOutputConversion() { return skipOutputConversion; } + public boolean isSkipInputConversion() { + return skipInputConversion; + } public boolean isPrototype() { return !this.isSingleton; @@ -493,6 +498,13 @@ public void setSkipInputConversion(boolean skipInputConversion) { logger.debug("'skipInputConversion' was explicitely set to true. No input conversion will be attempted"); } this.skipInputConversion = skipInputConversion; + if (this.skipInputConversionCallback != null) { + this.skipInputConversionCallback.accept(skipInputConversion); + } + } + + void setSkipInputConversionCallback(Consumer skipInputConversionCallback) { + this.skipInputConversionCallback = skipInputConversionCallback; } public void setSkipOutputConversion(boolean skipOutputConversion) { @@ -684,6 +696,10 @@ else if (this.outputType == null) { String composedName = this.functionDefinition + "|" + afterWrapper.functionDefinition; FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, composedFunctionType); + composedFunction.setSkipInputConversionCallback((skipInputConversion) -> { + this.setSkipInputConversion(skipInputConversion); + afterWrapper.setSkipInputConversion(skipInputConversion); + }); composedFunction.composed = true; if (((FunctionInvocationWrapper) after).target instanceof PostProcessingFunction) { composedFunction.postProcessor = (PostProcessingFunction) ((FunctionInvocationWrapper) after).target; @@ -836,7 +852,7 @@ private Object fluxifyInputIfNecessary(Object input) { if ((!treatPayloadAsPlainText && JsonMapper.isJsonStringRepresentsCollection(payload)) && !FunctionTypeUtils.isTypeCollection(this.inputType) && !FunctionTypeUtils.isTypeArray(this.inputType)) { - MessageHeaders headers = ((Message) input).getHeaders(); + MessageHeaders headers = input instanceof Message ? ((Message) input).getHeaders() : new MessageHeaders(Collections.emptyMap()); Collection collectionPayload = jsonMapper.fromJson(payload, Collection.class); Class inputClass = FunctionTypeUtils.getRawType(this.inputType); if (this.isInputTypeMessage()) { @@ -1103,6 +1119,9 @@ else if (FunctionTypeUtils.isMultipleArgumentType(type)) { convertedInput = Tuples.fromArray(convertedInputs); } else if (this.skipInputConversion) { + if (!(input instanceof Message)) { + input = MessageBuilder.withPayload(input).build(); + } convertedInput = this.isInputTypeMessage() ? input : new OriginalMessageHolder(((Message) input).getPayload(), (Message) input); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 49ab2baa7..edd78aeac 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -140,6 +140,20 @@ public void testCompositionWithNonExistingFunction() throws Exception { assertThat(registration.getNames().iterator().next()).isEqualTo("echo1"); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testMessageWithArrayAsPayload() throws Exception { + FunctionCatalog catalog = this.configureCatalog(MessageWithArrayAsPayload.class); + FunctionInvocationWrapper function = catalog.lookup("myFunction"); + + List payload = List.of("Ricky", "Julien", "Bubbles"); + + Message result = (Message) function.apply(MessageBuilder.withPayload(payload).build()); + + assertThat(((Collection) result.getPayload())).isNotEmpty(); + + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testCompositionWithNullReturnInBetween() { @@ -325,7 +339,7 @@ public void testReactiveVoidInputFunctionAsSupplier() { @Test - public void testComposition() { + public void testComposition() throws Exception { FunctionCatalog catalog = this.configureCatalog(); Function, Flux> fluxFunction = catalog.lookup("uppercase|reverseFlux"); @@ -348,8 +362,24 @@ public void testComposition() { assertThat(result.get(0)).isEqualTo("OLLEH"); assertThat(result.get(1)).isEqualTo("EYB"); - Function function = catalog.lookup("uppercase|reverse"); + FunctionInvocationWrapper function = catalog.lookup("uppercase|reverse"); assertThat(function.apply("foo")).isEqualTo("OOF"); + + Object target = function.getTarget(); + Field arg1Field = ReflectionUtils.findField(target.getClass(), "arg$1"); + arg1Field.setAccessible(true); + FunctionInvocationWrapper functionUppercase = (FunctionInvocationWrapper) arg1Field.get(target); + + Field arg2Field = ReflectionUtils.findField(target.getClass(), "arg$2"); + arg2Field.setAccessible(true); + FunctionInvocationWrapper functionReverse = (FunctionInvocationWrapper) arg2Field.get(target); + + assertThat(functionUppercase.isSkipInputConversion()).isFalse(); + assertThat(functionReverse.isSkipInputConversion()).isFalse(); + + function.setSkipInputConversion(true); + assertThat(functionUppercase.isSkipInputConversion()).isTrue(); + assertThat(functionReverse.isSkipInputConversion()).isTrue(); } @Test @@ -1509,4 +1539,13 @@ public Function echo2() { } } + @EnableAutoConfiguration + @Configuration + public static class MessageWithArrayAsPayload { + + @Bean + public Function, Message> myFunction() { + return msg -> msg; + } + } }