Skip to content

Commit

Permalink
GH-9695: Replace AMQP tx-size with batch-size
Browse files Browse the repository at this point in the history
Fixes: #9695
Issue link: #9695

The `txSize` in the `SimpleMessageListenerContainer` has been replaced with more reasonable `batchSize`.
Spring Integration XML support for AMQP module has missed to fix this change: we didn't have a respective test.

* Deprecate `tx-size` (will be removed in `6.5`) XML attribute for the `<amqp:inbound-channel-adapter>`
and introduce `batch-size`
* Cover with the tests
* Fix docs from `tx-size` to `batch-size`

(cherry picked from commit 2cf83b5)
  • Loading branch information
artembilan authored and spring-builds committed Dec 5, 2024
1 parent 857f703 commit f56f5cf
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ abstract class AbstractAmqpInboundAdapterParser extends AbstractSingleBeanDefini
"receive-timeout",
"shutdown-timeout",
"tx-size",
"batch-size",
"missing-queues-fatal"
};

Expand Down Expand Up @@ -154,7 +155,13 @@ private BeanDefinition buildListenerContainer(Element element, ParserContext par
}
builder.addConstructorArgReference(connectionFactoryRef);
for (String attributeName : CONTAINER_VALUE_ATTRIBUTES) {
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName);
// TODO remove 'tx-size' in 6.5
if ("tx-size".equals(attributeName)) {
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName, "batchSize");
}
else {
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName);
}
}
for (String attributeName : CONTAINER_REFERENCE_ATTRIBUTES) {
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, attributeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
<xsd:documentation>
Acknowledge Mode for the MessageListenerContainer; default 'AUTO'
meaning the adapter automatically acknowledges the message(s)
according to the tx-size.
according to the batch-size.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
Expand Down Expand Up @@ -1039,7 +1039,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
<xsd:documentation>
Specifies how many messages to send to each consumer in a single request. Often this can be set
quite high
to improve throughput. It should be greater than or equal to the tx-size value.
to improve throughput. It should be greater than or equal to the batch-size value.
</xsd:documentation>
</xsd:appinfo>
</xsd:annotation>
Expand Down Expand Up @@ -1117,10 +1117,23 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
<xsd:annotation>
<xsd:appinfo>
<xsd:documentation>
[DEPRECATED]
How many messages to process in a single transaction (if the channel is transactional). For best
results it should be
less than or equal to the prefetch count.
Not allowed when 'consumers-per-queue' is set.
Deprecated in favor of 'batch-size'.
</xsd:documentation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="batch-size" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<xsd:documentation>
How many messages to process in a single request.
For best results it should be less than or equal to the prefetch count.
Not allowed when 'consumers-per-queue' is set.
</xsd:documentation>
</xsd:appinfo>
</xsd:annotation>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp https://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp https://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
Expand All @@ -20,32 +18,34 @@
</util:properties>

<amqp:inbound-channel-adapter id="rabbitInbound" queue-names="inboundchanneladapter.test.1"
batch-mode="EXTRACT_PAYLOADS"/>
batch-mode="EXTRACT_PAYLOADS" tx-size="2"/>

<amqp:inbound-channel-adapter id="autoStartFalse" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123" acknowledge-mode="${ackMode}" missing-queues-fatal="false" />
auto-startup="false" phase="123" acknowledge-mode="${ackMode}"
missing-queues-fatal="false"
batch-size="3"/>

<amqp:inbound-channel-adapter id="withHeaderMapperStandardAndCustomHeaders"
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"
mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"/>
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"
mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"/>

<amqp:inbound-channel-adapter id="withHeaderMapperOnlyCustomHeaders"
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"
mapped-request-headers="foo*"/>
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"
mapped-request-headers="foo*"/>

<amqp:inbound-channel-adapter id="withHeaderMapperNothingToMap"
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"
mapped-request-headers=""/>
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"
mapped-request-headers=""/>

<amqp:inbound-channel-adapter id="withHeaderMapperDefaultMapping"
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"/>
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
auto-startup="false" phase="123"/>

<amqp:inbound-channel-adapter id="dmlc" queue-names="inboundchanneladapter.test.2" consumers-per-queue="2"
auto-startup="false" />
auto-startup="false"/>

<int:channel id="requestChannel">
<int:queue/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,7 @@

package org.springframework.integration.amqp.config;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
Expand All @@ -37,10 +36,10 @@
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

/**
* @author Mark Fisher
Expand All @@ -49,8 +48,7 @@
*
* @since 2.1
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class AmqpInboundChannelAdapterParserTests {

Expand All @@ -71,6 +69,8 @@ public void verifyIdAsChannel() {
.isInstanceOf(SimpleMessageListenerContainer.class);
assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class))
.isEqualTo(BatchMode.EXTRACT_PAYLOADS);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
.isEqualTo(2);
}

@Test
Expand All @@ -95,6 +95,8 @@ public void verifyLifeCycle() {
.isEqualTo(AcknowledgeMode.NONE);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class))
.isFalse();
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
.isEqualTo(3);
}

@Test
Expand Down Expand Up @@ -216,14 +218,13 @@ public void withHeaderMapperDefaultMapping() throws Exception {

@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
try {
new ClassPathXmlApplicationContext("AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml",
this.getClass()).close();
}
catch (BeanDefinitionParsingException e) {
assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue();
}
assertThatExceptionOfType(BeanDefinitionParsingException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext(
"AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml",
getClass()))
.withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'");
}

}
2 changes: 1 addition & 1 deletion src/reference/antora/modules/ROOT/pages/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The following adapters are available:
* xref:amqp/inbound-gateway.adoc[Inbound Gateway]
* xref:amqp/outbound-channel-adapter.adoc[Outbound Channel Adapter]
* xref:amqp/outbound-gateway.adoc[Outbound Gateway]
* xref:amqp-async-outbound-gateway[Async Outbound Gateway]
* xref:amqp/async-outbound-gateway.adoc[Async Outbound Gateway]
* xref:amqp/rmq-streams.adoc#rmq-stream-inbound-channel-adapter[RabbitMQ Stream Queue Inbound Channel Adapter]
* xref:amqp/rmq-streams.adoc#rmq-stream-outbound-channel-adapter[RabbitMQ Stream Queue Outbound Channel Adapter]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ XML::
task-executor="" <22>
transaction-attribute="" <23>
transaction-manager="" <24>
tx-size="" <25>
batch-size="" <25>
consumers-per-queue <26>
batch-mode="MESSAGES"/> <27>
Expand Down Expand Up @@ -146,7 +146,7 @@ By default, this value is `Integer.MAX_VALUE`, meaning that this container start
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
It should be greater than or equal to the transaction size (see the `batch-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
Expand All @@ -173,7 +173,7 @@ If the `channelTransacted` flag is `false`, no transaction semantics apply to th
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single request.
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Java DSL::
+
[source, java, role="primary"]
----
@Bean // return the upper cased payload
@Bean // return the upper-cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
.transform(String.class, String::toUpperCase)
Expand Down

0 comments on commit f56f5cf

Please sign in to comment.