-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: added client-side instrumentation to all rpcs #925
base: client_side_metrics_handlers
Are you sure you want to change the base?
feat: added client-side instrumentation to all rpcs #925
Conversation
del active_request_indices[result.index] | ||
finally: | ||
# send trailing metadata to metrics | ||
result_generator.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to call cancel in the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is calling cancel on the grpc stream. I think this is in case we encounter a client error; we need to stop the stream so it'll give us the trailing metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What client error would happen? I'm just wondering if this will introduce some bugs (like not consuming all the results or cancelling the grpc stream twice)
# the value directly to avoid extra overhead | ||
operation.active_attempt.application_blocking_time_ms += ( # type: ignore | ||
time.monotonic() - block_time | ||
) * 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we record everything in nanoseconds and convert them to milliseconds instead?
@@ -593,11 +595,20 @@ async def read_rows_stream( | |||
) | |||
retryable_excs = _get_retryable_errors(retryable_errors, self) | |||
|
|||
# extract metric operation if passed down through kwargs | |||
# used so that read_row can disable is_streaming flag | |||
metric_operation = kwargs.pop("metric_operation", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the API customer will interact with? This feels a bit weird 🤔 I think if someone calls readrows we can set is_streaming=True and OperationType to be ReadRows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hidden argument, users aren't supposed to interact with it
The problem is read_row is a small wrapper on top of read_rows. So read_rows can't assume the operation is streaming. This pattern is trying to allow read_row to pass down its own operation when it calls read_rows, so it can make sure streaming is False
We could also solve it using an entirely separate helper if this is too ugly though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
users aren't supposed to interact with it
Is it possible to make sure users can't interact with it? The fact that someone could look at the source code and pass in a random string (which will increase the cardinality) is a little concerning.
@@ -328,17 +331,26 @@ async def _flush_internal(self, new_entries: list[RowMutationEntry]): | |||
""" | |||
# flush new entries | |||
in_process_requests: list[asyncio.Future[list[FailedMutationEntryError]]] = [] | |||
metric = self._table._metrics.create_operation(OperationType.BULK_MUTATE_ROWS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have this OperationType in java 😓 let's just use MUTATE_ROWS for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BULK_MUTATE_ROWS is just the name we've been using for mutate_rows, since it's easier to parse. The string value is still "MutateRows"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha!
timeout_val: float | None, | ||
) -> tuple[Exception, Exception | None]: | ||
exc, source = _retry_exception_factory(exc_list, reason, timeout_val) | ||
if reason != RetryFailureReason.NON_RETRYABLE_ERROR: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be if reason = RetryFailureReason.NON_RETRYABLE_ERROR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the wrapped_predicate
will report all exceptions encountered. But the predicate isn't called when the operation ends due to timeout. So this extra wrapper is needed for that
This part is a bit more complicated than I hoped, so I was considering trying to refactor some of the read_rows instrumentation when I have a chance. Let me know what you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It makes sense but a little hard to follow. Maybe add a comment for now? And also in _read_rows start_operation comment what metric_fns[0]
and metric_fns[1]
are?
fcbdda6
to
ca0963a
Compare
del active_request_indices[result.index] | ||
finally: | ||
# send trailing metadata to metrics | ||
result_generator.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What client error would happen? I'm just wondering if this will introduce some bugs (like not consuming all the results or cancelling the grpc stream twice)
# For each row | ||
while True: | ||
try: | ||
c = await it.__anext__() | ||
except StopAsyncIteration: | ||
# stream complete | ||
operation.end_with_success() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this happen when customer cancels the read in the middle of a stream?
timeout_val: float | None, | ||
) -> tuple[Exception, Exception | None]: | ||
exc, source = _retry_exception_factory(exc_list, reason, timeout_val) | ||
if reason != RetryFailureReason.NON_RETRYABLE_ERROR: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It makes sense but a little hard to follow. Maybe add a comment for now? And also in _read_rows start_operation comment what metric_fns[0]
and metric_fns[1]
are?
This PR builds off of #924 to add instrumentation to each rpc in the Bigtable v3 data client
This change required some changes to the gapic generated client, which will need to be upstreamed to the gapic generator at some point: googleapis/gapic-generator-python#1856
TODO:
add end-to-end system tests