Skip to content

Commit

Permalink
combined wrapped predicate with wrapped exc factory
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Feb 7, 2024
1 parent fcd3aaa commit fcbdda6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
24 changes: 5 additions & 19 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,17 @@ def start_operation(self) -> AsyncGenerator[Row, None]:
self._operation_metrics.backoff_generator = sleep_generator

# Metrics:
# track attempt failures using build_wrapped_predicate() for raised exceptions
# and _metric_wrapped_exception_factory for operation timeouts
# track attempt failures using build_wrapped_fn_handlers() for raised exceptions
# and operation timeouts
metric_predicate, metric_excs = self._operation_metrics.build_wrapped_fn_handlers(self._predicate)
return retries.retry_target_stream_async(
self._read_rows_attempt,
self._operation_metrics.build_wrapped_predicate(self._predicate),
metric_predicate,
sleep_generator,
self.operation_timeout,
exception_factory=self._metric_wrapped_exception_factory,
exception_factory=metric_excs,
)

def _metric_wrapped_exception_factory(
self,
exc_list: list[Exception],
reason: retries.RetryFailureReason,
timeout_val: float | None,
) -> tuple[Exception, Exception | None]:
"""
Wrap the retry exception builder to alert the metrics class
when we are going to emit an operation timeout.
"""
exc, source = _retry_exception_factory(exc_list, reason, timeout_val)
if reason != retries.RetryFailureReason.NON_RETRYABLE_ERROR:
self._operation_metrics.end_with_status(exc)
return exc, source

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
"""
Attempt a single read_rows rpc call.
Expand Down
28 changes: 22 additions & 6 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable_v2.types.response_params import ResponseParams
from google.protobuf.message import DecodeError
from google.api_core.retry import RetryFailureReason

if TYPE_CHECKING:
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler
Expand Down Expand Up @@ -377,13 +379,17 @@ def end_with_success(self):
"""
return self.end_with_status(StatusCode.OK)

def build_wrapped_predicate(
self, inner_predicate: Callable[[Exception], bool]
def build_wrapped_fn_handlers(
self,
inner_predicate: Callable[[Exception], bool],
) -> Callable[[Exception], bool]:
"""
Wrapps a predicate to include metrics tracking. Any call to the resulting predicate
is assumed to be an rpc failure, and will either mark the end of the active attempt
or the end of the operation.
One way to track metrics is by wrapping the `predicate` and `exception_factory`
arguments of `api_core.Retry`. This will notify us when an exception occurs so
we can track it.
This function retruns wrapped versions of the `predicate` and `exception_factory`
to be passed down when building the `Retry` object.
Args:
- predicate: The predicate to wrap.
Expand All @@ -397,7 +403,17 @@ def wrapped_predicate(exc: Exception) -> bool:
self.end_with_status(exc)
return inner_result

return wrapped_predicate
def wrapped_exception_factory(
exc_list: list[Exception],
reason: RetryFailureReason,
timeout_val: float | None,
) -> tuple[Exception, Exception | None]:
exc, source = _retry_exception_factory(exc_list, reason, timeout_val)
if reason != RetryFailureReason.NON_RETRYABLE_ERROR:
self._operation_metrics.end_with_status(exc)
return exc, source

return wrapped_predicate, wrapped_exception_factory

@staticmethod
def _exc_to_status(exc: Exception) -> StatusCode:
Expand Down

0 comments on commit fcbdda6

Please sign in to comment.