Skip to content

Commit

Permalink
fix: check obj
Browse files Browse the repository at this point in the history
  • Loading branch information
ElderMatt committed Sep 20, 2024
1 parent 38f9893 commit 3eaf9c7
Showing 1 changed file with 68 additions and 59 deletions.
127 changes: 68 additions & 59 deletions src/operator/watcherutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,68 +239,77 @@ export default abstract class Operator {
uri += plural

const watch = new Watch(this.kubeConfig)
let lastResourceVersion = ''
console.log('Uri to be used: ', uri)

const startWatch = async (resourceVersion?: string): Promise<void> => {
console.log('Starting watch with resourceVersion: ', resourceVersion)
return watch
.watch(
uri,
resourceVersion ? { resourceVersion, timeoutSeconds: 300 } : { timeoutSeconds: 300 },
(phase, obj) => {
if (obj && obj.metadata) {
console.log('OBJECT: ', obj.metadata.name)
console.log('Watch event received, setting lastResourceVersion: ', obj.metadata.resourceVersion)
lastResourceVersion = obj.metadata.resourceVersion

this.eventQueue.push({
event: {
meta: ResourceMetaImpl.createWithPlural(plural, obj),
object: obj,
type: phase as ResourceEventType,
},
onEvent,
})
} else {
console.log('Received undefined or invalid object:', obj)
}
},
(err) => {
console.log('Watcher error callback hit')
if (err) {
console.log('Error during watch: ', err)
if (err.code === 410) {
console.log('ResourceVersion expired, falling back to list and start a new watch')
this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => {
lastResourceVersion = res.body.metadata!.resourceVersion!
setTimeout(() => startWatch(lastResourceVersion), 200) // Retry with backoff
})
} else if (err.code === 'ECONNRESET') {
console.log('Connection reset, retrying watch')
setTimeout(() => startWatch(lastResourceVersion), 500) // Retry after a delay
let lastResourceVersion = ''
console.log('Uri to be used: ', uri)
const startWatch = async (resourceVersion?: string): Promise<void> => {
console.log('watch: ', watch)
console.log('Starting watch with resourceVersion: ', resourceVersion)
console.log('Starting watch on uri: ', uri)
return watch
.watch(
uri,
resourceVersion ? { resourceVersion } : {},
(phase, obj) => {
if (obj && obj.metadata) {
console.log('OBJECT: ', obj.metadata.name)
console.log('Watch event received, setting lastResourceVersion: ', obj.metadata.resourceVersion)
// Store the latest resourceVersion for future reconnection
lastResourceVersion = obj.metadata.resourceVersion

// Enqueue the event to process it
this.eventQueue.push({
event: {
meta: ResourceMetaImpl.createWithPlural(plural, obj),
object: obj,
type: phase as ResourceEventType,
},
onEvent,
})
} else {
console.log('Received undefined or invalid object:', obj)
}
},
(err) => {
console.log('Watcher error callback hit')
if (err) {
console.log('Error during watch: ', err)
if (err.code === 410) {
console.log('ResourceVersion expired, falling back to list and start a new watch')
// Perform a list operation to get the current state
this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => {
lastResourceVersion = res.body.metadata!.resourceVersion!
// Restart the watch with the latest resourceVersion
setTimeout(() => startWatch(lastResourceVersion), 200)
})
} else {
console.log(`watch on resource ${id} failed: ${this.errorToJson(err)}`)
console.log(`restarting watch on resource ${id} using resourceVersion=${lastResourceVersion}`)
setTimeout(() => startWatch(lastResourceVersion), 200)
}
}
},
)
.catch((reason) => {
console.log('Caught an error in watch:', reason)
})
.then((req) => {
if (!req) {
console.log('Watch request did not return a valid request object')
} else {
console.log(`Watch on resource ${id} failed: ${this.errorToJson(err)}`)
setTimeout(() => startWatch(lastResourceVersion), 200) // Retry on other errors
console.log('Watch request initiated')
this.watchRequests[id] = req
}
}
}
)
.catch((reason) => {
console.log('Caught an error in watch:', reason)
})
.then((req) => {
if (!req) {
console.log('Watch request did not return a valid request object')
} else {
console.log('Watch request initiated')
this.watchRequests[id] = req
}
})
}
})
}

await startWatch()
try {
await startWatch()
} catch (error) {
console.log('ERROR OCCURED DURING STARTWATCH: ', error)
}

console.log(`watching resource ${id}`)
}

/**
* Set the status subresource of a custom resource (if it has one defined).
Expand Down Expand Up @@ -396,7 +405,7 @@ await startWatch()
headers?: Record<string, string | number | boolean>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
httpsAgent?: any
auth?: { username: string password: string }
auth?: { username: string; password: string }
}): Promise<void> {
const opts: https.RequestOptions = {}
await this.kubeConfig.applytoHTTPSOptions(opts)
Expand Down

0 comments on commit 3eaf9c7

Please sign in to comment.