From 62cb4dacf97db0325c2ca3eb96b84aa2ccae1f98 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 14 Nov 2024 11:29:21 +0530 Subject: [PATCH 1/4] fix(spanner): support transaction tags in partition DML --- .../clirr-ignored-differences.xml | 6 ++++- ...tractMultiplexedSessionDatabaseClient.java | 7 +++++ .../google/cloud/spanner/DatabaseClient.java | 18 +++++++++++++ .../cloud/spanner/DatabaseClientImpl.java | 14 +++++++++- .../spanner/PartitionedDmlTransaction.java | 21 +++++++++++---- .../com/google/cloud/spanner/SessionImpl.java | 13 ++++++++- .../com/google/cloud/spanner/SessionPool.java | 27 +++++++++++++++++++ .../cloud/spanner/DatabaseClientImplTest.java | 24 +++++++++++++++++ .../PartitionedDmlTransactionTest.java | 8 +++--- 9 files changed, 126 insertions(+), 12 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index ec13415790c..7f194d55dae 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -790,5 +790,9 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - + + 7012 + com/google/cloud/spanner/DatabaseClient + long executePartitionedUpdate(com.google.cloud.spanner.Statement, java.lang.String, com.google.cloud.spanner.Options$UpdateOption[]) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index ebfb0e0a774..74fca99f9a3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -21,6 +21,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; +import javax.annotation.Nullable; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -56,4 +57,10 @@ public ServerStream batchWriteAtLeastOnce( public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { throw new UnsupportedOperationException(); } + + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + throw new UnsupportedOperationException(); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 06237131458..755b9b1ef5d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -22,6 +22,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; +import javax.annotation.Nullable; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -601,4 +602,21 @@ ServerStream batchWriteAtLeastOnce( * idempotent, such as deleting old rows from a very large table. */ long executePartitionedUpdate(Statement stmt, UpdateOption... options); + + /** + * Executes a Partitioned DML statement with the specified transaction tag. + * + *

This method has the same behavior as {@link #executePartitionedUpdate(Statement, + * UpdateOption...)} but allows specifying a transaction tag that will be applied to all + * partitioned operations. + * + * @param stmt The Partitioned DML statement to execute + * @param transactionTag The transaction tag to apply to all partitioned operations. The tag must + * be a printable string (ASCII 32-126) with maximum length of 50 characters. + * @param options The options to use for the update operation + * @return The total number of rows modified by the statement + * @throws SpannerException if the operation failed + */ + long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index d7f16f89524..9200aa23a2b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -309,9 +309,21 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { + return executePartitionedUpdateWithOptions(stmt, null, options); + } + + @Override + public long executePartitionedUpdate( + final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { + return executePartitionedUpdateWithOptions(stmt, transactionTag, options); + } + + private long executePartitionedUpdateWithOptions( + final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { - return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); + return runWithSessionRetry( + session -> session.executePartitionedUpdate(stmt, transactionTag, options)); } catch (RuntimeException e) { span.setStatus(e); span.end(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 82b7f06b7d2..dc4d1f54f3c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.threeten.bp.Duration; import org.threeten.bp.temporal.ChronoUnit; @@ -54,13 +55,16 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction private final SessionImpl session; private final SpannerRpc rpc; private final Ticker ticker; + private final @Nullable String transactionTag; private final IsRetryableInternalError isRetryableInternalErrorPredicate; private volatile boolean isValid = true; - PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) { + PartitionedDmlTransaction( + SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) { this.session = session; this.rpc = rpc; this.ticker = ticker; + this.transactionTag = transactionTag; this.isRetryableInternalErrorPredicate = new IsRetryableInternalError(); } @@ -194,22 +198,29 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt if (options.hasTag()) { requestOptionsBuilder.setRequestTag(options.tag()); } + if (transactionTag != null) { + requestOptionsBuilder.setTransactionTag(transactionTag); + } builder.setRequestOptions(requestOptionsBuilder.build()); } return builder.build(); } private ByteString initTransaction(final Options options) { - final BeginTransactionRequest request = + BeginTransactionRequest.Builder builder = BeginTransactionRequest.newBuilder() .setSession(session.getName()) .setOptions( TransactionOptions.newBuilder() .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()) .setExcludeTxnFromChangeStreams( - options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)) - .build(); - Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); + options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)); + + if (transactionTag != null) { + builder.setRequestOptions( + RequestOptions.newBuilder().setTransactionTag(transactionTag).build()); + } + Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true); if (tx.getId().isEmpty()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 5bd31603685..f0eca14feed 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -201,7 +201,18 @@ public DatabaseId getDatabaseId() { public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = - new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); + new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null); + return txn.executeStreamingPartitionedUpdate( + stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); + } + + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + setActive(null); + PartitionedDmlTransaction txn = + new PartitionedDmlTransaction( + this, spanner.getRpc(), Ticker.systemTicker(), transactionTag); return txn.executeStreamingPartitionedUpdate( stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cf50fa44c77..d621f91d630 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1273,6 +1273,11 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt default long executePartitionedUpdate(Statement stmt, UpdateOption... options) { return get().executePartitionedUpdate(stmt, options); } + + default long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + return get().executePartitionedUpdate(stmt, transactionTag, options); + } } class PooledSessionFutureWrapper implements SessionFutureWrapper { @@ -1494,6 +1499,16 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { } } + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) { + try { + return get(true).executePartitionedUpdate(stmt, transactionTag, options); + } finally { + close(); + } + } + @Override public String getName() { return get().getName(); @@ -1709,6 +1724,18 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) } } + @Override + public long executePartitionedUpdate( + Statement stmt, @Nullable String transactionTag, UpdateOption... options) + throws SpannerException { + try { + markUsed(); + return delegate.executePartitionedUpdate(stmt, transactionTag, options); + } catch (SpannerException e) { + throw lastException = e; + } + } + @Override public ReadContext singleUse() { return delegate.singleUse(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 62a10c0adb4..09812f1d598 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -1944,6 +1944,30 @@ public void testPartitionedDMLWithTag() { assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); } + @Test + public void testPartitionedDMLWithTransactionTag() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.executePartitionedUpdate( + UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml")); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasPartitionedDml()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getRequestOptions()); + assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("testTransactionTag"); + assertThat(request.getRequestOptions().getRequestTag()) + .isEqualTo("app=spanner,env=test,action=dml"); + } + @Test public void testCommitWithTag() { DatabaseClient client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 93e0e3eb3d0..d2b102e3d37 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -101,7 +101,7 @@ public void setup() { when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); - tx = new PartitionedDmlTransaction(session, rpc, ticker); + tx = new PartitionedDmlTransaction(session, rpc, ticker, null); } @Test @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); SpannerException e = assertThrows( SpannerException.class, From 131966c777e9866ce0581b2f629857bc5c692206 Mon Sep 17 00:00:00 2001 From: cloud-java-bot Date: Thu, 14 Nov 2024 06:15:33 +0000 Subject: [PATCH 2/4] chore: generate libraries at Thu Nov 14 06:12:19 UTC 2024 --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4e1fbea168f..294ac252fd3 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-spanner - 6.81.0 + 6.81.1 ``` @@ -516,6 +516,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-spanner/tree/ | Create Instance Partition Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstancePartitionSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstancePartitionSample.java) | | Create Instance With Autoscaling Config Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAutoscalingConfigExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAutoscalingConfigExample.java) | | Create Instance With Processing Units Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithProcessingUnitsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithProcessingUnitsExample.java) | +| Create Instance Without Default Backup Schedules Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithoutDefaultBackupSchedulesExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithoutDefaultBackupSchedulesExample.java) | | Create Sequence Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateSequenceSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateSequenceSample.java) | | Create Table With Foreign Key Delete Cascade Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateTableWithForeignKeyDeleteCascadeSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateTableWithForeignKeyDeleteCascadeSample.java) | | Custom Timeout And Retry Settings Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java) | @@ -571,6 +572,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-spanner/tree/ | Update Database Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseSample.java) | | Update Database With Default Leader Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseWithDefaultLeaderSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseWithDefaultLeaderSample.java) | | Update Instance Config Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateInstanceConfigSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateInstanceConfigSample.java) | +| Update Instance Default Backup Schedule Type Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateInstanceDefaultBackupScheduleTypeExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateInstanceDefaultBackupScheduleTypeExample.java) | | Update Instance Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateInstanceExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateInstanceExample.java) | | Update Json Data Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateJsonDataSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateJsonDataSample.java) | | Update Jsonb Data Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateJsonbDataSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateJsonbDataSample.java) | From 4fa9b288d903d8a49afd52034ac09bc340eab0ff Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 14 Nov 2024 11:53:39 +0530 Subject: [PATCH 3/4] fix tests --- .../java/com/google/cloud/spanner/SessionPoolTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 00339fd2946..cada6007d13 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1729,10 +1729,13 @@ public void testSessionNotFoundPartitionedUpdate() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1"); final SessionImpl closedSession = mockSession(); - when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound); + when(closedSession.executePartitionedUpdate(any(Statement.class))).thenThrow(sessionNotFound); + when(closedSession.executePartitionedUpdate(any(Statement.class), any(), any())) + .thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.executePartitionedUpdate(statement)).thenReturn(1L); + when(openSession.executePartitionedUpdate(any(Statement.class))).thenReturn(1L); + when(openSession.executePartitionedUpdate(any(Statement.class), any(), any())).thenReturn(1L); doAnswer( invocation -> { executor.submit( From 3da6837eac3c4120b99c32be1b57e51248514b0c Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 14 Nov 2024 12:41:31 +0530 Subject: [PATCH 4/4] revert changes and add transactionTag option --- .../clirr-ignored-differences.xml | 6 +-- ...tractMultiplexedSessionDatabaseClient.java | 7 ---- .../google/cloud/spanner/DatabaseClient.java | 18 --------- .../cloud/spanner/DatabaseClientImpl.java | 14 +------ .../com/google/cloud/spanner/Options.java | 38 +++++++++++++++++++ .../spanner/PartitionedDmlTransaction.java | 14 +++---- .../com/google/cloud/spanner/SessionImpl.java | 13 +------ .../com/google/cloud/spanner/SessionPool.java | 27 ------------- .../cloud/spanner/DatabaseClientImplTest.java | 4 +- .../PartitionedDmlTransactionTest.java | 8 ++-- .../google/cloud/spanner/SessionPoolTest.java | 7 +--- 11 files changed, 55 insertions(+), 101 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 7f194d55dae..b2bcddee010 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -790,9 +790,5 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - - 7012 - com/google/cloud/spanner/DatabaseClient - long executePartitionedUpdate(com.google.cloud.spanner.Statement, java.lang.String, com.google.cloud.spanner.Options$UpdateOption[]) - + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 74fca99f9a3..ebfb0e0a774 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -21,7 +21,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; -import javax.annotation.Nullable; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -57,10 +56,4 @@ public ServerStream batchWriteAtLeastOnce( public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { throw new UnsupportedOperationException(); } - - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - throw new UnsupportedOperationException(); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 755b9b1ef5d..06237131458 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -22,7 +22,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; -import javax.annotation.Nullable; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -602,21 +601,4 @@ ServerStream batchWriteAtLeastOnce( * idempotent, such as deleting old rows from a very large table. */ long executePartitionedUpdate(Statement stmt, UpdateOption... options); - - /** - * Executes a Partitioned DML statement with the specified transaction tag. - * - *

This method has the same behavior as {@link #executePartitionedUpdate(Statement, - * UpdateOption...)} but allows specifying a transaction tag that will be applied to all - * partitioned operations. - * - * @param stmt The Partitioned DML statement to execute - * @param transactionTag The transaction tag to apply to all partitioned operations. The tag must - * be a printable string (ASCII 32-126) with maximum length of 50 characters. - * @param options The options to use for the update operation - * @return The total number of rows modified by the statement - * @throws SpannerException if the operation failed - */ - long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 9200aa23a2b..d7f16f89524 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -309,21 +309,9 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { - return executePartitionedUpdateWithOptions(stmt, null, options); - } - - @Override - public long executePartitionedUpdate( - final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { - return executePartitionedUpdateWithOptions(stmt, transactionTag, options); - } - - private long executePartitionedUpdateWithOptions( - final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { - return runWithSessionRetry( - session -> session.executePartitionedUpdate(stmt, transactionTag, options)); + return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); } catch (RuntimeException e) { span.setStatus(e); span.end(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 9c3257586fb..73c47a32ac3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -197,6 +197,10 @@ public static ReadQueryUpdateTransactionOption tag(String name) { return new TagOption(name); } + public static ReadQueryUpdateTransactionOption transactionTag(String name) { + return new TransactionTagOption(name); + } + /** * Specifying this will cause the list operations to fetch at most this many records in a page. */ @@ -394,6 +398,24 @@ void appendToOptions(Options options) { } } + static final class TransactionTagOption extends InternalOption + implements ReadQueryUpdateTransactionOption { + private final String transactionTag; + + TransactionTagOption(String transactionTag) { + this.transactionTag = transactionTag; + } + + String getTransactionTag() { + return transactionTag; + } + + @Override + void appendToOptions(Options options) { + options.transactionTag = transactionTag; + } + } + static final class EtagOption extends InternalOption implements DeleteAdminApiOption { private final String etag; @@ -462,6 +484,7 @@ void appendToOptions(Options options) { private RpcPriority priority; private String tag; private String etag; + private String transactionTag; private Boolean validateOnly; private Boolean withOptimisticLock; private Boolean withExcludeTxnFromChangeStreams; @@ -545,6 +568,14 @@ boolean hasTag() { return tag != null; } + boolean hasTransactionTag() { + return transactionTag != null; + } + + String transactionTag() { + return transactionTag; + } + String tag() { return tag; } @@ -661,6 +692,9 @@ public String toString() { if (orderBy != null) { b.append("orderBy: ").append(orderBy).append(' '); } + if (transactionTag != null) { + b.append("transactionTag: ").append(transactionTag).append(' '); + } return b.toString(); } @@ -694,6 +728,7 @@ public boolean equals(Object o) { && Objects.equals(filter(), that.filter()) && Objects.equals(priority(), that.priority()) && Objects.equals(tag(), that.tag()) + && Objects.equals(transactionTag, that.transactionTag) && Objects.equals(etag(), that.etag()) && Objects.equals(validateOnly(), that.validateOnly()) && Objects.equals(withOptimisticLock(), that.withOptimisticLock()) @@ -760,6 +795,9 @@ public int hashCode() { if (orderBy != null) { result = 31 * result + orderBy.hashCode(); } + if (transactionTag != null) { + result = 31 * result + transactionTag.hashCode(); + } return result; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index dc4d1f54f3c..cabfd2c476e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import org.threeten.bp.Duration; import org.threeten.bp.temporal.ChronoUnit; @@ -55,16 +54,13 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction private final SessionImpl session; private final SpannerRpc rpc; private final Ticker ticker; - private final @Nullable String transactionTag; private final IsRetryableInternalError isRetryableInternalErrorPredicate; private volatile boolean isValid = true; - PartitionedDmlTransaction( - SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) { + PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) { this.session = session; this.rpc = rpc; this.ticker = ticker; - this.transactionTag = transactionTag; this.isRetryableInternalErrorPredicate = new IsRetryableInternalError(); } @@ -198,8 +194,8 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt if (options.hasTag()) { requestOptionsBuilder.setRequestTag(options.tag()); } - if (transactionTag != null) { - requestOptionsBuilder.setTransactionTag(transactionTag); + if (options.hasTransactionTag()) { + requestOptionsBuilder.setTransactionTag(options.transactionTag()); } builder.setRequestOptions(requestOptionsBuilder.build()); } @@ -216,9 +212,9 @@ private ByteString initTransaction(final Options options) { .setExcludeTxnFromChangeStreams( options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)); - if (transactionTag != null) { + if (options.hasTransactionTag()) { builder.setRequestOptions( - RequestOptions.newBuilder().setTransactionTag(transactionTag).build()); + RequestOptions.newBuilder().setTransactionTag(options.transactionTag()).build()); } Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true); if (tx.getId().isEmpty()) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index f0eca14feed..5bd31603685 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -201,18 +201,7 @@ public DatabaseId getDatabaseId() { public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = - new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null); - return txn.executeStreamingPartitionedUpdate( - stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); - } - - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - setActive(null); - PartitionedDmlTransaction txn = - new PartitionedDmlTransaction( - this, spanner.getRpc(), Ticker.systemTicker(), transactionTag); + new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); return txn.executeStreamingPartitionedUpdate( stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index d621f91d630..cf50fa44c77 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1273,11 +1273,6 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt default long executePartitionedUpdate(Statement stmt, UpdateOption... options) { return get().executePartitionedUpdate(stmt, options); } - - default long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - return get().executePartitionedUpdate(stmt, transactionTag, options); - } } class PooledSessionFutureWrapper implements SessionFutureWrapper { @@ -1499,16 +1494,6 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { } } - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - try { - return get(true).executePartitionedUpdate(stmt, transactionTag, options); - } finally { - close(); - } - } - @Override public String getName() { return get().getName(); @@ -1724,18 +1709,6 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) } } - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) - throws SpannerException { - try { - markUsed(); - return delegate.executePartitionedUpdate(stmt, transactionTag, options); - } catch (SpannerException e) { - throw lastException = e; - } - } - @Override public ReadContext singleUse() { return delegate.singleUse(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 09812f1d598..f01f9342331 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -1949,7 +1949,9 @@ public void testPartitionedDMLWithTransactionTag() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); client.executePartitionedUpdate( - UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml")); + UPDATE_STATEMENT, + Options.transactionTag("testTransactionTag"), + Options.tag("app=spanner,env=test,action=dml")); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index d2b102e3d37..93e0e3eb3d0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -101,7 +101,7 @@ public void setup() { when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); - tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + tx = new PartitionedDmlTransaction(session, rpc, ticker); } @Test @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); SpannerException e = assertThrows( SpannerException.class, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index cada6007d13..00339fd2946 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1729,13 +1729,10 @@ public void testSessionNotFoundPartitionedUpdate() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1"); final SessionImpl closedSession = mockSession(); - when(closedSession.executePartitionedUpdate(any(Statement.class))).thenThrow(sessionNotFound); - when(closedSession.executePartitionedUpdate(any(Statement.class), any(), any())) - .thenThrow(sessionNotFound); + when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.executePartitionedUpdate(any(Statement.class))).thenReturn(1L); - when(openSession.executePartitionedUpdate(any(Statement.class), any(), any())).thenReturn(1L); + when(openSession.executePartitionedUpdate(statement)).thenReturn(1L); doAnswer( invocation -> { executor.submit(