Skip to content

Commit

Permalink
watcher - introduce and use onDidWatchFail event (#207320)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpasero authored Mar 12, 2024
1 parent 6c1e897 commit 6c8ae49
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 174 deletions.
126 changes: 56 additions & 70 deletions src/vs/platform/files/node/watcher/baseWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*--------------------------------------------------------------------------------------------*/

import { watchFile, unwatchFile, Stats } from 'fs';
import { Disposable, DisposableMap, IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { Disposable, DisposableMap, DisposableStore, toDisposable } from 'vs/base/common/lifecycle';
import { ILogMessage, IUniversalWatchRequest, IWatcher } from 'vs/platform/files/common/watcher';
import { Emitter, Event } from 'vs/base/common/event';
import { FileChangeType, IFileChange } from 'vs/platform/files/common/files';
Expand All @@ -18,47 +18,45 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
protected readonly _onDidLogMessage = this._register(new Emitter<ILogMessage>());
readonly onDidLogMessage = this._onDidLogMessage.event;

private mapWatchMissingRequestPathToCorrelationId = this._register(new DisposableMap<number>());
protected readonly _onDidWatchFail = this._register(new Emitter<IUniversalWatchRequest>());
private readonly onDidWatchFail = this._onDidWatchFail.event;

private allWatchRequests = new Set<IUniversalWatchRequest>();
private suspendedWatchRequests = new Set<IUniversalWatchRequest>();
private readonly suspendedWatchRequests = this._register(new DisposableMap<IUniversalWatchRequest>());

protected readonly missingRequestPathPollingInterval: number | undefined;
protected readonly suspendedWatchRequestPollingInterval: number | undefined;

async watch(requests: IUniversalWatchRequest[]): Promise<void> {
this.allWatchRequests = new Set([...requests]);
constructor() {
super();

const correlationIds = new Set<number>();
for (const request of requests) {
this._register(this.onDidWatchFail(request => this.handleDidWatchFail(request)));
}

// Request with correlation: watch request path to support
// watching paths that do not exist yet or are potentially
// being deleted and recreated.
//
// We are not doing this for all watch requests yet to see
// how it goes, thus its limitd to correlated requests.
private handleDidWatchFail(request: IUniversalWatchRequest): void {
if (!this.supportsRequestSuspendResume(request)) {
return;
}

if (typeof request.correlationId === 'number') {
correlationIds.add(request.correlationId);
this.suspendWatchRequest(request);
}

if (!this.mapWatchMissingRequestPathToCorrelationId.has(request.correlationId)) {
this.mapWatchMissingRequestPathToCorrelationId.set(request.correlationId, this.watchMissingRequestPath(request));
}
}
}
protected supportsRequestSuspendResume(request: IUniversalWatchRequest): boolean {

// Remove all watched correlated paths that are no longer
// needed because the request is no longer there
for (const [correlationId] of this.mapWatchMissingRequestPathToCorrelationId) {
if (!correlationIds.has(correlationId)) {
this.mapWatchMissingRequestPathToCorrelationId.deleteAndDispose(correlationId);
}
}
// For now, limit failed watch monitoring to requests with a correlationId
// to experiment with this feature in a controlled way. Monitoring requests
// requires us to install polling watchers (via `fs.watchFile()`) and thus
// should be used sparingly.

return typeof request.correlationId === 'number';
}

async watch(requests: IUniversalWatchRequest[]): Promise<void> {
this.allWatchRequests = new Set([...requests]);

// Remove all suspended requests that are no longer needed
for (const request of this.suspendedWatchRequests) {
// Remove all suspended watch requests that are no longer watched
for (const [request] of this.suspendedWatchRequests) {
if (!this.allWatchRequests.has(request)) {
this.suspendedWatchRequests.delete(request);
this.suspendedWatchRequests.deleteAndDispose(request);
}
}

Expand All @@ -69,19 +67,33 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
return this.doWatch(Array.from(this.allWatchRequests).filter(request => !this.suspendedWatchRequests.has(request)));
}

private watchMissingRequestPath(request: IUniversalWatchRequest): IDisposable {
if (typeof request.correlationId !== 'number') {
return Disposable.None; // for now limit this to correlated watch requests only (reduces surface)
private suspendWatchRequest(request: IUniversalWatchRequest): void {
if (this.suspendedWatchRequests.has(request)) {
return; // already suspended
}

const that = this;
const disposables = new DisposableStore();
this.suspendedWatchRequests.set(request, disposables);

this.monitorSuspendedWatchRequest(request, disposables);

this.updateWatchers();
}

private resumeWatchRequest(request: IUniversalWatchRequest): void {
this.suspendedWatchRequests.deleteAndDispose(request);

this.updateWatchers();
}

private monitorSuspendedWatchRequest(request: IUniversalWatchRequest, disposables: DisposableStore) {
const resource = URI.file(request.path);
const that = this;

let disposed = false;
let pathNotFound = false;

const watchFileCallback: (curr: Stats, prev: Stats) => void = (curr, prev) => {
if (disposed) {
if (disposables.isDisposed) {
return; // return early if already disposed
}

Expand All @@ -91,8 +103,7 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
pathNotFound = currentPathNotFound;

// Watch path created: resume watching request
if (
(previousPathNotFound && !currentPathNotFound) || // file was created
if ((previousPathNotFound && !currentPathNotFound) || // file was created
(oldPathNotFound && !currentPathNotFound && !previousPathNotFound) // file was created from a rename
) {
this.trace(`fs.watchFile() detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`);
Expand All @@ -102,60 +113,35 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
that._onDidChangeFile.fire([event]);
this.traceEvent(event, request);

this.suspendedWatchRequests.delete(request);
this.updateWatchers();
}

// Watch path deleted or never existed: suspend watching request
else if (currentPathNotFound) {
this.trace(`fs.watchFile() detected ${request.path} not found, suspending watcher (correlationId: ${request.correlationId})`);

if (!previousPathNotFound) {
const event: IFileChange = { resource, type: FileChangeType.DELETED, cId: request.correlationId };
that._onDidChangeFile.fire([event]);
this.traceEvent(event, request);
}

this.suspendedWatchRequests.add(request);
this.updateWatchers();
// Resume watching
this.resumeWatchRequest(request);
}
};

this.trace(`starting fs.watchFile() on ${request.path} (correlationId: ${request.correlationId})`);
try {
watchFile(request.path, { persistent: false, interval: this.missingRequestPathPollingInterval }, watchFileCallback);
watchFile(request.path, { persistent: false, interval: this.suspendedWatchRequestPollingInterval }, watchFileCallback);
} catch (error) {
this.warn(`fs.watchFile() failed with error ${error} on path ${request.path} (correlationId: ${request.correlationId})`);

return Disposable.None;
}

return toDisposable(() => {
disposables.add(toDisposable(() => {
this.trace(`stopping fs.watchFile() on ${request.path} (correlationId: ${request.correlationId})`);

disposed = true;

this.suspendedWatchRequests.delete(request);

try {
unwatchFile(request.path, watchFileCallback);
} catch (error) {
this.warn(`fs.unwatchFile() failed with error ${error} on path ${request.path} (correlationId: ${request.correlationId})`);
}
});
}));
}

private isPathNotFound(stats: Stats): boolean {
return stats.ctimeMs === 0 && stats.ino === 0;
}

async stop(): Promise<void> {
this.mapWatchMissingRequestPathToCorrelationId.clearAndDisposeAll();
this.suspendedWatchRequests.clear();
}

protected shouldRestartWatching(request: IUniversalWatchRequest): boolean {
return typeof request.correlationId !== 'number';
this.suspendedWatchRequests.clearAndDisposeAll();
}

protected traceEvent(event: IFileChange, request: IUniversalWatchRequest): void {
Expand Down
2 changes: 1 addition & 1 deletion src/vs/platform/files/node/watcher/nodejs/nodejsWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher {
private startWatching(request: INonRecursiveWatchRequest): void {

// Start via node.js lib
const instance = new NodeJSFileWatcherLibrary(request, changes => this._onDidChangeFile.fire(changes), msg => this._onDidLogMessage.fire(msg), this.verboseLogging);
const instance = new NodeJSFileWatcherLibrary(request, changes => this._onDidChangeFile.fire(changes), () => this._onDidWatchFail.fire(request), msg => this._onDidLogMessage.fire(msg), this.verboseLogging);

// Remember as watcher instance
const watcher: INodeJSWatcherInstance = { request, instance };
Expand Down
26 changes: 14 additions & 12 deletions src/vs/platform/files/node/watcher/nodejs/nodejsWatcherLib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ export class NodeJSFileWatcherLibrary extends Disposable {
readonly ready = this.watch();

constructor(
private request: INonRecursiveWatchRequest,
private onDidFilesChange: (changes: IFileChange[]) => void,
private onLogMessage?: (msg: ILogMessage) => void,
private readonly request: INonRecursiveWatchRequest,
private readonly onDidFilesChange: (changes: IFileChange[]) => void,
private readonly onDidWatchFail?: () => void,
private readonly onLogMessage?: (msg: ILogMessage) => void,
private verboseLogging?: boolean
) {
super();
Expand All @@ -76,13 +77,14 @@ export class NodeJSFileWatcherLibrary extends Disposable {
// Watch via node.js
const stat = await Promises.stat(realPath);
this._register(await this.doWatch(realPath, stat.isDirectory()));

} catch (error) {
if (error.code !== 'ENOENT') {
this.error(error);
} else {
this.trace(error);
}

this.onDidWatchFail?.();
}
}

Expand Down Expand Up @@ -164,9 +166,7 @@ export class NodeJSFileWatcherLibrary extends Disposable {
watcher.on('error', (code: number, signal: string) => {
this.error(`Failed to watch ${path} for changes using fs.watch() (${code}, ${signal})`);

// The watcher is no longer functional reliably
// so we go ahead and dispose it
this.dispose();
this.onDidWatchFail?.();
});

watcher.on('change', (type, raw) => {
Expand Down Expand Up @@ -230,9 +230,7 @@ export class NodeJSFileWatcherLibrary extends Disposable {
if (changedFileName === pathBasename && !await Promises.exists(path)) {
this.warn('Watcher shutdown because watched path got deleted');

// The watcher is no longer functional reliably
// so we go ahead and dispose it
this.dispose();
this.onDidWatchFail?.();

return;
}
Expand Down Expand Up @@ -335,7 +333,7 @@ export class NodeJSFileWatcherLibrary extends Disposable {
// we will loose this event.
this.fileChangesAggregator.flush();

this.dispose();
this.onDidWatchFail?.();
}
}, NodeJSFileWatcherLibrary.FILE_DELETE_HANDLER_DELAY);

Expand All @@ -352,9 +350,13 @@ export class NodeJSFileWatcherLibrary extends Disposable {
}
});
} catch (error) {
if (await Promises.exists(path) && !cts.token.isCancellationRequested) {
if (!cts.token.isCancellationRequested && await Promises.exists(path)) {
this.error(`Failed to watch ${path} for changes using fs.watch() (${error.toString()})`);
}

this.onDidWatchFail?.();

return Disposable.None;
}

return toDisposable(() => {
Expand Down
31 changes: 24 additions & 7 deletions src/vs/platform/files/node/watcher/parcel/parcelWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { DeferredPromise, RunOnceScheduler, RunOnceWorker, ThrottledWorker } fro
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import { toErrorMessage } from 'vs/base/common/errorMessage';
import { Emitter } from 'vs/base/common/event';
import { randomPath } from 'vs/base/common/extpath';
import { randomPath, isEqual } from 'vs/base/common/extpath';
import { GLOBSTAR, ParsedPattern, patternsEquals } from 'vs/base/common/glob';
import { BaseWatcher } from 'vs/platform/files/node/watcher/baseWatcher';
import { TernarySearchTree } from 'vs/base/common/ternarySearchTree';
Expand Down Expand Up @@ -133,7 +133,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
// Gather paths that we should stop watching
const pathsToStopWatching = Array.from(this.watchers.values()).filter(({ request }) => {
return !normalizedRequests.find(normalizedRequest => {
return normalizedRequest.path === request.path &&
return isEqual(normalizedRequest.path, request.path, !isLinux) &&
patternsEquals(normalizedRequest.excludes, request.excludes) &&
patternsEquals(normalizedRequest.includes, request.includes) &&
normalizedRequest.pollingInterval === request.pollingInterval;
Expand Down Expand Up @@ -295,6 +295,8 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
this.onUnexpectedError(error, watcher);

instance.complete(undefined);

this._onDidWatchFail.fire(request);
});
}

Expand Down Expand Up @@ -439,7 +441,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
let rootDeleted = false;

for (const event of events) {
if (event.type === FileChangeType.DELETED && event.resource.fsPath === watcher.request.path) {
if (event.type === FileChangeType.DELETED && isEqual(event.resource.fsPath, watcher.request.path, !isLinux)) {

// Explicitly exclude changes to root if we have any
// to avoid VS Code closing all opened editors which
Expand All @@ -459,10 +461,21 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
private onWatchedPathDeleted(watcher: IParcelWatcherInstance): void {
this.warn('Watcher shutdown because watched path got deleted', watcher);

if (!this.shouldRestartWatching(watcher.request)) {
return; // return if this deletion is handled outside
this._onDidWatchFail.fire(watcher.request);

// Do monitoring of the request path parent unless this request
// can be handled via suspend/resume in the super class
//
// TODO@bpasero we should remove this logic in favor of the
// support in the super class so that we have 1 consistent
// solution for handling this.

if (!this.supportsRequestSuspendResume(watcher.request)) {
this.legacyMonitorRequest(watcher);
}
}

private legacyMonitorRequest(watcher: IParcelWatcherInstance): void {
const parentPath = dirname(watcher.request.path);
if (existsSync(parentPath)) {
this.trace('Trying to watch on the parent path to restart the watcher...', watcher);
Expand All @@ -474,7 +487,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {

// Watcher path came back! Restart watching...
for (const { resource, type } of changes) {
if (resource.fsPath === watcher.request.path && (type === FileChangeType.ADDED || type === FileChangeType.UPDATED)) {
if (isEqual(resource.fsPath, watcher.request.path, !isLinux) && (type === FileChangeType.ADDED || type === FileChangeType.UPDATED)) {
if (this.isPathValid(watcher.request.path)) {
this.warn('Watcher restarts because watched path got created again', watcher);

Expand All @@ -488,7 +501,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
}
}
}, msg => this._onDidLogMessage.fire(msg), this.verboseLogging);
}, undefined, msg => this._onDidLogMessage.fire(msg), this.verboseLogging);

// Make sure to stop watching when the watcher is disposed
watcher.token.onCancellationRequested(() => nodeWatcher.dispose());
Expand Down Expand Up @@ -626,12 +639,16 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
} catch (error) {
this.trace(`ignoring a path for watching who's realpath failed to resolve: ${request.path} (error: ${error})`);

this._onDidWatchFail.fire(request);

continue;
}
}

// Check for invalid paths
if (validatePaths && !this.isPathValid(request.path)) {
this._onDidWatchFail.fire(request);

continue;
}

Expand Down
Loading

0 comments on commit 6c8ae49

Please sign in to comment.