Skip to content

Commit

Permalink
Prune outdated (#4)
Browse files Browse the repository at this point in the history
* Prune outdated clusters even when not receiving any PubSub messages

Previously the only trigger for pruning was receiving new PubSub messages.

* Prune only if we get PubSub messages to avoid pruning if PubSub is down

* Workaround for missing "bitbucket.org/ww/goautoneg" module
  • Loading branch information
hermanbanken authored Jan 29, 2022
1 parent 734b1ec commit ce53629
Showing 1 changed file with 43 additions and 7 deletions.
50 changes: 43 additions & 7 deletions pkg/controller/servicesync/servicesync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return nil
}

var hasRequestedBroadcastOnce = false
var lastRequestedBroadcast = time.Time{}

// blank assignment to verify that ReconcileServiceSync implements reconcile.Reconciler
var _ reconcile.Reconciler = &ReconcileServiceSync{}
Expand Down Expand Up @@ -234,8 +234,8 @@ func (r *ReconcileServiceSync) Reconcile(request reconcile.Request) (reconcile.R
}

// Broadcast once after coming online
if !hasRequestedBroadcastOnce {
hasRequestedBroadcastOnce = true
if time.Since(lastRequestedBroadcast) > time.Hour {
lastRequestedBroadcast = time.Now()
r.es.Publish(instance.Spec.TopicURL, []byte(broadcastRequestPayload), clusterName)
}

Expand Down Expand Up @@ -323,16 +323,26 @@ func (r *ReconcileServiceSync) enqueueAllServiceSyncs(a handler.MapObject) []rec
// This callback parses and then writes the data of remote PeerServices to our cluster, using ensureRemoteStatus
func (r *ReconcileServiceSync) callbackFor(name types.NamespacedName) func([]byte, string) {
return func(dataJson []byte, from string) {
logger := log.WithValues("sender", from)
sync := &mcv1.ServiceSync{}

if from == r.getClusterName() {
// If this is our own message, only run the cleanup (if we receive our own message we know for sure that PubSub is working so we can prune other clusters)
err := r.client.Get(context.Background(), name, sync)
if err == nil {
err = r.ensurePruned(name, sync)
logOnError(err, "Failed to prune ServiceSync")
}
return
}
logger := log.WithValues("sender", from)

// Handle broadcast requests
if string(dataJson) == broadcastRequestPayload {
sync := &mcv1.ServiceSync{}
err := r.client.Get(context.Background(), name, sync)
_, err = r.publish(sync)
if err == nil {
_, err = r.publish(sync)
}
logOnError(err, "Failed to broadcast ServiceSync")
return
}
Expand Down Expand Up @@ -360,6 +370,12 @@ func (r *ReconcileServiceSync) callbackFor(name types.NamespacedName) func([]byt
// Save
err = r.ensureRemoteStatus(name, cluster)
logOnError(err, "ensureRemoteStatus")

// Also cleanup expired entries, after writing new data
if err == nil {
err = r.ensurePruned(name, sync)
logOnError(err, "Failed to prune ServiceSync")
}
}
}

Expand Down Expand Up @@ -410,7 +426,6 @@ func (r *ReconcileServiceSync) ensureLocalStatus(instance *mcv1.ServiceSync) (cu
// Writing the remote status to the local ServiceSync.Status object
func (r *ReconcileServiceSync) ensureRemoteStatus(name types.NamespacedName, cluster mcv1.Cluster) error {
ctx := context.Background()
logger := log.WithValues("servicesync", name.Name)

// Load latest state
instance := &mcv1.ServiceSync{}
Expand All @@ -427,17 +442,38 @@ func (r *ReconcileServiceSync) ensureRemoteStatus(name types.NamespacedName, clu
instance.Status.Clusters[cluster.Name] = &cluster
instance.Status.Peers = filterOut(keys(instance.Status.Clusters), r.getClusterName())

return r.save(name, originalInstance, instance)
}

// Writing the remote status to the local ServiceSync.Status object
func (r *ReconcileServiceSync) ensurePruned(name types.NamespacedName, instance *mcv1.ServiceSync) error {
logger := log.WithValues("servicesync", name.Name)

// Save original state
originalInstance := instance.DeepCopy()
if instance.Status.Clusters == nil {
instance.Status.Clusters = make(map[string]*mcv1.Cluster, 0)
}

// Prune old/expired clusters
pruned := PruneExpired(&instance.Status.Clusters, instance.Spec.PrunePeerAtAge)
if len(pruned) > 0 {
logger.Info("Pruned remote clusters", "clusters", pruned)
r.recorder.Eventf(instance, eventTypeNormal, "PrunedClusters", "Pruned remote clusters %s", pruned)
}

return r.save(name, originalInstance, instance)
}

// Writing the remote status to the local ServiceSync.Status object
func (r *ReconcileServiceSync) save(name types.NamespacedName, originalInstance *mcv1.ServiceSync, instance *mcv1.ServiceSync) error {
ctx := context.Background()
logger := log.WithValues("servicesync", name.Name)

// Patch if necessary
if !operatorStatusesEqual(originalInstance.Status, instance.Status) {
// update the Status of the resource with the special client.Status()-client (nothing happens when you don't use the sub-client):
err = r.client.Status().Patch(ctx, instance, client.MergeFrom(originalInstance))
err := r.client.Status().Patch(ctx, instance, client.MergeFrom(originalInstance))
if err == nil {
logger.Info("Patched ServiceSync status")
r.recorder.Eventf(instance, eventTypeNormal, "EnsuringRemoteStatus", "Remote status patched")
Expand Down

0 comments on commit ce53629

Please sign in to comment.