Skip to content

Commit

Permalink
fix: lastresourceversion
Browse files Browse the repository at this point in the history
  • Loading branch information
ElderMatt committed Sep 23, 2024
1 parent 3149fa6 commit 033c1f5
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions src/operator/watcherutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,22 @@ export default abstract class Operator {
.watch(
uri,
resourceVersion ? { resourceVersion } : {},
(phase, obj) => {
async (phase, obj) => {
switch (plural) {
case 'secrets':
lastResourceVersion = (await this.k8sApi.listNamespacedSecret(namespace!)).body.metadata!
.resourceVersion!
break
case 'configmaps':
lastResourceVersion = (await this.k8sApi.listNamespacedConfigMap(namespace!)).body.metadata!
.resourceVersion!
break
default:
break
}
console.log('PHASE: ', phase)
console.log('OBJECT: ', obj)
if (obj && obj.metadata) {
this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => {
lastResourceVersion = res.body.metadata!.resourceVersion!
})
if (obj && obj.status !== 'Failure') {
// Enqueue the event to process it
this.eventQueue.push({
event: {
Expand All @@ -265,6 +274,22 @@ export default abstract class Operator {
},
onEvent,
})
} else if (obj && obj.status === 'Failure') {
console.log(`watch on resource ${id} failed: ${this.errorToJson(obj)}`)
switch (plural) {
case 'secrets':
lastResourceVersion = (await this.k8sApi.listNamespacedSecret(namespace!)).body.metadata!
.resourceVersion!
break
case 'configmaps':
lastResourceVersion = (await this.k8sApi.listNamespacedConfigMap(namespace!)).body.metadata!
.resourceVersion!
break
default:
break
}
console.log(`restarting watch on resource ${id} using resourceVersion=${lastResourceVersion}`)
setTimeout(() => startWatch(lastResourceVersion), 200)
} else {
console.log('Received undefined or invalid object:', obj)
}
Expand All @@ -276,11 +301,11 @@ export default abstract class Operator {
if (err.code === 410) {
console.log('ResourceVersion expired, falling back to list and start a new watch')
// Perform a list operation to get the current state
this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => {
lastResourceVersion = res.body.metadata!.resourceVersion!
// Restart the watch with the latest resourceVersion
setTimeout(() => startWatch(lastResourceVersion), 200)
})
// this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => {
// lastResourceVersion = res.body.metadata!.resourceVersion!
// // Restart the watch with the latest resourceVersion
// setTimeout(() => startWatch(lastResourceVersion), 200)
// })
} else {
console.log(`watch on resource ${id} failed: ${this.errorToJson(err)}`)
console.log(`restarting watch on resource ${id} using resourceVersion=${lastResourceVersion}`)
Expand Down

0 comments on commit 033c1f5

Please sign in to comment.