From fcbdda667f0f888b184209aee7ab5c43f7edaee5 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 6 Feb 2024 17:01:47 -0800 Subject: [PATCH] combined wrapped predicate with wrapped exc factory --- .../cloud/bigtable/data/_async/_read_rows.py | 24 ++++------------ .../bigtable/data/_metrics/data_model.py | 28 +++++++++++++++---- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index b768267b5..480bad21d 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -115,31 +115,17 @@ def start_operation(self) -> AsyncGenerator[Row, None]: self._operation_metrics.backoff_generator = sleep_generator # Metrics: - # track attempt failures using build_wrapped_predicate() for raised exceptions - # and _metric_wrapped_exception_factory for operation timeouts + # track attempt failures using build_wrapped_fn_handlers() for raised exceptions + # and operation timeouts + metric_predicate, metric_excs = self._operation_metrics.build_wrapped_fn_handlers(self._predicate) return retries.retry_target_stream_async( self._read_rows_attempt, - self._operation_metrics.build_wrapped_predicate(self._predicate), + metric_predicate, sleep_generator, self.operation_timeout, - exception_factory=self._metric_wrapped_exception_factory, + exception_factory=metric_excs, ) - def _metric_wrapped_exception_factory( - self, - exc_list: list[Exception], - reason: retries.RetryFailureReason, - timeout_val: float | None, - ) -> tuple[Exception, Exception | None]: - """ - Wrap the retry exception builder to alert the metrics class - when we are going to emit an operation timeout. - """ - exc, source = _retry_exception_factory(exc_list, reason, timeout_val) - if reason != retries.RetryFailureReason.NON_RETRYABLE_ERROR: - self._operation_metrics.end_with_status(exc) - return exc, source - def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: """ Attempt a single read_rows rpc call. diff --git a/google/cloud/bigtable/data/_metrics/data_model.py b/google/cloud/bigtable/data/_metrics/data_model.py index 790533ac7..e52d0db55 100644 --- a/google/cloud/bigtable/data/_metrics/data_model.py +++ b/google/cloud/bigtable/data/_metrics/data_model.py @@ -30,8 +30,10 @@ from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup from google.cloud.bigtable.data.exceptions import RetryExceptionGroup +from google.cloud.bigtable.data._helpers import _retry_exception_factory from google.cloud.bigtable_v2.types.response_params import ResponseParams from google.protobuf.message import DecodeError +from google.api_core.retry import RetryFailureReason if TYPE_CHECKING: from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler @@ -377,13 +379,17 @@ def end_with_success(self): """ return self.end_with_status(StatusCode.OK) - def build_wrapped_predicate( - self, inner_predicate: Callable[[Exception], bool] + def build_wrapped_fn_handlers( + self, + inner_predicate: Callable[[Exception], bool], ) -> Callable[[Exception], bool]: """ - Wrapps a predicate to include metrics tracking. Any call to the resulting predicate - is assumed to be an rpc failure, and will either mark the end of the active attempt - or the end of the operation. + One way to track metrics is by wrapping the `predicate` and `exception_factory` + arguments of `api_core.Retry`. This will notify us when an exception occurs so + we can track it. + + This function retruns wrapped versions of the `predicate` and `exception_factory` + to be passed down when building the `Retry` object. Args: - predicate: The predicate to wrap. @@ -397,7 +403,17 @@ def wrapped_predicate(exc: Exception) -> bool: self.end_with_status(exc) return inner_result - return wrapped_predicate + def wrapped_exception_factory( + exc_list: list[Exception], + reason: RetryFailureReason, + timeout_val: float | None, + ) -> tuple[Exception, Exception | None]: + exc, source = _retry_exception_factory(exc_list, reason, timeout_val) + if reason != RetryFailureReason.NON_RETRYABLE_ERROR: + self._operation_metrics.end_with_status(exc) + return exc, source + + return wrapped_predicate, wrapped_exception_factory @staticmethod def _exc_to_status(exc: Exception) -> StatusCode: