diff --git a/src/operator/watcherutil.ts b/src/operator/watcherutil.ts index 70062b0..06b6399 100644 --- a/src/operator/watcherutil.ts +++ b/src/operator/watcherutil.ts @@ -239,68 +239,77 @@ export default abstract class Operator { uri += plural const watch = new Watch(this.kubeConfig) -let lastResourceVersion = '' -console.log('Uri to be used: ', uri) - -const startWatch = async (resourceVersion?: string): Promise => { - console.log('Starting watch with resourceVersion: ', resourceVersion) - return watch - .watch( - uri, - resourceVersion ? { resourceVersion, timeoutSeconds: 300 } : { timeoutSeconds: 300 }, - (phase, obj) => { - if (obj && obj.metadata) { - console.log('OBJECT: ', obj.metadata.name) - console.log('Watch event received, setting lastResourceVersion: ', obj.metadata.resourceVersion) - lastResourceVersion = obj.metadata.resourceVersion - - this.eventQueue.push({ - event: { - meta: ResourceMetaImpl.createWithPlural(plural, obj), - object: obj, - type: phase as ResourceEventType, - }, - onEvent, - }) - } else { - console.log('Received undefined or invalid object:', obj) - } - }, - (err) => { - console.log('Watcher error callback hit') - if (err) { - console.log('Error during watch: ', err) - if (err.code === 410) { - console.log('ResourceVersion expired, falling back to list and start a new watch') - this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => { - lastResourceVersion = res.body.metadata!.resourceVersion! - setTimeout(() => startWatch(lastResourceVersion), 200) // Retry with backoff - }) - } else if (err.code === 'ECONNRESET') { - console.log('Connection reset, retrying watch') - setTimeout(() => startWatch(lastResourceVersion), 500) // Retry after a delay + let lastResourceVersion = '' + console.log('Uri to be used: ', uri) + const startWatch = async (resourceVersion?: string): Promise => { + console.log('watch: ', watch) + console.log('Starting watch with resourceVersion: ', resourceVersion) + console.log('Starting watch on uri: ', uri) + return watch + .watch( + uri, + resourceVersion ? { resourceVersion } : {}, + (phase, obj) => { + if (obj && obj.metadata) { + console.log('OBJECT: ', obj.metadata.name) + console.log('Watch event received, setting lastResourceVersion: ', obj.metadata.resourceVersion) + // Store the latest resourceVersion for future reconnection + lastResourceVersion = obj.metadata.resourceVersion + + // Enqueue the event to process it + this.eventQueue.push({ + event: { + meta: ResourceMetaImpl.createWithPlural(plural, obj), + object: obj, + type: phase as ResourceEventType, + }, + onEvent, + }) + } else { + console.log('Received undefined or invalid object:', obj) + } + }, + (err) => { + console.log('Watcher error callback hit') + if (err) { + console.log('Error during watch: ', err) + if (err.code === 410) { + console.log('ResourceVersion expired, falling back to list and start a new watch') + // Perform a list operation to get the current state + this.k8sApi.listNamespacedEndpoints(namespace || 'default').then((res) => { + lastResourceVersion = res.body.metadata!.resourceVersion! + // Restart the watch with the latest resourceVersion + setTimeout(() => startWatch(lastResourceVersion), 200) + }) + } else { + console.log(`watch on resource ${id} failed: ${this.errorToJson(err)}`) + console.log(`restarting watch on resource ${id} using resourceVersion=${lastResourceVersion}`) + setTimeout(() => startWatch(lastResourceVersion), 200) + } + } + }, + ) + .catch((reason) => { + console.log('Caught an error in watch:', reason) + }) + .then((req) => { + if (!req) { + console.log('Watch request did not return a valid request object') } else { - console.log(`Watch on resource ${id} failed: ${this.errorToJson(err)}`) - setTimeout(() => startWatch(lastResourceVersion), 200) // Retry on other errors + console.log('Watch request initiated') + this.watchRequests[id] = req } - } - } - ) - .catch((reason) => { - console.log('Caught an error in watch:', reason) - }) - .then((req) => { - if (!req) { - console.log('Watch request did not return a valid request object') - } else { - console.log('Watch request initiated') - this.watchRequests[id] = req - } - }) -} + }) + } -await startWatch() + try { + await startWatch() + } catch (error) { + console.log('ERROR OCCURED DURING STARTWATCH: ', error) + } + console.log(`watching resource ${id}`) + } /** * Set the status subresource of a custom resource (if it has one defined). @@ -396,7 +405,7 @@ await startWatch() headers?: Record // eslint-disable-next-line @typescript-eslint/no-explicit-any httpsAgent?: any - auth?: { username: string password: string } + auth?: { username: string; password: string } }): Promise { const opts: https.RequestOptions = {} await this.kubeConfig.applytoHTTPSOptions(opts)