Skip to content

Commit

Permalink
refactor: extract label-selector as a flag for node-funder and operat…
Browse files Browse the repository at this point in the history
…or (#411)

* fix(node-funder): extract label-selector as a flag

* chore(config): set default label for ingress in config files

* fix(operator): add label selector and improve watching running nodes
  • Loading branch information
gacevicljubisa authored Oct 2, 2024
1 parent c98eac9 commit 8d0a18f
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 67 deletions.
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ It has following flags:
--min-native float Minimum amount of chain native coins (xDAI) nodes should have.
--min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have.
--namespace string Kubernetes namespace. Overrides cluster name if set.
--label-selector string Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.
--timeout duration Timeout. (default 5m0s)
--wallet-key string Hex-encoded private key for the Bee node wallet. Required.
```
Expand All @@ -385,13 +386,14 @@ Command **node-operator** uses <https://github.com/ethersphere/node-funder> tool
It has following flags:

```console
--geth-url string Endpoint to chain node. Required.
--help help for node-operator
--min-native float Minimum amount of chain native coins (xDAI) nodes should have.
--min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have.
--namespace string Kubernetes namespace to scan for scheduled pods.
--timeout duration Timeout. Default is infinite.
--wallet-key string Hex-encoded private key for the Bee node wallet. Required.
--geth-url string Endpoint to chain node. Required.
--help help for node-operator
--min-native float Minimum amount of chain native coins (xDAI) nodes should have.
--min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have.
--namespace string Kubernetes namespace to scan for scheduled pods.
--label-selector string Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.
--timeout duration Timeout. Default is infinite.
--wallet-key string Hex-encoded private key for the Bee node wallet. Required.
```

example:
Expand All @@ -408,7 +410,7 @@ example:

```console
--config string config file (default is $HOME/.beekeeper.yaml)
--config-dir string config directory (default is $HOME/.beekeeper/) (default "C:\\Users\\ljubi\\.beekeeper")
--config-dir string config directory (default is $HOME/.beekeeper/)
--config-git-branch string Git branch (default "main")
--config-git-password string Git password or personal access tokens (needed for private repos)
--config-git-repo string Git repository with configurations (uses config directory when Git repo is not specified) (default "")
Expand Down
18 changes: 13 additions & 5 deletions cmd/beekeeper/cmd/node_funder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/ethersphere/beekeeper/pkg/config"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/spf13/cobra"
)

const nodeFunderLabelSelector string = "beekeeper.ethswarm.org/node-funder=true"

