Skip to content

Commit

Permalink
revert changes and add transactionTag option
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul2393 committed Nov 14, 2024
1 parent 4fa9b28 commit 3da6837
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 101 deletions.
6 changes: 1 addition & 5 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,5 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement, java.lang.String, com.google.cloud.spanner.Options$UpdateOption[])</method>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import javax.annotation.Nullable;

/**
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
Expand Down Expand Up @@ -57,10 +56,4 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import javax.annotation.Nullable;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -602,21 +601,4 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt, UpdateOption... options);

/**
* Executes a Partitioned DML statement with the specified transaction tag.
*
* <p>This method has the same behavior as {@link #executePartitionedUpdate(Statement,
* UpdateOption...)} but allows specifying a transaction tag that will be applied to all
* partitioned operations.
*
* @param stmt The Partitioned DML statement to execute
* @param transactionTag The transaction tag to apply to all partitioned operations. The tag must
* be a printable string (ASCII 32-126) with maximum length of 50 characters.
* @param options The options to use for the update operation
* @return The total number of rows modified by the statement
* @throws SpannerException if the operation failed
*/
long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,9 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
return executePartitionedUpdateWithOptions(stmt, null, options);
}

@Override
public long executePartitionedUpdate(
final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) {
return executePartitionedUpdateWithOptions(stmt, transactionTag, options);
}

private long executePartitionedUpdateWithOptions(
final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(
session -> session.executePartitionedUpdate(stmt, transactionTag, options));
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ public static ReadQueryUpdateTransactionOption tag(String name) {
return new TagOption(name);
}

public static ReadQueryUpdateTransactionOption transactionTag(String name) {
return new TransactionTagOption(name);
}

/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
Expand Down Expand Up @@ -394,6 +398,24 @@ void appendToOptions(Options options) {
}
}

static final class TransactionTagOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
private final String transactionTag;

TransactionTagOption(String transactionTag) {
this.transactionTag = transactionTag;
}

String getTransactionTag() {
return transactionTag;
}

@Override
void appendToOptions(Options options) {
options.transactionTag = transactionTag;
}
}

static final class EtagOption extends InternalOption implements DeleteAdminApiOption {
private final String etag;

Expand Down Expand Up @@ -462,6 +484,7 @@ void appendToOptions(Options options) {
private RpcPriority priority;
private String tag;
private String etag;
private String transactionTag;
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean withExcludeTxnFromChangeStreams;
Expand Down Expand Up @@ -545,6 +568,14 @@ boolean hasTag() {
return tag != null;
}

boolean hasTransactionTag() {
return transactionTag != null;
}

String transactionTag() {
return transactionTag;
}

String tag() {
return tag;
}
Expand Down Expand Up @@ -661,6 +692,9 @@ public String toString() {
if (orderBy != null) {
b.append("orderBy: ").append(orderBy).append(' ');
}
if (transactionTag != null) {
b.append("transactionTag: ").append(transactionTag).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -694,6 +728,7 @@ public boolean equals(Object o) {
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority())
&& Objects.equals(tag(), that.tag())
&& Objects.equals(transactionTag, that.transactionTag)
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
Expand Down Expand Up @@ -760,6 +795,9 @@ public int hashCode() {
if (orderBy != null) {
result = 31 * result + orderBy.hashCode();
}
if (transactionTag != null) {
result = 31 * result + transactionTag.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

Expand All @@ -55,16 +54,13 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
private final SessionImpl session;
private final SpannerRpc rpc;
private final Ticker ticker;
private final @Nullable String transactionTag;
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
private volatile boolean isValid = true;

PartitionedDmlTransaction(
SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) {
PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
this.session = session;
this.rpc = rpc;
this.ticker = ticker;
this.transactionTag = transactionTag;
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
}

Expand Down Expand Up @@ -198,8 +194,8 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
if (options.hasTag()) {
requestOptionsBuilder.setRequestTag(options.tag());
}
if (transactionTag != null) {
requestOptionsBuilder.setTransactionTag(transactionTag);
if (options.hasTransactionTag()) {
requestOptionsBuilder.setTransactionTag(options.transactionTag());
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
Expand All @@ -216,9 +212,9 @@ private ByteString initTransaction(final Options options) {
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));

if (transactionTag != null) {
if (options.hasTransactionTag()) {
builder.setRequestOptions(
RequestOptions.newBuilder().setTransactionTag(transactionTag).build());
RequestOptions.newBuilder().setTransactionTag(options.transactionTag()).build());
}
Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true);
if (tx.getId().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,7 @@ public DatabaseId getDatabaseId() {
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout(), options);
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(
this, spanner.getRpc(), Ticker.systemTicker(), transactionTag);
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker());
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout(), options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,11 +1273,6 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt
default long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
return get().executePartitionedUpdate(stmt, options);
}

default long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
return get().executePartitionedUpdate(stmt, transactionTag, options);
}
}

class PooledSessionFutureWrapper implements SessionFutureWrapper<PooledSessionFuture> {
Expand Down Expand Up @@ -1499,16 +1494,6 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
}
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
try {
return get(true).executePartitionedUpdate(stmt, transactionTag, options);
} finally {
close();
}
}

@Override
public String getName() {
return get().getName();
Expand Down Expand Up @@ -1724,18 +1709,6 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
}
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options)
throws SpannerException {
try {
markUsed();
return delegate.executePartitionedUpdate(stmt, transactionTag, options);
} catch (SpannerException e) {
throw lastException = e;
}
}

@Override
public ReadContext singleUse() {
return delegate.singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1949,7 +1949,9 @@ public void testPartitionedDMLWithTransactionTag() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
client.executePartitionedUpdate(
UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml"));
UPDATE_STATEMENT,
Options.transactionTag("testTransactionTag"),
Options.tag("app=spanner,env=test,action=dml"));

List<BeginTransactionRequest> beginTransactions =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setup() {
when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)))
.thenReturn(Transaction.newBuilder().setId(txId).build());

tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
tx = new PartitionedDmlTransaction(session, rpc, ticker);
}

@Test
Expand Down Expand Up @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
Expand Down Expand Up @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
SpannerException e =
assertThrows(
SpannerException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1729,13 +1729,10 @@ public void testSessionNotFoundPartitionedUpdate() {
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1");
final SessionImpl closedSession = mockSession();
when(closedSession.executePartitionedUpdate(any(Statement.class))).thenThrow(sessionNotFound);
when(closedSession.executePartitionedUpdate(any(Statement.class), any(), any()))
.thenThrow(sessionNotFound);
when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound);

final SessionImpl openSession = mockSession();
when(openSession.executePartitionedUpdate(any(Statement.class))).thenReturn(1L);
when(openSession.executePartitionedUpdate(any(Statement.class), any(), any())).thenReturn(1L);
when(openSession.executePartitionedUpdate(statement)).thenReturn(1L);
doAnswer(
invocation -> {
executor.submit(
Expand Down

0 comments on commit 3da6837

Please sign in to comment.