Skip to content

Commit

Permalink
Crawler to backfill PNI records of alternate forms of existing phone …
Browse files Browse the repository at this point in the history
…numbers
  • Loading branch information
ameya-signal authored and jon-signal committed Nov 26, 2024
1 parent 1db9258 commit 0e04cac
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.BackfillBeninPhoneNumberFormsCommand;
import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
Expand Down Expand Up @@ -334,6 +335,7 @@ public void initialize(final Bootstrap<WhisperServerConfiguration> bootstrap) {

bootstrap.addCommand(new MigrateDeletedAccountsCommand());
bootstrap.addCommand(new DeleteE164RegistrationRecoveryPasswordsCommand());
bootstrap.addCommand(new BackfillBeninPhoneNumberFormsCommand());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -21,11 +22,17 @@
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
Expand Down Expand Up @@ -217,4 +224,45 @@ CompletableFuture<Map<String, UUID>> fetchPhoneNumbers(List<String> phoneNumbers
item -> AttributeValues.getUUID(item, ATTR_PHONE_NUMBER_IDENTIFIER, null))))
.whenComplete((ignored, throwable) -> sample.stop(GET_PNI_TIMER));
}

public Flux<Tuple2<String, UUID>> getPhoneNumberIdentifiers(final String prefix, final int segments,
final Scheduler scheduler) {
return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> dynamoDbClient.scanPaginator(ScanRequest.builder()
.tableName(tableName)
.segment(segment)
.totalSegments(segments)
.filterExpression("begins_with(#key, :e164Prefix)")
.expressionAttributeNames(Map.of("#key", KEY_E164))
.expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS(prefix)))
.build())
.items()
.map(item -> Tuples.of(item.get(KEY_E164).s(),
AttributeValues.getUUID(item, ATTR_PHONE_NUMBER_IDENTIFIER, null))))
.sequential();
}

public CompletableFuture<Void> backfillAlternatePhoneNumbers(final String e164, final UUID pni) {
final List<String> alternateForms = new ArrayList<>(Util.getAlternateForms(e164));
if (alternateForms.size() == 1) {
return CompletableFuture.completedFuture(null);
}
return retry(MAX_RETRIES, TransactionConflictException.class,
() -> setPniIfRequired(e164, alternateForms, Map.of(e164, pni)))
.exceptionally(ExceptionUtils.exceptionallyHandler(TransactionCanceledException.class, e -> {
logger.error("Tried to backfill {} with pni: {}, but there were existing numbers with a different pni", e164,
pni);
throw e;
}))
.thenAccept(newPni -> {
if (!newPni.equals(pni)) {
logger.error("Tried to backfill {} with pni: {}, but there were existing numbers with pni: {}", pni,
newPni);
throw new IllegalArgumentException("Wrong PNI in backfill");
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package org.whispersystems.textsecuregcm.workers;

import com.google.i18n.phonenumbers.NumberParseException;
import com.google.i18n.phonenumbers.PhoneNumberUtil;
import com.google.i18n.phonenumbers.Phonenumber;
import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Function;

public class BackfillBeninPhoneNumberFormsCommand extends AbstractCommandWithDependencies {
private static final String BENIN_PREFIX = "+229";
private static final PhoneNumberUtil PHONE_NUMBER_UTIL = PhoneNumberUtil.getInstance();

private static final String SEGMENT_COUNT_ARGUMENT = "segments";
private static final String PREFIX_ARGUMENT = "prefix";
private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";

private static final String BUFFER_ARGUMENT = "buffer";

private static final String PHONE_NUMBERS_INSPECTED = MetricsUtil.name(BackfillBeninPhoneNumberFormsCommand.class, "phoneNumbersInspected");
private static final String PHONE_NUMBERS_BACKFILLED = MetricsUtil.name(BackfillBeninPhoneNumberFormsCommand.class, "phoneNumbersBackfilled");
private static final String DRY_RUN_TAG = "dryRun";
private static final String IS_OLD_FORMAT_TAG = "oldFormat";

private static final Logger logger = LoggerFactory.getLogger(BackfillBeninPhoneNumberFormsCommand.class);

public BackfillBeninPhoneNumberFormsCommand() {
super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration whisperServerConfiguration, final Environment environment)
throws Exception {

}
}, "backfill-alternate-phone-number-forms", "Inserts alternate forms of existing phone numbers");
}

@Override
public void configure(final Subparser subparser) {
super.configure(subparser);

subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENT_COUNT_ARGUMENT)
.required(false)
.setDefault(1)
.help("The total number of segments for a DynamoDB scan");

subparser.addArgument("--prefix")
.type(String.class)
.dest(PREFIX_ARGUMENT)
.required(true)
.help("The phone number prefix (including +) to filter by");

subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.required(false)
.setDefault(16)
.help("Max concurrency for backfilling PNI for alternate phone number forms");

subparser.addArgument("--buffer")
.type(Integer.class)
.dest(BUFFER_ARGUMENT)
.setDefault(16_384)
.help("Records to buffer");

subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, don’t actually insert any new PNI records");
}

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
final int segments = namespace.getInt(SEGMENT_COUNT_ARGUMENT);
final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
final int bufferSize = namespace.getInt(BUFFER_ARGUMENT);
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);

