Skip to content

Commit

Permalink
Add pick first metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Nov 13, 2024
1 parent 0553bc3 commit 980497f
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 9 deletions.
318 changes: 318 additions & 0 deletions balancer/pickfirst/pickfirstleaf/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package pickfirstleaf_test

import (
"context"
"fmt"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/stats"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats/opentelemetry"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

// TestPickFirstMetrics tests pick first metrics. It configures a pick first
// balancer, causes it to connect and then disconnect, and expects the
// subsequent metrics to emit from that.
func (s) TestPickFirstMetrics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
ss.StartServer()
defer ss.Stop()

lbCfgJSON := `{
"loadBalancingConfig": [
{
"pick_first_leaf": {
}
}
]
}`

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)

r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: ss.Address}}},
)

grpcTarget := r.Scheme() + ":///"
tmr := stats.NewTestMetricsRecorder()
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 0)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

ss.Stop()
if err = pollForDisconnectedMetrics(ctx, tmr); err != nil {
t.Fatal(err)
}
}

func pollForDisconnectedMetrics(ctx context.Context, tmr *stats.TestMetricsRecorder) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got == 1 {
return nil
}
}
return fmt.Errorf("error waiting for grpc.lb.pick_first.disconnections metric: %v", ctx.Err())
}

// TestPickFirstMetricsFailure tests the connection attempts failed metric. It
// configures a channel and scenario that causes a pick first connection attempt
// to fail, and then expects that metric to emit.
func (s) TestPickFirstMetricsFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

lbCfgJSON := `{
"loadBalancingConfig": [
{
"pick_first_leaf": {
}
}
]
}`

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)

r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: "bad address"}}},
)
grpcTarget := r.Scheme() + ":///"
tmr := stats.NewTestMetricsRecorder()
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatalf("EmptyCall() passed when expected to fail")
}

if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 0)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}
}

// TestPickFirstMetricsE2E tests the pick first metrics end to end. It
// configures a channel with an OpenTelemetry plugin, induces all 3 pick first
// metrics to emit, and makes sure the correct OpenTelemetry metrics atoms emit.
func (s) TestPickFirstMetricsE2E(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
ss.StartServer()
defer ss.Stop()

lbCfgJSON := `{
"loadBalancingConfig": [
{
"pick_first_leaf": {
}
}
]
}`

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: "bad address"}}},
) // Will trigger connection failed.

grpcTarget := r.Scheme() + ":///"
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.pick_first.disconnections", "grpc.lb.pick_first.connection_attempts_succeeded", "grpc.lb.pick_first.connection_attempts_failed"),
}

cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatalf("EmptyCall() passed when expected to fail")
}

r.UpdateState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: ss.Address}}}) // Will trigger successful connection metric.
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

// Stop the server, that should send signal to disconnect, which will
// eventually emit disconnection metric.
ss.Stop()
wantMetrics := []metricdata.Metrics{
{
Name: "grpc.lb.pick_first.connection_attempts_succeeded",
Description: "EXPERIMENTAL. Number of successful connection attempts.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.lb.pick_first.connection_attempts_failed",
Description: "EXPERIMENTAL. Number of failed connection attempts.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
}

gotMetrics := metricsDataFromReader(ctx, reader)
for _, metric := range wantMetrics {
val, ok := gotMetrics[metric.Name]
if !ok {
t.Fatalf("Metric %v not present in recorded metrics", metric.Name)
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
t.Fatalf("Metrics data type not equal for metric: %v", metric.Name)
}
}
// Disconnections metric will show up eventually, as asynchronous from
// server stopping.
wantMetrics = []metricdata.Metrics{
{
Name: "grpc.lb.pick_first.disconnections",
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
Unit: "disconnection",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
}
if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil {
t.Fatal(err)
}
}

func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map[string]metricdata.Metrics {
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
return gotMetrics
}

func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.ManualReader, wantMetrics []metricdata.Metrics) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
gotMetrics := metricsDataFromReader(ctx, reader)
for _, metric := range wantMetrics {
val, ok := gotMetrics[metric.Name]
if !ok {
break
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
return fmt.Errorf("metrics data type not equal for metric: %v", metric.Name)
}
return nil
}
time.Sleep(5 * time.Millisecond)
}

return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err())
}
Loading

0 comments on commit 980497f

Please sign in to comment.