Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Co-authored-by: pfg <[email protected]>
Co-authored-by: Ryan Gonzalez <[email protected]>
Co-authored-by: Ben Grant <[email protected]>
Co-authored-by: Dave Caruso <[email protected]>
  • Loading branch information
5 people authored Nov 2, 2024
1 parent ce2afac commit 6914c5e
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ $ xcode-select --install
Bun defaults to linking `libatomic` statically, as not all systems have it. If you are building on a distro that does not have a static libatomic available, you can run the following command to enable dynamic linking:

```bash
$ bun setup -DUSE_STATIC_LIBATOMIC=OFF
$ bun run build -DUSE_STATIC_LIBATOMIC=OFF
```

The built version of Bun may not work on other systems if compiled this way.
Expand Down
2 changes: 1 addition & 1 deletion cmake/Options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ if(CMAKE_HOST_LINUX AND NOT WIN32 AND NOT APPLE)
OUTPUT_STRIP_TRAILING_WHITESPACE
ERROR_QUIET
)
if(LINUX_DISTRO MATCHES "NAME=\"(Arch|Manjaro|Artix) Linux\"|NAME=\"openSUSE Tumbleweed\"")
if(LINUX_DISTRO MATCHES "NAME=\"(Arch|Manjaro|Artix) Linux( ARM)?\"|NAME=\"openSUSE Tumbleweed\"")
set(DEFAULT_STATIC_LIBATOMIC OFF)
endif()
endif()
Expand Down
1 change: 1 addition & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export default [
["ERR_BUFFER_OUT_OF_BOUNDS", RangeError, "RangeError"],
["ERR_UNKNOWN_SIGNAL", TypeError, "TypeError"],
["ERR_SOCKET_BAD_PORT", RangeError, "RangeError"],
["ERR_STREAM_RELEASE_LOCK", Error, "AbortError"],

// Bun-specific
["ERR_FORMDATA_PARSE_ERROR", TypeError, "TypeError"],
Expand Down
5 changes: 5 additions & 0 deletions src/js/builtins/ProcessObjectInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ export function getStdinStream(fd) {
}
}
} catch (err) {
if (err?.code === "ERR_STREAM_RELEASE_LOCK") {
// Not a bug. Happens in unref().
return;
}
stream.destroy(err);
}
}
Expand All @@ -212,6 +216,7 @@ export function getStdinStream(fd) {
$debug('on("resume");');
ref();
stream._undestroy();
stream_destroyed = false;
});

stream._readableState.reading = false;
Expand Down
17 changes: 16 additions & 1 deletion src/js/builtins/ReadableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export function initializeReadableStream(

$linkTimeConstant;
export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
Expand All @@ -119,6 +120,7 @@ export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]

$linkTimeConstant;
export function readableStreamToText(stream: ReadableStream): Promise<string> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
Expand All @@ -137,6 +139,7 @@ export function readableStreamToText(stream: ReadableStream): Promise<string> {

$linkTimeConstant;
export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>): Promise<ArrayBuffer> | ArrayBuffer {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
Expand Down Expand Up @@ -216,6 +219,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>)

