Skip to content

Commit

Permalink
fix: retry watch when not all clusters abnormal
Browse files Browse the repository at this point in the history
  • Loading branch information
Poor12 committed Nov 22, 2024
1 parent ab06029 commit fb3499a
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions pkg/registry/hpaaggregator/aggregation/forward/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
var lock sync.Mutex
watchClusters := sets.Set[string]{}
proxyCh := make(chan watch.Event)
recoveryCh := make(chan string)

Check warning on line 154 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L154

Added line #L154 was not covered by tests
proxyWatcher := watch.NewProxyWatcher(proxyCh)
clusterWatchers := make(map[string]watch.Interface, len(clusters))

Check warning on line 156 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L156

Added line #L156 was not covered by tests
for i := range clusters {
client, exist := p.federatedInformerManager.GetClusterKubeClient(clusters[i].Name)
if !exist {
Expand All @@ -164,25 +166,33 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
if err != nil {
continue
}
clusterWatchers[clusters[i].Name] = watcher

Check warning on line 169 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L169

Added line #L169 was not covered by tests
watchClusters.Insert(clusters[i].Name)
go func(cluster string) {
defer watcher.Stop()
for {
clusterWatcher := clusterWatchers[cluster]

Check warning on line 173 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L173

Added line #L173 was not covered by tests
select {
case <-proxyWatcher.StopChan():
clusterWatcher.Stop()

Check warning on line 176 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L176

Added line #L176 was not covered by tests
return
case event, ok := <-watcher.ResultChan():
case event, ok := <-clusterWatcher.ResultChan():

Check warning on line 178 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L178

Added line #L178 was not covered by tests
if !ok {
lock.Lock()
defer lock.Unlock()

klog.Infof("closed %s channel, selector %s", cluster, label.String())
clusterWatcher.Stop()

Check warning on line 184 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L183-L184

Added lines #L183 - L184 were not covered by tests
watchClusters.Delete(cluster)
if watchClusters.Len() == 0 {
close(proxyCh)
close(recoveryCh)
return

Check warning on line 189 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L188-L189

Added lines #L188 - L189 were not covered by tests
}
return
recoveryCh <- cluster
continue

Check warning on line 192 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L191-L192

Added lines #L191 - L192 were not covered by tests
}
if pod, ok := event.Object.(*corev1.Pod); ok {
klog.Infof("pod event name: %v %v", pod.Name, event.Type)

Check warning on line 195 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L195

Added line #L195 was not covered by tests
clusterobject.MakePodUnique(pod, cluster)
event.Object = pod
}
Expand All @@ -191,6 +201,29 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
}
}(clusters[i].Name)
}

go func() {
for cluster := range recoveryCh {
client, exist := p.federatedInformerManager.GetClusterKubeClient(cluster)
if !exist {
continue

Check warning on line 209 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L205-L209

Added lines #L205 - L209 were not covered by tests
}
watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: label.String(),
FieldSelector: options.FieldSelector.String(),
})
if err != nil {
continue

Check warning on line 216 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L211-L216

Added lines #L211 - L216 were not covered by tests
}

lock.Lock()
defer lock.Unlock()

Check failure on line 220 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View workflow job for this annotation

GitHub Actions / lint

SA9001: defers in this range loop won't run unless the channel gets closed (staticcheck)

clusterWatchers[cluster] = watcher
watchClusters.Insert(cluster)

Check warning on line 223 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L219-L223

Added lines #L219 - L223 were not covered by tests
}
}()

return proxyWatcher, nil
}

Expand Down

0 comments on commit fb3499a

Please sign in to comment.