From ffed19d198efdfc46a253c693eb69735862f15f1 Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:42:16 -0800 Subject: [PATCH] Create deleted-accounts records keyed by both e164 and PNI --- .../textsecuregcm/WhisperServerService.java | 2 + .../textsecuregcm/storage/Accounts.java | 98 +++++++++++- .../storage/AccountsManager.java | 14 ++ .../MigrateDeletedAccountsCommand.java | 109 +++++++++++++ .../textsecuregcm/storage/AccountsTest.java | 144 +++++++++++++++++- 5 files changed, 362 insertions(+), 5 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 30a1f0ddd..b3879207f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -262,6 +262,7 @@ import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory; import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; +import org.whispersystems.textsecuregcm.workers.MigrateDeletedAccountsCommand; import org.whispersystems.textsecuregcm.workers.MigrateRegistrationRecoveryPasswordsCommand; import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand; import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand; @@ -331,6 +332,7 @@ public void initialize(final Bootstrap bootstrap) { "Processes scheduled jobs to send notifications to idle devices", new IdleDeviceNotificationSchedulerFactory())); + bootstrap.addCommand(new MigrateDeletedAccountsCommand()); bootstrap.addCommand(new MigrateRegistrationRecoveryPasswordsCommand()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 00f49bf23..8fe146bbe 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -47,6 +47,8 @@ import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -57,6 +59,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.Put; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.QueryRequest; import software.amazon.awssdk.services.dynamodb.model.QueryResponse; import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; @@ -439,8 +442,10 @@ public void changeNumber(final Account account, writeItems.add(buildConstraintTablePut(phoneNumberIdentifierConstraintTableName, uuidAttr, ATTR_PNI_UUID, pniAttr)); writeItems.add(buildRemoveDeletedAccount(number)); writeItems.add(buildRemoveDeletedAccount(phoneNumberIdentifier)); - maybeDisplacedAccountIdentifier.ifPresent(displacedAccountIdentifier -> - writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalNumber))); + maybeDisplacedAccountIdentifier.ifPresent(displacedAccountIdentifier -> { + writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalNumber)); + writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalPni)); + }); // The `catch (TransactionCanceledException) block needs to check whether the cancellation reason is the account // update write item @@ -1163,7 +1168,19 @@ private TransactWriteItem buildPutDeletedAccount(final UUID uuid, final String e .item(Map.of( DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid), - DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond()))) + DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond()))) + .build()) + .build(); + } + + private TransactWriteItem buildPutDeletedAccount(final UUID aci, final UUID pni) { + return TransactWriteItem.builder() + .put(Put.builder() + .tableName(deletedAccountsTableName) + .item(Map.of( + DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()), + DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci), + DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond()))) .build()) .build(); } @@ -1203,6 +1220,16 @@ public Optional findRecentlyDeletedAccountIdentifier(final String e164) { return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null)); } + public Optional findRecentlyDeletedAccountIdentifier(final UUID phoneNumberIdentifier) { + final GetItemResponse response = db().getItem(GetItemRequest.builder() + .tableName(deletedAccountsTableName) + .consistentRead(true) + .key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(phoneNumberIdentifier.toString()))) + .build()); + + return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null)); + } + public Optional findRecentlyDeletedE164(final UUID uuid) { final QueryResponse response = db().query(QueryRequest.builder() .tableName(deletedAccountsTableName) @@ -1232,7 +1259,8 @@ public CompletableFuture delete(final UUID uuid, final List transactWriteItems.add( @@ -1268,6 +1296,68 @@ Flux getAll(final int segments, final Scheduler scheduler) { .sequential(); } + public Flux> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) { + if (segments < 1) { + throw new IllegalArgumentException("Total number of segments must be positive"); + } + + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder() + .tableName(deletedAccountsTableName) + .consistentRead(true) + .segment(segment) + .totalSegments(segments) + .build()) + .items()) + .map(item -> + Tuples.of( + item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s(), + AttributeValues.getUUID(item, DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null), + AttributeValues.getLong(item, DELETED_ACCOUNTS_ATTR_EXPIRES, 0))) + .filter(item -> item.getT1().startsWith("+")) + .sequential(); + } + + public CompletableFuture insertPniDeletedAccount(final String e164, final UUID pni, final UUID aci, final long expiration) { + // This happens under a pessimistic lock, but that wasn't taken before we found the record we want to migrate, + // so make sure the e164 record is unchanged before updating the PNI record + return asyncClient.getItem(GetItemRequest.builder() + .tableName(deletedAccountsTableName) + .consistentRead(true) + .key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164.toString()))) + .build()) + .thenComposeAsync(getItemResponse -> + getItemResponse.hasItem() + && AttributeValues.getString( + getItemResponse.item(), DELETED_ACCOUNTS_KEY_ACCOUNT_E164, "").equals(e164) + && AttributeValues.getUUID( + getItemResponse.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, UUID.randomUUID()).equals(aci) + && AttributeValues.getLong( + getItemResponse.item(), DELETED_ACCOUNTS_ATTR_EXPIRES, 0) == expiration + ? asyncClient.putItem( + PutItemRequest.builder() + .tableName(deletedAccountsTableName) + .item( + Map.of( + DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()), + DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci), + DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(expiration))) + .conditionExpression("attribute_not_exists(#key)") + .expressionAttributeNames(Map.of("#key", DELETED_ACCOUNTS_KEY_ACCOUNT_E164)) + .build()) + .thenApply(ignored -> true) + .exceptionally(throwable -> { + if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) { + // there was already a PNI record; no problem, do nothing + return false; + } + throw ExceptionUtils.wrap(throwable); + }) + : CompletableFuture.completedFuture(false)); + } + @Nonnull private Optional getByIndirectLookup( final Timer timer, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index fe7be280d..707c26074 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -90,6 +90,7 @@ import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; +import reactor.util.function.Tuple3; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; @@ -1216,6 +1217,19 @@ public Flux streamAllFromDynamo(final int segments, final Scheduler sch return accounts.getAll(segments, scheduler); } + public Flux> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) { + return accounts.getE164KeyedDeletedAccounts(segments, scheduler); + } + + public CompletableFuture migrateDeletedAccount(final String e164, final UUID aci, final long expiration) { + return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164) + .thenCompose( + pni -> accountLockManager.withLockAsync( + List.of(pni), + () -> accounts.insertPniDeletedAccount(e164, pni, aci, expiration), + accountLockExecutor)); + } + public CompletableFuture delete(final Account account, final DeletionReason deletionReason) { final Timer.Sample sample = Timer.start(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java new file mode 100644 index 000000000..0f12e4657 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java @@ -0,0 +1,109 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import io.dropwizard.core.Application; +import io.dropwizard.core.setup.Environment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies { + + private static final String RECORDS_INSPECTED_COUNTER_NAME = + MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsInspected"); + + private static final String RECORDS_MIGRATED_COUNTER_NAME = + MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsMigrated"); + + private static final String DRY_RUN_TAG = "dryRun"; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private static final String SEGMENT_COUNT_ARGUMENT = "segments"; + private static final String DRY_RUN_ARGUMENT = "dry-run"; + private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; + + private static final int DEFAULT_SEGMENT_COUNT = 1; + private static final int DEFAULT_CONCURRENCY = 16; + + public MigrateDeletedAccountsCommand() { + super(new Application<>() { + @Override + public void run(final WhisperServerConfiguration configuration, final Environment environment) { + } + }, "migrate-deleted-accounts", "Migrates recently-deleted account records from E164 to PNI-keyed schema"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("--segments") + .type(Integer.class) + .dest(SEGMENT_COUNT_ARGUMENT) + .required(false) + .setDefault(DEFAULT_SEGMENT_COUNT) + .help("The total number of segments for a DynamoDB scan"); + + subparser.addArgument("--max-concurrency") + .type(Integer.class) + .dest(MAX_CONCURRENCY_ARGUMENT) + .required(false) + .setDefault(DEFAULT_CONCURRENCY) + .help("Max concurrency for migrations."); + + subparser.addArgument("--dry-run") + .type(Boolean.class) + .dest(DRY_RUN_ARGUMENT) + .required(false) + .setDefault(true) + .help("If true, don’t actually migrate any deleted accounts records"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception { + final int segments = namespace.getInt(SEGMENT_COUNT_ARGUMENT); + final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); + final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); + + final String deletedAccountsTableName = configuration.getDynamoDbTables().getDeletedAccounts().getTableName(); + logger.info("Crawling deleted accounts with {} segments and {} processors", + segments, + Runtime.getRuntime().availableProcessors()); + + final Counter recordsInspectedCounter = + Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); + + final Counter recordsMigratedCounter = + Metrics.counter(RECORDS_MIGRATED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); + + final AccountsManager accounts = commandDependencies.accountsManager(); + + accounts.getE164KeyedDeletedAccounts(segments, Schedulers.parallel()) + .doOnNext(tuple -> recordsInspectedCounter.increment()) + .flatMap( + tuple -> dryRun + ? Mono.just(false) + : Mono.fromFuture( + accounts.migrateDeletedAccount( + tuple.getT1(), tuple.getT2(), tuple.getT3())), + concurrency) + .filter(migrated -> migrated) + .doOnNext(ignored -> recordsMigratedCounter.increment()) + .then() + .block(); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index 4b407cd7f..547e5dc8c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -65,6 +65,7 @@ import org.whispersystems.textsecuregcm.util.TestClock; import org.whispersystems.textsecuregcm.util.TestRandomUtil; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -74,6 +75,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.Put; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; @@ -211,6 +213,7 @@ void testStoreRecentlyDeleted() { accounts.delete(originalUuid, Collections.emptyList()).join(); assertThat(accounts.findRecentlyDeletedAccountIdentifier(account.getNumber())).hasValue(originalUuid); + assertThat(accounts.findRecentlyDeletedAccountIdentifier(account.getPhoneNumberIdentifier())).hasValue(originalUuid); freshUser = createAccount(account); assertThat(freshUser).isTrue(); @@ -220,6 +223,7 @@ void testStoreRecentlyDeleted() { assertPhoneNumberIdentifierConstraintExists(account.getPhoneNumberIdentifier(), account.getUuid()); assertThat(accounts.findRecentlyDeletedAccountIdentifier(account.getNumber())).isEmpty(); + assertThat(accounts.findRecentlyDeletedAccountIdentifier(account.getPhoneNumberIdentifier())).isEmpty(); } @Test @@ -742,6 +746,7 @@ void testDelete() { createAccount(retainedAccount); assertThat(accounts.findRecentlyDeletedAccountIdentifier(deletedAccount.getNumber())).isEmpty(); + assertThat(accounts.findRecentlyDeletedAccountIdentifier(deletedAccount.getPhoneNumberIdentifier())).isEmpty(); assertPhoneNumberConstraintExists("+14151112222", deletedAccount.getUuid()); assertPhoneNumberIdentifierConstraintExists(deletedAccount.getPhoneNumberIdentifier(), deletedAccount.getUuid()); @@ -755,6 +760,7 @@ void testDelete() { assertThat(accounts.getByAccountIdentifier(deletedAccount.getUuid())).isNotPresent(); assertThat(accounts.findRecentlyDeletedAccountIdentifier(deletedAccount.getNumber())).hasValue(deletedAccount.getUuid()); + assertThat(accounts.findRecentlyDeletedAccountIdentifier(deletedAccount.getPhoneNumberIdentifier())).hasValue(deletedAccount.getUuid()); assertPhoneNumberConstraintDoesNotExist(deletedAccount.getNumber()); assertPhoneNumberIdentifierConstraintDoesNotExist(deletedAccount.getPhoneNumberIdentifier()); @@ -764,7 +770,7 @@ void testDelete() { { final Account recreatedAccount = generateAccount(deletedAccount.getNumber(), UUID.randomUUID(), - UUID.randomUUID(), List.of(generateDevice(DEVICE_ID_1))); + deletedAccount.getPhoneNumberIdentifier(), List.of(generateDevice(DEVICE_ID_1))); final boolean freshUser = createAccount(recreatedAccount); @@ -893,6 +899,7 @@ public void testChangeNumber(final Optional maybeDisplacedAccountIdentifie } assertThat(accounts.findRecentlyDeletedAccountIdentifier(originalNumber)).isEqualTo(maybeDisplacedAccountIdentifier); + assertThat(accounts.findRecentlyDeletedAccountIdentifier(originalPni)).isEqualTo(maybeDisplacedAccountIdentifier); } private static Stream testChangeNumber() { @@ -1697,7 +1704,142 @@ public void testInvalidDeviceIdDeserialization() throws Exception { assertInstanceOf(DeviceIdDeserializer.DeviceIdDeserializationException.class, cause); } + @Test + public void getE164KeyedDeletedAccounts() { + final Account deletedAccount = generateAccount("+18005551234", UUID.randomUUID(), UUID.randomUUID()); + createAccount(deletedAccount); + accounts.delete(deletedAccount.getUuid(), List.of()).join(); + assertEquals( + List.of(Tuples.of("+18005551234", deletedAccount.getUuid(), clock.instant().plus(Accounts.DELETED_ACCOUNTS_TIME_TO_LIVE).toEpochMilli() / 1000)), + accounts.getE164KeyedDeletedAccounts(1, Schedulers.immediate()).collectList().block()); + } + + @Test + public void insertPniDeletedAccount() throws Exception { + final String e164 = "+18005551234"; + final UUID aci = UUID.randomUUID(); + final UUID pni = UUID.randomUUID(); + final Long expires = 1234567890L; + + final ScanRequest scanRequest = ScanRequest.builder() + .tableName(Tables.DELETED_ACCOUNTS.tableName()) + .build(); + assertThat( + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items()) + .isEmpty(); + + + DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem( + PutItemRequest.builder() + .tableName(Tables.DELETED_ACCOUNTS.tableName()) + .item(Map.of( + Accounts.DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164), + Accounts.DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci), + Accounts.DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(expires))) + .build()); + + assertThat(accounts.insertPniDeletedAccount(e164, pni, aci, expires).get()).isTrue(); + + List> items = + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items(); + assertThat(items).hasSize(2); + final Map item = items.stream().filter(i -> !i.get(Accounts.DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s().equals(e164)).findFirst().get(); + assertThat(item.get(Accounts.DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s()) + .isEqualTo(pni.toString()); + assertThat(AttributeValues.getUUID(item, Accounts.DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null)) + .isEqualTo(aci); + assertThat(AttributeValues.getLong(item, Accounts.DELETED_ACCOUNTS_ATTR_EXPIRES, 0)) + .isEqualTo(expires); + } + + @Test + public void insertPniDeletedAccount_concurrentChange() throws Exception { + final String e164 = "+18005551234"; + final UUID aci = UUID.randomUUID(); + final UUID pni = UUID.randomUUID(); + final Long expires = 1234567890L; + + final ScanRequest scanRequest = ScanRequest.builder() + .tableName(Tables.DELETED_ACCOUNTS.tableName()) + .build(); + assertThat( + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items()) + .isEmpty(); + + + DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem( + PutItemRequest.builder() + .tableName(Tables.DELETED_ACCOUNTS.tableName()) + .item(Map.of( + Accounts.DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164), + Accounts.DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci), + Accounts.DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(expires + 1))) + .build()); + assertThat(accounts.insertPniDeletedAccount(e164, pni, aci, expires).get()).isFalse(); + + List> items = + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items(); + assertThat(items).hasSize(1); + } + + @Test + public void insertPniDeletedAccount_concurrentDeletion() throws Exception { + final String e164 = "+18005551234"; + final UUID aci = UUID.randomUUID(); + final UUID pni = UUID.randomUUID(); + final Long expires = 1234567890L; + + final ScanRequest scanRequest = ScanRequest.builder() + .tableName(Tables.DELETED_ACCOUNTS.tableName()) + .build(); + assertThat( + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items()) + .isEmpty(); + + assertThat(accounts.insertPniDeletedAccount(e164, pni, aci, expires).get()).isFalse(); + + List> items = + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items(); + assertThat(items).isEmpty(); + } + + @Test + public void insertPniDeletedAccount_alreadyMigrated() throws Exception { + final Account deletedAccount = generateAccount("+18005551234", UUID.randomUUID(), UUID.randomUUID()); + + createAccount(deletedAccount); + accounts.delete(deletedAccount.getUuid(), List.of()).join(); + + final ScanRequest scanRequest = ScanRequest.builder() + .tableName(Tables.DELETED_ACCOUNTS.tableName()) + .build(); + assertThat( + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items()) + .hasSize(2); + + assertThat(accounts.insertPniDeletedAccount(deletedAccount.getNumber(), deletedAccount.getPhoneNumberIdentifier(), deletedAccount.getUuid(), clock.instant().plus(Accounts.DELETED_ACCOUNTS_TIME_TO_LIVE).toEpochMilli() / 1000).get()).isFalse(); + + List> items = + DYNAMO_DB_EXTENSION.getDynamoDbClient() + .scan(scanRequest) + .items(); + assertThat(items).hasSize(2); + } private static Device generateDevice(byte id) { return DevicesHelper.createDevice(id);