$linkTimeConstant;
export function readableStreamToBytes(stream: ReadableStream<ArrayBuffer>): Promise<Uint8Array> | Uint8Array {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");

Expand Down Expand Up @@ -297,6 +301,7 @@ export function readableStreamToFormData(
stream: ReadableStream<ArrayBuffer>,
contentType: string | ArrayBuffer | ArrayBufferView,
): Promise<FormData> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
return Bun.readableStreamToBlob(stream).then(blob => {
return FormData.from(blob, contentType);
Expand All @@ -305,6 +310,7 @@ export function readableStreamToFormData(

$linkTimeConstant;
export function readableStreamToJSON(stream: ReadableStream): unknown {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
let result = $tryUseReadableStreamBufferedFastPath(stream, "json");
if (result) {
Expand All @@ -326,6 +332,7 @@ export function readableStreamToJSON(stream: ReadableStream): unknown {

$linkTimeConstant;
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));

return (
Expand Down Expand Up @@ -422,7 +429,15 @@ export function pipeThrough(this, streams, options) {

if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");

$readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
const promise = $readableStreamPipeToWritableStream(
this,
internalWritable,
preventClose,
preventAbort,
preventCancel,
signal,
);
$markPromiseAsHandled(promise);

return readable;
}
Expand Down
5 changes: 3 additions & 2 deletions src/js/builtins/ReadableStreamDefaultController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ export function initializeReadableStreamDefaultController(this, stream, underlyi
export function enqueue(this, chunk) {
if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue");

if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw $ERR_INVALID_STATE("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
}

return $readableStreamDefaultControllerEnqueue(this, chunk);
}
Expand Down
5 changes: 1 addition & 4 deletions src/js/builtins/ReadableStreamDefaultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ export function releaseLock(this) {

if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;

if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
throw new TypeError("There are still pending read requests, cannot release the lock");

$readableStreamReaderGenericRelease(this);
$readableStreamDefaultReaderRelease(this);
}

$getter;
Expand Down
38 changes: 27 additions & 11 deletions src/js/builtins/ReadableStreamInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,10 @@ export function pipeToDoReadWrite(pipeState) {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, canWrite);
if (!canWrite) return;

pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value).$then(
undefined,
() => {},
);
},
e => {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false);
Expand Down Expand Up @@ -396,7 +399,7 @@ export function pipeToClosingMustBePropagatedForward(pipeState) {
action();
return;
}
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, undefined);
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, () => {});
}

export function pipeToClosingMustBePropagatedBackward(pipeState) {
Expand Down Expand Up @@ -1367,20 +1370,18 @@ export function readableStreamError(stream, error) {

if (!reader) return;

$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);

if ($isReadableStreamDefaultReader(reader)) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
$readableStreamDefaultReaderErrorReadRequests(reader, error);
} else {
$assert($isReadableStreamBYOBReader(reader));
const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}

$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
}

export function readableStreamDefaultControllerShouldCallPull(controller) {
Expand Down Expand Up @@ -1608,6 +1609,15 @@ export function isReadableStreamDisturbed(stream) {
return stream.$disturbed;
}

$visibility = "Private";
export function readableStreamDefaultReaderRelease(reader) {
$readableStreamReaderGenericRelease(reader);
$readableStreamDefaultReaderErrorReadRequests(
reader,
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
}

$visibility = "Private";
export function readableStreamReaderGenericRelease(reader) {
$assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
Expand All @@ -1616,11 +1626,11 @@ export function readableStreamReaderGenericRelease(reader) {
if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(
undefined,
$makeTypeError("releasing lock of reader whose stream is still in readable state"),
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
else
$putByIdDirectPrivate(reader, "closedPromiseCapability", {
promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
promise: $newHandledRejectedPromise($ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()")),
});

const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
Expand All @@ -1636,6 +1646,12 @@ export function readableStreamReaderGenericRelease(reader) {
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
}

export function readableStreamDefaultReaderErrorReadRequests(reader, error) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}

export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
return false;
Expand Down
6 changes: 5 additions & 1 deletion test/js/third_party/prompts/prompts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ test("works with prompts", async () => {
stdin: "pipe",
});

await Bun.sleep(100);
const reader = child.stdout.getReader();

await reader.read();
reader.releaseLock();

child.stdin.write("dylan\n");
await Bun.sleep(100);
child.stdin.write("999\n");
Expand Down
88 changes: 88 additions & 0 deletions test/js/web/streams/streams.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,55 @@ it("ReadableStream for empty file closes immediately", async () => {
expect(chunks.length).toBe(0);
});

it("ReadableStream errors the stream on pull rejection", async () => {
let stream = new ReadableStream({
pull(controller) {
return Promise.reject("pull rejected");
},
});

let reader = stream.getReader();
let closed = reader.closed.catch(err => `closed: ${err}`);
let read = reader.read().catch(err => `read: ${err}`);
expect(await Promise.race([closed, read])).toBe("closed: pull rejected");
expect(await read).toBe("read: pull rejected");
});

it("ReadableStream rejects pending reads when the lock is released", async () => {
let { resolve, promise } = Promise.withResolvers();
let stream = new ReadableStream({
async pull(controller) {
controller.enqueue("123");
await promise;
controller.enqueue("456");
controller.close();
},
});

let reader = stream.getReader();
expect((await reader.read()).value).toBe("123");

let read = reader.read();
reader.releaseLock();
expect(read).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
expect(reader.closed).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);

resolve();

reader = stream.getReader();
expect((await reader.read()).value).toBe("456");
});

it("new Response(stream).arrayBuffer() (bytes)", async () => {
var queue = [Buffer.from("abdefgh")];
var stream = new ReadableStream({
Expand Down Expand Up @@ -1053,3 +1102,42 @@ it("fs.createReadStream(filename) should be able to break inside async loop", as
expect(true).toBe(true);
}
});

it("pipeTo doesn't cause unhandled rejections on readable errors", async () => {
// https://github.com/WebKit/WebKit/blob/3a75b5d2de94aa396a99b454ac47f3be9e0dc726/LayoutTests/streams/pipeTo-unhandled-promise.html
let unhandledRejectionCaught = false;

const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);

const writable = new WritableStream();
const readable = new ReadableStream({ start: c => c.error("error") });
readable.pipeTo(writable).catch(() => {});

await Bun.sleep(15);

process.off("unhandledRejection", catchUnhandledRejection);

expect(unhandledRejectionCaught).toBe(false);
});

it("pipeThrough doesn't cause unhandled rejections on readable errors", async () => {
let unhandledRejectionCaught = false;

const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);

const readable = new ReadableStream({ start: c => c.error("error") });
const ts = new TransformStream();
readable.pipeThrough(ts);

await Bun.sleep(15);

process.off("unhandledRejection", catchUnhandledRejection);

expect(unhandledRejectionCaught).toBe(false);
});

0 comments on commit 6914c5e

Please sign in to comment.