Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC support request/response size #11833

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,20 @@ public interface RpcAttributesGetter<REQUEST> {

@Nullable
String getMethod(REQUEST request);

default int getClientRequestSize(REQUEST request) {
return 0;
}

default int getClientResponseSize(REQUEST request) {
return 0;
}

default int getServerRequestSize(REQUEST request) {
return 0;
}

default int getServerResponseSize(REQUEST request) {
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type for these should be Long, and these should return null by default. We need to consider whether this should be split into separate client/server getters similarly to what we do for http client/server. Would it make sense to introduce a response parameter too? Or perhaps just have getRequestSize and getResponseSize in the getter, we already have separate extractors for client/server these could be used to set the correct attribute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for your suggestion, Done.

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
Expand All @@ -35,6 +37,8 @@ public final class RpcClientMetrics implements OperationListener {
private static final Logger logger = Logger.getLogger(RpcClientMetrics.class.getName());

private final DoubleHistogram clientDurationHistogram;
private final LongHistogram clientRequestSize;
private final LongHistogram clientResponseSize;

private RpcClientMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
Expand All @@ -44,6 +48,24 @@ private RpcClientMetrics(Meter meter) {
.setUnit("ms");
RpcMetricsAdvice.applyClientDurationAdvice(durationBuilder);
clientDurationHistogram = durationBuilder.build();

LongHistogramBuilder requestSizeBuilder =
meter
.histogramBuilder("rpc.client.request.size")
.setUnit("By")
.setDescription("Measures the size of RPC request messages (uncompressed).")
.ofLongs();
RpcMetricsAdvice.applyClientRequestSizeAdvice(requestSizeBuilder);
clientRequestSize = requestSizeBuilder.build();

LongHistogramBuilder responseSizeBuilder =
meter
.histogramBuilder("rpc.client.response.size")
.setUnit("By")
.setDescription("Measures the size of RPC response messages (uncompressed).")
.ofLongs();
RpcMetricsAdvice.applyClientRequestSizeAdvice(responseSizeBuilder);
clientResponseSize = responseSizeBuilder.build();
}

/**
Expand Down Expand Up @@ -72,10 +94,21 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) {
context);
return;
}
Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();
clientDurationHistogram.record(
(endNanos - state.startTimeNanos()) / NANOS_PER_MS,
state.startAttributes().toBuilder().putAll(endAttributes).build(),
context);
(endNanos - state.startTimeNanos()) / NANOS_PER_MS, attributes, context);

Long rpcClientRequestBodySize =
RpcMessageBodySizeUtil.getRpcClientRequestBodySize(endAttributes, state.startAttributes());
if (rpcClientRequestBodySize != null && rpcClientRequestBodySize > 0) {
clientRequestSize.record(rpcClientRequestBodySize, attributes, context);
}

Long rpcClientResponseBodySize =
RpcMessageBodySizeUtil.getRpcClientResponseBodySize(endAttributes, state.startAttributes());
if (rpcClientResponseBodySize != null && rpcClientResponseBodySize > 0) {
clientResponseSize.record(rpcClientResponseBodySize, attributes, context);
}
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ abstract class RpcCommonAttributesExtractor<REQUEST, RESPONSE>
static final AttributeKey<String> RPC_METHOD = AttributeKey.stringKey("rpc.method");
static final AttributeKey<String> RPC_SERVICE = AttributeKey.stringKey("rpc.service");
static final AttributeKey<String> RPC_SYSTEM = AttributeKey.stringKey("rpc.system");
static final AttributeKey<Long> RPC_CLIENT_REQUEST_BODY_SIZE =
AttributeKey.longKey("rpc.client.request.body.size");
static final AttributeKey<Long> RPC_CLIENT_RESPONSE_BODY_SIZE =
AttributeKey.longKey("rpc.client.response.body.size");
static final AttributeKey<Long> RPC_SERVER_REQUEST_BODY_SIZE =
AttributeKey.longKey("rpc.server.request.body.size");
static final AttributeKey<Long> RPC_SERVER_RESPONSE_BODY_SIZE =
AttributeKey.longKey("rpc.server.response.body.size");

private final RpcAttributesGetter<REQUEST> getter;

Expand All @@ -41,6 +49,23 @@ public final void onEnd(
REQUEST request,
@Nullable RESPONSE response,
@Nullable Throwable error) {
// No response attributes
int clientRequestSize = getter.getClientRequestSize(request);
if (clientRequestSize > 0) {
internalSet(attributes, RPC_CLIENT_REQUEST_BODY_SIZE, (long) clientRequestSize);
}
int clientResponseSize = getter.getClientResponseSize(request);
if (clientResponseSize > 0) {
internalSet(attributes, RPC_CLIENT_RESPONSE_BODY_SIZE, (long) clientResponseSize);
}

int serverRequestSize = getter.getServerRequestSize(request);
if (serverRequestSize > 0) {
internalSet(attributes, RPC_SERVER_REQUEST_BODY_SIZE, (long) serverRequestSize);
}

int serverResponseSize = getter.getServerResponseSize(request);
if (serverResponseSize > 0) {
internalSet(attributes, RPC_SERVER_RESPONSE_BODY_SIZE, (long) serverResponseSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.rpc;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import javax.annotation.Nullable;

final class RpcMessageBodySizeUtil {

@Nullable
static Long getRpcClientRequestBodySize(Attributes... attributesList) {
return getAttribute(RpcCommonAttributesExtractor.RPC_CLIENT_REQUEST_BODY_SIZE, attributesList);
}

@Nullable
static Long getRpcClientResponseBodySize(Attributes... attributesList) {
return getAttribute(RpcCommonAttributesExtractor.RPC_CLIENT_RESPONSE_BODY_SIZE, attributesList);
}

@Nullable
static Long getRpcServerRequestBodySize(Attributes... attributesList) {
return getAttribute(RpcCommonAttributesExtractor.RPC_SERVER_REQUEST_BODY_SIZE, attributesList);
}

@Nullable
static Long getRpcServerResponseBodySize(Attributes... attributesList) {
return getAttribute(RpcCommonAttributesExtractor.RPC_SERVER_RESPONSE_BODY_SIZE, attributesList);
}

@Nullable
private static <T> T getAttribute(AttributeKey<T> key, Attributes... attributesList) {
for (Attributes attributes : attributesList) {
T value = attributes.get(key);
if (value != null) {
return value;
}
}
return null;
}

private RpcMessageBodySizeUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder;
import io.opentelemetry.api.incubator.metrics.ExtendedLongHistogramBuilder;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.semconv.NetworkAttributes;
import io.opentelemetry.semconv.ServerAttributes;
import java.util.Arrays;
Expand Down Expand Up @@ -56,5 +58,43 @@ static void applyServerDurationAdvice(DoubleHistogramBuilder builder) {
ServerAttributes.SERVER_PORT));
}

static void applyClientRequestSizeAdvice(LongHistogramBuilder builder) {
Copy link
Contributor

@laurit laurit Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you are always going to use the same set of attributes you could make all these delegate to a common method or extract the attributes list to a variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (!(builder instanceof ExtendedLongHistogramBuilder)) {
return;
}
// the list of recommended metrics attributes is from
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-metrics.md
((ExtendedLongHistogramBuilder) builder)
.setAttributesAdvice(
Arrays.asList(
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
RpcCommonAttributesExtractor.RPC_SYSTEM,
RpcCommonAttributesExtractor.RPC_SERVICE,
RpcCommonAttributesExtractor.RPC_METHOD,
RPC_GRPC_STATUS_CODE,
NetworkAttributes.NETWORK_TYPE,
NetworkAttributes.NETWORK_TRANSPORT,
ServerAttributes.SERVER_ADDRESS,
ServerAttributes.SERVER_PORT));
}

static void applyServerRequestSizeAdvice(LongHistogramBuilder builder) {
if (!(builder instanceof ExtendedLongHistogramBuilder)) {
return;
}
// the list of recommended metrics attributes is from
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-metrics.md
((ExtendedLongHistogramBuilder) builder)
.setAttributesAdvice(
Arrays.asList(
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
RpcCommonAttributesExtractor.RPC_SYSTEM,
RpcCommonAttributesExtractor.RPC_SERVICE,
RpcCommonAttributesExtractor.RPC_METHOD,
RPC_GRPC_STATUS_CODE,
NetworkAttributes.NETWORK_TYPE,
NetworkAttributes.NETWORK_TRANSPORT,
ServerAttributes.SERVER_ADDRESS,
ServerAttributes.SERVER_PORT));
}

private RpcMetricsAdvice() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
Expand All @@ -35,6 +37,8 @@ public final class RpcServerMetrics implements OperationListener {
private static final Logger logger = Logger.getLogger(RpcServerMetrics.class.getName());

private final DoubleHistogram serverDurationHistogram;
private final LongHistogram serverRequestSize;
private final LongHistogram serverResponseSize;

private RpcServerMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
Expand All @@ -44,6 +48,24 @@ private RpcServerMetrics(Meter meter) {
.setUnit("ms");
RpcMetricsAdvice.applyServerDurationAdvice(durationBuilder);
serverDurationHistogram = durationBuilder.build();

LongHistogramBuilder requestSizeBuilder =
meter
.histogramBuilder("rpc.server.request.size")
.setUnit("By")
.setDescription("Measures the size of RPC request messages (uncompressed).")
.ofLongs();
RpcMetricsAdvice.applyServerRequestSizeAdvice(requestSizeBuilder);
serverRequestSize = requestSizeBuilder.build();

LongHistogramBuilder responseSizeBuilder =
meter
.histogramBuilder("rpc.server.response.size")
.setUnit("By")
.setDescription("Measures the size of RPC response messages (uncompressed).")
.ofLongs();
RpcMetricsAdvice.applyServerRequestSizeAdvice(responseSizeBuilder);
serverResponseSize = responseSizeBuilder.build();
}

/**
Expand Down Expand Up @@ -72,10 +94,21 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) {
context);
return;
}
Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();
serverDurationHistogram.record(
(endNanos - state.startTimeNanos()) / NANOS_PER_MS,
state.startAttributes().toBuilder().putAll(endAttributes).build(),
context);
(endNanos - state.startTimeNanos()) / NANOS_PER_MS, attributes, context);

Long rpcServerRequestBodySize =
RpcMessageBodySizeUtil.getRpcServerRequestBodySize(endAttributes, state.startAttributes());
if (rpcServerRequestBodySize != null && rpcServerRequestBodySize > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> 0 condition is weird. I think you can get rid of it by changing the type in getter from int -> Long so you can distinguish when getter does not implement this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

serverRequestSize.record(rpcServerRequestBodySize, attributes, context);
}

Long rpcServerResponseBodySize =
RpcMessageBodySizeUtil.getRpcServerResponseBodySize(endAttributes, state.startAttributes());
if (rpcServerResponseBodySize != null && rpcServerResponseBodySize > 0) {
serverResponseSize.record(rpcServerResponseBodySize, attributes, context);
}
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.OperationListener;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.semconv.NetworkAttributes;
import io.opentelemetry.semconv.ServerAttributes;
Expand Down Expand Up @@ -46,6 +47,8 @@ void collectsMetrics() {
.put(ServerAttributes.SERVER_PORT, 8080)
.put(NetworkAttributes.NETWORK_TRANSPORT, "tcp")
.put(NetworkAttributes.NETWORK_TYPE, "ipv4")
.put(RpcCommonAttributesExtractor.RPC_CLIENT_REQUEST_BODY_SIZE, 10)
.put(RpcCommonAttributesExtractor.RPC_CLIENT_RESPONSE_BODY_SIZE, 20)
.build();

Attributes responseAttributes2 =
Expand Down Expand Up @@ -76,6 +79,62 @@ void collectsMetrics() {

assertThat(metricReader.collectAllMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
OpenTelemetryAssertions.assertThat(metric)
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
.hasName("rpc.client.response.size")
.hasUnit("By")
.hasDescription("Measures the size of RPC response messages (uncompressed).")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSum(20 /* bytes */)
.hasAttributesSatisfying(
equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"),
equalTo(
RpcIncubatingAttributes.RPC_SERVICE,
"myservice.EchoService"),
equalTo(
RpcIncubatingAttributes.RPC_METHOD,
"exampleMethod"),
equalTo(ServerAttributes.SERVER_ADDRESS, "example.com"),
equalTo(ServerAttributes.SERVER_PORT, 8080),
equalTo(NetworkAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(NetworkAttributes.NETWORK_TYPE, "ipv4"))
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasTraceId("ff01020304050600ff0a0b0c0d0e0f00")
.hasSpanId("090a0b0c0d0e0f00")))),
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("rpc.client.request.size")
.hasUnit("By")
.hasDescription("Measures the size of RPC request messages (uncompressed).")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSum(10 /* bytes */)
.hasAttributesSatisfying(
equalTo(RpcIncubatingAttributes.RPC_SYSTEM, "grpc"),
equalTo(
RpcIncubatingAttributes.RPC_SERVICE,
"myservice.EchoService"),
equalTo(
RpcIncubatingAttributes.RPC_METHOD,
"exampleMethod"),
equalTo(ServerAttributes.SERVER_ADDRESS, "example.com"),
equalTo(ServerAttributes.SERVER_PORT, 8080),
equalTo(NetworkAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(NetworkAttributes.NETWORK_TYPE, "ipv4"))
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasTraceId("ff01020304050600ff0a0b0c0d0e0f00")
.hasSpanId("090a0b0c0d0e0f00")))),
metric ->
assertThat(metric)
.hasName("rpc.client.duration")
Expand Down
Loading
Loading