Skip to content

Commit

Permalink
Merge branch 'apache:main' into feature/replica_broker_internal
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaShupletsov authored Jun 26, 2023
2 parents 02d505a + 4d81ef5 commit c9c863d
Show file tree
Hide file tree
Showing 38 changed files with 1,170 additions and 2,246 deletions.
3 changes: 1 addition & 2 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
| ------- | ------------------ |
| 5.18.x | :white_check_mark: |
| 5.17.x | :white_check_mark: |
| 5.16.x | :white_check_mark: |
| <= 5.15.x | :x: |
| <= 5.16.x | :x: |

## Reporting a Vulnerability

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,11 @@ private void checkUsage(ConnectionContext context,ProducerBrokerExchange produce
}

private void expireMessages() {
if(isDispatchPaused()) {
LOG.debug("{} dispatchPaused, skipping expire messages check", getActiveMQDestination().getQualifiedName());
return;
}

LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName());

// just track the insertion count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class QueueDispatchSelector extends SimpleDispatchSelector {
private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class);
private Subscription exclusiveConsumer;
private boolean paused;
private volatile boolean paused;

/**
* @param destination
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected final ConcurrentMap<ConsumerId, Set<ConsumerId>> compositeConsumerIds = new ConcurrentHashMap<>();
protected final ConcurrentMap<SubscriptionInfo, Set<SubscriptionInfo>> compositeSubscriptions = new ConcurrentHashMap<>();
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -1015,6 +1018,18 @@ public void run() {

} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();

// If we have an entry in compositeConsumerIds then this consumer was a
// composite consumer and we need to remove the entries in the set and
// not the consumer id we received here
final Set<ConsumerId> compositeIds = compositeConsumerIds.remove(id);
if (compositeIds != null) {
for (ConsumerId compositeId : compositeIds) {
serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId));
}
return;
}

removeDemandSubscription(id);

if (forcedDurableRemoteId.remove(id)) {
Expand All @@ -1030,6 +1045,23 @@ public void run() {
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());

// If we have an entry in compositeSubscriptions then this consumer was a
// composite consumer and we need to remove the entries in the set and not
// the subscription that we received here
final Set<SubscriptionInfo> compositeSubs =
this.compositeSubscriptions.remove(subscriptionInfo);
if (compositeSubs != null) {
for (SubscriptionInfo compositeSub : compositeSubs) {
RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo();
remove.setClientId(compositeSub.getClientId());
remove.setSubscriptionName(compositeSub.getSubscriptionName());
remove.setConnectionId(this.localConnectionInfo.getConnectionId());
serviceRemoteConsumerAdvisory(remove);
}
return;
}

final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
subscriptionInfo.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
Expand Down Expand Up @@ -1415,6 +1447,12 @@ protected void setupStaticDestinations() {
}

protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
// Check if this was processed and split into new consumers for composite dests
if (splitCompositeConsumer(consumerInfo)) {
// If true we don't want to continue processing the original consumer info
return;
}

ConsumerInfo info = consumerInfo.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub = createDemandSubscription(info);
Expand Down Expand Up @@ -1443,6 +1481,65 @@ protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOExcepti
}
}

// Generate new consumers for each destination that part of a composite destination list for a consumer
private boolean splitCompositeConsumer(final ConsumerInfo consumerInfo) throws IOException {
// If not a composite destination or if an advisory topic then return false
// So we process normally and don't split
if (!consumerInfo.getDestination().isComposite() ||
AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination())) {
return false;
}

// At this point this is a composite destination and not an advisory topic. The destination
// will be split into individual destinations to create demand so that conduit subscriptions
// and durable subscriptions work correctly

// Handle duplicates, don't need to create again if we already have an entry
// Just return true so we stop processing
if (!isDuplicateSuppressionOff(consumerInfo) && compositeConsumerIds.containsKey(
consumerInfo.getConsumerId())) {
return true;
}

// Get a set to store mapped consumer Ids for each individual destination in the composite list
// and (if applicable) a set for subscriptions for durables
final Set<ConsumerId> consumerIds = compositeConsumerIds.computeIfAbsent(
consumerInfo.getConsumerId(),
k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
final Set<SubscriptionInfo> subscriptions = Optional.ofNullable(
consumerInfo.getSubscriptionName()).map(
subName -> compositeSubscriptions.computeIfAbsent(
new SubscriptionInfo(consumerInfo.getClientId(),
consumerInfo.getSubscriptionName()),
k -> Collections.newSetFromMap(new ConcurrentHashMap<>()))).orElse(null);

// Split and go through each destination that is part of the composite list and process
for (ActiveMQDestination individualDest : consumerInfo.getDestination()
.getCompositeDestinations()) {
// Create a new consumer info with the individual destinations and
// generate new consumer Ids for each and add to the consumerIds set
final ConsumerInfo info = consumerInfo.copy();
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
info.setDestination(individualDest);
consumerIds.add(info.getConsumerId());

// If there is a subscription name (durable) then generate a new one for the dest
// and add to the subscriptions set
Optional.ofNullable(subscriptions).ifPresent(
subs -> {
info.setSubscriptionName(
consumerInfo.getSubscriptionName() + individualDest.getPhysicalName());
subs.add(
new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
});

// Continue on and process the new consumer Info
addConsumerInfo(info);
}
return true;
}

private void undoMapRegistration(DemandSubscription sub) {
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws
if (singleSelectorPerDestination) {
String destinationName = info.getDestination().getQualifiedName();
Set<String> selectors = subSelectorCache.get(destinationName);
if (info.getSelector() == null && selectors.size() > 1) {
if (info.getSelector() == null && (selectors != null && selectors.size() > 1)) {
boolean removed = selectors.remove(MATCH_EVERYTHING);
LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed);
}
Expand Down
5 changes: 5 additions & 0 deletions activemq-kahadb-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@
<artifactId>jmock-legacy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.activemq.store.kahadb.disk.journal;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;

import java.util.Objects;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,17 +76,14 @@ public ByteSequence readRecord(Location location) throws IOException {
}

try {

if (location.getSize() == Location.NOT_SET) {
file.seek(location.getOffset());
location.setSize(file.readInt());
location.setType(file.readByte());
} else {
file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
}
if ((long)location.getOffset() + location.getSize() > dataFile.length) {
throw new IOException("Invalid location size: " + location + ", size: " + location.getSize());
}
validateFileLength(location);
byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE];
file.readFully(data);
return new ByteSequence(data, 0, data.length);
Expand Down Expand Up @@ -118,7 +115,6 @@ public void readLocationDetails(Location location) throws IOException {
}
}


public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {

file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
Expand All @@ -132,4 +128,24 @@ public void updateRecord(Location location, ByteSequence data, boolean sync) thr
public RecoverableRandomAccessFile getRaf() {
return file;
}

void validateFileLength(final Location location) throws IOException {
final long recordEnd = location.getOffset() + location.getSize();

//Check if the end of the record will go past the file length
if (recordEnd > dataFile.length) {
/*
* AMQ-9254 if the read request is outside expected dataFile length,
* perform expensive OS file length lookup operation to allow read
* operation if it will succeed
*/
final long osFileLength = dataFile.getFile().length();
if(recordEnd > osFileLength) {
throw new IOException("Invalid location size: " + location + ", size: " + location.getSize());
} else {
LOG.warn("DataFile:{} actual length:{} larger than expected:{} for readRecord location:{} size:{}",
dataFile.file.getName(), osFileLength, dataFile.length, location, location.getSize());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -239,9 +240,20 @@ public void testRecoveryAfterCorruptionMetadataLocation() throws Exception {
final var appender = new AbstractAppender("testAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) {
@Override
public void append(LogEvent event) {
if (event.getLevel() == Level.WARN
/**
* NOTE: As of JDK v11.0.19 RandomAccessFile throws a messageless EOFException when read fails
*
* throw new EOFException();
*/
if (event != null
&& event.getLevel() == Level.WARN
&& event.getMessage() != null
&& event.getMessage().getFormattedMessage() != null
&& event.getMessage().getFormattedMessage().contains("Cannot recover message audit")
&& event.getThrown().getLocalizedMessage().contains("Invalid location size")) {
&& event.getThrown() != null
&& event.getThrown() instanceof EOFException
&& event.getThrown().getMessage() == null) {

trappedExpectedLogMessage.set(true);
}
}
Expand All @@ -258,6 +270,8 @@ public void append(LogEvent event) {
}

assertEquals("no missing message", 50, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 50, drainQueue(50));
assertEquals("no problem draining messages", 0, broker.getAdminView().getTotalMessageCount());
assertTrue("Did replay records on invalid location size", trappedExpectedLogMessage.get());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.disk.journal;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;


public class DataFileAccessorTest {

@Rule
public TemporaryFolder dataDir = new TemporaryFolder();

@Test
public void testValidLocation() throws IOException {
//Create file of size 1024
final DataFileAccessor accessor = new DataFileAccessor(mock(Journal.class),
newTestDataFile(1024));

//The read check will add the offset and location size and will be 612
//so should be fine as it's less than the set file size of 1024
final Location location = new Location(0, 100);
location.setSize(512);

accessor.validateFileLength(location);
}

@Test(expected = IOException.class)
public void testInvalidLocationSize() throws IOException {
//Create file of size 1024
final DataFileAccessor accessor = new DataFileAccessor(mock(Journal.class),
newTestDataFile(1024));

//Set a size that is too large so this should fail
final Location location = new Location(0, 100);
location.setSize(2048);

accessor.validateFileLength(location);
}

@Test
public void testValidateUsingRealFileLength() throws IOException {
//Create file of size 1024
final DataFile dataFile = newTestDataFile(1024);

final DataFileAccessor accessor = new DataFileAccessor(mock(Journal.class), dataFile);

//Set a bad length value on the dataFile so that the initial check fails
//because the location is greater than dataFile.length
//We should read the real file size (1024) which is greater than the
//location size + offset so this should work
dataFile.setLength(512);
final Location location = new Location(0, 100);
location.setSize(512);

accessor.validateFileLength(location);
}

private DataFile newTestDataFile(int size) throws IOException {
final DataFile dataFile = new DataFile(writeTestFile(1024), 0);
assertEquals(1024, dataFile.length);
return dataFile;
}

private File writeTestFile(int size) throws IOException {
final File file = dataDir.newFile();
final byte[] data = new byte[size];
Arrays.fill(data, (byte)0);
Files.write(Path.of(file.toURI()), data);
assertEquals(1024, file.length());
return file;
}
}
Loading

0 comments on commit c9c863d

Please sign in to comment.