-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: client side metrics data model #923
base: main
Are you sure you want to change the base?
Conversation
|
||
# by default, exceptions in the metrics system are logged, | ||
# but enabling this flag causes them to be raised instead | ||
ALLOW_METRIC_EXCEPTIONS = os.getenv("BIGTABLE_METRICS_EXCEPTIONS", False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't think we should ever break the client, the exporter should just run in the background and log errors if there are any
completed rpc attempt. | ||
""" | ||
|
||
start_time: datetime.datetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, these are operation level labels and should be the same across multiple attempts. But Attempts are also labeled with these fields, so want to make sure they're added to the attributes later :)
new_attempt = CompletedAttemptMetric( | ||
start_time=self.active_attempt.start_time.utc, | ||
first_response_latency_ms=self.active_attempt.first_response_latency_ms, | ||
duration_ms=duration_seconds * 1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to measure it in nano seconds? seconds precision seems too low :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duration_seconds is actually higher precision than seconds already, because it's a float value.
The docs say this about the precision: "Use monotonic_ns() to avoid the precision loss caused by the float type."
So I think we should already be at sub-millisecond percision, but if that's not good enough we can change everything to monotonic_ns
to get full int nanoseconds everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha. Let's use monotic_ns and convert everything to milliseconds. The bucketing in OTEL is different from server side. OTEL buckets uses (start, end] while server uses [start, end). Recording everything in float histogram can minimize these off by 1 errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, just converted everything to int nanos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after nits
""" | ||
history = [] | ||
subgenerator = exponential_sleep_generator(initial, multiplier, maximum) | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dumb question: when will it break out of the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a python generator function. It gives up control on each yield line
The idea is that you get an instance like generator = backoff_generator(...)
, and then you can call next(generator)
or generator.send(idx)
on it every time you want to retrieve a value. This will run the internal code until it reaches the next yield, and then pause execution again until next time a value is requested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit too surprising of an api. I think it would be a lot cleaner and easier to read if Attempt.start() took a delay parameter
|
||
# by default, exceptions in the metrics system are logged, | ||
# but enabling this flag causes them to be raised instead | ||
ALLOW_METRIC_EXCEPTIONS = os.getenv("BIGTABLE_METRICS_EXCEPTIONS", False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm this still seems like an option? 😅 I think we should just remove this option
completed rpc attempt. | ||
""" | ||
|
||
start_time: datetime.datetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha, this makes sense! Can we also add this explanation to the document? maybe something like Operation level fields can be accessed from ActvieOperationMetric
new_attempt = CompletedAttemptMetric( | ||
start_time=self.active_attempt.start_time.utc, | ||
first_response_latency_ms=self.active_attempt.first_response_latency_ms, | ||
duration_ms=duration_seconds * 1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha. Let's use monotic_ns and convert everything to milliseconds. The bucketing in OTEL is different from server side. OTEL buckets uses (start, end] while server uses [start, end). Recording everything in float histogram can minimize these off by 1 errors.
|
||
def end_attempt_with_status(self, status: StatusCode | Exception) -> None: | ||
""" | ||
Called to mark the end of a failed attempt for the operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this comment be "Called to mark the end of a attempt for the operation."? it's also called in end_with_status, where the status could be OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. Usually users of this code won't call end_attempt_with_status after a successful attempt, because a successful attempt also means a successful operation. But it is used that way internally. I'll change this comment to try to make it more clear
preferred for calculations because it is resilient to clock changes, eg DST | ||
""" | ||
|
||
utc: datetime.datetime = field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we're only measuring latencies, why do we need the utc timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm good point. I thought that the wall-time timestamp was important to collect, but maybe we don't need it. I pulled it out
) | ||
if isinstance(status, Exception): | ||
status = self._exc_to_status(status) | ||
new_attempt = CompletedAttemptMetric( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we rename new_attempt
to current_attempt
? new_attempt sounds like we're creating an object for the next attempt? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, renamed to completed_attempt
""" | ||
history = [] | ||
subgenerator = exponential_sleep_generator(initial, multiplier, maximum) | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit too surprising of an api. I think it would be a lot cleaner and easier to read if Attempt.start() took a delay parameter
) | ||
|
||
# find backoff value | ||
if self.backoff_generator and len(self.completed_attempts) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. self.backoff_generator and self.completed_attempts
""" | ||
|
||
op_type: OperationType | ||
backoff_generator: BackoffGenerator | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to add a comment explaining when this is None ... I'm guessing for non-retriable operations?
Separately I'm not sure that this will work for RetryInfo in response trailers (where the server specifies how much to sleep). Might be better to just pass the amount slept as an arg to start_attempt
backoff_generator: BackoffGenerator | None = None | ||
# keep monotonic timestamps for active operations | ||
start_time_ns: int = field(default_factory=time.monotonic_ns) | ||
active_attempt: ActiveAttemptMetric | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would this be non-None?
cluster_id: str | None = None | ||
zone: str | None = None | ||
completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list) | ||
is_streaming: bool = False # only True for read_rows operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats not entirely true, it would also be true for CDC if we were to support that in this client and we have a couple of other features that would set this to true. I would remove the comment so it doesnt get stale
""" | ||
Creates a new operation and registers it with the subscribed handlers. | ||
""" | ||
handlers = self.handlers + kwargs.pop("handlers", []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the usecase for adding a handler per operation?
self, inner_predicate: Callable[[Exception], bool] | ||
) -> Callable[[Exception], bool]: | ||
""" | ||
Wrapps a predicate to include metrics tracking. Any call to the resulting predicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit s/Wrapps/Wrap
Im having a hard time wrapping my head around this....whats an example of a predicate that will be wrapped?
- exc: The exception to extract the status code from. | ||
""" | ||
if isinstance(exc, bt_exceptions._BigtableExceptionGroup): | ||
exc = exc.exceptions[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the last one? please add a note
if ( | ||
exc.__cause__ | ||
and hasattr(exc.__cause__, "grpc_status_code") | ||
and exc.__cause__.grpc_status_code is not None | ||
): | ||
return exc.__cause__.grpc_status_code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a single level enough? should this be recursive?
self, | ||
fn: Callable[..., Any], | ||
*, | ||
extract_call_metadata: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would this be false?
extract_call_metadata: bool = True, | ||
) -> Callable[..., Any]: | ||
""" | ||
Wraps a function call, tracing metadata along the way |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this wrap an attempt or an operation?
This PR adds the data model for the client-side metrics system
Follow-up PRs:
Design
The main architecture looks like this:
Most of the work is done by the ActiveOperationMetric class, which is instantiated with each rpc call, and updated through the lifecycle of the call. When the rpc is complete, it will call
on_operation_complete
andon_attempt_complete
on the MetricsHandler, which can then log the completed data into OpenTelemetry (or theoretically, other locations if needed)Note that there are separate classes for active vs completed metrics (
ActiveOperationMetric
,ActiveAttemptMetric
,CompletedOperationMetric
,CompletedAttemptMetric
). This is so that we can keep fields mutable and optional while the request is ongoing, but pass down static immutable copies once the attempt is completed and no new data is coming