Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMQ-9484] Support exporting kahadb messages from a queue with an offset #1208

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public interface MessageStore extends Service {

void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;

void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception;

void dispose(ConnectionContext context);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
delegate.recoverNextMessages(maxReturned, listener);
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception {
delegate.recoverNextMessages(offset, maxReturned, listener, keepCurrentNextMessageId);
}

@Override
public void resetBatching() {
delegate.resetBatching();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,32 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
int position = 0;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
if(offset > 0 && offset > position) {
position++;
continue;
}
if (pastLackBatch) {
Object msg = entry.getValue();
lastBatchId = entry.getKey();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId) msg);
} else {
listener.recoverMessage((Message) msg);
}
} else {
pastLackBatch = entry.getKey().equals(lastBatchId);
}
position++;
}
}
}

@Override
public void resetBatching() {
lastBatchId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ public boolean recoverMessageReference(String reference) throws Exception {

}

/**
* @param offset
* @param maxReturned
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* org.apache.activemq.store.MessageRecoveryListener)
*/
@Override
public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener, final boolean keepCurrentNextMessageId) throws Exception {
throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported.");
}

public void trackRollbackAck(Message message) {
synchronized (rolledBackAcks) {
rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,52 @@ public void execute(Transaction tx) throws Exception {
}
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener, final boolean keepCurrentNextMessageId) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set<String> ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
long currentDefaultCursorPosition = sd.orderIndex.cursor.defaultCursorPosition;

try {
if(offset > 0) {
// setBatch does a +1 internally
sd.orderIndex.setBatch(tx, currentDefaultCursorPosition - 1 + offset);
}
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();

if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}

Message msg = loadMessage(entry.getValue().location);
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break;
}
}
sd.orderIndex.stoppedIterating();
} finally {
if(keepCurrentNextMessageId) {
sd.orderIndex.setBatch(tx, currentDefaultCursorPosition - 1);
}
}
}
});
} finally {
indexLock.writeLock().unlock();
}
}

protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
int counter = 0;
String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,43 @@ public void execute(Transaction tx) throws Exception {
}
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener, final boolean keepCurrentNextMessageId) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageRecord> entry=null;
int counter = 0;
long currentCursorPosition = cursorPos;
try {
long recoverCursPos = cursorPos;
if(offset > 0) {
recoverCursPos = cursorPos + offset;
}
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, recoverCursPos); iterator.hasNext();) {
entry = iterator.next();

listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
counter++;
if( counter >= maxReturned ) {
break;
}
}
if( entry!=null ) {
cursorPos = entry.getKey()+1;
}
} finally {
if(offset > 0 && keepCurrentNextMessageId) {
cursorPos = currentCursorPosition;
}
}
}
});
}
}

@Override
public void resetBatching() {
cursorPos=0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception {

}

@Override
public void setBatch(MessageId message) {
batch.set((Long)message.getFutureOrSequenceLong());
Expand Down
Loading