From 62cb4dacf97db0325c2ca3eb96b84aa2ccae1f98 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 14 Nov 2024 11:29:21 +0530 Subject: [PATCH] fix(spanner): support transaction tags in partition DML --- .../clirr-ignored-differences.xml | 6 ++++- ...tractMultiplexedSessionDatabaseClient.java | 7 +++++ .../google/cloud/spanner/DatabaseClient.java | 18 +++++++++++++ .../cloud/spanner/DatabaseClientImpl.java | 14 +++++++++- .../spanner/PartitionedDmlTransaction.java | 21 +++++++++++---- .../com/google/cloud/spanner/SessionImpl.java | 13 ++++++++- .../com/google/cloud/spanner/SessionPool.java | 27 +++++++++++++++++++ .../cloud/spanner/DatabaseClientImplTest.java | 24 +++++++++++++++++ .../PartitionedDmlTransactionTest.java | 8 +++--- 9 files changed, 126 insertions(+), 12 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index ec13415790c..7f194d55dae 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -790,5 +790,9 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - + + 7012 + com/google/cloud/spanner/DatabaseClient + long executePartitionedUpdate(com.google.cloud.spanner.Statement, java.lang.String, com.google.cloud.spanner.Options$UpdateOption[]) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index ebfb0e0a774..74fca99f9a3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -21,6 +21,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; +import javax.annotation.Nullable; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -56,4 +57,10 @@ public ServerStream batchWriteAtLeastOnce( public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { throw new UnsupportedOperationException(); } + + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + throw new UnsupportedOperationException(); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 06237131458..755b9b1ef5d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -22,6 +22,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; +import javax.annotation.Nullable; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -601,4 +602,21 @@ ServerStream batchWriteAtLeastOnce( * idempotent, such as deleting old rows from a very large table. */ long executePartitionedUpdate(Statement stmt, UpdateOption... options); + + /** + * Executes a Partitioned DML statement with the specified transaction tag. + * + *

This method has the same behavior as {@link #executePartitionedUpdate(Statement, + * UpdateOption...)} but allows specifying a transaction tag that will be applied to all + * partitioned operations. + * + * @param stmt The Partitioned DML statement to execute + * @param transactionTag The transaction tag to apply to all partitioned operations. The tag must + * be a printable string (ASCII 32-126) with maximum length of 50 characters. + * @param options The options to use for the update operation + * @return The total number of rows modified by the statement + * @throws SpannerException if the operation failed + */ + long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index d7f16f89524..9200aa23a2b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -309,9 +309,21 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { + return executePartitionedUpdateWithOptions(stmt, null, options); + } + + @Override + public long executePartitionedUpdate( + final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { + return executePartitionedUpdateWithOptions(stmt, transactionTag, options); + } + + private long executePartitionedUpdateWithOptions( + final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { - return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); + return runWithSessionRetry( + session -> session.executePartitionedUpdate(stmt, transactionTag, options)); } catch (RuntimeException e) { span.setStatus(e); span.end(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 82b7f06b7d2..dc4d1f54f3c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.threeten.bp.Duration; import org.threeten.bp.temporal.ChronoUnit; @@ -54,13 +55,16 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction private final SessionImpl session; private final SpannerRpc rpc; private final Ticker ticker; + private final @Nullable String transactionTag; private final IsRetryableInternalError isRetryableInternalErrorPredicate; private volatile boolean isValid = true; - PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) { + PartitionedDmlTransaction( + SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) { this.session = session; this.rpc = rpc; this.ticker = ticker; + this.transactionTag = transactionTag; this.isRetryableInternalErrorPredicate = new IsRetryableInternalError(); } @@ -194,22 +198,29 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt if (options.hasTag()) { requestOptionsBuilder.setRequestTag(options.tag()); } + if (transactionTag != null) { + requestOptionsBuilder.setTransactionTag(transactionTag); + } builder.setRequestOptions(requestOptionsBuilder.build()); } return builder.build(); } private ByteString initTransaction(final Options options) { - final BeginTransactionRequest request = + BeginTransactionRequest.Builder builder = BeginTransactionRequest.newBuilder() .setSession(session.getName()) .setOptions( TransactionOptions.newBuilder() .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()) .setExcludeTxnFromChangeStreams( - options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)) - .build(); - Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); + options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)); + + if (transactionTag != null) { + builder.setRequestOptions( + RequestOptions.newBuilder().setTransactionTag(transactionTag).build()); + } + Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true); if (tx.getId().isEmpty()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 5bd31603685..f0eca14feed 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -201,7 +201,18 @@ public DatabaseId getDatabaseId() { public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = - new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); + new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null); + return txn.executeStreamingPartitionedUpdate( + stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); + } + + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + setActive(null); + PartitionedDmlTransaction txn = + new PartitionedDmlTransaction( + this, spanner.getRpc(), Ticker.systemTicker(), transactionTag); return txn.executeStreamingPartitionedUpdate( stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cf50fa44c77..d621f91d630 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1273,6 +1273,11 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt default long executePartitionedUpdate(Statement stmt, UpdateOption... options) { return get().executePartitionedUpdate(stmt, options); } + + default long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + return get().executePartitionedUpdate(stmt, transactionTag, options); + } } class PooledSessionFutureWrapper implements SessionFutureWrapper { @@ -1494,6 +1499,16 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { } } + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + try { + return get(true).executePartitionedUpdate(stmt, transactionTag, options); + } finally { + close(); + } + } + @Override public String getName() { return get().getName(); @@ -1709,6 +1724,18 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) } } + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) + throws SpannerException { + try { + markUsed(); + return delegate.executePartitionedUpdate(stmt, transactionTag, options); + } catch (SpannerException e) { + throw lastException = e; + } + } + @Override public ReadContext singleUse() { return delegate.singleUse(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 62a10c0adb4..09812f1d598 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -1944,6 +1944,30 @@ public void testPartitionedDMLWithTag() { assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); } + @Test + public void testPartitionedDMLWithTransactionTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.executePartitionedUpdate( + UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml")); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasPartitionedDml()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("testTransactionTag"); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=dml"); + } + @Test public void testCommitWithTag() { DatabaseClient client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 93e0e3eb3d0..d2b102e3d37 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -101,7 +101,7 @@ public void setup() { when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); - tx = new PartitionedDmlTransaction(session, rpc, ticker); + tx = new PartitionedDmlTransaction(session, rpc, ticker, null); } @Test @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); SpannerException e = assertThrows( SpannerException.class,