Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new target attribute to attempts and operation attributes #2260

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TargetTracerInterceptor;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
Expand Down Expand Up @@ -275,7 +276,9 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
}

managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if(settings.getIsDirectpath()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only directpath?

managedChannelBuilder.intercept(new TargetTracerInterceptor());
}
if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private static final int MAX_MESSAGE_SIZE = 256 * 1024 * 1024;
private static final String SERVER_DEFAULT_APP_PROFILE_ID = "";

private static final String CBT_ENABLE_DIRECTPATH = "CBT_ENABLE_DIRECTPATH";
private static final Set<Code> IDEMPOTENT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);

Expand Down Expand Up @@ -233,6 +234,8 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS

private final MetricsProvider metricsProvider;

private final Boolean isDirectpath;

private EnhancedBigtableStubSettings(Builder builder) {
super(builder);

Expand Down Expand Up @@ -260,6 +263,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
enableRoutingCookie = builder.enableRoutingCookie;
enableRetryInfo = builder.enableRetryInfo;
metricsProvider = builder.metricsProvider;
isDirectpath = builder.isDirectpath;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand All @@ -275,6 +279,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
readChangeStreamSettings = builder.readChangeStreamSettings.build();
pingAndWarmSettings = builder.pingAndWarmSettings.build();
featureFlags = builder.featureFlags.build();

}

/** Create a new builder. */
Expand All @@ -287,6 +292,9 @@ public String getProjectId() {
return projectId;
}

public Boolean getIsDirectpath() {
return isDirectpath;
}
/** Returns the target instance id. */
public String getInstanceId() {
return instanceId;
Expand Down Expand Up @@ -345,7 +353,17 @@ public boolean getEnableRetryInfo() {

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
String enableDirectpathEnv = System.getenv(CBT_ENABLE_DIRECTPATH);
Boolean isDirectpathEnabled = Boolean.parseBoolean(enableDirectpathEnv);

InstantiatingGrpcChannelProvider.Builder grpcTransportProviderBuilder =
BigtableStubSettings.defaultGrpcTransportProviderBuilder();
if (isDirectpathEnabled) {
// Attempts direct access to CBT service over gRPC to improve throughput,
// whether the attempt is allowed is totally controlled by service owner.
grpcTransportProviderBuilder.setAttemptDirectPathXds().setAttemptDirectPath(true);
}
return grpcTransportProviderBuilder
meeral-k marked this conversation as resolved.
Show resolved Hide resolved
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
Expand All @@ -356,10 +374,7 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)) // wait this long before considering the connection dead
// Attempts direct access to CBT service over gRPC to improve throughput,
// whether the attempt is allowed is totally controlled by service owner.
.setAttemptDirectPath(true);
Duration.ofSeconds(10)); // wait this long before considering the connection dead
}

@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -652,6 +667,8 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet

private MetricsProvider metricsProvider;

private Boolean isDirectpath;

/**
* Initializes a new Builder with sane defaults for all settings.
*
Expand Down Expand Up @@ -777,6 +794,7 @@ private Builder() {

featureFlags =
FeatureFlags.newBuilder().setReverseScans(true).setLastScannedRowResponses(true);
isDirectpath = Boolean.parseBoolean(System.getenv(CBT_ENABLE_DIRECTPATH));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please dont access env vars from multiple places...there should be one place converts the env var into an instance variables in settings and then everything should reference the instance var/getter

}

private Builder(EnhancedBigtableStubSettings settings) {
Expand All @@ -790,6 +808,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
enableRoutingCookie = settings.enableRoutingCookie;
enableRetryInfo = settings.enableRetryInfo;
metricsProvider = settings.metricsProvider;
isDirectpath = settings.isDirectpath;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -1100,6 +1119,7 @@ public EnhancedBigtableStubSettings build() {
.build();
setInternalHeaderProvider(FixedHeaderProvider.create(headers));

isDirectpath = Boolean.parseBoolean(System.getenv(CBT_ENABLE_DIRECTPATH));
return new EnhancedBigtableStubSettings(this);
}
// </editor-fold>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ public void setLocations(String zone, String cluster) {
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}

public void addTarget(String target) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -48,6 +49,7 @@ public class BuiltinMetricsConstants {
public static final AttributeKey<String> CLIENT_NAME_KEY = AttributeKey.stringKey("client_name");
static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
static AttributeKey<String> TARGET_KEY = AttributeKey.stringKey("target");
static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");

// Metric names
Expand Down Expand Up @@ -107,7 +109,8 @@ public class BuiltinMetricsConstants {
CLUSTER_ID_KEY,
ZONE_ID_KEY,
METHOD_KEY,
CLIENT_NAME_KEY);
CLIENT_NAME_KEY,
TARGET_KEY);

static void defineView(
ImmutableMap.Builder<InstrumentSelector, View> viewMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STATUS_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STREAMING_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TARGET_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY;

import com.google.api.gax.retrying.ServerStreamingAttemptException;
import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.Version;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.math.IntMath;
import io.grpc.CallOptions;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
Expand All @@ -45,7 +48,6 @@
* bigtable.googleapis.com/client namespace
*/
class BuiltinMetricsTracer extends BigtableTracer {

private static final String NAME = "java-bigtable/" + Version.VERSION;
private final OperationType operationType;
private final SpanName spanName;
Expand Down Expand Up @@ -85,6 +87,8 @@ class BuiltinMetricsTracer extends BigtableTracer {

private Long serverLatencies = null;

private String target_endpoint = "unspecified";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be volatile? please check that the thread that sets the target is the same as the thread that reads it


// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
// to milliseconds and use DoubleHistogram. This should minimize the chance of a data
Expand Down Expand Up @@ -175,6 +179,12 @@ public void attemptSucceeded() {
recordAttemptCompletion(null);
}

public void addTarget(String target) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setAttemptTarget - there should only be a single target per attempt ... add implies there are multiple targets

if (!Strings.isNullOrEmpty(target)) {
this.target_endpoint = target;
}
}

@Override
public void attemptCancelled() {
recordAttemptCompletion(new CancellationException());
Expand Down Expand Up @@ -338,7 +348,6 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}

String statusStr = Util.extractStatus(status);

Attributes attributes =
baseAttributes
.toBuilder()
Expand All @@ -352,9 +361,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
.build();

clientBlockingLatenciesHistogram.record(convertToMs(totalClientBlockingTime.get()), attributes);

attemptLatenciesHistogram.record(
convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)), attributes);
convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)), attributes.toBuilder().put(TARGET_KEY,target_endpoint).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please reformat this line to make it easier to see that you are modifying the attribtues. Ideally split it up into 2 statements the first having a comment that attempt latency metrics differ from others to include the destination


