Skip to content

Commit

Permalink
GH-9709: Fix IntegrationFlow for input channel resolution
Browse files Browse the repository at this point in the history
Fixes: #9709
Issue link: #9709

The Java DSL loses an `inputChannel` when existing channel is referenced by its name.
Then the target IntegrationFlow does not contain a bean reference for this channel and when we call its `getInputChannel()` we got something from middle of the flow.
However, that `inputChannel` is really expected to be an input for the flow.

* Fix `IntegrationFlowBeanPostProcessor` to get a bean by `MessageChannelReference` and populate it into `targetIntegrationComponents`
as we do for newly created `DirectChannel` if there is no bean for provided `MessageChannelReference`

**Auto-cherry-pick to `6.3.x`**
  • Loading branch information
artembilan committed Dec 10, 2024
1 parent b1032a7 commit 4d84220
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -192,12 +193,16 @@ else if (useFlowIdAsPrefix) {
}
else if (component instanceof MessageChannelReference messageChannelReference) {
String channelBeanName = messageChannelReference.name();
MessageChannel channelByName;
if (!this.beanFactory.containsBean(channelBeanName)) {
DirectChannel directChannel = new DirectChannel();
registerComponent(registerBeanDefinitions, directChannel, channelBeanName,
channelByName = new DirectChannel();
registerComponent(registerBeanDefinitions, channelByName, channelBeanName,
beanSource, beanDescription, flowBeanName);
targetIntegrationComponents.put(directChannel, channelBeanName);
}
else {
channelByName = this.beanFactory.getBean(channelBeanName, MessageChannel.class);
}
targetIntegrationComponents.put(channelByName, channelBeanName);
}
else if (component instanceof SourcePollingChannelAdapterSpec spec) {
Map<Object, String> componentsToRegister = spec.getComponentsToRegister();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -202,11 +202,17 @@ IntegrationFlow middleFlow(IntegrationFlow firstFlow, IntegrationFlow lastFlow)
.to(lastFlow);
}

@Bean
DirectChannel lastFlowInput() {
return new DirectChannel();
}

@Bean
IntegrationFlow lastFlow() {
return f -> f
return IntegrationFlow.from("lastFlowInput")
.<String, String>transform(p -> p + ", and last flow")
.channel(c -> c.queue("lastFlowResult"));
.channel(c -> c.queue("lastFlowResult"))
.get();
}

}
Expand Down

0 comments on commit 4d84220

Please sign in to comment.