From 8a81d848998b7c6971df8212f5b42bf9c47913a5 Mon Sep 17 00:00:00 2001 From: Zaba505 Date: Fri, 15 Dec 2023 01:50:16 -0500 Subject: [PATCH 1/2] feat(issue-8): support readiness metric for each registered service --- grpc/grpc.go | 134 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 109 insertions(+), 25 deletions(-) diff --git a/grpc/grpc.go b/grpc/grpc.go index d765864..bf81ec9 100644 --- a/grpc/grpc.go +++ b/grpc/grpc.go @@ -11,20 +11,28 @@ import ( "fmt" "log/slog" "net" - "sync/atomic" + "time" + "github.com/z5labs/app/pkg/health" "github.com/z5labs/app/pkg/noop" "github.com/z5labs/app/pkg/slogfield" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + grpchealth "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" ) +type service struct { + registerFunc func(*grpc.Server) + opts serviceOptions +} + type runtimeOptions struct { - port uint - logHandler slog.Handler - registerFuncs []func(*grpc.Server) + port uint + logHandler slog.Handler + services []service } // RuntimeOption @@ -46,13 +54,54 @@ func LogHandler(h slog.Handler) RuntimeOption { } } -// Register a gRPC service with the underlying gRPC server. -func Register(f func(*grpc.Server)) RuntimeOption { +type serviceOptions struct { + name string + readiness *health.Readiness +} + +// ServiceOption +type ServiceOption func(*serviceOptions) + +// ServiceName +func ServiceName(name string) ServiceOption { + return func(so *serviceOptions) { + so.name = name + } +} + +// Readiness +func Readiness(readiness *health.Readiness) ServiceOption { + return func(so *serviceOptions) { + so.readiness = readiness + } +} + +// Service registers a gRPC service with the underlying gRPC server. +func Service(f func(*grpc.Server), opts ...ServiceOption) RuntimeOption { return func(ro *runtimeOptions) { - ro.registerFuncs = append(ro.registerFuncs, f) + so := serviceOptions{ + readiness: &health.Readiness{}, + } + for _, opt := range opts { + opt(&so) + } + ro.services = append(ro.services, service{ + registerFunc: f, + opts: so, + }) } } +type serviceHealthMonitor struct { + name string + readiness *health.Readiness +} + +type grpcServer interface { + Serve(net.Listener) error + GracefulStop() +} + // Runtime type Runtime struct { port uint @@ -60,11 +109,10 @@ type Runtime struct { log *slog.Logger - started atomic.Bool - healthy atomic.Bool - serving atomic.Bool + serviceHealthMonitors []serviceHealthMonitor - grpc *grpc.Server + grpc grpcServer + health *grpchealth.Server } // NewRuntime @@ -77,16 +125,26 @@ func NewRuntime(opts ...RuntimeOption) *Runtime { opt(ro) } + var healthMonitors []serviceHealthMonitor s := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) - for _, f := range ro.registerFuncs { - f(s) + for _, svc := range ro.services { + svc.registerFunc(s) + healthMonitors = append(healthMonitors, serviceHealthMonitor{ + name: svc.opts.name, + readiness: svc.opts.readiness, + }) } + healthServer := grpchealth.NewServer() + grpc_health_v1.RegisterHealthServer(s, healthServer) + rt := &Runtime{ - port: ro.port, - listen: net.Listen, - log: slog.New(ro.logHandler), - grpc: s, + port: ro.port, + listen: net.Listen, + log: slog.New(ro.logHandler), + serviceHealthMonitors: healthMonitors, + grpc: s, + health: healthServer, } return rt } @@ -95,24 +153,43 @@ func NewRuntime(opts ...RuntimeOption) *Runtime { func (rt *Runtime) Run(ctx context.Context) error { ls, err := rt.listen("tcp", fmt.Sprintf(":%d", rt.port)) if err != nil { - rt.log.Error("failed to listen for connections", slogfield.Error(err)) + rt.log.ErrorContext(ctx, "failed to listen for connections", slogfield.Error(err)) return err } g, gctx := errgroup.WithContext(ctx) + for _, monitor := range rt.serviceHealthMonitors { + monitor := monitor + g.Go(func() error { + healthy := true + monitor.readiness.Ready() + rt.health.SetServingStatus(monitor.name, grpc_health_v1.HealthCheckResponse_SERVING) + for { + select { + case <-gctx.Done(): + return nil + case <-time.After(200 * time.Millisecond): + } + + isHealthy := monitor.readiness.Healthy(gctx) + if healthy && isHealthy { + continue + } + healthy = isHealthy + rt.health.SetServingStatus(monitor.name, getServingStatus(isHealthy)) + } + }) + } g.Go(func() error { <-gctx.Done() - rt.log.Info("shutting down service") + rt.log.InfoContext(gctx, "shutting down service") rt.grpc.GracefulStop() - rt.log.Info("shut down service") + rt.log.InfoContext(gctx, "shut down service") return nil }) g.Go(func() error { - rt.started.Store(true) - rt.healthy.Store(true) - rt.serving.Store(true) - rt.log.Info("started service") + rt.log.InfoContext(gctx, "started service") return rt.grpc.Serve(ls) }) @@ -120,6 +197,13 @@ func (rt *Runtime) Run(ctx context.Context) error { if err == grpc.ErrServerStopped { return nil } - rt.log.Error("service encountered unexpected error", slogfield.Error(err)) + rt.log.ErrorContext(gctx, "service encountered unexpected error", slogfield.Error(err)) return err } + +func getServingStatus(healthy bool) grpc_health_v1.HealthCheckResponse_ServingStatus { + if healthy { + return grpc_health_v1.HealthCheckResponse_SERVING + } + return grpc_health_v1.HealthCheckResponse_NOT_SERVING +} From 868d6d2432acdeb4f0a2323b83b6e28e95565b5d Mon Sep 17 00:00:00 2001 From: Zaba505 Date: Fri, 15 Dec 2023 01:51:08 -0500 Subject: [PATCH 2/2] test(issue-8): improve test coverage --- grpc/grpc_test.go | 335 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 grpc/grpc_test.go diff --git a/grpc/grpc_test.go b/grpc/grpc_test.go new file mode 100644 index 0000000..4fc2924 --- /dev/null +++ b/grpc/grpc_test.go @@ -0,0 +1,335 @@ +// Copyright (c) 2023 Z5Labs and Contributors +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT + +package grpc + +import ( + "context" + "errors" + "log/slog" + "net" + "testing" + "time" + + "github.com/z5labs/app/pkg/health" + "github.com/z5labs/app/pkg/noop" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" +) + +type grpcServerFunc func(net.Listener) error + +func (f grpcServerFunc) Serve(ls net.Listener) error { + return f(ls) +} +func (f grpcServerFunc) GracefulStop() {} + +func TestRuntime_Run(t *testing.T) { + t.Run("will return an error", func(t *testing.T) { + t.Run("if it fails to construct a listener", func(t *testing.T) { + listenErr := errors.New("failed to listen") + rt := &Runtime{ + log: slog.New(noop.LogHandler{}), + listen: func(s1, s2 string) (net.Listener, error) { + return nil, listenErr + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := rt.Run(ctx) + if !assert.Equal(t, listenErr, err) { + return + } + }) + + t.Run("if the grpc server fails to serve", func(t *testing.T) { + rt := NewRuntime(ListenOnPort(0), LogHandler(noop.LogHandler{})) + + serveErr := errors.New("failed to serve") + rt.grpc = grpcServerFunc(func(l net.Listener) error { + return serveErr + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := rt.Run(ctx) + if !assert.Equal(t, serveErr, err) { + return + } + }) + }) + + t.Run("will not return an error", func(t *testing.T) { + t.Run("if the grpc server is gracefully shutdown", func(t *testing.T) { + rt := NewRuntime( + ListenOnPort(0), + LogHandler(noop.LogHandler{}), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := rt.Run(ctx) + if !assert.Nil(t, err) { + return + } + }) + }) +} + +func TestReadiness(t *testing.T) { + t.Run("will return serving", func(t *testing.T) { + t.Run("if the server has just been started", func(t *testing.T) { + rt := NewRuntime( + LogHandler(noop.LogHandler{}), + Service( + func(s *grpc.Server) {}, + // No ServiceName is set so this corresponds to + // overall server health + Readiness(&health.Readiness{}), + ), + ) + addrCh := make(chan net.Addr) + rt.listen = func(s1, s2 string) (net.Listener, error) { + defer close(addrCh) + ls, err := net.Listen(s1, s2) + if err != nil { + return nil, err + } + addrCh <- ls.Addr() + return ls, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + return rt.Run(gctx) + }) + + statusCh := make(chan grpc_health_v1.HealthCheckResponse_ServingStatus) + g.Go(func() error { + defer close(statusCh) + + addr := <-addrCh + conn, err := grpc.Dial( + addr.String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + client := grpc_health_v1.NewHealthClient(conn) + resp, err := client.Check(gctx, &grpc_health_v1.HealthCheckRequest{ + Service: "", + }) + if err != nil { + return err + } + cancel() + statusCh <- resp.Status + return nil + }) + + status := <-statusCh + if !assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, status) { + return + } + }) + + t.Run("if the health metric is toggled from unhealthy to healthy", func(t *testing.T) { + var readiness health.Readiness + rt := NewRuntime( + LogHandler(noop.LogHandler{}), + Service( + func(s *grpc.Server) {}, + // No ServiceName is set so this corresponds to + // overall server health + Readiness(&readiness), + ), + ) + addrCh := make(chan net.Addr) + rt.listen = func(s1, s2 string) (net.Listener, error) { + defer close(addrCh) + ls, err := net.Listen(s1, s2) + if err != nil { + return nil, err + } + addrCh <- ls.Addr() + return ls, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + return rt.Run(gctx) + }) + + statusCh := make(chan grpc_health_v1.HealthCheckResponse_ServingStatus) + g.Go(func() error { + defer close(statusCh) + + addr := <-addrCh + conn, err := grpc.Dial( + addr.String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + readiness.NotReady() + readiness.Ready() + + client := grpc_health_v1.NewHealthClient(conn) + resp, err := client.Check(gctx, &grpc_health_v1.HealthCheckRequest{ + Service: "", + }) + if err != nil { + return err + } + cancel() + statusCh <- resp.Status + return nil + }) + + status := <-statusCh + if !assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, status) { + return + } + }) + + t.Run("if a specific service name is requested", func(t *testing.T) { + rt := NewRuntime( + LogHandler(noop.LogHandler{}), + Service( + func(s *grpc.Server) {}, + ServiceName("test"), + Readiness(&health.Readiness{}), + ), + ) + addrCh := make(chan net.Addr) + rt.listen = func(s1, s2 string) (net.Listener, error) { + defer close(addrCh) + ls, err := net.Listen(s1, s2) + if err != nil { + return nil, err + } + addrCh <- ls.Addr() + return ls, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + return rt.Run(gctx) + }) + + statusCh := make(chan grpc_health_v1.HealthCheckResponse_ServingStatus) + g.Go(func() error { + defer close(statusCh) + + addr := <-addrCh + conn, err := grpc.Dial( + addr.String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + client := grpc_health_v1.NewHealthClient(conn) + resp, err := client.Check(gctx, &grpc_health_v1.HealthCheckRequest{ + Service: "test", + }) + if err != nil { + return err + } + cancel() + statusCh <- resp.Status + return nil + }) + + status := <-statusCh + if !assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, status) { + return + } + }) + }) + + t.Run("will return not serving", func(t *testing.T) { + t.Run("if the health metric returns not healthy", func(t *testing.T) { + var readiness health.Readiness + rt := NewRuntime( + LogHandler(noop.LogHandler{}), + Service( + func(s *grpc.Server) {}, + ServiceName("test"), + Readiness(&readiness), + ), + ) + addrCh := make(chan net.Addr) + rt.listen = func(s1, s2 string) (net.Listener, error) { + defer close(addrCh) + ls, err := net.Listen(s1, s2) + if err != nil { + return nil, err + } + addrCh <- ls.Addr() + return ls, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + return rt.Run(gctx) + }) + + statusCh := make(chan grpc_health_v1.HealthCheckResponse_ServingStatus) + g.Go(func() error { + defer close(statusCh) + + addr := <-addrCh + conn, err := grpc.Dial( + addr.String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + readiness.NotReady() + <-time.After(200 * time.Millisecond) + + client := grpc_health_v1.NewHealthClient(conn) + resp, err := client.Check(gctx, &grpc_health_v1.HealthCheckRequest{ + Service: "test", + }) + if err != nil { + return err + } + cancel() + statusCh <- resp.Status + return nil + }) + + status := <-statusCh + if !assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, status) { + return + } + }) + }) +}