Skip to content

Commit

Permalink
Wait for MRM experiment mono to complete before returning default mes…
Browse files Browse the repository at this point in the history
…sage
  • Loading branch information
eager-signal committed Sep 5, 2024
1 parent b95a766 commit ad17c6e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,13 @@ Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final by

final Mono<MessageProtos.Envelope> messageMono;
if (message.hasSharedMrmKey()) {
maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);
final Mono<?> experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);

// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build());
// To avoid races, wait for the experiment to run, but ignore any errors
messageMono = experimentMono
.onErrorComplete()
.then(Mono.just(message.toBuilder().clearSharedMrmKey().build()));
} else {
messageMono = Mono.just(message);
}
Expand All @@ -420,7 +423,7 @@ Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final by
*
* @see DynamicMessagesConfiguration#mrmViewExperimentEnabled()
*/
private void maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid,
private Mono<?> maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid,
final byte destinationDevice) {
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration()
.mrmViewExperimentEnabled()) {
Expand Down Expand Up @@ -456,6 +459,10 @@ private void maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage,

experiment.compareFutureResult(mrmMessage.toBuilder().clearSharedMrmKey().build(),
mrmMessageMono.toFuture());

return mrmMessageMono;
} else {
return Mono.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,22 @@ void testAvailabilityListenerResponses() {
});
}

@Test
void testMultiRecipientMessage() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testMultiRecipientMessage(final boolean sharedMrmKeyPresent) throws Exception {
final UUID destinationUuid = UUID.randomUUID();
final byte deviceId = 1;

final UUID mrmGuid = UUID.randomUUID();
final SealedSenderMultiRecipientMessage mrm = generateRandomMrmMessage(
new AciServiceIdentifier(destinationUuid), deviceId);
final byte[] sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrmGuid, mrm);

final byte[] sharedMrmDataKey;
if (sharedMrmKeyPresent) {
sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrmGuid, mrm);
} else {
sharedMrmDataKey = new byte[]{1};
}

final UUID guid = UUID.randomUUID();
final MessageProtos.Envelope message = generateRandomMessage(guid, true)
Expand All @@ -585,7 +592,7 @@ void testMultiRecipientMessage() throws Exception {
.build();
messagesCache.insert(guid, destinationUuid, deviceId, message);

assertEquals(1L, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster()
assertEquals(sharedMrmKeyPresent ? 1 : 0, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster()
.withBinaryCluster(conn -> conn.sync().exists(MessagesCache.getSharedMrmKey(mrmGuid))));

final List<MessageProtos.Envelope> messages = get(destinationUuid, deviceId, 1);
Expand Down

0 comments on commit ad17c6e

Please sign in to comment.