diff --git a/lib/src/embedded/dispatcher.dart b/lib/src/embedded/dispatcher.dart index fae22b458..c9d501c00 100644 --- a/lib/src/embedded/dispatcher.dart +++ b/lib/src/embedded/dispatcher.dart @@ -7,10 +7,10 @@ import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; +import 'package:native_synchronization/mailbox.dart'; import 'package:path/path.dart' as p; import 'package:protobuf/protobuf.dart'; import 'package:sass/sass.dart' as sass; -import 'package:stream_channel/stream_channel.dart'; import 'embedded_sass.pb.dart'; import 'function_registry.dart'; @@ -30,83 +30,65 @@ final _outboundRequestId = 0; /// A class that dispatches messages to and from the host for a single /// compilation. final class Dispatcher { - /// The channel of encoded protocol buffers, connected to the host. - final StreamChannel _channel; + /// The mailbox for receiving messages from the host. + final Mailbox _mailbox; + + /// The sink for sending messages to the host. + final StreamSink _sink; /// The compilation ID for which this dispatcher is running. /// /// This is added to outgoing messages but is _not_ parsed from incoming /// messages, since that's already handled by the [IsolateDispatcher]. - final int _compilationId; + int _compilationId = 0; /// [_compilationId], serialized as a varint. - final Uint8List _compilationIdVarint; - - /// Whether this dispatcher has received its compile request. - var _compiling = false; + Uint8List _compilationIdVarint = Uint8List.fromList([0]); - /// A completer awaiting a response to an outbound request. - /// - /// Since each [Dispatcher] is only running a single-threaded compilation, it - /// can only ever have one request outstanding. - Completer? _outstandingRequest; + /// Whether a fatal error has occured during host request. + var _asyncError = false; /// Creates a [Dispatcher] that sends and receives encoded protocol buffers /// over [channel]. - Dispatcher(this._channel, this._compilationId) - : _compilationIdVarint = serializeVarint(_compilationId); + Dispatcher(this._mailbox, this._sink); /// Listens for incoming `CompileRequests` and runs their compilations. - /// - /// This may only be called once. Returns whether or not the compilation - /// succeeded. - Future listen() async { - var success = false; - await _channel.stream.listen((binaryMessage) async { - // Wait a single microtask tick so that we're running in a separate - // microtask from the initial request dispatch. Otherwise, [waitFor] will - // deadlock the event loop fiber that would otherwise be checking stdin - // for new input. - await Future.value(); + void listen() { + do { + var packet = _mailbox.take(); + if (packet.isEmpty) break; try { - InboundMessage? message; + var (compilationId, messageBuffer) = parsePacket(packet); + + _compilationId = compilationId; + _compilationIdVarint = serializeVarint(compilationId); + + InboundMessage message; try { - message = InboundMessage.fromBuffer(binaryMessage); + message = InboundMessage.fromBuffer(messageBuffer); } on InvalidProtocolBufferException catch (error) { throw parseError(error.message); } switch (message.whichMessage()) { - case InboundMessage_Message.versionRequest: - throw paramsError("VersionRequest must have compilation ID 0."); - case InboundMessage_Message.compileRequest: - if (_compiling) { - throw paramsError( - "A CompileRequest with compilation ID $_compilationId is " - "already active."); + var request = message.compileRequest; + var response = _compile(request); + if (!_asyncError) { + _send(OutboundMessage()..compileResponse = response); } - _compiling = true; + break; - var request = message.compileRequest; - var response = await _compile(request); - _send(OutboundMessage()..compileResponse = response); - success = true; - // Each Dispatcher runs a single compilation and then closes. - _channel.sink.close(); + case InboundMessage_Message.versionRequest: + throw paramsError("VersionRequest must have compilation ID 0."); case InboundMessage_Message.canonicalizeResponse: - _dispatchResponse(message.id, message.canonicalizeResponse); - case InboundMessage_Message.importResponse: - _dispatchResponse(message.id, message.importResponse); - case InboundMessage_Message.fileImportResponse: - _dispatchResponse(message.id, message.fileImportResponse); - case InboundMessage_Message.functionCallResponse: - _dispatchResponse(message.id, message.functionCallResponse); + throw paramsError( + "Response ID ${message.id} doesn't match any outstanding requests in compilation $_compilationId."); case InboundMessage_Message.notSet: throw parseError("InboundMessage.message is not set."); @@ -115,16 +97,14 @@ final class Dispatcher { throw parseError( "Unknown message type: ${message.toDebugString()}"); } - } on ProtocolError catch (error, stackTrace) { - sendError(handleError(error, stackTrace)); - _channel.sink.close(); + } catch (error, stackTrace) { + _handleError(error, stackTrace); } - }).asFuture(); - return success; + } while (!_asyncError); } - Future _compile( - InboundMessage_CompileRequest request) async { + OutboundMessage_CompileResponse _compile( + InboundMessage_CompileRequest request) { var functions = FunctionRegistry(); var style = request.style == OutputStyle.COMPRESSED @@ -241,63 +221,96 @@ final class Dispatcher { void sendLog(OutboundMessage_LogEvent event) => _send(OutboundMessage()..logEvent = event); - /// Sends [error] to the host. - void sendError(ProtocolError error) => - _send(OutboundMessage()..error = error); - - Future sendCanonicalizeRequest( + InboundMessage_CanonicalizeResponse sendCanonicalizeRequest( OutboundMessage_CanonicalizeRequest request) => _sendRequest( OutboundMessage()..canonicalizeRequest = request); - Future sendImportRequest( + InboundMessage_ImportResponse sendImportRequest( OutboundMessage_ImportRequest request) => _sendRequest( OutboundMessage()..importRequest = request); - Future sendFileImportRequest( + InboundMessage_FileImportResponse sendFileImportRequest( OutboundMessage_FileImportRequest request) => _sendRequest( OutboundMessage()..fileImportRequest = request); - Future sendFunctionCallRequest( + InboundMessage_FunctionCallResponse sendFunctionCallRequest( OutboundMessage_FunctionCallRequest request) => _sendRequest( OutboundMessage()..functionCallRequest = request); /// Sends [request] to the host and returns the message sent in response. - Future _sendRequest( - OutboundMessage request) async { - request.id = _outboundRequestId; - _send(request); - - if (_outstandingRequest != null) { - throw StateError( - "Dispatcher.sendRequest() can't be called when another request is " - "active."); - } + T _sendRequest(OutboundMessage message) { + message.id = _outboundRequestId; + _send(message); - return (_outstandingRequest = Completer()).future; - } + var packet = _mailbox.take(); + Uint8List messageBuffer = + Uint8List.sublistView(packet, _compilationIdVarint.length); - /// Dispatches [response] to the appropriate outstanding request. - /// - /// Throws an error if there's no outstanding request with the given [id] or - /// if that request is expecting a different type of response. - void _dispatchResponse(int? id, T response) { - var completer = _outstandingRequest; - _outstandingRequest = null; - if (completer == null || id != _outboundRequestId) { - throw paramsError( - "Response ID $id doesn't match any outstanding requests in " - "compilation $_compilationId."); - } else if (completer is! Completer) { - throw paramsError( - "Request ID $id doesn't match response type ${response.runtimeType} " - "in compilation $_compilationId."); + try { + InboundMessage message; + try { + message = InboundMessage.fromBuffer(messageBuffer); + } on InvalidProtocolBufferException catch (error) { + throw parseError(error.message); + } + + GeneratedMessage response; + switch (message.whichMessage()) { + case InboundMessage_Message.canonicalizeResponse: + response = message.canonicalizeResponse; + break; + + case InboundMessage_Message.importResponse: + response = message.importResponse; + break; + + case InboundMessage_Message.fileImportResponse: + response = message.fileImportResponse; + break; + + case InboundMessage_Message.functionCallResponse: + response = message.functionCallResponse; + break; + + case InboundMessage_Message.compileRequest: + throw paramsError( + "A CompileRequest with compilation ID $_compilationId is already active."); + + case InboundMessage_Message.versionRequest: + throw paramsError("VersionRequest must have compilation ID 0."); + + case InboundMessage_Message.notSet: + throw parseError("InboundMessage.message is not set."); + + default: + throw parseError("Unknown message type: ${message.toDebugString()}"); + } + if (message.id != _outboundRequestId) { + throw paramsError( + "Response ID ${message.id} doesn't match any outstanding requests in compilation $_compilationId."); + } + if (response is! T) { + throw paramsError( + "Request ID $_outboundRequestId doesn't match response type ${response.runtimeType} in compilation $_compilationId."); + } + return response; + } catch (error, stackTrace) { + _handleError(error, stackTrace); + _asyncError = true; + rethrow; } + } - completer.complete(response); + /// Handles an error thrown by the dispatcher or code it dispatches to. + /// + /// The [messageId] indicate the IDs of the message being responded to, if available. + void _handleError(Object error, StackTrace stackTrace, {int? messageId}) { + sendError(handleError(error, stackTrace, messageId: messageId)); + _sink.close(); } /// Sends [message] to the host with the given [wireId]. @@ -316,6 +329,11 @@ final class Dispatcher { : 0; packet.setAll(1, _compilationIdVarint); protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length); - _channel.sink.add(packet); + _sink.add(packet); + } + + /// Sends [error] to the host. + void sendError(ProtocolError error) { + _send(OutboundMessage()..error = error); } } diff --git a/lib/src/embedded/host_callable.dart b/lib/src/embedded/host_callable.dart index bb1770ea4..448cce217 100644 --- a/lib/src/embedded/host_callable.dart +++ b/lib/src/embedded/host_callable.dart @@ -2,9 +2,6 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -// ignore: deprecated_member_use -import 'dart:cli'; - import '../callable.dart'; import '../exception.dart'; import 'dispatcher.dart'; @@ -37,8 +34,7 @@ Callable hostCallable( request.name = callable.name; } - // ignore: deprecated_member_use - var response = waitFor(dispatcher.sendFunctionCallRequest(request)); + var response = dispatcher.sendFunctionCallRequest(request); try { switch (response.whichResult()) { case InboundMessage_FunctionCallResponse_Result.success: diff --git a/lib/src/embedded/importer/file.dart b/lib/src/embedded/importer/file.dart index b945cba2e..8250515eb 100644 --- a/lib/src/embedded/importer/file.dart +++ b/lib/src/embedded/importer/file.dart @@ -2,9 +2,6 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -// ignore: deprecated_member_use -import 'dart:cli'; - import '../../importer.dart'; import '../dispatcher.dart'; import '../embedded_sass.pb.dart' hide SourceSpan; @@ -27,30 +24,27 @@ final class FileImporter extends ImporterBase { Uri? canonicalize(Uri url) { if (url.scheme == 'file') return _filesystemImporter.canonicalize(url); - // ignore: deprecated_member_use - return waitFor(() async { - var response = await dispatcher - .sendFileImportRequest(OutboundMessage_FileImportRequest() - ..importerId = _importerId - ..url = url.toString() - ..fromImport = fromImport); - - switch (response.whichResult()) { - case InboundMessage_FileImportResponse_Result.fileUrl: - var url = parseAbsoluteUrl("The file importer", response.fileUrl); - if (url.scheme != 'file') { - throw 'The file importer must return a file: URL, was "$url"'; - } - - return _filesystemImporter.canonicalize(url); - - case InboundMessage_FileImportResponse_Result.error: - throw response.error; - - case InboundMessage_FileImportResponse_Result.notSet: - return null; - } - }()); + var response = + dispatcher.sendFileImportRequest(OutboundMessage_FileImportRequest() + ..importerId = _importerId + ..url = url.toString() + ..fromImport = fromImport); + + switch (response.whichResult()) { + case InboundMessage_FileImportResponse_Result.fileUrl: + var url = parseAbsoluteUrl("The file importer", response.fileUrl); + if (url.scheme != 'file') { + throw 'The file importer must return a file: URL, was "$url"'; + } + + return _filesystemImporter.canonicalize(url); + + case InboundMessage_FileImportResponse_Result.error: + throw response.error; + + case InboundMessage_FileImportResponse_Result.notSet: + return null; + } } ImporterResult? load(Uri url) => _filesystemImporter.load(url); diff --git a/lib/src/embedded/importer/host.dart b/lib/src/embedded/importer/host.dart index e4a952100..a1d5eed31 100644 --- a/lib/src/embedded/importer/host.dart +++ b/lib/src/embedded/importer/host.dart @@ -2,9 +2,6 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -// ignore: deprecated_member_use -import 'dart:cli'; - import '../../importer.dart'; import '../dispatcher.dart'; import '../embedded_sass.pb.dart' hide SourceSpan; @@ -19,44 +16,35 @@ final class HostImporter extends ImporterBase { HostImporter(Dispatcher dispatcher, this._importerId) : super(dispatcher); Uri? canonicalize(Uri url) { - // ignore: deprecated_member_use - return waitFor(() async { - var response = await dispatcher - .sendCanonicalizeRequest(OutboundMessage_CanonicalizeRequest() - ..importerId = _importerId - ..url = url.toString() - ..fromImport = fromImport); - - return switch (response.whichResult()) { - InboundMessage_CanonicalizeResponse_Result.url => - parseAbsoluteUrl("The importer", response.url), - InboundMessage_CanonicalizeResponse_Result.error => - throw response.error, - InboundMessage_CanonicalizeResponse_Result.notSet => null - }; - }()); + var response = + dispatcher.sendCanonicalizeRequest(OutboundMessage_CanonicalizeRequest() + ..importerId = _importerId + ..url = url.toString() + ..fromImport = fromImport); + + return switch (response.whichResult()) { + InboundMessage_CanonicalizeResponse_Result.url => + parseAbsoluteUrl("The importer", response.url), + InboundMessage_CanonicalizeResponse_Result.error => throw response.error, + InboundMessage_CanonicalizeResponse_Result.notSet => null + }; } ImporterResult? load(Uri url) { - // ignore: deprecated_member_use - return waitFor(() async { - var response = - await dispatcher.sendImportRequest(OutboundMessage_ImportRequest() - ..importerId = _importerId - ..url = url.toString()); - - return switch (response.whichResult()) { - InboundMessage_ImportResponse_Result.success => ImporterResult( - response.success.contents, - sourceMapUrl: response.success.sourceMapUrl.isEmpty - ? null - : parseAbsoluteUrl( - "The importer", response.success.sourceMapUrl), - syntax: syntaxToSyntax(response.success.syntax)), - InboundMessage_ImportResponse_Result.error => throw response.error, - InboundMessage_ImportResponse_Result.notSet => null - }; - }()); + var response = dispatcher.sendImportRequest(OutboundMessage_ImportRequest() + ..importerId = _importerId + ..url = url.toString()); + + return switch (response.whichResult()) { + InboundMessage_ImportResponse_Result.success => ImporterResult( + response.success.contents, + sourceMapUrl: response.success.sourceMapUrl.isEmpty + ? null + : parseAbsoluteUrl("The importer", response.success.sourceMapUrl), + syntax: syntaxToSyntax(response.success.syntax)), + InboundMessage_ImportResponse_Result.error => throw response.error, + InboundMessage_ImportResponse_Result.notSet => null + }; } String toString() => "HostImporter"; diff --git a/lib/src/embedded/isolate_dispatcher.dart b/lib/src/embedded/isolate_dispatcher.dart index 78d340997..cfa3c57c0 100644 --- a/lib/src/embedded/isolate_dispatcher.dart +++ b/lib/src/embedded/isolate_dispatcher.dart @@ -7,6 +7,8 @@ import 'dart:ffi'; import 'dart:isolate'; import 'dart:typed_data'; +import 'package:native_synchronization/mailbox.dart'; +import 'package:native_synchronization/sendable.dart'; import 'package:pool/pool.dart'; import 'package:protobuf/protobuf.dart'; import 'package:stream_channel/isolate_channel.dart'; @@ -18,16 +20,20 @@ import 'util/explicit_close_transformer.dart'; import 'util/proto_extensions.dart'; import 'utils.dart'; -/// The message sent to a previously-inactive isolate to initiate a new -/// compilation session. -/// -/// The [SendPort] is used to establish a dedicated [IsolateChannel] for this -/// compilation and the [int] is the compilation ID to use on the wire. -/// -/// We apply the compilation ID in the isolate for efficiency reasons: it allows -/// us to write the protobuf directly to the same buffer as the wire ID, which -/// saves a copy for each message. -typedef _InitialMessage = (SendPort, int); +/// A persisted mailbox resource lease from the pool that can be transfered over +/// to new owners. +class _Lease { + /// A mailbox. + final Mailbox mailbox; + + /// The compilationId. + int id = 0; + + /// The PoolResource. + PoolResource resource; + + _Lease(this.mailbox, this.id, this.resource); +} /// A class that dispatches messages between the host and various isolates that /// are each running an individual compilation. @@ -35,17 +41,17 @@ class IsolateDispatcher { /// The channel of encoded protocol buffers, connected to the host. final StreamChannel _channel; - /// A map from compilation IDs to the sinks for isolates running those - /// compilations. - final _activeIsolates = >{}; + /// All isolates. + final _isolates = >{}; - /// A set of isolates that are _not_ actively running compilations. - final _inactiveIsolates = >{}; + /// All sinks. Only used with ExplicitCloseTransformer for closing channels. + final _sinks = >{}; - /// The actual isolate objects that have been spawned. - /// - /// Only used for cleaning up the process when the underlying channel closes. - final _allIsolates = >[]; + /// A set of lease for tracking for inactive mailboxes. + final _inactive = <_Lease>{}; + + /// A map of active compilationId to mailbox. + final _mailboxes = {}; /// A pool controlling how many isolates (and thus concurrent compilations) /// may be live at once. @@ -55,10 +61,6 @@ class IsolateDispatcher { /// See https://github.com/sass/dart-sass/pull/2019 final _isolatePool = Pool(sizeOf() <= 4 ? 7 : 15); - /// Whether the underlying channel has closed and the dispatcher is shutting - /// down. - var _closed = false; - IsolateDispatcher(this._channel); void listen() { @@ -70,13 +72,15 @@ class IsolateDispatcher { (compilationId, messageBuffer) = parsePacket(packet); if (compilationId != 0) { - // TODO(nweiz): Consider using the techniques described in - // https://github.com/dart-lang/language/issues/124#issuecomment-988718728 - // or https://github.com/dart-lang/language/issues/3118 for low-cost - // inter-isolate transfers. - (_activeIsolates[compilationId] ?? await _getIsolate(compilationId)) - .add(messageBuffer); - return; + var mailbox = + (_mailboxes[compilationId] ?? await _getMailbox(compilationId)); + try { + mailbox.put(packet); + return; + } on StateError catch (_) { + throw paramsError( + "InboundMessage with compilation ID $compilationId is out of order."); + } } try { @@ -102,84 +106,67 @@ class IsolateDispatcher { }, onError: (Object error, StackTrace stackTrace) { _handleError(error, stackTrace); }, onDone: () async { - _closed = true; - for (var isolate in _allIsolates) { + for (var isolate in _isolates) { (await isolate).kill(); } - // Killing isolates isn't sufficient to make sure the process closes; we - // also have to close all the [ReceivePort]s we've constructed (by closing - // the [IsolateChannel]s). - for (var sink in _activeIsolates.values) { + for (var sink in _sinks) { sink.close(); } - for (var channel in _inactiveIsolates) { - channel.sink.close(); - } }); } - /// Returns an isolate that's ready to run a new compilation. + /// Returns the mailbox for an isolate that's ready to run a new compilation. /// /// This re-uses an existing isolate if possible, and spawns a new one /// otherwise. - Future> _getIsolate(int compilationId) async { + Future _getMailbox(int compilationId) async { var resource = await _isolatePool.request(); - if (_inactiveIsolates.isNotEmpty) { - return _activate(_inactiveIsolates.first, compilationId, resource); + if (_inactive.isNotEmpty) { + var lease = _inactive.first; + _inactive.remove(lease); + lease.id = compilationId; + lease.resource = resource; + _mailboxes[compilationId] = lease.mailbox; + return lease.mailbox; } + var mailbox = Mailbox(); + var lease = _Lease(mailbox, compilationId, resource); + _mailboxes[compilationId] = mailbox; + var receivePort = ReceivePort(); - var future = Isolate.spawn(_isolateMain, receivePort.sendPort); - _allIsolates.add(future); + var future = + Isolate.spawn(_isolateMain, (mailbox.asSendable, receivePort.sendPort)); + _isolates.add(future); await future; - var channel = IsolateChannel<_InitialMessage?>.connectReceive(receivePort) + var channel = IsolateChannel.connectReceive(receivePort) .transform(const ExplicitCloseTransformer()); - channel.stream.listen(null, - onError: (Object error, StackTrace stackTrace) => - _handleError(error, stackTrace), - onDone: _channel.sink.close); - return _activate(channel, compilationId, resource); - } - - /// Activates [isolate] for a new individual compilation. - /// - /// This pipes all the outputs from the given isolate through to [_channel]. - /// The [resource] is released once the isolate is no longer active. - StreamSink _activate(StreamChannel<_InitialMessage> isolate, - int compilationId, PoolResource resource) { - _inactiveIsolates.remove(isolate); - - // Each individual compilation has its own dedicated [IsolateChannel], which - // closes once the compilation is finished. - var receivePort = ReceivePort(); - isolate.sink.add((receivePort.sendPort, compilationId)); - - var channel = IsolateChannel.connectReceive(receivePort); - channel.stream.listen( - (message) { - // The first byte of messages from isolates indicates whether the - // entire compilation is finished. Sending this as part of the message - // buffer rather than a separate message avoids a race condition where - // the host might send a new compilation request with the same ID as - // one that just finished before the [IsolateDispatcher] receives word - // that the isolate with that ID is done. See sass/dart-sass#2004. - if (message[0] == 1) { - channel.sink.close(); - _activeIsolates.remove(compilationId); - _inactiveIsolates.add(isolate); - resource.release(); - } - _channel.sink.add(Uint8List.sublistView(message, 1)); - }, - onError: (Object error, StackTrace stackTrace) => - _handleError(error, stackTrace), - onDone: () { - if (_closed) isolate.sink.close(); - }); - _activeIsolates[compilationId] = channel.sink; - return channel.sink; + _sinks.add(channel.sink); + + channel.stream.listen((message) { + // The first byte of messages from isolates indicates whether the + // entire compilation is finished. Sending this as part of the message + // buffer rather than a separate message avoids a race condition where + // the host might send a new compilation request with the same ID as + // one that just finished before the [IsolateDispatcher] receives word + // that the isolate with that ID is done. See sass/dart-sass#2004. + if (message[0] == 1) { + _mailboxes.remove(lease.id); + _inactive.add(lease); + lease.resource.release(); + } + _channel.sink.add(Uint8List.sublistView(message, 1)); + }, onError: (Object error, StackTrace stackTrace) { + _handleError(error, stackTrace); + }, onDone: () { + try { + mailbox.put(Uint8List(0)); + } on StateError catch (_) {} + _channel.sink.close(); + }); + return mailbox; } /// Creates a [OutboundMessage_VersionResponse] @@ -211,14 +198,11 @@ class IsolateDispatcher { _send(compilationId, OutboundMessage()..error = error); } -void _isolateMain(SendPort sendPort) { - var channel = IsolateChannel<_InitialMessage?>.connectSend(sendPort) - .transform(const ExplicitCloseTransformer()); - channel.stream.listen((initialMessage) async { - var (compilationSendPort, compilationId) = initialMessage; - var compilationChannel = - IsolateChannel.connectSend(compilationSendPort); - var success = await Dispatcher(compilationChannel, compilationId).listen(); - if (!success) channel.sink.close(); - }); +void _isolateMain((Sendable, SendPort) message) { + var (sendableMailbox, sendPort) = message; + var mailbox = sendableMailbox.materialize(); + var sink = IsolateChannel.connectSend(sendPort) + .transform(const ExplicitCloseTransformer()) + .sink; + Dispatcher(mailbox, sink).listen(); } diff --git a/pubspec.yaml b/pubspec.yaml index e571a7d12..c778a70bf 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -19,6 +19,7 @@ dependencies: http: ">=0.13.3 <1.0.0" js: ^0.6.3 meta: ^1.3.0 + native_synchronization: ^0.2.0 node_interop: ^2.1.0 package_config: ^2.0.0 path: ^1.8.0