Skip to content

Commit

Permalink
story(grpc): register health check service (#31)
Browse files Browse the repository at this point in the history
* feat(issue-8): support readiness metric for each registered service

* test(issue-8): improve test coverage
  • Loading branch information
Zaba505 authored Dec 15, 2023
1 parent 5849c1e commit 88b913e
Show file tree
Hide file tree
Showing 2 changed files with 444 additions and 25 deletions.
134 changes: 109 additions & 25 deletions grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@ import (
"fmt"
"log/slog"
"net"
"sync/atomic"
"time"

"github.com/z5labs/app/pkg/health"
"github.com/z5labs/app/pkg/noop"
"github.com/z5labs/app/pkg/slogfield"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
grpchealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)

type service struct {
registerFunc func(*grpc.Server)
opts serviceOptions
}

type runtimeOptions struct {
port uint
logHandler slog.Handler
registerFuncs []func(*grpc.Server)
port uint
logHandler slog.Handler
services []service
}

// RuntimeOption
Expand All @@ -46,25 +54,65 @@ func LogHandler(h slog.Handler) RuntimeOption {
}
}

// Register a gRPC service with the underlying gRPC server.
func Register(f func(*grpc.Server)) RuntimeOption {
type serviceOptions struct {
name string
readiness *health.Readiness
}

// ServiceOption
type ServiceOption func(*serviceOptions)

// ServiceName
func ServiceName(name string) ServiceOption {
return func(so *serviceOptions) {
so.name = name
}
}

// Readiness
func Readiness(readiness *health.Readiness) ServiceOption {
return func(so *serviceOptions) {
so.readiness = readiness
}
}

// Service registers a gRPC service with the underlying gRPC server.
func Service(f func(*grpc.Server), opts ...ServiceOption) RuntimeOption {
return func(ro *runtimeOptions) {
ro.registerFuncs = append(ro.registerFuncs, f)
so := serviceOptions{
readiness: &health.Readiness{},
}
for _, opt := range opts {
opt(&so)
}
ro.services = append(ro.services, service{
registerFunc: f,
opts: so,
})
}
}

type serviceHealthMonitor struct {
name string
readiness *health.Readiness
}

type grpcServer interface {
Serve(net.Listener) error
GracefulStop()
}

// Runtime
type Runtime struct {
port uint
listen func(string, string) (net.Listener, error)

log *slog.Logger

started atomic.Bool
healthy atomic.Bool
serving atomic.Bool
serviceHealthMonitors []serviceHealthMonitor

grpc *grpc.Server
grpc grpcServer
health *grpchealth.Server
}

// NewRuntime
Expand All @@ -77,16 +125,26 @@ func NewRuntime(opts ...RuntimeOption) *Runtime {
opt(ro)
}

var healthMonitors []serviceHealthMonitor
s := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
for _, f := range ro.registerFuncs {
f(s)
for _, svc := range ro.services {
svc.registerFunc(s)
healthMonitors = append(healthMonitors, serviceHealthMonitor{
name: svc.opts.name,
readiness: svc.opts.readiness,
})
}

healthServer := grpchealth.NewServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)

rt := &Runtime{
port: ro.port,
listen: net.Listen,
log: slog.New(ro.logHandler),
grpc: s,
port: ro.port,
listen: net.Listen,
log: slog.New(ro.logHandler),
serviceHealthMonitors: healthMonitors,
grpc: s,
health: healthServer,
}
return rt
}
Expand All @@ -95,31 +153,57 @@ func NewRuntime(opts ...RuntimeOption) *Runtime {
func (rt *Runtime) Run(ctx context.Context) error {
ls, err := rt.listen("tcp", fmt.Sprintf(":%d", rt.port))
if err != nil {
rt.log.Error("failed to listen for connections", slogfield.Error(err))
rt.log.ErrorContext(ctx, "failed to listen for connections", slogfield.Error(err))
return err
}

g, gctx := errgroup.WithContext(ctx)
for _, monitor := range rt.serviceHealthMonitors {
monitor := monitor
g.Go(func() error {
healthy := true
monitor.readiness.Ready()
rt.health.SetServingStatus(monitor.name, grpc_health_v1.HealthCheckResponse_SERVING)
for {
select {
case <-gctx.Done():
return nil
case <-time.After(200 * time.Millisecond):
}

isHealthy := monitor.readiness.Healthy(gctx)
if healthy && isHealthy {
continue
}
healthy = isHealthy
rt.health.SetServingStatus(monitor.name, getServingStatus(isHealthy))
}
})
}
g.Go(func() error {
<-gctx.Done()

rt.log.Info("shutting down service")
rt.log.InfoContext(gctx, "shutting down service")
rt.grpc.GracefulStop()
rt.log.Info("shut down service")
rt.log.InfoContext(gctx, "shut down service")
return nil
})
g.Go(func() error {
rt.started.Store(true)
rt.healthy.Store(true)
rt.serving.Store(true)
rt.log.Info("started service")
rt.log.InfoContext(gctx, "started service")
return rt.grpc.Serve(ls)
})

err = g.Wait()
if err == grpc.ErrServerStopped {
return nil
}
rt.log.Error("service encountered unexpected error", slogfield.Error(err))
rt.log.ErrorContext(gctx, "service encountered unexpected error", slogfield.Error(err))
return err
}

func getServingStatus(healthy bool) grpc_health_v1.HealthCheckResponse_ServingStatus {
if healthy {
return grpc_health_v1.HealthCheckResponse_SERVING
}
return grpc_health_v1.HealthCheckResponse_NOT_SERVING
}
Loading

0 comments on commit 88b913e

Please sign in to comment.