Skip to content

Commit

Permalink
Multi-recipient message views
Browse files Browse the repository at this point in the history
This adds support for storing multi-recipient message payloads and recipient views in Redis, and only fanning out on delivery or persistence. Phase 1: confirm storage and retrieval correctness.
  • Loading branch information
eager-signal authored Sep 4, 2024
1 parent d78c837 commit 11601fd
Show file tree
Hide file tree
Showing 50 changed files with 1,546 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
keyspaceNotificationDispatchExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor,
messageDeliveryScheduler, messageDeletionAsyncExecutor, clock);
messageDeliveryScheduler, messageDeletionAsyncExecutor, clock, dynamicConfigurationManager);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ public FaultTolerantRedisClusterFactory getRedisClusterConfiguration() {
public int getPersistDelayMinutes() {
return persistDelayMinutes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,9 @@

package org.whispersystems.textsecuregcm.configuration.dynamic;

import java.util.List;

import javax.validation.constraints.NotNull;

public record DynamicMessagesConfiguration(@NotNull List<DynamoKeyScheme> dynamoKeySchemes) {
public enum DynamoKeyScheme {
TRADITIONAL,
LAZY_DELETION;
}
public record DynamicMessagesConfiguration(boolean storeSharedMrmData, boolean mrmViewExperimentEnabled) {

public DynamicMessagesConfiguration() {
this(List.of(DynamoKeyScheme.TRADITIONAL));
}

public DynamoKeyScheme writeKeyScheme() {
return dynamoKeySchemes().getLast();
this(false, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;

import java.security.MessageDigest;
import java.time.Clock;
import java.time.Duration;
Expand Down Expand Up @@ -73,8 +72,8 @@
import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.server.ManagedAsync;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.signal.libsignal.protocol.ServiceId;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage.Recipient;
import org.signal.libsignal.protocol.ServiceId;
import org.signal.libsignal.protocol.util.Pair;
import org.signal.libsignal.zkgroup.ServerSecretParams;
import org.signal.libsignal.zkgroup.VerificationFailedException;
Expand Down Expand Up @@ -261,7 +260,7 @@ public MessageController(
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ManagedAsync
@Operation(
@Operation(
summary = "Send a message",
description = """
Deliver a message to a single recipient. May be authenticated or unauthenticated; if unauthenticated,
Expand Down Expand Up @@ -309,9 +308,10 @@ public Response sendMessage(@ReadOnly @Auth Optional<AuthenticatedDevice> source

if (groupSendToken != null) {
if (!source.isEmpty() || !accessKey.isEmpty()) {
throw new BadRequestException("Group send endorsement tokens should not be combined with other authentication");
throw new BadRequestException(
"Group send endorsement tokens should not be combined with other authentication");
} else if (isStory) {
throw new BadRequestException("Group send endorsement tokens should not be sent for story messages");
throw new BadRequestException("Group send endorsement tokens should not be sent for story messages");
}
}

Expand Down Expand Up @@ -346,8 +346,7 @@ public Response sendMessage(@ReadOnly @Auth Optional<AuthenticatedDevice> source
}

final Optional<byte[]> spamReportToken = switch (senderType) {
case SENDER_TYPE_IDENTIFIED ->
reportSpamTokenProvider.makeReportSpamToken(context, source.get(), destination);
case SENDER_TYPE_IDENTIFIED -> reportSpamTokenProvider.makeReportSpamToken(context, source.get(), destination);
default -> Optional.empty();
};

Expand Down Expand Up @@ -470,7 +469,7 @@ public Response sendMessage(@ReadOnly @Auth Optional<AuthenticatedDevice> source
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevices(e.getMissingDevices(),
e.getExtraDevices()))
e.getExtraDevices()))
.build());
} catch (StaleDevicesException e) {
throw new WebApplicationException(Response.status(410)
Expand Down Expand Up @@ -621,27 +620,28 @@ public Response sendMultiRecipientMessage(
Collection<AccountMismatchedDevices> accountMismatchedDevices = new ArrayList<>();
Collection<AccountStaleDevices> accountStaleDevices = new ArrayList<>();
recipients.values().forEach(recipient -> {
final Account account = recipient.account();

try {
DestinationDeviceValidator.validateCompleteDeviceList(account, recipient.deviceIdToRegistrationId().keySet(), Collections.emptySet());

DestinationDeviceValidator.validateRegistrationIds(
account,
recipient.deviceIdToRegistrationId().entrySet(),
Map.Entry<Byte, Short>::getKey,
e -> Integer.valueOf(e.getValue()),
recipient.serviceIdentifier().identityType() == IdentityType.PNI);
} catch (MismatchedDevicesException e) {
accountMismatchedDevices.add(
new AccountMismatchedDevices(
recipient.serviceIdentifier(),
new MismatchedDevices(e.getMissingDevices(), e.getExtraDevices())));
} catch (StaleDevicesException e) {
accountStaleDevices.add(
new AccountStaleDevices(recipient.serviceIdentifier(), new StaleDevices(e.getStaleDevices())));
}
});
final Account account = recipient.account();

try {
DestinationDeviceValidator.validateCompleteDeviceList(account, recipient.deviceIdToRegistrationId().keySet(),
Collections.emptySet());

DestinationDeviceValidator.validateRegistrationIds(
account,
recipient.deviceIdToRegistrationId().entrySet(),
Map.Entry<Byte, Short>::getKey,
e -> Integer.valueOf(e.getValue()),
recipient.serviceIdentifier().identityType() == IdentityType.PNI);
} catch (MismatchedDevicesException e) {
accountMismatchedDevices.add(
new AccountMismatchedDevices(
recipient.serviceIdentifier(),
new MismatchedDevices(e.getMissingDevices(), e.getExtraDevices())));
} catch (StaleDevicesException e) {
accountStaleDevices.add(
new AccountStaleDevices(recipient.serviceIdentifier(), new StaleDevices(e.getStaleDevices())));
}
});
if (!accountMismatchedDevices.isEmpty()) {
return Response
.status(409)
Expand All @@ -667,6 +667,11 @@ public Response sendMultiRecipientMessage(
}

try {
@Nullable final byte[] sharedMrmKey =
dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().storeSharedMrmData()
? messagesManager.insertSharedMultiRecipientMessagePayload(multiRecipientMessage)
: null;

CompletableFuture.allOf(
recipients.values().stream()
.flatMap(recipientData -> {
Expand All @@ -692,8 +697,7 @@ public Response sendMultiRecipientMessage(
sentMessageCounter.increment();
sendCommonPayloadMessage(
destinationAccount, destinationDevice, recipientData.serviceIdentifier(), timestamp,
online,
isStory, isUrgent, payload);
online, isStory, isUrgent, payload, sharedMrmKey);
},
multiRecipientMessageExecutor));
})
Expand Down Expand Up @@ -739,8 +743,8 @@ private void checkAccessKeys(
.filter(Predicate.not(Account::isUnrestrictedUnidentifiedAccess))
.map(account ->
account.getUnidentifiedAccessKey()
.filter(b -> b.length == keyLength)
.orElseThrow(() -> new WebApplicationException(Status.UNAUTHORIZED)))
.filter(b -> b.length == keyLength)
.orElseThrow(() -> new WebApplicationException(Status.UNAUTHORIZED)))
.reduce(new byte[keyLength],
(a, b) -> {
final byte[] xor = new byte[keyLength];
Expand Down Expand Up @@ -828,23 +832,28 @@ public CompletableFuture<Response> removePendingMessage(@ReadOnly @Auth Authenti
auth.getAuthenticatedDevice(),
uuid,
null)
.thenAccept(maybeDeletedMessage -> {
maybeDeletedMessage.ifPresent(deletedMessage -> {
.thenAccept(maybeRemovedMessage -> maybeRemovedMessage.ifPresent(removedMessage -> {

WebSocketConnection.recordMessageDeliveryDuration(deletedMessage.getServerTimestamp(),
auth.getAuthenticatedDevice());
WebSocketConnection.recordMessageDeliveryDuration(removedMessage.serverTimestamp(),
auth.getAuthenticatedDevice());

if (deletedMessage.hasSourceUuid() && deletedMessage.getType() != Type.SERVER_DELIVERY_RECEIPT) {
if (removedMessage.sourceServiceId().isPresent()
&& removedMessage.envelopeType() != Type.SERVER_DELIVERY_RECEIPT) {
if (removedMessage.sourceServiceId().get() instanceof AciServiceIdentifier aciServiceIdentifier) {
try {
receiptSender.sendReceipt(
ServiceIdentifier.valueOf(deletedMessage.getDestinationUuid()), auth.getAuthenticatedDevice().getId(),
AciServiceIdentifier.valueOf(deletedMessage.getSourceUuid()), deletedMessage.getTimestamp());
receiptSender.sendReceipt(removedMessage.destinationServiceId(), auth.getAuthenticatedDevice().getId(),
aciServiceIdentifier, removedMessage.clientTimestamp());
} catch (Exception e) {
logger.warn("Failed to send delivery receipt", e);
}
} else {
// If source service ID is present and the envelope type is not a server delivery receipt, then
// the source service ID *should always* be an ACI -- PNIs are receive-only, so they can only be the
// "source" via server delivery receipts
logger.warn("Source service ID unexpectedly a PNI service ID");
}
});
})
}
}))
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}

Expand Down Expand Up @@ -943,19 +952,25 @@ private void sendCommonPayloadMessage(Account destinationAccount,
boolean online,
boolean story,
boolean urgent,
byte[] payload) {
byte[] payload,
@Nullable byte[] sharedMrmKey) {

final Envelope.Builder messageBuilder = Envelope.newBuilder();
final long serverTimestamp = System.currentTimeMillis();

messageBuilder
.setType(Type.UNIDENTIFIED_SENDER)
.setTimestamp(timestamp == 0 ? serverTimestamp : timestamp)
.setClientTimestamp(timestamp == 0 ? serverTimestamp : timestamp)
.setServerTimestamp(serverTimestamp)
.setContent(ByteString.copyFrom(payload))
.setStory(story)
.setUrgent(urgent)
.setDestinationUuid(serviceIdentifier.toServiceIdentifierString());
.setDestinationServiceId(serviceIdentifier.toServiceIdentifierString());

if (sharedMrmKey != null) {
messageBuilder.setSharedMrmKey(ByteString.copyFrom(sharedMrmKey));
}
// mrm views phase 1: always set content
messageBuilder.setContent(ByteString.copyFrom(payload));

messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public MessageProtos.Envelope toEnvelope(final ServiceIdentifier destinationIden
final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder();

envelopeBuilder.setType(envelopeType)
.setTimestamp(timestamp)
.setClientTimestamp(timestamp)
.setServerTimestamp(System.currentTimeMillis())
.setDestinationUuid(destinationIdentifier.toServiceIdentifierString())
.setDestinationServiceId(destinationIdentifier.toServiceIdentifierString())
.setStory(story)
.setUrgent(urgent);

if (sourceAccount != null && sourceDeviceId != null) {
envelopeBuilder
.setSourceUuid(new AciServiceIdentifier(sourceAccount.getUuid()).toServiceIdentifierString())
.setSourceServiceId(new AciServiceIdentifier(sourceAccount.getUuid()).toServiceIdentifierString())
.setSourceDevice(sourceDeviceId.intValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public record OutgoingMessageEntity(UUID guid,
public MessageProtos.Envelope toEnvelope() {
final MessageProtos.Envelope.Builder builder = MessageProtos.Envelope.newBuilder()
.setType(MessageProtos.Envelope.Type.forNumber(type()))
.setTimestamp(timestamp())
.setClientTimestamp(timestamp())
.setServerTimestamp(serverTimestamp())
.setDestinationUuid(destinationUuid().toServiceIdentifierString())
.setDestinationServiceId(destinationUuid().toServiceIdentifierString())
.setServerGuid(guid().toString())
.setStory(story)
.setUrgent(urgent);

if (sourceUuid() != null) {
builder.setSourceUuid(sourceUuid().toServiceIdentifierString());
builder.setSourceServiceId(sourceUuid().toServiceIdentifierString());
builder.setSourceDevice(sourceDevice());
}

Expand All @@ -72,10 +72,10 @@ public static OutgoingMessageEntity fromEnvelope(final MessageProtos.Envelope en
return new OutgoingMessageEntity(
UUID.fromString(envelope.getServerGuid()),
envelope.getType().getNumber(),
envelope.getTimestamp(),
envelope.hasSourceUuid() ? ServiceIdentifier.valueOf(envelope.getSourceUuid()) : null,
envelope.getClientTimestamp(),
envelope.hasSourceServiceId() ? ServiceIdentifier.valueOf(envelope.getSourceServiceId()) : null,
envelope.getSourceDevice(),
envelope.hasDestinationUuid() ? ServiceIdentifier.valueOf(envelope.getDestinationUuid()) : null,
envelope.hasDestinationServiceId() ? ServiceIdentifier.valueOf(envelope.getDestinationServiceId()) : null,
envelope.hasUpdatedPni() ? UUID.fromString(envelope.getUpdatedPni()) : null,
envelope.getContent().toByteArray(),
envelope.getServerTimestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ public void measureAccountOutgoingMessageUuidMismatches(final Account account,

public void measureAccountEnvelopeUuidMismatches(final Account account,
final MessageProtos.Envelope envelope) {
if (envelope.hasDestinationUuid()) {
if (envelope.hasDestinationServiceId()) {
try {
measureAccountDestinationUuidMismatches(account, ServiceIdentifier.valueOf(envelope.getDestinationUuid()));
measureAccountDestinationUuidMismatches(account, ServiceIdentifier.valueOf(envelope.getDestinationServiceId()));
} catch (final IllegalArgumentException ignored) {
logger.warn("Envelope had invalid destination UUID: {}", envelope.getDestinationUuid());
logger.warn("Envelope had invalid destination UUID: {}", envelope.getDestinationServiceId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void sendMessage(final Account account, final Device device, final Envelo
CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent),
URGENT_TAG_NAME, String.valueOf(message.getUrgent()),
STORY_TAG_NAME, String.valueOf(message.getStory()),
SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceUuid()))
SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId()))
.increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public void sendReceipt(ServiceIdentifier sourceIdentifier, byte sourceDeviceId,
destinationAccount -> {
final Envelope.Builder message = Envelope.newBuilder()
.setServerTimestamp(System.currentTimeMillis())
.setSourceUuid(sourceIdentifier.toServiceIdentifierString())
.setSourceDevice((int) sourceDeviceId)
.setDestinationUuid(destinationIdentifier.toServiceIdentifierString())
.setTimestamp(messageId)
.setSourceServiceId(sourceIdentifier.toServiceIdentifierString())
.setSourceDevice(sourceDeviceId)
.setDestinationServiceId(destinationIdentifier.toServiceIdentifierString())
.setClientTimestamp(messageId)
.setType(Envelope.Type.SERVER_DELIVERY_RECEIPT)
.setUrgent(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ void sendMessageToSelf(
final long serverTimestamp = System.currentTimeMillis();
final Envelope envelope = Envelope.newBuilder()
.setType(Envelope.Type.forNumber(message.type()))
.setTimestamp(serverTimestamp)
.setClientTimestamp(serverTimestamp)
.setServerTimestamp(serverTimestamp)
.setDestinationUuid(new AciServiceIdentifier(sourceAndDestinationAccount.getUuid()).toServiceIdentifierString())
.setDestinationServiceId(
new AciServiceIdentifier(sourceAndDestinationAccount.getUuid()).toServiceIdentifierString())
.setContent(ByteString.copyFrom(contents.get()))
.setSourceUuid(new AciServiceIdentifier(sourceAndDestinationAccount.getUuid()).toServiceIdentifierString())
.setSourceDevice((int) Device.PRIMARY_ID)
.setSourceServiceId(new AciServiceIdentifier(sourceAndDestinationAccount.getUuid()).toServiceIdentifierString())
.setSourceDevice(Device.PRIMARY_ID)
.setUpdatedPni(sourceAndDestinationAccount.getPhoneNumberIdentifier().toString())
.setUrgent(true)
.build();
Expand Down
Loading

0 comments on commit 11601fd

Please sign in to comment.