final Counter phoneNumbersInspectedCounter =
Metrics.counter(PHONE_NUMBERS_INSPECTED, DRY_RUN_TAG, String.valueOf(dryRun));

final Counter phoneNumbersBackfilledCounter =
Metrics.counter(PHONE_NUMBERS_BACKFILLED, DRY_RUN_TAG, String.valueOf(dryRun));

final PhoneNumberIdentifiers phoneNumberIdentifiers = commandDependencies.phoneNumberIdentifiers();

phoneNumberIdentifiers.getPhoneNumberIdentifiers(BENIN_PREFIX, segments, Schedulers.parallel())
.doOnNext(ignored -> phoneNumbersInspectedCounter.increment())
.buffer(bufferSize)
.map(source -> {
final ArrayList<Tuple2<String, UUID>> shuffled = new ArrayList<>(source);
Collections.shuffle(shuffled);
return shuffled;
})
.limitRate(2)
.flatMapIterable(Function.identity())
.flatMap(tuple -> {
final String e164 = tuple.getT1();
final UUID pni = tuple.getT2();

final boolean isNew = isNewFormatBeninNumber(e164);
Metrics.counter(PHONE_NUMBERS_INSPECTED,
DRY_RUN_TAG, String.valueOf(dryRun),
IS_OLD_FORMAT_TAG, String.valueOf(!isNew));

if (isNew) {
// only old format numbers need to be backfilled
return Mono.just(false);
}

return dryRun
? Mono.just(true)
: Mono.fromFuture( () ->
phoneNumberIdentifiers.backfillAlternatePhoneNumbers(e164, pni).thenApply(ignored -> true))
.onErrorResume(t -> {
logger.warn("Failed to insert PNI for alternate forms of number {}", e164, t);
return Mono.just(false);
});
}, concurrency)
.filter(succeeded -> succeeded)
.doOnNext(ignored -> phoneNumbersBackfilledCounter.increment())
.then()
.block();
}

private static boolean isNewFormatBeninNumber(final String number) {
final Phonenumber.PhoneNumber phoneNumber;
try {
phoneNumber = PHONE_NUMBER_UTIL.parse(number, null);
if ("BJ".equals(PHONE_NUMBER_UTIL.getRegionCodeForNumber(phoneNumber))) {
final String nationalSignificantNumber = PHONE_NUMBER_UTIL.getNationalSignificantNumber(phoneNumber);
return nationalSignificantNumber.length() == 10 && nationalSignificantNumber.startsWith("01");
}
} catch (NumberParseException e) {
logger.error("Failed to parse benin phone number {}", number);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ record CommandDependencies(
ClientResources.Builder redisClusterClientResourcesBuilder,
BackupManager backupManager,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
DynamoDbAsyncClient dynamoDbAsyncClient) {
DynamoDbAsyncClient dynamoDbAsyncClient,
PhoneNumberIdentifiers phoneNumberIdentifiers) {

static CommandDependencies build(
final String name,
Expand Down Expand Up @@ -287,7 +288,8 @@ static CommandDependencies build(
redisClientResourcesBuilder,
backupManager,
dynamicConfigurationManager,
dynamoDbAsyncClient
dynamoDbAsyncClient,
phoneNumberIdentifiers
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void setUp() {
null,
null,
null,
null,
null);

//noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private TestNotifyIdleDevicesCommand(final MessagesManager messagesManager,
null,
null,
null,
null,
null);

this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public TestStartPushNotificationExperimentCommand(
null,
null,
null,
null,
null);
}

Expand Down

0 comments on commit 0e04cac

Please sign in to comment.