Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #109

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
76 changes: 36 additions & 40 deletions cmd/internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"log/slog"
"time"

v1 "github.com/metal-stack/metal-api/pkg/api/v1"
"github.com/metal-stack/metal-core/cmd/internal/metrics"
Expand All @@ -14,17 +13,16 @@ type Core struct {
log *slog.Logger
logLevel string

cidr string
loopbackIP string
asn string
partitionID string
rackID string
enableReconfigureSwitch bool
reconfigureSwitchInterval time.Duration
managementGateway string
additionalBridgePorts []string
additionalBridgeVIDs []string
spineUplinks []string
cidr string
loopbackIP string
asn string
partitionID string
rackID string
enableReconfigureSwitch bool
managementGateway string
additionalBridgePorts []string
additionalBridgeVIDs []string
spineUplinks []string

nos switcher.NOS

Expand All @@ -38,17 +36,16 @@ type Config struct {
Log *slog.Logger
LogLevel string

CIDR string
LoopbackIP string
ASN string
PartitionID string
RackID string
ReconfigureSwitch bool
ReconfigureSwitchInterval time.Duration
ManagementGateway string
AdditionalBridgePorts []string
AdditionalBridgeVIDs []string
SpineUplinks []string
CIDR string
LoopbackIP string
ASN string
PartitionID string
RackID string
ReconfigureSwitch bool
ManagementGateway string
AdditionalBridgePorts []string
AdditionalBridgeVIDs []string
SpineUplinks []string

NOS switcher.NOS

Expand All @@ -60,22 +57,21 @@ type Config struct {

func New(c Config) *Core {
return &Core{
log: c.Log,
logLevel: c.LogLevel,
cidr: c.CIDR,
loopbackIP: c.LoopbackIP,
asn: c.ASN,
partitionID: c.PartitionID,
rackID: c.RackID,
enableReconfigureSwitch: c.ReconfigureSwitch,
reconfigureSwitchInterval: c.ReconfigureSwitchInterval,
managementGateway: c.ManagementGateway,
additionalBridgePorts: c.AdditionalBridgePorts,
additionalBridgeVIDs: c.AdditionalBridgeVIDs,
spineUplinks: c.SpineUplinks,
nos: c.NOS,
driver: c.Driver,
eventServiceClient: c.EventServiceClient,
metrics: c.Metrics,
log: c.Log,
logLevel: c.LogLevel,
cidr: c.CIDR,
loopbackIP: c.LoopbackIP,
asn: c.ASN,
partitionID: c.PartitionID,
rackID: c.RackID,
enableReconfigureSwitch: c.ReconfigureSwitch,
managementGateway: c.ManagementGateway,
additionalBridgePorts: c.AdditionalBridgePorts,
additionalBridgeVIDs: c.AdditionalBridgeVIDs,
spineUplinks: c.SpineUplinks,
nos: c.NOS,
driver: c.Driver,
eventServiceClient: c.EventServiceClient,
metrics: c.Metrics,
}
}
45 changes: 21 additions & 24 deletions cmd/internal/core/phone-home.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ import (
)

const (
phonedHomeInterval = time.Minute // lldpd sends messages every two seconds
provisioningEventPhonedHome = "Phoned Home"
)

// ConstantlyPhoneHome sends every minute a single phone-home
// provisioning event to metal-api for each machine that sent at least one
// phone-home LLDP package to any interface of the host machine
// during this interval.
func (c *Core) ConstantlyPhoneHome() {
func (c *Core) ConstantlyPhoneHome(ctx context.Context, interval time.Duration) {
// FIXME this list of interfaces is only read on startup
// if additional interfaces are configured, no new lldpd client is started and therefore no
// phoned home events are sent for these interfaces.
Expand All @@ -41,9 +40,6 @@ func (c *Core) ConstantlyPhoneHome() {
discoveryResultChan := make(chan lldp.DiscoveryResult)
discoveryResultChanWG := sync.WaitGroup{}

// FIXME context should come from caller and canceled on shutdown
ctx := context.Background()

var phoneHomeMessages sync.Map
for _, iface := range ifs {
lldpcli := lldp.NewClient(ctx, *iface)
Expand Down Expand Up @@ -81,27 +77,28 @@ func (c *Core) ConstantlyPhoneHome() {
}()

// send arrived messages on a ticker basis
ticker := time.NewTicker(phonedHomeInterval)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
msgs := []phoneHomeMessage{}
phoneHomeMessages.Range(func(key, value interface{}) bool {
msg, ok := value.(phoneHomeMessage)
if !ok {
return true
}
phoneHomeMessages.Delete(key)
msgs = append(msgs, msg)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
msgs := []phoneHomeMessage{}
phoneHomeMessages.Range(func(key, value interface{}) bool {
msg, ok := value.(phoneHomeMessage)
if !ok {
return true
})
c.phoneHome(ctx, msgs)
}
}
phoneHomeMessages.Delete(key)
msgs = append(msgs, msg)
return true
})
c.phoneHome(ctx, msgs)
case <-ctx.Done():
// wait until all lldp routines to finish
discoveryResultChanWG.Wait()
return
}
}()
}
}

func (c *Core) send(ctx context.Context, event *v1.EventServiceSendRequest) (*v1.EventServiceSendResponse, error) {
Expand Down
66 changes: 37 additions & 29 deletions cmd/internal/core/reconfigure-switch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"context"
"fmt"
"os"
"slices"
Expand All @@ -16,37 +17,44 @@ import (
"github.com/metal-stack/metal-go/api/models"
)

// ReconfigureSwitch reconfigures the switch.
func (c *Core) ReconfigureSwitch() {
t := time.NewTicker(c.reconfigureSwitchInterval)
// ConstantlyReconfigureSwitch reconfigures the switch.
func (c *Core) ConstantlyReconfigureSwitch(ctx context.Context, interval time.Duration) {
host, _ := os.Hostname()
for range t.C {
c.log.Info("trigger reconfiguration")
start := time.Now()
err := c.reconfigureSwitch(host)
elapsed := time.Since(start)
c.log.Info("reconfiguration took", "elapsed", elapsed)

params := sw.NewNotifySwitchParams()
params.ID = host
ns := elapsed.Nanoseconds()
nr := &models.V1SwitchNotifyRequest{
SyncDuration: &ns,
}
if err != nil {
errStr := err.Error()
nr.Error = &errStr
c.log.Error("reconfiguration failed", "error", err)
c.metrics.CountError("switch-reconfiguration")
} else {
c.log.Info("reconfiguration succeeded")
}

params.Body = nr
_, err = c.driver.SwitchOperations().NotifySwitch(params, nil)
if err != nil {
c.log.Error("notification about switch reconfiguration failed", "error", err)
c.metrics.CountError("reconfiguration-notification")
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.log.Info("trigger reconfiguration")
start := time.Now()
err := c.reconfigureSwitch(host)
elapsed := time.Since(start)
c.log.Info("reconfiguration took", "elapsed", elapsed)

params := sw.NewNotifySwitchParams()
params.ID = host
ns := elapsed.Nanoseconds()
nr := &models.V1SwitchNotifyRequest{
SyncDuration: &ns,
}
if err != nil {
errStr := err.Error()
nr.Error = &errStr
c.log.Error("reconfiguration failed", "error", err)
c.metrics.CountError("switch-reconfiguration")
} else {
c.log.Info("reconfiguration succeeded")
}

params.Body = nr
_, err = c.driver.SwitchOperations().NotifySwitch(params, nil)
if err != nil {
c.log.Error("notification about switch reconfiguration failed", "error", err)
c.metrics.CountError("reconfiguration-notification")
}
case <-ctx.Done():
return
}
}
}
Expand Down
82 changes: 58 additions & 24 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
package cmd

import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
httppprof "net/http/pprof"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/kelseyhightower/envconfig"
Expand All @@ -22,6 +27,8 @@ import (
"github.com/metal-stack/v"
)

const phonedHomeInterval = time.Minute // lldpd sends messages every two seconds

func Run() {
cfg := &Config{}
if err := envconfig.Process("METAL_CORE", cfg); err != nil {
Expand Down Expand Up @@ -84,23 +91,22 @@ func Run() {
metrics := metrics.New()

c := core.New(core.Config{
Log: log,
LogLevel: cfg.LogLevel,
CIDR: cfg.CIDR,
LoopbackIP: cfg.LoopbackIP,
ASN: cfg.ASN,
PartitionID: cfg.PartitionID,
RackID: cfg.RackID,
ReconfigureSwitch: cfg.ReconfigureSwitch,
ReconfigureSwitchInterval: cfg.ReconfigureSwitchInterval,
ManagementGateway: cfg.ManagementGateway,
AdditionalBridgePorts: cfg.AdditionalBridgePorts,
AdditionalBridgeVIDs: cfg.AdditionalBridgeVIDs,
SpineUplinks: cfg.SpineUplinks,
NOS: nos,
Driver: driver,
EventServiceClient: grpcClient.NewEventClient(),
Metrics: metrics,
Log: log,
LogLevel: cfg.LogLevel,
CIDR: cfg.CIDR,
LoopbackIP: cfg.LoopbackIP,
ASN: cfg.ASN,
PartitionID: cfg.PartitionID,
RackID: cfg.RackID,
ReconfigureSwitch: cfg.ReconfigureSwitch,
ManagementGateway: cfg.ManagementGateway,
AdditionalBridgePorts: cfg.AdditionalBridgePorts,
AdditionalBridgeVIDs: cfg.AdditionalBridgeVIDs,
SpineUplinks: cfg.SpineUplinks,
NOS: nos,
Driver: driver,
EventServiceClient: grpcClient.NewEventClient(),
Metrics: metrics,
})

err = c.RegisterSwitch()
Expand All @@ -109,8 +115,21 @@ func Run() {
os.Exit(1)
}

go c.ReconfigureSwitch()
c.ConstantlyPhoneHome()
var wg sync.WaitGroup

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

wg.Add(1)
go func() {
defer wg.Done()
c.ConstantlyReconfigureSwitch(ctx, cfg.ReconfigureSwitchInterval)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.ConstantlyPhoneHome(ctx, phonedHomeInterval)
}()

// Start metrics
metricsAddr := fmt.Sprintf("%v:%d", cfg.MetricsServerBindAddress, cfg.MetricsServerPort)
Expand All @@ -132,9 +151,24 @@ func Run() {
ReadHeaderTimeout: 3 * time.Second,
}

err = srv.ListenAndServe()
if err != nil {
log.Error("unable to start metrics listener", "error", err)
os.Exit(1)
}
wg.Add(1)
go func() {
defer wg.Done()
if err = srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Error("unable to start metrics listener", "error", err)
os.Exit(1)
}
}()

<-ctx.Done()

wg.Add(1)
go func() {
defer wg.Done()
if err = srv.Shutdown(context.Background()); err != nil {
log.Error("unable to shutdown metrics listener", "error", err)
}
}()

wg.Wait()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/go-redis/redismock/v9 v9.0.3
github.com/google/go-cmp v0.5.9
github.com/kelseyhightower/envconfig v1.4.0
github.com/metal-stack/go-lldpd v0.4.5
github.com/metal-stack/go-lldpd v0.4.6-0.20230926115634-6145201c345e
github.com/metal-stack/metal-api v0.23.2
github.com/metal-stack/metal-go v0.23.2
github.com/metal-stack/v v1.0.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ github.com/mdlayher/packet v1.0.0/go.mod h1:eE7/ctqDhoiRhQ44ko5JZU2zxB88g+JH/6jm
github.com/mdlayher/socket v0.2.1/go.mod h1:QLlNPkFR88mRUNQIzRBMfXxwKal8H7u1h3bL1CV+f0E=
github.com/metal-stack/go-lldpd v0.4.5 h1:VeWsPN6wSWMslnuFtF1ebvS4fCk6VPZmqracsoXqic8=
github.com/metal-stack/go-lldpd v0.4.5/go.mod h1:aCVI7lCka0uRo6dkrh2ET97WBpN7M5yZqK8NX0vGpho=
github.com/metal-stack/go-lldpd v0.4.6-0.20230926115634-6145201c345e h1:NCrPNPLbj8yihfaGiro41NgJgUsmmXlYT7ehk8DalGg=
github.com/metal-stack/go-lldpd v0.4.6-0.20230926115634-6145201c345e/go.mod h1:aCVI7lCka0uRo6dkrh2ET97WBpN7M5yZqK8NX0vGpho=
github.com/metal-stack/metal-api v0.23.2 h1:5l5m4jszrLURotjgmGoJywMaHZ4oDIyaK8ks5p45+/w=
github.com/metal-stack/metal-api v0.23.2/go.mod h1:1404/Y2/x/98I6DTGKYS34J8awFmmU/zPdCRuezLEdE=
github.com/metal-stack/metal-go v0.23.2 h1:/1wdBWXedVG2oCPzsjaRl7aqEIWep7MMmogYegoFyjs=
Expand Down
Loading