func (c *command) initNodeFunderCmd() (err error) {
const (
optionNameAddresses = "addresses"
Expand All @@ -23,6 +26,7 @@ func (c *command) initNodeFunderCmd() (err error) {
optionNameMinNative = "min-native"
optionNameMinSwarm = "min-swarm"
optionNameTimeout = "timeout"
optionNameLabelSelector = "label-selector"
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -81,7 +85,8 @@ func (c *command) initNodeFunderCmd() (err error) {
var nodeLister funder.NodeLister
// if addresses are provided, use them, not k8s client to list nodes
if cfg.Namespace != "" {
nodeLister = newNodeLister(c.k8sClient, c.log)
label := c.globalConfig.GetString(optionNameLabelSelector)
nodeLister = newNodeLister(c.k8sClient, label, c.log)
}

return funder.Fund(ctx, funder.Config{
Expand All @@ -105,6 +110,7 @@ func (c *command) initNodeFunderCmd() (err error) {
cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.")
cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.")
cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.")
cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.")
cmd.Flags().Duration(optionNameTimeout, 5*time.Minute, "Timeout.")

c.root.AddCommand(cmd)
Expand All @@ -114,12 +120,14 @@ func (c *command) initNodeFunderCmd() (err error) {

type nodeLister struct {
k8sClient *k8s.Client
label string
log logging.Logger
}

func newNodeLister(k8sClient *k8s.Client, l logging.Logger) *nodeLister {
func newNodeLister(k8sClient *k8s.Client, label string, l logging.Logger) *nodeLister {
return &nodeLister{
k8sClient: k8sClient,
label: label,
log: l,
}
}
Expand All @@ -133,12 +141,12 @@ func (nf *nodeLister) List(ctx context.Context, namespace string) (nodes []funde
return nil, fmt.Errorf("namespace not provided")
}

ingressHosts, err := nf.k8sClient.Ingress.ListAPINodesHosts(ctx, namespace)
ingressHosts, err := nf.k8sClient.Ingress.GetIngressHosts(ctx, namespace, nf.label)
if err != nil {
return nil, fmt.Errorf("list ingress api nodes hosts: %s", err.Error())
}

ingressRouteHosts, err := nf.k8sClient.IngressRoute.ListAPINodesHosts(ctx, namespace)
ingressRouteHosts, err := nf.k8sClient.IngressRoute.GetIngressHosts(ctx, namespace, nf.label)
if err != nil {
return nil, fmt.Errorf("list ingress route api nodes hosts: %s", err.Error())
}
Expand All @@ -147,7 +155,7 @@ func (nf *nodeLister) List(ctx context.Context, namespace string) (nodes []funde

for _, node := range ingressHosts {
nodes = append(nodes, funder.NodeInfo{
Name: node.Name,
Name: strings.TrimSuffix(node.Name, "-api"),
Address: fmt.Sprintf("http://%s", node.Host),
})
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/beekeeper/cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (c *command) initOperatorCmd() (err error) {
optionNameMinNative = "min-native"
optionNameMinSwarm = "min-swarm"
optionNameTimeout = "timeout"
optionNameLabelSelector = "label-selector"
)

cmd := &cobra.Command{
Expand All @@ -26,7 +27,11 @@ func (c *command) initOperatorCmd() (err error) {
Long: `Node operator scans for scheduled pods and funds them using node-funder. beekeeper node-operator`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
cfg := config.NodeFunder{}
namespace := c.globalConfig.GetString(optionNameNamespace)

var namespace string
if namespace = c.globalConfig.GetString(optionNameNamespace); namespace == "" {
return errors.New("namespace not provided")
}

// chain node endpoint check
if cfg.ChainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); cfg.ChainNodeEndpoint == "" {
Expand Down Expand Up @@ -62,6 +67,7 @@ func (c *command) initOperatorCmd() (err error) {
ChainNodeEndpoint: cfg.ChainNodeEndpoint,
MinAmounts: cfg.MinAmounts,
K8sClient: c.k8sClient,
LabelSelector: c.globalConfig.GetString(optionNameLabelSelector),
}).Run(ctxNew)
},
PreRunE: c.preRunE,
Expand All @@ -72,6 +78,7 @@ func (c *command) initOperatorCmd() (err error) {
cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.")
cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.")
cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.")
cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.")
cmd.Flags().Duration(optionNameTimeout, 0*time.Minute, "Timeout. Default is infinite.")

c.root.AddCommand(cmd)
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ node-groups:
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
beekeeper.ethswarm.org/node-funder: "true"
node-selector:
node-group: "private"
persistence-enabled: false
Expand Down
1 change: 1 addition & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ node-groups:
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
beekeeper.ethswarm.org/node-funder: "true"
node-selector:
node-group: "local"
persistence-enabled: false
Expand Down
1 change: 1 addition & 0 deletions config/testnet-giant.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ node-groups:
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
beekeeper.ethswarm.org/node-funder: "true"
node-selector:
node-group: "testnet-giant"
persistence-enabled: false
Expand Down
1 change: 1 addition & 0 deletions config/testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ node-groups:
app.kubernetes.io/name: "bee"
app.kubernetes.io/part-of: "bee"
app.kubernetes.io/version: "latest"
beekeeper.ethswarm.org/node-funder: "true"
node-selector:
node-group: "private"
persistence-enabled: true
Expand Down
23 changes: 10 additions & 13 deletions pkg/k8s/customresource/ingressroute/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingressroute
import (
"context"
"fmt"
"strings"

"github.com/ethersphere/beekeeper/pkg/k8s/ingress"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -82,10 +81,10 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error)
return
}

// ListAPINodesHosts list Ingresses that are nodes
func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes []ingress.NodeInfo, err error) {
// GetIngressHosts list Ingress Routes hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed.
func (c *Client) GetIngressHosts(ctx context.Context, namespace, label string) (nodes []ingress.NodeInfo, err error) {
ingressRoutes, err := c.clientset.IngressRoutes(namespace).List(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=bee",
LabelSelector: label,
})
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -96,15 +95,13 @@ func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes

if ingressRoutes != nil {
for _, ingressRoute := range ingressRoutes.Items {
if strings.HasSuffix(ingressRoute.Name, "-api") {
for _, route := range ingressRoute.Spec.Routes {
host := route.GetHost()
if host != "" {
nodes = append(nodes, ingress.NodeInfo{
Name: strings.TrimSuffix(ingressRoute.Name, "-api"),
Host: host,
})
}
for _, route := range ingressRoute.Spec.Routes {
host := route.GetHost()
if host != "" {
nodes = append(nodes, ingress.NodeInfo{
Name: ingressRoute.Name,
Host: host,
})
}
}
}
Expand Down
21 changes: 9 additions & 12 deletions pkg/k8s/ingress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingress
import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -76,10 +75,10 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error)
return
}

// ListAPINodesHosts list Ingresses that are nodes
func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes []NodeInfo, err error) {
// GetIngressHosts list Ingresses hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed.
func (c *Client) GetIngressHosts(ctx context.Context, namespace, label string) (nodes []NodeInfo, err error) {
ingreses, err := c.clientset.NetworkingV1().Ingresses(namespace).List(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=bee",
LabelSelector: label,
})
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -89,14 +88,12 @@ func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes
}

for _, ingress := range ingreses.Items {
if strings.HasSuffix(ingress.Name, "-api") {
for _, rule := range ingress.Spec.Rules {
if rule.Host != "" {
nodes = append(nodes, NodeInfo{
Name: strings.TrimSuffix(ingress.Name, "-api"),
Host: rule.Host,
})
}
for _, rule := range ingress.Spec.Rules {
if rule.Host != "" {
nodes = append(nodes, NodeInfo{
Name: ingress.Name,
Host: rule.Host,
})
}
}
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/k8s/pod/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,20 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error)
return
}

// EventsWatch watches for events.
func (c *Client) EventsWatch(ctx context.Context, namespace string, operatorChan chan string) (err error) {
c.log.Infof("starting events watch")
// WatchNewRunning detects new running Pods in the namespace and sends their IPs to the channel.
func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPodIps chan string) (err error) {
c.log.Infof("starting events watch in namespace %s, label selector %s", namespace, labelSelector)
defer c.log.Infof("events watch done")
defer close(operatorChan)
defer close(newPodIps)

watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
// TODO: add this label to beekeeper and filter on it => app.kubernetes.io/name=bee
LabelSelector: "app.kubernetes.io/name=bee",
LabelSelector: labelSelector,
})
if err != nil {
return fmt.Errorf("getting pod events in namespace %s: %w", namespace, err)
}
defer watcher.Stop()

// listen for either events from the watcher or a context cancellation
for {
select {
case <-ctx.Done():
Expand All @@ -98,13 +96,12 @@ func (c *Client) EventsWatch(ctx context.Context, namespace string, operatorChan
return fmt.Errorf("watch channel closed")
}
switch event.Type {
// case watch.Added: // if we want to do something with already running pods
// case watch.Added: // already running pods
case watch.Modified:
pod, ok := event.Object.(*v1.Pod)
if ok {
if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil {
c.log.Tracef("new pod event: {%s}, {%s}, {%s}, {%s}, {%v}", event.Type, pod.Name, pod.Status.Phase, pod.Status.PodIP, pod.ObjectMeta.DeletionTimestamp)
operatorChan <- pod.Status.PodIP
if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil && pod.Status.Phase == v1.PodRunning {
newPodIps <- pod.Status.PodIP
}
}
}
Expand Down
Loading

0 comments on commit 8d0a18f

Please sign in to comment.