Skip to content

Commit

Permalink
Remove ForkJoinPool.managedBlock in favor of async updates
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal authored and jon-signal committed Dec 13, 2023
1 parent 28a981f commit 8d4acf0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
import java.time.Clock;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ManagedBlocker;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.validation.Valid;
Expand All @@ -36,7 +33,6 @@
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.configuration.BadgesConfiguration;
import org.whispersystems.textsecuregcm.entities.RedeemReceiptRequest;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountBadge;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.RedeemedReceiptsManager;
Expand Down Expand Up @@ -101,43 +97,24 @@ public CompletionStage<Response> redeemReceipt(
if (badgeId == null) {
return CompletableFuture.completedFuture(Response.serverError().entity("server does not recognize the requested receipt level").type(MediaType.TEXT_PLAIN_TYPE).build());
}
final CompletionStage<Boolean> putStage = redeemedReceiptsManager.put(
receiptSerial, receiptExpiration.getEpochSecond(), receiptLevel, auth.getAccount().getUuid());
return putStage.thenApplyAsync(receiptMatched -> {
return redeemedReceiptsManager.put(
receiptSerial, receiptExpiration.getEpochSecond(), receiptLevel, auth.getAccount().getUuid())
.thenCompose(receiptMatched -> {
if (!receiptMatched) {
return Response.status(Status.BAD_REQUEST).entity("receipt serial is already redeemed").type(MediaType.TEXT_PLAIN_TYPE).build();
return CompletableFuture.completedFuture(
Response.status(Status.BAD_REQUEST).entity("receipt serial is already redeemed")
.type(MediaType.TEXT_PLAIN_TYPE).build());
}

try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
boolean done = false;

@Override
public boolean block() {
final Optional<Account> optionalAccount = accountsManager.getByAccountIdentifier(auth.getAccount().getUuid());
optionalAccount.ifPresent(account -> {
accountsManager.update(account, a -> {
return accountsManager.getByAccountIdentifierAsync(auth.getAccount().getUuid())
.thenCompose(optionalAccount ->
optionalAccount.map(account -> accountsManager.updateAsync(account, a -> {
a.addBadge(clock, new AccountBadge(badgeId, receiptExpiration, request.isVisible()));
if (request.isPrimary()) {
a.makeBadgePrimaryIfExists(clock, badgeId);
}
});
});
done = true;
return true;
}

@Override
public boolean isReleasable() {
return done;
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Response.serverError().build();
}

return Response.ok().build();
})).orElse(CompletableFuture.completedFuture(null)))
.thenApply(ignored -> Response.ok().build());
});
}).thenCompose(Function.identity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ void testRedeemReceipt() {
when(receiptCredentialPresentation.getReceiptExpirationTime()).thenReturn(receiptExpiration);
when(redeemedReceiptsManager.put(same(receiptSerial), eq(receiptExpiration), eq(receiptLevel), eq(AuthHelper.VALID_UUID))).thenReturn(
CompletableFuture.completedFuture(Boolean.TRUE));
when(accountsManager.getByAccountIdentifier(eq(AuthHelper.VALID_UUID))).thenReturn(Optional.of(AuthHelper.VALID_ACCOUNT));
when(accountsManager.getByAccountIdentifierAsync(eq(AuthHelper.VALID_UUID))).thenReturn(
CompletableFuture.completedFuture(Optional.of(AuthHelper.VALID_ACCOUNT)));

RedeemReceiptRequest request = new RedeemReceiptRequest(presentation, true, true);
Response response = resources.getJerseyTest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.mockito.MockingDetails;
import org.mockito.stubbing.Stubbing;
Expand Down Expand Up @@ -69,6 +70,13 @@ private static void setupMockUpdate(final AccountsManager mockAccountsManager, f
return markStale ? copyAndMarkStale(account) : account;
});

when(mockAccountsManager.updateAsync(any(), any())).thenAnswer(answer -> {
final Account account = answer.getArgument(0, Account.class);
answer.getArgument(1, Consumer.class).accept(account);

return CompletableFuture.completedFuture(markStale ? copyAndMarkStale(account) : account);
});

when(mockAccountsManager.updateDevice(any(), anyByte(), any())).thenAnswer(answer -> {
final Account account = answer.getArgument(0, Account.class);
final byte deviceId = answer.getArgument(1, Byte.class);
Expand All @@ -77,6 +85,14 @@ private static void setupMockUpdate(final AccountsManager mockAccountsManager, f
return markStale ? copyAndMarkStale(account) : account;
});

when(mockAccountsManager.updateDeviceAsync(any(), anyByte(), any())).thenAnswer(answer -> {
final Account account = answer.getArgument(0, Account.class);
final byte deviceId = answer.getArgument(1, Byte.class);
account.getDevice(deviceId).ifPresent(answer.getArgument(2, Consumer.class));

return CompletableFuture.completedFuture(markStale ? copyAndMarkStale(account) : account);
});

when(mockAccountsManager.updateDeviceLastSeen(any(), any(), anyLong())).thenAnswer(answer -> {
answer.getArgument(1, Device.class).setLastSeen(answer.getArgument(2, Long.class));
return mockAccountsManager.update(answer.getArgument(0, Account.class), account -> {});
Expand Down

0 comments on commit 8d4acf0

Please sign in to comment.