From 45141753dc37ecce33814184ab25f63885595b9e Mon Sep 17 00:00:00 2001 From: Lucas Barrena Date: Sat, 3 Feb 2024 21:51:35 -0300 Subject: [PATCH] Cleanup not working --- index.js | 28 +++++++++++++++------------- test.js | 45 +++++++++++++++++++++------------------------ 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/index.js b/index.js index 2e54ef1..7719f07 100644 --- a/index.js +++ b/index.js @@ -52,8 +52,8 @@ module.exports = class WebPeer extends ReadyResource { } _close () { - // this.peer.destroy() - // if (this.mux) this.mux.destroy() + console.log('_closed') + this.peer.close() this.stream.destroy() } @@ -67,9 +67,12 @@ module.exports = class WebPeer extends ReadyResource { handshakeHash: this.mux.stream.handshakeHash }) // new SecretStream(false, rawStream) - this.remote.once('connect', () => { - this.emit('continue', this.remote) // TODO: It should be a Duplex to avoid this event + this.remote.on('close', () => { + console.log('remote closed') + this.close().catch(safetyCatch) }) + + this.emit('continue', this.remote) // TODO: It should be a Duplex to avoid this event } if (this.mux.stream.isInitiator) { @@ -88,14 +91,12 @@ module.exports = class WebPeer extends ReadyResource { } } - _onwireice ({ ice }) { - this.peer.addIceCandidate(new RTCIceCandidate(ice)) + async _onwireice ({ ice }) { + await this.peer.addIceCandidate(new RTCIceCandidate(ice)) } async _onwireoffer ({ offer }) { - // TODO: Only once - - this.peer.setRemoteDescription(offer) + await this.peer.setRemoteDescription(offer) const answer = await this.peer.createAnswer() await this.peer.setLocalDescription(answer) @@ -103,8 +104,8 @@ module.exports = class WebPeer extends ReadyResource { this.channel.messages[2].send({ answer: this.peer.localDescription }) } - _onwireanswer ({ answer }) { - this.peer.setRemoteDescription(answer) + async _onwireanswer ({ answer }) { + await this.peer.setRemoteDescription(answer) } _onmuxerror (err) { @@ -114,8 +115,9 @@ module.exports = class WebPeer extends ReadyResource { _onmuxclose (isRemote) { console.log('_onmuxclose', { isRemote }, 'Stream created?', !!this.remote) - // if (!this.remote) this.peer.destroy() - // this.mux.destroy() + if (!this.remote) this.peer.close() + + this.stream.destroy() } } diff --git a/test.js b/test.js index 481d20f..374f8c5 100644 --- a/test.js +++ b/test.js @@ -41,9 +41,14 @@ async function writer (t, { relayAddress }) { swarm.on('connection', function (signal) { const peer = new HyperWebRTC(signal) + // t.teardown(() => peer.close()) + peer.on('continue', function (stream) { console.log('core replicate') - core.replicate(stream) + t.teardown(() => stream.destroy()) + + const s = core.replicate(stream) + stream.on('close', () => s.destroy()) }) }) const discoveryWeb = crypto.discoveryKey(core.discoveryKey) @@ -71,9 +76,14 @@ async function reader (t, key, { relayAddress }) { swarm.on('connection', function (signal) { const peer = new HyperWebRTC(signal) + // t.teardown(() => peer.close()) + peer.on('continue', function (stream) { console.log('clone replicate') - clone.replicate(stream) + t.teardown(() => stream.destroy()) + + const s = clone.replicate(stream) + stream.on('close', () => s.destroy()) }) }) const discoveryWeb = crypto.discoveryKey(clone.discoveryKey) @@ -88,27 +98,29 @@ async function reader (t, key, { relayAddress }) { function createRelayClient (t, relayAddress) { const ws = new WebSocket(relayAddress) const dht = new DHTRelay(new Stream(true, ws)) + // TODO: dht-relay does not have 'close' event - t.teardown(() => dht.destroy(), { order: Infinity }) + t.teardown(() => dht.destroy({ force: true }), { order: Infinity }) return dht } function createRelayServer (t, { bootstrap }) { - // const dht = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) - const dht = new DHT({ bootstrap }) const server = new WebSocketServer({ port: 0 }) + const connections = new Set() server.on('connection', function (socket) { + connections.add(socket) + socket.on('close', () => connections.delete(socket)) + relay(dht, new Stream(false, socket)) }) - // await waitForServer(server) - t.teardown(async function () { - console.log('teardown') - await new Promise(resolve => server.close(resolve)) + const closing = new Promise(resolve => server.close(resolve)) + for (const socket of connections) socket.terminate() + await closing await dht.destroy() }) @@ -122,18 +134,3 @@ async function createBootstrap (t) { return testnet.bootstrap } - -/* function waitForServer (server) { - return new Promise((resolve, reject) => { - server.on('listening', done) - server.on('error', done) - - function done (err) { - server.off('listening', done) - server.off('error', done) - - if (err) reject(err) - else resolve() - } - }) -} */