Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

story(grpc): register health check service #31

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 109 additions & 25 deletions grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@ import (
"fmt"
"log/slog"
"net"
"sync/atomic"
"time"

"github.com/z5labs/app/pkg/health"
"github.com/z5labs/app/pkg/noop"
"github.com/z5labs/app/pkg/slogfield"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
grpchealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)

type service struct {
registerFunc func(*grpc.Server)
opts serviceOptions
}

type runtimeOptions struct {
port uint
logHandler slog.Handler
registerFuncs []func(*grpc.Server)
port uint
logHandler slog.Handler
services []service
}

// RuntimeOption
Expand All @@ -46,25 +54,65 @@ func LogHandler(h slog.Handler) RuntimeOption {
}
}

// Register a gRPC service with the underlying gRPC server.
func Register(f func(*grpc.Server)) RuntimeOption {
type serviceOptions struct {
name string
readiness *health.Readiness
}

// ServiceOption
type ServiceOption func(*serviceOptions)

// ServiceName
func ServiceName(name string) ServiceOption {
return func(so *serviceOptions) {
so.name = name
}
}

// Readiness
func Readiness(readiness *health.Readiness) ServiceOption {
return func(so *serviceOptions) {
so.readiness = readiness
}
}

// Service registers a gRPC service with the underlying gRPC server.
func Service(f func(*grpc.Server), opts ...ServiceOption) RuntimeOption {
return func(ro *runtimeOptions) {
ro.registerFuncs = append(ro.registerFuncs, f)
so := serviceOptions{
readiness: &health.Readiness{},
}
for _, opt := range opts {
opt(&so)
}
ro.services = append(ro.services, service{
registerFunc: f,
opts: so,
})
}
}

type serviceHealthMonitor struct {
name string
readiness *health.Readiness
}

type grpcServer interface {
Serve(net.Listener) error
GracefulStop()
}

// Runtime
type Runtime struct {
port uint
listen func(string, string) (net.Listener, error)

log *slog.Logger

started atomic.Bool
healthy atomic.Bool
serving atomic.Bool
serviceHealthMonitors []serviceHealthMonitor

grpc *grpc.Server
grpc grpcServer
health *grpchealth.Server
}

// NewRuntime
Expand All @@ -77,16 +125,26 @@ func NewRuntime(opts ...RuntimeOption) *Runtime {
opt(ro)
}

var healthMonitors []serviceHealthMonitor
s := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
for _, f := range ro.registerFuncs {
f(s)
for _, svc := range ro.services {
svc.registerFunc(s)
healthMonitors = append(healthMonitors, serviceHealthMonitor{
name: svc.opts.name,
readiness: svc.opts.readiness,
})
}

healthServer := grpchealth.NewServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)

rt := &Runtime{
port: ro.port,
listen: net.Listen,
log: slog.New(ro.logHandler),
grpc: s,
port: ro.port,
listen: net.Listen,
log: slog.New(ro.logHandler),
serviceHealthMonitors: healthMonitors,
grpc: s,
health: healthServer,
}
return rt
}
Expand All @@ -95,31 +153,57 @@ func NewRuntime(opts ...RuntimeOption) *Runtime {
func (rt *Runtime) Run(ctx context.Context) error {
ls, err := rt.listen("tcp", fmt.Sprintf(":%d", rt.port))
if err != nil {
rt.log.Error("failed to listen for connections", slogfield.Error(err))
rt.log.ErrorContext(ctx, "failed to listen for connections", slogfield.Error(err))
return err
}

g, gctx := errgroup.WithContext(ctx)
for _, monitor := range rt.serviceHealthMonitors {
monitor := monitor
g.Go(func() error {
healthy := true
monitor.readiness.Ready()
rt.health.SetServingStatus(monitor.name, grpc_health_v1.HealthCheckResponse_SERVING)
for {
select {
case <-gctx.Done():
return nil
case <-time.After(200 * time.Millisecond):
}

isHealthy := monitor.readiness.Healthy(gctx)
if healthy && isHealthy {
continue
}
healthy = isHealthy
rt.health.SetServingStatus(monitor.name, getServingStatus(isHealthy))
}
})
}
g.Go(func() error {
<-gctx.Done()

rt.log.Info("shutting down service")
rt.log.InfoContext(gctx, "shutting down service")
rt.grpc.GracefulStop()
rt.log.Info("shut down service")
rt.log.InfoContext(gctx, "shut down service")
return nil
})
g.Go(func() error {
rt.started.Store(true)
rt.healthy.Store(true)
rt.serving.Store(true)
rt.log.Info("started service")
rt.log.InfoContext(gctx, "started service")
return rt.grpc.Serve(ls)
})

err = g.Wait()
if err == grpc.ErrServerStopped {
return nil
}
rt.log.Error("service encountered unexpected error", slogfield.Error(err))
rt.log.ErrorContext(gctx, "service encountered unexpected error", slogfield.Error(err))
return err
}

func getServingStatus(healthy bool) grpc_health_v1.HealthCheckResponse_ServingStatus {
if healthy {
return grpc_health_v1.HealthCheckResponse_SERVING
}
return grpc_health_v1.HealthCheckResponse_NOT_SERVING
}
Loading
Loading