Skip to content

Commit

Permalink
chore(kuma-cp): have readiness probe be dependent of server health (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lahabana authored Mar 20, 2024
1 parent 1a25a16 commit 604610b
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 193 deletions.
2 changes: 1 addition & 1 deletion app/kuma-cp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newRunCmdWithOpts(opts kuma_cmd.RunCmdOpts) *cobra.Command {

if limit, _ := os.CurrentFileLimit(); limit < minOpenFileLimit {
runLog.Info("for better performance, raise the open file limit",
"minimim-open-files", minOpenFileLimit)
"minimum-open-files", minOpenFileLimit)
}

if err := mads_server.SetupServer(rt); err != nil {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ require (
github.com/sethvargo/go-retry v0.2.4
github.com/shopspring/decimal v1.3.1
github.com/slok/go-http-metrics v0.11.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.8.0
github.com/spiffe/go-spiffe/v2 v2.1.7
github.com/testcontainers/testcontainers-go v0.29.1
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/slok/go-http-metrics v0.11.0 h1:ABJUpekCZSkQT1wQrFvS4kGbhea/w6ndFJaWJeh3zL0=
github.com/slok/go-http-metrics v0.11.0/go.mod h1:ZGKeYG1ET6TEJpQx18BqAJAvxw9jBAZXCHU7bWQqqAc=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
Expand Down Expand Up @@ -549,7 +547,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
Expand Down
15 changes: 6 additions & 9 deletions pkg/api-server/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,14 @@ var _ = Describe("Auth test", func() {
Expect(http2.ConfigureMTLS(httpsClientWithoutCerts, certPath, "", "")).To(Succeed())

// wait for both http and https server
Eventually(func() bool {
Eventually(func(g Gomega) {
resp, err := httpsClient.Get(fmt.Sprintf("https://localhost:%d/secrets", httpsPort))
if err != nil || resp.StatusCode != 200 {
return false
}
g.Expect(err).ToNot(HaveOccurred())
g.Expect(resp).To(HaveHTTPStatus(200))
resp, err = http.Get(fmt.Sprintf("http://localhost:%d/secrets", httpPort))
if err != nil || resp.StatusCode != 200 {
return false
}
return true
}, "5s", "100ms").Should(BeTrue())
g.Expect(err).ToNot(HaveOccurred())
g.Expect(resp).To(HaveHTTPStatus(200))
}, "5s", "100ms").Should(Succeed())
})

AfterEach(func() {
Expand Down
120 changes: 50 additions & 70 deletions pkg/api-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/bakito/go-log-logr-adapter/adapter"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
secrets_k8s "github.com/kumahq/kuma/pkg/plugins/secrets/k8s"
"github.com/kumahq/kuma/pkg/tokens/builtin"
tokens_server "github.com/kumahq/kuma/pkg/tokens/builtin/server"
kuma_srv "github.com/kumahq/kuma/pkg/util/http/server"
util_prometheus "github.com/kumahq/kuma/pkg/util/prometheus"
"github.com/kumahq/kuma/pkg/version"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
Expand All @@ -58,8 +60,10 @@ import (
var log = core.Log.WithName("api-server")

type ApiServer struct {
mux *http.ServeMux
config api_server.ApiServerConfig
mux *http.ServeMux
config api_server.ApiServerConfig
httpReady atomic.Bool
httpsReady atomic.Bool
}

func (a *ApiServer) NeedLeaderElection() bool {
Expand Down Expand Up @@ -326,23 +330,50 @@ func ShouldBeReadOnly(kdsFlag model.KDSFlagType, cfg *kuma_cp.Config) bool {
return false
}

func (a *ApiServer) Ready() bool {
return a.httpReady.Load() && a.httpsReady.Load()
}

func (a *ApiServer) Start(stop <-chan struct{}) error {
errChan := make(chan error)

var httpServer, httpsServer *http.Server
if a.config.HTTP.Enabled {
httpServer = a.startHttpServer(errChan)
httpServer = &http.Server{
ReadHeaderTimeout: time.Second,
Addr: net.JoinHostPort(a.config.HTTP.Interface, strconv.FormatUint(uint64(a.config.HTTP.Port), 10)),
Handler: a.mux,
ErrorLog: adapter.ToStd(log),
}
if err := kuma_srv.StartServer(log, httpServer, &a.httpReady, errChan); err != nil {
return err
}
} else {
a.httpReady.Store(true)
}
if a.config.HTTPS.Enabled {
var err error
httpsServer, err = a.startHttpsServer(errChan)
tlsConfig, err := configureTLS(a.config)
if err != nil {
return err
}
httpsServer = &http.Server{
ReadHeaderTimeout: time.Second,
Addr: net.JoinHostPort(a.config.HTTPS.Interface, strconv.FormatUint(uint64(a.config.HTTPS.Port), 10)),
Handler: a.mux,
TLSConfig: tlsConfig,
ErrorLog: adapter.ToStd(log),
}
if err := kuma_srv.StartServer(log, httpsServer, &a.httpsReady, errChan); err != nil {
return err
}
} else {
a.httpsReady.Store(true)
}
select {
case <-stop:
log.Info("stopping down API Server")
a.httpReady.Store(false)
a.httpsReady.Store(false)
if httpServer != nil {
return httpServer.Shutdown(context.Background())
}
Expand All @@ -355,80 +386,29 @@ func (a *ApiServer) Start(stop <-chan struct{}) error {
return nil
}

func (a *ApiServer) startHttpServer(errChan chan error) *http.Server {
server := &http.Server{
ReadHeaderTimeout: time.Second,
Addr: net.JoinHostPort(a.config.HTTP.Interface, strconv.FormatUint(uint64(a.config.HTTP.Port), 10)),
Handler: a.mux,
ErrorLog: adapter.ToStd(log),
func configureTLS(cfg api_server.ApiServerConfig) (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(cfg.HTTPS.TlsCertFile, cfg.HTTPS.TlsKeyFile)
if err != nil {
return nil, errors.Wrap(err, "failed to load TLS certificate")
}

go func() {
err := server.ListenAndServe()
if err != nil {
switch err {
case http.ErrServerClosed:
log.Info("shutting down server")
default:
log.Error(err, "could not start an HTTP Server")
errChan <- err
}
}
}()
log.Info("starting", "interface", a.config.HTTP.Interface, "port", a.config.HTTP.Port)
return server
}

func (a *ApiServer) startHttpsServer(errChan chan error) (*http.Server, error) {
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12, // to pass gosec (in practice it's always set after.
}
var err error
tlsConfig.MinVersion, err = config_types.TLSVersion(a.config.HTTPS.TlsMinVersion)
if err != nil {
return nil, err
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12, // to pass gosec (in practice it's always set after.
}
tlsConfig.CipherSuites, err = config_types.TLSCiphers(a.config.HTTPS.TlsCipherSuites)
tlsConfig.MinVersion, err = config_types.TLSVersion(cfg.HTTPS.TlsMinVersion)
if err != nil {
return nil, err
}

err = configureMTLS(tlsConfig, a.config)
tlsConfig.CipherSuites, err = config_types.TLSCiphers(cfg.HTTPS.TlsCipherSuites)
if err != nil {
return nil, err
}

server := &http.Server{
ReadHeaderTimeout: time.Second,
Addr: net.JoinHostPort(a.config.HTTPS.Interface, strconv.FormatUint(uint64(a.config.HTTPS.Port), 10)),
Handler: a.mux,
TLSConfig: tlsConfig,
ErrorLog: adapter.ToStd(log),
}

go func() {
err := server.ListenAndServeTLS(a.config.HTTPS.TlsCertFile, a.config.HTTPS.TlsKeyFile)
if err != nil {
switch err {
case http.ErrServerClosed:
log.Info("shutting down server")
default:
log.Error(err, "could not start an HTTPS Server")
errChan <- err
}
}
}()
log.Info("starting", "interface", a.config.HTTPS.Interface, "port", a.config.HTTPS.Port, "tls", true)
return server, nil
}

func configureMTLS(tlsConfig *tls.Config, cfg api_server.ApiServerConfig) error {
clientCertPool := x509.NewCertPool()
if cfg.Auth.ClientCertsDir != "" {
log.Info("loading client certificates")
files, err := os.ReadDir(cfg.Auth.ClientCertsDir)
if err != nil {
return err
return nil, err
}
for _, file := range files {
if file.IsDir() {
Expand All @@ -442,20 +422,20 @@ func configureMTLS(tlsConfig *tls.Config, cfg api_server.ApiServerConfig) error
path := filepath.Join(cfg.Auth.ClientCertsDir, file.Name())
caCert, err := os.ReadFile(path)
if err != nil {
return errors.Wrapf(err, "could not read certificate %q", path)
return nil, errors.Wrapf(err, "could not read certificate %q", path)
}
if !clientCertPool.AppendCertsFromPEM(caCert) {
return errors.Errorf("failed to load PEM client certificate from %q", path)
return nil, errors.Errorf("failed to load PEM client certificate from %q", path)
}
}
}
if cfg.HTTPS.TlsCaFile != "" {
file, err := os.ReadFile(cfg.HTTPS.TlsCaFile)
if err != nil {
return err
return nil, err
}
if !clientCertPool.AppendCertsFromPEM(file) {
return errors.Errorf("failed to load PEM client certificate from %q", cfg.HTTPS.TlsCaFile)
return nil, errors.Errorf("failed to load PEM client certificate from %q", cfg.HTTPS.TlsCaFile)
}
}

Expand All @@ -465,7 +445,7 @@ func configureMTLS(tlsConfig *tls.Config, cfg api_server.ApiServerConfig) error
} else if cfg.Authn.Type == certs.PluginName {
tlsConfig.ClientAuth = tls.VerifyClientCertIfGiven // client certs are required only for some endpoints when using admin client cert
}
return nil
return tlsConfig, nil
}

func SetupServer(rt runtime.Runtime) error {
Expand Down
24 changes: 24 additions & 0 deletions pkg/core/runtime/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type Component interface {
NeedLeaderElection() bool
}

type ReadyComponent interface {
Component

// Ready returns true if the component is ready to serve traffic.
// This is useful to read this for a readiness probe
Ready() bool
}

// GracefulComponent is a component that supports waiting until it's finished.
// It's useful if there is cleanup logic that has to be executed before the process exits
// (i.e. sending SIGTERM signals to subprocesses started by this component).
Expand Down Expand Up @@ -66,6 +74,8 @@ type Manager interface {
// Returns an error if there is an error starting any component.
// If there are any GracefulComponent, it waits until all components are done.
Start(<-chan struct{}) error

Ready() bool
}

var _ Manager = &manager{}
Expand All @@ -88,6 +98,20 @@ type manager struct {
errCh chan error
}

func (cm *manager) Ready() bool {
cm.Lock()
defer cm.Unlock()

for _, component := range cm.components {
if ready, ok := component.(ReadyComponent); ok {
if !ready.Ready() {
return false
}
}
}
return true
}

func (cm *manager) Add(c ...Component) error {
cm.Lock()
defer cm.Unlock()
Expand Down
1 change: 1 addition & 0 deletions pkg/diagnostics/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
func SetupServer(rt core_runtime.Runtime) error {
return rt.Add(
&diagnosticsServer{
isReady: rt.Ready,
config: rt.Config().Diagnostics,
metrics: rt.Metrics(),
},
Expand Down
Loading

0 comments on commit 604610b

Please sign in to comment.