Skip to content

Commit

Permalink
Fix message reader dispose
Browse files Browse the repository at this point in the history
  • Loading branch information
dbaeumer committed Oct 10, 2024
1 parent 0671381 commit a5a067b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
37 changes: 37 additions & 0 deletions jsonrpc/src/common/disposable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,40 @@ export namespace Disposable {
};
}
}

export class DisposableStore implements Disposable {

private isDisposed: boolean;
private readonly disposables: Set<Disposable>;

constructor() {
this.isDisposed = false;
this.disposables = new Set<Disposable>();
}

/**
* Dispose of all registered disposables and mark this object as disposed.
*
* Any future disposables added to this object will be disposed of on `add`.
*/
public dispose(): void {
if (this.isDisposed || this.disposables.size === 0) {
return;
}
try {
this.disposables.forEach(item => item.dispose());
} finally {
this.isDisposed = true;
this.disposables.clear();
}
}

public add<T extends Disposable>(t: T): T {
if (this.isDisposed) {
t.dispose();
} else {
this.disposables.add(t);
}
return t;
}
}
19 changes: 12 additions & 7 deletions jsonrpc/src/common/messageReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Message } from './messages';
import { ContentDecoder, ContentTypeDecoder } from './encoding';
import { Disposable } from './api';
import { Semaphore } from './semaphore';
import { DisposableStore } from './disposable';

/**
* A callback that receives each incoming JSON-RPC message.
Expand Down Expand Up @@ -170,7 +171,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader {

private readable: RAL.ReadableStream;
private options: ResolvedMessageReaderOptions;
private callback!: DataCallback;
private callback: DataCallback | undefined;

private nextMessageLength: number;
private messageToken: number;
Expand Down Expand Up @@ -199,16 +200,20 @@ export class ReadableStreamMessageReader extends AbstractMessageReader {
}

public listen(callback: DataCallback): Disposable {
if (this.callback !== undefined) {
throw new Error('Reader can only listen once.');
}
this.nextMessageLength = -1;
this.messageToken = 0;
this.partialMessageTimer = undefined;
this.callback = callback;
const result = this.readable.onData((data: Uint8Array) => {
const disposables = new DisposableStore();
disposables.add(this.readable.onData((data: Uint8Array) => {
this.onData(data);
});
this.readable.onError((error: any) => this.fireError(error));
this.readable.onClose(() => this.fireClose());
return result;
}));
disposables.add(this.readable.onError((error: any) => this.fireError(error)));
disposables.add(this.readable.onClose(() => this.fireClose()));
return disposables;
}

private onData(data: Uint8Array): void {
Expand Down Expand Up @@ -250,7 +255,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader {
? await this.options.contentDecoder.decode(body)
: body;
const message = await this.options.contentTypeDecoder.decode(bytes, this.options);
this.callback(message);
this.callback!(message);
}).catch((error) => {
this.fireError(error);
});
Expand Down

0 comments on commit a5a067b

Please sign in to comment.