Skip to content

Commit

Permalink
fix: clean up handles in worker_threads environments to prevent aborting
Browse files Browse the repository at this point in the history
  • Loading branch information
oyyd committed Oct 16, 2024
1 parent bd3d460 commit 27b8c36
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# CHANGELOG

## 0.2.6
fix: clean up handles in worker_threads environments to prevent aborting

## 0.2.5
fix: close the socket if connecting failed

Expand Down
1 change: 1 addition & 0 deletions js/addon.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

export function socketNewSoReuseportFd(domain: string, port: number, ip: string): number
export function socketClose(fd: number): void
export function addHook(cb: (...args: any[]) => any): void
export class SeqpacketSocketWrap {
constructor(ee: object, fd?: number | undefined | null)
init(thisObj: object): void
Expand Down
4 changes: 4 additions & 0 deletions js/dgram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { EventEmitter } from 'events';
import {
DgramSocketWrap
} from './addon';
import { addSocket, deleteSocket } from './uv_socket_util'

type FnRecv = (err: undefined | Error, buf: Buffer) => void;
export type SendCb = (err: undefined | Error) => void;
Expand Down Expand Up @@ -39,6 +40,8 @@ export class DgramSocket extends EventEmitter {
this.wrap.startRecv();
this.on('_data', this.onData);
this.on('_error', this.onError);

addSocket(this)
}

private onData = (buf: Buffer, filepath: string) => {
Expand Down Expand Up @@ -134,6 +137,7 @@ export class DgramSocket extends EventEmitter {
if (this.closed) {
return;
}
deleteSocket(this);
this.closed = true;
this.wrap.close();
}
Expand Down
11 changes: 11 additions & 0 deletions js/seqpacket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { EventEmitter } from 'events';
import {
SeqpacketSocketWrap
} from './addon';
import { addSocket, deleteSocket } from './uv_socket_util'

export type NotifyCb = () => void;

Expand Down Expand Up @@ -154,6 +155,8 @@ export class SeqpacketSocket extends EventEmitter {
this.on('_connect', this.onConnect);
this.on('_error', this.onError);
this.on('_shutdown', this.onShutdown);

addSocket(this);
}

private onEnd = () => {
Expand Down Expand Up @@ -289,6 +292,14 @@ export class SeqpacketSocket extends EventEmitter {
}
this.destroyed = true;
this.wrap.close();
deleteSocket(this);
}

/**
* Alias of "destory".
*/
close() {
return this.destroy();
}

/**
Expand Down
35 changes: 35 additions & 0 deletions js/uv_socket_util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { addHook } from './addon'
import * as workerThreads from 'worker_threads'

interface ClosableSocket {
close(): void
}

const socketSet = new Set<ClosableSocket>()

export function addSocket(socket: ClosableSocket) {
socketSet.add(socket)
}

export function deleteSocket(socket: ClosableSocket) {
socketSet.delete(socket)
}

// Cleanup for worker_threads environment. Otherwise Node.js will abort because there are
// open uv sockets while exiting.
function cleanup() {
const sockets = socketSet.values()

let next = sockets.next();
while (!next.done) {
const socket = next.value
socket.close()
next = sockets.next();
}

socketSet.clear()
}

if (!workerThreads.isMainThread) {
addHook(cleanup)
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-unix-socket",
"version": "0.2.5",
"version": "0.2.6",
"main": "js/index.js",
"types": "js/index.d.ts",
"author": {
Expand Down
16 changes: 15 additions & 1 deletion src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::str::FromStr;

use crate::util::{error, get_err, resolve_libc_err, resolve_uv_err};
use libc::{c_void, sockaddr_storage, sockaddr_un};
use napi::{Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result, bindgen_prelude::FromNapiValue};
use napi::{
bindgen_prelude::FromNapiValue, Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref,
Result,
};
use uv_sys::sys;

pub(crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> {
Expand Down Expand Up @@ -275,3 +278,14 @@ fn socket_close(fd: JsNumber) -> Result<()> {

close(fd)
}

#[napi]
#[allow(dead_code)]
pub fn add_hook(mut env: Env, cb: JsFunction) -> Result<()> {
env.add_env_cleanup_hook((), move |_| {
let args: Vec<JsUnknown> = vec![];
// no chance to handle the error
let _ = cb.call(Option::None, &args);
})?;
Ok(())
}
7 changes: 4 additions & 3 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::ffi::CStr;
use std::intrinsics::transmute;
use std::mem;

use libc::{sockaddr, sockaddr_un, c_char};
use napi::{self, Error, JsBuffer, Result, JsFunction, JsObject};
use libc::{c_char, sockaddr, sockaddr_un};
use napi::{self, Error, JsBuffer, JsFunction, JsObject, Result};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, OFlag};
use uv_sys::sys;
Expand Down Expand Up @@ -51,7 +51,8 @@ pub(crate) fn uv_err_msg(errno: i32) -> String {
let ret = CStr::from_ptr(ret);
ret
.to_str()
.map_err(|_| error("parsing cstr failed".to_string())).unwrap()
.map_err(|_| error("parsing cstr failed".to_string()))
.unwrap()
.to_string()
};

Expand Down

0 comments on commit 27b8c36

Please sign in to comment.