diff --git a/README.md b/README.md index 9bca03bc5..e4a1ea341 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 common library (Java) (3.33.0) +# th2 common library (Java) (3.34.0) ## Usage @@ -80,6 +80,8 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. * maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout. * prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10. * messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100 +* secondsToCheckVirtualQueueLimit - this option defines an interval in seconds between size check attempts, default = 10 +* batchesToCheckVirtualQueueLimit - this option defines the number of batches between size check attempts, default = 10000 ```json { @@ -95,7 +97,9 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. "minConnectionRecoveryTimeout": 10000, "maxConnectionRecoveryTimeout": 60000, "prefetchCount": 10, - "messageRecursionLimit": 100 + "messageRecursionLimit": 100, + "secondsToCheckVirtualQueueLimit": 10, + "batchesToCheckVirtualQueueLimit": 10000 } ``` @@ -117,6 +121,7 @@ The `CommonFactory` reads a message's router configuration from the `mq.json` fi * filters - pin's message's filters * metadata - a metadata filters * message - a message's fields filters + * virtualQueueLimit - MQ router calculates destination queues and compares their current size to this value. The router blocks the current thread to repeat the comparison if the size of any destination queues exceeds the virtual limit Filters format: * fieldName - a field's name @@ -154,7 +159,8 @@ Filters format: "operation": "WILDCARD" } ] - } + }, + "virtualQueueLimit": 10000 } } } @@ -288,6 +294,12 @@ dependencies { ## Release notes +### 3.34.0 + ++ Added backpressure support: lock sending if queue virtual size limit is exceeded ++ Added parameter `virtualQueueLimit` to `mq.json` ++ Added parameters `secondsToCheckVirtualQueueLimit` and `batchesToCheckVirtualQueueLimit` to `mq_router.json` + ### 3.33.0 + Added ability to read dictionaries by aliases and as group of all available aliases diff --git a/build.gradle b/build.gradle index a61424583..037162966 100644 --- a/build.gradle +++ b/build.gradle @@ -171,6 +171,7 @@ dependencies { implementation "io.grpc:grpc-netty" implementation "com.rabbitmq:amqp-client" + implementation 'com.rabbitmq:http-client:4.0.0' implementation "org.jetbrains:annotations" diff --git a/gradle.properties b/gradle.properties index 3d838c7a2..a91acb1d6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ # -# Copyright 2020-2021 Exactpro (Exactpro Systems Limited) +# Copyright 2020-2022 Exactpro (Exactpro Systems Limited) # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -13,7 +13,7 @@ # limitations under the License. # -release_version=3.33.0 +release_version=3.34.0 description = 'th2 common library (Java)' diff --git a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java index b311b4a41..20c40d090 100644 --- a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java +++ b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java @@ -645,7 +645,12 @@ protected PrometheusConfiguration loadPrometheusConfiguration() { } protected ConnectionManager createRabbitMQConnectionManager() { - return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration(), livenessMonitor::disable); + return new ConnectionManager( + getRabbitMqConfiguration(), + getConnectionManagerConfiguration(), + getMessageRouterConfiguration(), + livenessMonitor::disable + ); } protected ConnectionManager getRabbitMqConnectionManager() { diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java index 751bc4113..063b0b4e3 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ public abstract class AbstractRabbitSender implements MessageSender { private final AtomicReference exchangeName = new AtomicReference<>(); private final AtomicReference connectionManager = new AtomicReference<>(); private final String th2Type; + private long sentBeforeQueueSizeCheck; public AbstractRabbitSender( @NotNull ConnectionManager connectionManager, @@ -81,7 +82,7 @@ public void send(T value) throws IOException { requireNonNull(value, "Value for send can not be null"); try { - ConnectionManager connection = this.connectionManager.get(); + ConnectionManager connectionManager = this.connectionManager.get(); byte[] bytes = valueToBytes(value); MESSAGE_SIZE_PUBLISH_BYTES .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) @@ -89,7 +90,12 @@ public void send(T value) throws IOException { MESSAGE_PUBLISH_TOTAL .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) .inc(); - connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); + sentBeforeQueueSizeCheck++; + if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getBatchesToCheckVirtualQueueLimit()) { + connectionManager.lockSendingIfSizeLimitExceeded(routingKey.get()); + sentBeforeQueueSizeCheck = 0; + } + connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'", diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index c073a711f..4cdf72a01 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -16,6 +16,8 @@ import com.exactpro.th2.common.metrics.HealthMetrics; import com.exactpro.th2.common.schema.message.SubscriberMonitor; +import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration; +import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration; import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -32,6 +34,11 @@ import com.rabbitmq.client.RecoveryListener; import com.rabbitmq.client.ShutdownNotifier; import com.rabbitmq.client.TopologyRecoveryException; +import com.rabbitmq.http.client.Client; +import com.rabbitmq.http.client.ClientParameters; +import com.rabbitmq.http.client.domain.BindingInfo; +import com.rabbitmq.http.client.domain.QueueInfo; + import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -43,12 +50,18 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,22 +69,35 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; -public class ConnectionManager implements AutoCloseable { +import static com.rabbitmq.http.client.domain.DestinationType.QUEUE; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.partitioningBy; +import static java.util.stream.Collectors.toMap; +public class ConnectionManager implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); + private static final String RABBITMQ_MANAGEMENT_URL = "http://%s:15672/api/"; private final Connection connection; private final Map channelsByPin = new ConcurrentHashMap<>(); private final AtomicInteger connectionRecoveryAttempts = new AtomicInteger(0); private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false); - private final ConnectionManagerConfiguration configuration; + private final RabbitMQConfiguration rabbitMQConfiguration; + private final ConnectionManagerConfiguration connectionManagerConfiguration; private final String subscriberName; private final AtomicInteger nextSubscriberId = new AtomicInteger(1); private final ExecutorService sharedExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("rabbitmq-shared-pool-%d") .build()); + private final Map queueNameToVirtualQueueLimit; + private final Client client; + private final ScheduledExecutorService sizeCheckExecutor = Executors.newScheduledThreadPool(1); + private final Set knownExchanges = Collections.synchronizedSet(new HashSet<>()); private final HealthMetrics metrics = new HealthMetrics(this); @@ -88,13 +114,27 @@ public void handleRecovery(Recoverable recoverable) { public void handleRecoveryStarted(Recoverable recoverable) {} }; - public ConnectionManagerConfiguration getConfiguration() { - return configuration; + public ConnectionManagerConfiguration getConnectionManagerConfiguration() { + return connectionManagerConfiguration; } - public ConnectionManager(@NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration, Runnable onFailedRecoveryConnection) { - Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); - this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); + public ConnectionManager( + @NotNull RabbitMQConfiguration rabbitMQConfiguration, + @NotNull ConnectionManagerConfiguration connectionManagerConfiguration, + @NotNull MessageRouterConfiguration messageRouterConfiguration, + Runnable onFailedRecoveryConnection + ) { + this.rabbitMQConfiguration = requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); + this.connectionManagerConfiguration = requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); + queueNameToVirtualQueueLimit = requireNonNull(messageRouterConfiguration, "Message router configuration can not be null") + .getQueues() + .values() + .stream() + .collect(toMap( + QueueConfiguration::getQueue, + QueueConfiguration::getVirtualQueueLimit, + Math::min // TODO is it valid situation if there are several configurations for one queue? + )); String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), rabbitMQConfiguration.getSubscriberName()); if (StringUtils.isBlank(subscriberNameTmp)) { @@ -215,9 +255,15 @@ private void turnOffReadiness(Throwable exception){ try { this.connection = factory.newConnection(); + client = new Client( + new ClientParameters() + .url(String.format(RABBITMQ_MANAGEMENT_URL, rabbitMQConfiguration.getHost())) + .username(rabbitMQConfiguration.getUsername()) + .password(rabbitMQConfiguration.getPassword()) + ); metrics.getReadinessMonitor().enable(); LOGGER.debug("Set RabbitMQ readiness to true"); - } catch (IOException | TimeoutException e) { + } catch (IOException | TimeoutException | URISyntaxException e) { metrics.getReadinessMonitor().disable(); LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e); throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e); @@ -242,6 +288,13 @@ public void handleUnblocked() throws IOException { } else { throw new IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it"); } + + sizeCheckExecutor.scheduleAtFixedRate( + this::lockSendingIfSizeLimitExceeded, + connectionManagerConfiguration.getSecondsToCheckVirtualQueueLimit(), // TODO another initial delay? + connectionManagerConfiguration.getSecondsToCheckVirtualQueueLimit(), + TimeUnit.SECONDS + ); } public boolean isOpen() { @@ -254,7 +307,7 @@ public void close() { return; } - int closeTimeout = configuration.getConnectionCloseTimeout(); + int closeTimeout = connectionManagerConfiguration.getConnectionCloseTimeout(); if (connection.isOpen()) { try { // We close the connection and don't close channels @@ -265,14 +318,13 @@ public void close() { } } - shutdownSharedExecutor(closeTimeout); + shutdownExecutor(sharedExecutor, closeTimeout); + shutdownExecutor(sizeCheckExecutor, closeTimeout); } public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { - ChannelHolder holder = getChannelFor(PinId.forRoutingKey(routingKey)); - holder.withLock(channel -> { - channel.basicPublish(exchange, routingKey, props, body); - }); + knownExchanges.add(exchange); + getChannelFor(PinId.forRoutingKey(routingKey)).publishWithLocks(exchange, routingKey, props, body); } public SubscriberMonitor basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { @@ -298,12 +350,12 @@ private void basicCancel(Channel channel, String consumerTag) throws IOException channel.basicCancel(consumerTag); } - private void shutdownSharedExecutor(int closeTimeout) { - sharedExecutor.shutdown(); + private void shutdownExecutor(ExecutorService executor, int closeTimeout) { + executor.shutdown(); try { - if (!sharedExecutor.awaitTermination(closeTimeout, TimeUnit.MILLISECONDS)) { + if (!executor.awaitTermination(closeTimeout, TimeUnit.MILLISECONDS)) { LOGGER.error("Executor is not terminated during {} millis", closeTimeout); - List runnables = sharedExecutor.shutdownNow(); + List runnables = executor.shutdownNow(); LOGGER.error("{} task(s) was(were) not finished", runnables.size()); } } catch (InterruptedException e) { @@ -323,8 +375,8 @@ private Channel createChannel() { try { Channel channel = connection.createChannel(); - Objects.requireNonNull(channel, () -> "No channels are available in the connection. Max channel number: " + connection.getChannelMax()); - channel.basicQos(configuration.getPrefetchCount()); + requireNonNull(channel, () -> "No channels are available in the connection. Max channel number: " + connection.getChannelMax()); + channel.basicQos(connectionManagerConfiguration.getPrefetchCount()); channel.addReturnListener(ret -> LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", ret.getExchange(), ret.getRoutingKey(), ret.getReplyCode(), ret.getReplyText())); return channel; @@ -377,6 +429,72 @@ private void basicAck(Channel channel, long deliveryTag) throws IOException { channel.basicAck(deliveryTag, false); } + public void lockSendingIfSizeLimitExceeded(String routingKey) { + lockSendingIfSizeLimitExceeded(List.of(routingKey)); + } + + private void lockSendingIfSizeLimitExceeded() { + lockSendingIfSizeLimitExceeded( + channelsByPin.keySet().stream() + .filter(channelHolder -> channelHolder.routingKey != null) + .map(channelHolder -> channelHolder.routingKey) + .collect(Collectors.toList()) + ); + } + + private void lockSendingIfSizeLimitExceeded(List routingKeys) { + try { + for (var routingKeyToQueues : groupQueuesByRoutingKey(routingKeys).entrySet()) { + String routingKey = routingKeyToQueues.getKey(); + Map> isExceededToQueues = routingKeyToQueues.getValue().stream() + .collect(partitioningBy(QueueInfoWithVirtualLimit::isExceeded)); + ChannelHolder holder = getChannelFor(PinId.forRoutingKey(routingKey)); + List exceededQueues = isExceededToQueues.get(true); + if (exceededQueues.isEmpty()) { + if (holder.sizeLimitLock.isLocked()) { + holder.sizeLimitLock.unlock(); + LOGGER.info( + "Sending via routing key '{}' is resumed. There are {}", + routingKey, + isExceededToQueues.get(false).stream().map(QueueInfoWithVirtualLimit::getSizeDetails).collect(joining(", ")) + ); + } + } else { + if (!holder.sizeLimitLock.isLocked()) { + holder.sizeLimitLock.lock(); + LOGGER.info( + "Sending via routing key '{}' is paused because there are {}", + routingKey, + exceededQueues.stream().map(QueueInfoWithVirtualLimit::getSizeDetails).collect(joining(", ")) + ); + } + } + } + } catch (Throwable t) { + LOGGER.error("Error during check queue sizes", t); + } + } + + private Map> groupQueuesByRoutingKey(List knownRoutingKeys) { + List bindings = new ArrayList<>(); + knownExchanges.forEach(exchange -> bindings.addAll( + client.getBindingsBySource(rabbitMQConfiguration.getVHost(), exchange).stream() + .filter(it -> it.getDestinationType() == QUEUE && knownRoutingKeys.contains(it.getRoutingKey())) + .collect(Collectors.toList()) + )); + Map queueNameToInfo = client.getQueues().stream() + .collect(toMap(QueueInfo::getName, Function.identity())); + Map> routingKeyToQueues = new HashMap<>(); + bindings.forEach(bindingInfo -> routingKeyToQueues + .computeIfAbsent(bindingInfo.getRoutingKey(), s -> new ArrayList<>()) + .add(new QueueInfoWithVirtualLimit( + queueNameToInfo.get(bindingInfo.getDestination()), + queueNameToVirtualQueueLimit.get(bindingInfo.getDestination()) + )) + ); + return routingKeyToQueues; + } + private static class RabbitMqSubscriberMonitor implements SubscriberMonitor { private final ChannelHolder holder; @@ -446,6 +564,7 @@ public String toString() { } private static class ChannelHolder { + private final ReentrantLock sizeLimitLock = new ReentrantLock(); private final Lock lock = new ReentrantLock(); private final Supplier supplier; private final BiConsumer reconnectionChecker; @@ -455,8 +574,17 @@ public ChannelHolder( Supplier supplier, BiConsumer reconnectionChecker ) { - this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter"); - this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter"); + this.supplier = requireNonNull(supplier, "'Supplier' parameter"); + this.reconnectionChecker = requireNonNull(reconnectionChecker, "'Reconnection checker' parameter"); + } + + public void publishWithLocks(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { + sizeLimitLock.lock(); + try { + withLock(true, channel -> channel.basicPublish(exchange, routingKey, props, body)); + } finally { + sizeLimitLock.unlock(); + } } public void withLock(ChannelConsumer consumer) throws IOException { diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/configuration/MessageRouterConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/configuration/MessageRouterConfiguration.kt index ed63c3fc2..8287570d5 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/configuration/MessageRouterConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/configuration/MessageRouterConfiguration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -42,7 +42,8 @@ data class QueueConfiguration( @JsonProperty(required = true) @JsonAlias("labels", "tags") var attributes: List = emptyList(), var filters: List = emptyList(), @JsonProperty(value = "read") var isReadable: Boolean = true, - @JsonProperty(value = "write") var isWritable: Boolean = true + @JsonProperty(value = "write") var isWritable: Boolean = true, + @JsonProperty var virtualQueueLimit: Long = 10000, ) : Configuration() data class MqRouterFilterConfiguration( diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt index d7d60395c..714599a5a 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -26,7 +26,8 @@ data class RabbitMQConfiguration( @JsonProperty(required = true) var password: String, @Deprecated(message = "Please use subscriber name from ConnectionManagerConfiguration") var subscriberName: String? = null, //FIXME: Remove in future version - var exchangeName: String? = null) : Configuration() + var exchangeName: String? = null, +) : Configuration() data class ConnectionManagerConfiguration( var subscriberName: String? = null, @@ -36,5 +37,7 @@ data class ConnectionManagerConfiguration( var minConnectionRecoveryTimeout: Int = 10000, var maxConnectionRecoveryTimeout: Int = 60000, val prefetchCount: Int = 10, - val messageRecursionLimit: Int = 100 + val messageRecursionLimit: Int = 100, + val secondsToCheckVirtualQueueLimit: Int = 10, + val batchesToCheckVirtualQueueLimit: Int = 10000, ) : Configuration() \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/QueueInfoWithVirtualLimit.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/QueueInfoWithVirtualLimit.kt new file mode 100644 index 000000000..7ac60d44a --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/QueueInfoWithVirtualLimit.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection + +import com.rabbitmq.http.client.domain.QueueInfo + +class QueueInfoWithVirtualLimit(queueInfo: QueueInfo, virtualLimit: Long) { + val isExceeded = queueInfo.totalMessages > virtualLimit + val sizeDetails = "${queueInfo.totalMessages} message(s) in '${queueInfo.name}' (limit is $virtualLimit)" +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt index d1baf2502..7251866e7 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -85,7 +85,7 @@ class RabbitMessageGroupBatchRouter : AbstractRabbitRouter() FilterFunction { msg: Message, filters: List -> filterMessage(msg, filters) }, pinName, pinConfig.filters, - connectionManager.configuration.messageRecursionLimit + connectionManager.connectionManagerConfiguration.messageRecursionLimit ) } diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt index 4b48e4510..973bb4f87 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ class TestRabbitMessageGroupBatchRouter { private val connectionConfiguration = ConnectionManagerConfiguration() private val monitor: SubscriberMonitor = mock { } private val connectionManager: ConnectionManager = mock { - on { configuration }.thenReturn(connectionConfiguration) + on { connectionManagerConfiguration }.thenReturn(connectionConfiguration) on { basicConsume(any(), any(), any()) }.thenReturn(monitor) }