Skip to content

Commit

Permalink
Small refactor to avoid loose json
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-jones committed Jul 16, 2024
1 parent 7e4e8f1 commit 0bd0c5f
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 84 deletions.
12 changes: 6 additions & 6 deletions internal/datastore/badgerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type Store struct {
}

type Entry struct {
Resource k8s.Resource `json:"resource"`
Suspended bool `json:"suspended"`
UpdatedBy string `json:"updatedBy"`
UpdatedAt time.Time `json:"updatedAt"`
Resource k8s.ResourceReference `json:"resource"`
Suspended bool `json:"suspended"`
UpdatedBy string `json:"updatedBy"`
UpdatedAt time.Time `json:"updatedAt"`
}

func NewBadgerStore(path string) (*Store, error) {
Expand All @@ -37,7 +37,7 @@ func NewBadgerStore(path string) (*Store, error) {
}, nil
}

func (s *Store) GetEntry(resource k8s.Resource) (Entry, error) {
func (s *Store) GetEntry(resource k8s.ResourceReference) (Entry, error) {
var entry Entry
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(buildKey(resource))
Expand Down Expand Up @@ -73,6 +73,6 @@ func (s *Store) Close() error {
return s.db.Close()
}

func buildKey(resource k8s.Resource) []byte {
func buildKey(resource k8s.ResourceReference) []byte {
return []byte(fmt.Sprintf("resource:%s:%s:%s:%s", resource.Type.Group, resource.Type.Kind, resource.Namespace, resource.Name))
}
15 changes: 15 additions & 0 deletions internal/fluxcd/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package fluxcd

type Resource struct {
Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
} `json:"metadata"`
Spec struct {
Suspend bool `json:"suspend"`
} `json:"spec"`
}

type ResourceList struct {
Items []Resource `json:"items"`
}
24 changes: 5 additions & 19 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package k8s

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"path"

Expand Down Expand Up @@ -50,7 +48,7 @@ func NewClient(configPath string) (*Client, error) {
}, nil
}

func (c *Client) GetRawResource(ctx context.Context, resource Resource) (map[string]any, error) {
func (c *Client) GetRawResource(ctx context.Context, resource ResourceReference) ([]byte, error) {
absPath := path.Join(
"apis",
resource.Type.Group,
Expand All @@ -60,39 +58,27 @@ func (c *Client) GetRawResource(ctx context.Context, resource Resource) (map[str
resource.Type.Kind,
resource.Name,
)

body, err := c.client.RESTClient().Get().AbsPath(absPath).DoRaw(ctx)
if err != nil {
slog.Warn("failed to fetch resource", slog.Any("error", err), slog.Any("path", absPath))
return nil, err
}

var res map[string]any
if err = json.Unmarshal(body, &res); err != nil {
return nil, fmt.Errorf("invalid response: %w", err)
}
return res, nil
return body, nil
}

func (c *Client) GetRawResources(ctx context.Context, group ResourceType) (map[string]any, error) {
func (c *Client) GetRawResources(ctx context.Context, group ResourceType) ([]byte, error) {
absPath := path.Join(
"apis",
group.Group,
group.Version,
group.Kind,
)

body, err := c.client.RESTClient().Get().AbsPath(absPath).DoRaw(ctx)
if err != nil {
slog.Warn("failed to fetch resource", slog.Any("error", err), slog.Any("path", absPath))
slog.Warn("failed to fetch resources", slog.Any("error", err), slog.Any("path", absPath))
return nil, err
}

var res map[string]any
if err = json.Unmarshal(body, &res); err != nil {
return nil, fmt.Errorf("invalid response: %w", err)
}
return res, nil
return body, nil
}

func (c *Client) GetCustomResourceDefinitions(ctx context.Context, listOptions metav1.ListOptions) (*v1.CustomResourceDefinitionList, error) {
Expand Down
8 changes: 4 additions & 4 deletions internal/k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ type ResourceType struct {
Kind string `json:"kind"`
}

type Resource struct {
type ResourceReference struct {
Type ResourceType `json:"type"`
Namespace string `json:"namespace"`
Name string `json:"name"`
}

func ResourceFromPath(path string) (Resource, error) {
func ResourceReferenceFromPath(path string) (ResourceReference, error) {
parts := strings.Split(path, "/")
if len(parts) != 6 {
return Resource{}, fmt.Errorf("unexpected path format: %s", path)
return ResourceReference{}, fmt.Errorf("unexpected path format: %s", path)
}
return Resource{
return ResourceReference{
Type: ResourceType{
Group: parts[0],
Version: parts[1],
Expand Down
2 changes: 1 addition & 1 deletion internal/notification/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type Notification struct {
Resource k8s.Resource
Resource k8s.ResourceReference
Suspended bool
Email string
GoogleCloudProjectID string
Expand Down
96 changes: 42 additions & 54 deletions internal/watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watch

import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/auditlog"
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/datastore"
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/fluxcd"
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/k8s"
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/notification"
)
Expand Down Expand Up @@ -43,13 +45,13 @@ func NewWatcher(
}

type k8sClient interface {
GetRawResource(ctx context.Context, resource k8s.Resource) (map[string]any, error)
GetRawResources(ctx context.Context, group k8s.ResourceType) (map[string]any, error)
GetRawResource(ctx context.Context, resource k8s.ResourceReference) ([]byte, error)
GetRawResources(ctx context.Context, group k8s.ResourceType) ([]byte, error)
GetCustomResourceDefinitions(ctx context.Context, listOptions metav1.ListOptions) (*v1.CustomResourceDefinitionList, error)
}

type store interface {
GetEntry(k8s.Resource) (datastore.Entry, error)
GetEntry(k8s.ResourceReference) (datastore.Entry, error)
SaveEntry(datastore.Entry) error
}

Expand Down Expand Up @@ -90,33 +92,21 @@ func (w *Watcher) Watch(ctx context.Context) error {
func (w *Watcher) init(ctx context.Context, groups []k8s.ResourceType) error {
slog.Info("initializing")
for _, group := range groups {
resources, err := w.k8sClient.GetRawResources(ctx, group)
res, err := w.k8sClient.GetRawResources(ctx, group)
if err != nil {
return err
}
items, ok := resources["items"].([]any)
if !ok {
return errors.New("expected items to be set")
var resourceList fluxcd.ResourceList
if err = json.Unmarshal(res, &resourceList); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
for _, i := range items {
item, ok := i.(map[string]any)
if !ok {
return errors.New("invalid item")
}
spec, ok := item["spec"].(map[string]any)
if !ok {
return errors.New("invalid spec")
}
metadata, ok := item["metadata"].(map[string]any)
if !ok {
return errors.New("invalid metadata")
}
resource := k8s.Resource{
for _, resource := range resourceList.Items {
resourceRef := k8s.ResourceReference{
Type: group,
Namespace: metadata["namespace"].(string),
Name: metadata["name"].(string),
Namespace: resource.Metadata.Namespace,
Name: resource.Metadata.Name,
}
if err = w.processResource(ctx, resource, spec, "<unknown>"); err != nil {
if err = w.processResource(ctx, resourceRef, resource, "<unknown>"); err != nil {
return fmt.Errorf("failed to process resource: %w", err)
}
}
Expand All @@ -140,22 +130,22 @@ func (w *Watcher) watch(ctx context.Context, groups []k8s.ResourceType) error {
resourceName := logEntry.GetResourceName()
email := logEntry.GetAuthenticationInfo().GetPrincipalEmail()

resource, err := k8s.ResourceFromPath(resourceName)
resourceRef, err := k8s.ResourceReferenceFromPath(resourceName)
if err != nil {
return err
}

res, err := w.k8sClient.GetRawResource(ctx, resource)
res, err := w.k8sClient.GetRawResource(ctx, resourceRef)
if err != nil {
return fmt.Errorf("failed to get raw resource: %w", err)
}

spec, ok := res["spec"].(map[string]any)
if !ok {
return errors.New("unexpected response payload")
var resource fluxcd.Resource
if err = json.Unmarshal(res, &resource); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}

if err = w.processResource(ctx, resource, spec, email); err != nil {
if err = w.processResource(ctx, resourceRef, resource, email); err != nil {
return fmt.Errorf("failed to re-check suspension status: %w", err)
}

Expand All @@ -165,26 +155,24 @@ func (w *Watcher) watch(ctx context.Context, groups []k8s.ResourceType) error {

func (w *Watcher) processResource(
ctx context.Context,
resource k8s.Resource,
spec map[string]any,
resourceRef k8s.ResourceReference,
resource fluxcd.Resource,
updatedBy string,
) error {
suspended, _ := spec["suspend"].(bool)

entry, err := w.store.GetEntry(resource)
entry, err := w.store.GetEntry(resourceRef)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
// First time seeing the resource, so we'll save the state, but not notify - as we don't know what has
// changed
slog.Info(
"new resource discovered",
slog.String("kind", resource.Type.Kind),
slog.String("resource", resource.Name),
slog.Bool("suspended", suspended),
slog.String("kind", resourceRef.Type.Kind),
slog.String("resource", resourceRef.Name),
slog.Bool("suspended", resource.Spec.Suspend),
)
if err = w.store.SaveEntry(datastore.Entry{
Resource: resource,
Suspended: suspended,
Resource: resourceRef,
Suspended: resource.Spec.Suspend,
UpdatedBy: updatedBy,
UpdatedAt: time.Now().UTC(),
}); err != nil {
Expand All @@ -195,31 +183,31 @@ func (w *Watcher) processResource(
return fmt.Errorf("failed to fetch entry: %w", err)
}

if suspended == entry.Suspended {
if resource.Spec.Suspend == entry.Suspended {
return nil // Probably something else about the resource modified
}

entry.Resource = resource
entry.Suspended = suspended
slog.Info(
"suspension status updated",
slog.String("kind", resourceRef.Type.Kind),
slog.String("resourceRef", resourceRef.Name),
slog.String("user", updatedBy),
slog.Bool("suspended", resource.Spec.Suspend),
)

entry.Resource = resourceRef
entry.Suspended = resource.Spec.Suspend
entry.UpdatedBy = updatedBy
entry.UpdatedAt = time.Now().UTC()

if err = w.store.SaveEntry(entry); err != nil {
return err
}

slog.Info(
"suspension status updated",
slog.String("kind", resource.Type.Kind),
slog.String("resource", resource.Name),
slog.String("user", updatedBy),
slog.Bool("suspended", suspended),
)

return w.notifier.Notify(ctx, notification.Notification{
Resource: resource,
Suspended: suspended,
Email: updatedBy,
Resource: entry.Resource,
Suspended: entry.Suspended,
Email: entry.UpdatedBy,
GoogleCloudProjectID: w.googleCloudProjectID,
})
}

0 comments on commit 0bd0c5f

Please sign in to comment.