From 0e04cac800bf952f115abcae17d5cb3fda0f65b5 Mon Sep 17 00:00:00 2001 From: Ameya Lokare Date: Mon, 25 Nov 2024 18:29:00 -0800 Subject: [PATCH] Crawler to backfill PNI records of alternate forms of existing phone numbers --- .../textsecuregcm/WhisperServerService.java | 2 + .../storage/PhoneNumberIdentifiers.java | 48 ++++++ .../BackfillBeninPhoneNumberFormsCommand.java | 159 ++++++++++++++++++ .../workers/CommandDependencies.java | 6 +- ...PushNotificationExperimentCommandTest.java | 1 + .../workers/NotifyIdleDevicesCommandTest.java | 1 + ...PushNotificationExperimentCommandTest.java | 1 + 7 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/BackfillBeninPhoneNumberFormsCommand.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index c2e30cbce..e7630acbb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -256,6 +256,7 @@ import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; +import org.whispersystems.textsecuregcm.workers.BackfillBeninPhoneNumberFormsCommand; import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand; import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; @@ -334,6 +335,7 @@ public void initialize(final Bootstrap bootstrap) { bootstrap.addCommand(new MigrateDeletedAccountsCommand()); bootstrap.addCommand(new DeleteE164RegistrationRecoveryPasswordsCommand()); + bootstrap.addCommand(new BackfillBeninPhoneNumberFormsCommand()); } @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PhoneNumberIdentifiers.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PhoneNumberIdentifiers.java index 7e5cdc3bc..16ce8f100 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PhoneNumberIdentifiers.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PhoneNumberIdentifiers.java @@ -10,6 +10,7 @@ import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -21,11 +22,17 @@ import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Util; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest; import software.amazon.awssdk.services.dynamodb.model.CancellationReason; import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes; import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; @@ -217,4 +224,45 @@ CompletableFuture> fetchPhoneNumbers(List phoneNumbers item -> AttributeValues.getUUID(item, ATTR_PHONE_NUMBER_IDENTIFIER, null)))) .whenComplete((ignored, throwable) -> sample.stop(GET_PNI_TIMER)); } + + public Flux> getPhoneNumberIdentifiers(final String prefix, final int segments, + final Scheduler scheduler) { + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> dynamoDbClient.scanPaginator(ScanRequest.builder() + .tableName(tableName) + .segment(segment) + .totalSegments(segments) + .filterExpression("begins_with(#key, :e164Prefix)") + .expressionAttributeNames(Map.of("#key", KEY_E164)) + .expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS(prefix))) + .build()) + .items() + .map(item -> Tuples.of(item.get(KEY_E164).s(), + AttributeValues.getUUID(item, ATTR_PHONE_NUMBER_IDENTIFIER, null)))) + .sequential(); + } + + public CompletableFuture backfillAlternatePhoneNumbers(final String e164, final UUID pni) { + final List alternateForms = new ArrayList<>(Util.getAlternateForms(e164)); + if (alternateForms.size() == 1) { + return CompletableFuture.completedFuture(null); + } + return retry(MAX_RETRIES, TransactionConflictException.class, + () -> setPniIfRequired(e164, alternateForms, Map.of(e164, pni))) + .exceptionally(ExceptionUtils.exceptionallyHandler(TransactionCanceledException.class, e -> { + logger.error("Tried to backfill {} with pni: {}, but there were existing numbers with a different pni", e164, + pni); + throw e; + })) + .thenAccept(newPni -> { + if (!newPni.equals(pni)) { + logger.error("Tried to backfill {} with pni: {}, but there were existing numbers with pni: {}", pni, + newPni); + throw new IllegalArgumentException("Wrong PNI in backfill"); + } + }); + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackfillBeninPhoneNumberFormsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackfillBeninPhoneNumberFormsCommand.java new file mode 100644 index 000000000..57c12a26b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackfillBeninPhoneNumberFormsCommand.java @@ -0,0 +1,159 @@ +package org.whispersystems.textsecuregcm.workers; + +import com.google.i18n.phonenumbers.NumberParseException; +import com.google.i18n.phonenumbers.PhoneNumberUtil; +import com.google.i18n.phonenumbers.Phonenumber; +import io.dropwizard.core.Application; +import io.dropwizard.core.setup.Environment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; +import java.util.ArrayList; +import java.util.Collections; +import java.util.UUID; +import java.util.function.Function; + +public class BackfillBeninPhoneNumberFormsCommand extends AbstractCommandWithDependencies { + private static final String BENIN_PREFIX = "+229"; + private static final PhoneNumberUtil PHONE_NUMBER_UTIL = PhoneNumberUtil.getInstance(); + + private static final String SEGMENT_COUNT_ARGUMENT = "segments"; + private static final String PREFIX_ARGUMENT = "prefix"; + private static final String DRY_RUN_ARGUMENT = "dry-run"; + private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; + + private static final String BUFFER_ARGUMENT = "buffer"; + + private static final String PHONE_NUMBERS_INSPECTED = MetricsUtil.name(BackfillBeninPhoneNumberFormsCommand.class, "phoneNumbersInspected"); + private static final String PHONE_NUMBERS_BACKFILLED = MetricsUtil.name(BackfillBeninPhoneNumberFormsCommand.class, "phoneNumbersBackfilled"); + private static final String DRY_RUN_TAG = "dryRun"; + private static final String IS_OLD_FORMAT_TAG = "oldFormat"; + + private static final Logger logger = LoggerFactory.getLogger(BackfillBeninPhoneNumberFormsCommand.class); + + public BackfillBeninPhoneNumberFormsCommand() { + super(new Application<>() { + @Override + public void run(final WhisperServerConfiguration whisperServerConfiguration, final Environment environment) + throws Exception { + + } + }, "backfill-alternate-phone-number-forms", "Inserts alternate forms of existing phone numbers"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("--segments") + .type(Integer.class) + .dest(SEGMENT_COUNT_ARGUMENT) + .required(false) + .setDefault(1) + .help("The total number of segments for a DynamoDB scan"); + + subparser.addArgument("--prefix") + .type(String.class) + .dest(PREFIX_ARGUMENT) + .required(true) + .help("The phone number prefix (including +) to filter by"); + + subparser.addArgument("--max-concurrency") + .type(Integer.class) + .dest(MAX_CONCURRENCY_ARGUMENT) + .required(false) + .setDefault(16) + .help("Max concurrency for backfilling PNI for alternate phone number forms"); + + subparser.addArgument("--buffer") + .type(Integer.class) + .dest(BUFFER_ARGUMENT) + .setDefault(16_384) + .help("Records to buffer"); + + subparser.addArgument("--dry-run") + .type(Boolean.class) + .dest(DRY_RUN_ARGUMENT) + .required(false) + .setDefault(true) + .help("If true, don’t actually insert any new PNI records"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception { + final int segments = namespace.getInt(SEGMENT_COUNT_ARGUMENT); + final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); + final int bufferSize = namespace.getInt(BUFFER_ARGUMENT); + final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); + + final Counter phoneNumbersInspectedCounter = + Metrics.counter(PHONE_NUMBERS_INSPECTED, DRY_RUN_TAG, String.valueOf(dryRun)); + + final Counter phoneNumbersBackfilledCounter = + Metrics.counter(PHONE_NUMBERS_BACKFILLED, DRY_RUN_TAG, String.valueOf(dryRun)); + + final PhoneNumberIdentifiers phoneNumberIdentifiers = commandDependencies.phoneNumberIdentifiers(); + + phoneNumberIdentifiers.getPhoneNumberIdentifiers(BENIN_PREFIX, segments, Schedulers.parallel()) + .doOnNext(ignored -> phoneNumbersInspectedCounter.increment()) + .buffer(bufferSize) + .map(source -> { + final ArrayList> shuffled = new ArrayList<>(source); + Collections.shuffle(shuffled); + return shuffled; + }) + .limitRate(2) + .flatMapIterable(Function.identity()) + .flatMap(tuple -> { + final String e164 = tuple.getT1(); + final UUID pni = tuple.getT2(); + + final boolean isNew = isNewFormatBeninNumber(e164); + Metrics.counter(PHONE_NUMBERS_INSPECTED, + DRY_RUN_TAG, String.valueOf(dryRun), + IS_OLD_FORMAT_TAG, String.valueOf(!isNew)); + + if (isNew) { + // only old format numbers need to be backfilled + return Mono.just(false); + } + + return dryRun + ? Mono.just(true) + : Mono.fromFuture( () -> + phoneNumberIdentifiers.backfillAlternatePhoneNumbers(e164, pni).thenApply(ignored -> true)) + .onErrorResume(t -> { + logger.warn("Failed to insert PNI for alternate forms of number {}", e164, t); + return Mono.just(false); + }); + }, concurrency) + .filter(succeeded -> succeeded) + .doOnNext(ignored -> phoneNumbersBackfilledCounter.increment()) + .then() + .block(); + } + + private static boolean isNewFormatBeninNumber(final String number) { + final Phonenumber.PhoneNumber phoneNumber; + try { + phoneNumber = PHONE_NUMBER_UTIL.parse(number, null); + if ("BJ".equals(PHONE_NUMBER_UTIL.getRegionCodeForNumber(phoneNumber))) { + final String nationalSignificantNumber = PHONE_NUMBER_UTIL.getNationalSignificantNumber(phoneNumber); + return nationalSignificantNumber.length() == 10 && nationalSignificantNumber.startsWith("01"); + } + } catch (NumberParseException e) { + logger.error("Failed to parse benin phone number {}", number); + } + return false; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 0aff5f94b..10dcc0388 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -88,7 +88,8 @@ record CommandDependencies( ClientResources.Builder redisClusterClientResourcesBuilder, BackupManager backupManager, DynamicConfigurationManager dynamicConfigurationManager, - DynamoDbAsyncClient dynamoDbAsyncClient) { + DynamoDbAsyncClient dynamoDbAsyncClient, + PhoneNumberIdentifiers phoneNumberIdentifiers) { static CommandDependencies build( final String name, @@ -287,7 +288,8 @@ static CommandDependencies build( redisClientResourcesBuilder, backupManager, dynamicConfigurationManager, - dynamoDbAsyncClient + dynamoDbAsyncClient, + phoneNumberIdentifiers ); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java index 65f32351d..3eb0ea90e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java @@ -80,6 +80,7 @@ void setUp() { null, null, null, + null, null); //noinspection unchecked diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java index d0c4afc39..0ff5669e3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java @@ -67,6 +67,7 @@ private TestNotifyIdleDevicesCommand(final MessagesManager messagesManager, null, null, null, + null, null); this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java index 3b54fc52b..82538ad00 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java @@ -69,6 +69,7 @@ public TestStartPushNotificationExperimentCommand( null, null, null, + null, null); }