if (serverLatencies != null) {
serverLatenciesHistogram.record(serverLatencies, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.api.gax.tracing.ApiTracer;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.internal.StringUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you forget to push? its still present

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -225,4 +226,12 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
}
}

public void addTarget(String target) {
if (StringUtils.isNullOrEmpty(target)) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.addTarget(target);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.tracing.ApiTracer;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.net.SocketAddress;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;

/** An interceptor extracts remote target and appends metric. */
public class TargetTracerInterceptor implements ClientInterceptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please mark all public class classes that are not supposed to be used by customers as InternalApi

private static final Logger LOG =
Logger.getLogger(TargetTracerInterceptor.class.toString());

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
ClientCall<ReqT,RespT> clientCall = channel.newCall(methodDescriptor,callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onHeaders(Metadata headers) {
// Connection accounting is non-critical, so we log the exception, but let normal
// processing proceed.
try {
SocketAddress remoteAddr =
clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
ApiTracer bigtableTracer = callOptions.getOption(GrpcCallContext.TRACER_KEY);
if(bigtableTracer instanceof BuiltinMetricsTracer) {
((BuiltinMetricsTracer)bigtableTracer).addTarget(String.valueOf(remoteAddr));
}
} catch (Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java java interrupted exceptions require special handling

LOG.log(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you really want to log for every RPC?

Level.WARNING, "Unexpected error while updating target label", t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

customers will be the ones to see this message and the will have no idea what it means. PLease udpate the message so someone else can understand what it means, what its implications are

}
super.onHeaders(headers);
}
},
headers);
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class Util {
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");
static final Metadata.Key<byte[]> LOCATION_METADATA_KEY =
Metadata.Key.of("x-goog-ext-425905942-bin", Metadata.BINARY_BYTE_MARSHALLER);

/** Convert an exception into a value that can be used to create an OpenCensus tag value. */
static String extractStatus(@Nullable Throwable error) {
final String statusString;
Expand Down Expand Up @@ -209,6 +209,7 @@ static void recordMetricsFromMetadata(
if (responseParams != null && latency == null) {
latency = 0L;
}

// Record gfe metrics
tracer.recordGfeMetadata(latency, throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class FakeServiceBuilder {
private final List<BindableService> services = new ArrayList<>();
private final List<ServerTransportFilter> transportFilters = new ArrayList<>();

private int serverPort = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can already get this from the returned server?

public static FakeServiceBuilder create(BindableService... services) {
return new FakeServiceBuilder(services);
}
Expand All @@ -56,6 +57,9 @@ public FakeServiceBuilder addTransportFilter(ServerTransportFilter transportFilt
return this;
}

public int getServerPort() {
return serverPort;
}
public Server start() throws IOException {
IOException lastError = null;

Expand All @@ -79,6 +83,7 @@ private Server startWithoutRetries() throws IOException {
port = ss.getLocalPort();
}
ServerBuilder<?> builder = ServerBuilder.forPort(port);
serverPort = port;
interceptors.forEach(builder::intercept);
services.forEach(builder::addService);
transportFilters.forEach(builder::addTransportFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public static void verifyAttributes(MetricData metricData, Attributes attributes
metricData.getHistogramData().getPoints().stream()
.filter(pd -> pd.getAttributes().equals(attributes))
.collect(Collectors.toList());

assertThat(hd).isNotEmpty();
break;
case LONG_SUM:
Expand Down
Loading
Loading