Skip to content

Commit

Permalink
Persist isolates and use mailbox
Browse files Browse the repository at this point in the history
  • Loading branch information
ntkme committed Aug 10, 2023
1 parent bb24476 commit 795b870
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 263 deletions.
205 changes: 112 additions & 93 deletions lib/src/embedded/dispatcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:native_synchronization/mailbox.dart';
import 'package:path/path.dart' as p;
import 'package:protobuf/protobuf.dart';
import 'package:sass/sass.dart' as sass;
import 'package:stream_channel/stream_channel.dart';

import 'embedded_sass.pb.dart';
import 'function_registry.dart';
Expand All @@ -30,83 +30,65 @@ final _outboundRequestId = 0;
/// A class that dispatches messages to and from the host for a single
/// compilation.
final class Dispatcher {
/// The channel of encoded protocol buffers, connected to the host.
final StreamChannel<Uint8List> _channel;
/// The mailbox for receiving messages from the host.
final Mailbox _mailbox;

/// The sink for sending messages to the host.
final StreamSink<Uint8List> _sink;

/// The compilation ID for which this dispatcher is running.
///
/// This is added to outgoing messages but is _not_ parsed from incoming
/// messages, since that's already handled by the [IsolateDispatcher].
final int _compilationId;
int _compilationId = 0;

/// [_compilationId], serialized as a varint.
final Uint8List _compilationIdVarint;

/// Whether this dispatcher has received its compile request.
var _compiling = false;
Uint8List _compilationIdVarint = Uint8List.fromList([0]);

/// A completer awaiting a response to an outbound request.
///
/// Since each [Dispatcher] is only running a single-threaded compilation, it
/// can only ever have one request outstanding.
Completer<GeneratedMessage>? _outstandingRequest;
/// Whether a fatal error has occured during host request.
var _asyncError = false;

/// Creates a [Dispatcher] that sends and receives encoded protocol buffers
/// over [channel].
Dispatcher(this._channel, this._compilationId)
: _compilationIdVarint = serializeVarint(_compilationId);
Dispatcher(this._mailbox, this._sink);

/// Listens for incoming `CompileRequests` and runs their compilations.
///
/// This may only be called once. Returns whether or not the compilation
/// succeeded.
Future<bool> listen() async {
var success = false;
await _channel.stream.listen((binaryMessage) async {
// Wait a single microtask tick so that we're running in a separate
// microtask from the initial request dispatch. Otherwise, [waitFor] will
// deadlock the event loop fiber that would otherwise be checking stdin
// for new input.
await Future<void>.value();
void listen() {
do {
var packet = _mailbox.take();
if (packet.isEmpty) break;

try {
InboundMessage? message;
var (compilationId, messageBuffer) = parsePacket(packet);

_compilationId = compilationId;
_compilationIdVarint = serializeVarint(compilationId);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(binaryMessage);
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

switch (message.whichMessage()) {
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.compileRequest:
if (_compiling) {
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is "
"already active.");
var request = message.compileRequest;
var response = _compile(request);
if (!_asyncError) {
_send(OutboundMessage()..compileResponse = response);
}
_compiling = true;
break;

var request = message.compileRequest;
var response = await _compile(request);
_send(OutboundMessage()..compileResponse = response);
success = true;
// Each Dispatcher runs a single compilation and then closes.
_channel.sink.close();
case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.canonicalizeResponse:
_dispatchResponse(message.id, message.canonicalizeResponse);

case InboundMessage_Message.importResponse:
_dispatchResponse(message.id, message.importResponse);

case InboundMessage_Message.fileImportResponse:
_dispatchResponse(message.id, message.fileImportResponse);

case InboundMessage_Message.functionCallResponse:
_dispatchResponse(message.id, message.functionCallResponse);
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests in compilation $_compilationId.");

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");
Expand All @@ -115,16 +97,14 @@ final class Dispatcher {
throw parseError(
"Unknown message type: ${message.toDebugString()}");
}
} on ProtocolError catch (error, stackTrace) {
sendError(handleError(error, stackTrace));
_channel.sink.close();
} catch (error, stackTrace) {
_handleError(error, stackTrace);
}
}).asFuture<void>();
return success;
} while (!_asyncError);
}

Future<OutboundMessage_CompileResponse> _compile(
InboundMessage_CompileRequest request) async {
OutboundMessage_CompileResponse _compile(
InboundMessage_CompileRequest request) {
var functions = FunctionRegistry();

var style = request.style == OutputStyle.COMPRESSED
Expand Down Expand Up @@ -241,63 +221,97 @@ final class Dispatcher {
void sendLog(OutboundMessage_LogEvent event) =>
_send(OutboundMessage()..logEvent = event);

/// Sends [error] to the host.
void sendError(ProtocolError error) =>
_send(OutboundMessage()..error = error);

Future<InboundMessage_CanonicalizeResponse> sendCanonicalizeRequest(
InboundMessage_CanonicalizeResponse sendCanonicalizeRequest(
OutboundMessage_CanonicalizeRequest request) =>
_sendRequest<InboundMessage_CanonicalizeResponse>(
OutboundMessage()..canonicalizeRequest = request);

Future<InboundMessage_ImportResponse> sendImportRequest(
InboundMessage_ImportResponse sendImportRequest(
OutboundMessage_ImportRequest request) =>
_sendRequest<InboundMessage_ImportResponse>(
OutboundMessage()..importRequest = request);

Future<InboundMessage_FileImportResponse> sendFileImportRequest(
InboundMessage_FileImportResponse sendFileImportRequest(
OutboundMessage_FileImportRequest request) =>
_sendRequest<InboundMessage_FileImportResponse>(
OutboundMessage()..fileImportRequest = request);

Future<InboundMessage_FunctionCallResponse> sendFunctionCallRequest(
InboundMessage_FunctionCallResponse sendFunctionCallRequest(
OutboundMessage_FunctionCallRequest request) =>
_sendRequest<InboundMessage_FunctionCallResponse>(
OutboundMessage()..functionCallRequest = request);

/// Sends [request] to the host and returns the message sent in response.
Future<T> _sendRequest<T extends GeneratedMessage>(
OutboundMessage request) async {
request.id = _outboundRequestId;
_send(request);

if (_outstandingRequest != null) {
throw StateError(
"Dispatcher.sendRequest() can't be called when another request is "
"active.");
}
T _sendRequest<T extends GeneratedMessage>(OutboundMessage message) {
message.id = _outboundRequestId;
_send(message);

return (_outstandingRequest = Completer<T>()).future;
}
var packet = _mailbox.take();

/// Dispatches [response] to the appropriate outstanding request.
///
/// Throws an error if there's no outstanding request with the given [id] or
/// if that request is expecting a different type of response.
void _dispatchResponse<T extends GeneratedMessage>(int? id, T response) {
var completer = _outstandingRequest;
_outstandingRequest = null;
if (completer == null || id != _outboundRequestId) {
throw paramsError(
"Response ID $id doesn't match any outstanding requests in "
"compilation $_compilationId.");
} else if (completer is! Completer<T>) {
throw paramsError(
"Request ID $id doesn't match response type ${response.runtimeType} "
"in compilation $_compilationId.");
try {
var messageBuffer =
Uint8List.sublistView(packet, _compilationIdVarint.length);

InboundMessage message;
try {
message = InboundMessage.fromBuffer(messageBuffer);
} on InvalidProtocolBufferException catch (error) {
throw parseError(error.message);
}

GeneratedMessage response;
switch (message.whichMessage()) {
case InboundMessage_Message.canonicalizeResponse:
response = message.canonicalizeResponse;
break;

case InboundMessage_Message.importResponse:
response = message.importResponse;
break;

case InboundMessage_Message.fileImportResponse:
response = message.fileImportResponse;
break;

case InboundMessage_Message.functionCallResponse:
response = message.functionCallResponse;
break;

case InboundMessage_Message.compileRequest:
throw paramsError(
"A CompileRequest with compilation ID $_compilationId is already active.");

case InboundMessage_Message.versionRequest:
throw paramsError("VersionRequest must have compilation ID 0.");

case InboundMessage_Message.notSet:
throw parseError("InboundMessage.message is not set.");

default:
throw parseError("Unknown message type: ${message.toDebugString()}");
}
if (message.id != _outboundRequestId) {
throw paramsError(
"Response ID ${message.id} doesn't match any outstanding requests in compilation $_compilationId.");
}
if (response is! T) {
throw paramsError(
"Request ID $_outboundRequestId doesn't match response type ${response.runtimeType} in compilation $_compilationId.");
}
return response;
} catch (error, stackTrace) {
_handleError(error, stackTrace);
_asyncError = true;
rethrow;
}
}

completer.complete(response);
/// Handles an error thrown by the dispatcher or code it dispatches to.
///
/// The [messageId] indicate the IDs of the message being responded to, if available.
void _handleError(Object error, StackTrace stackTrace, {int? messageId}) {
sendError(handleError(error, stackTrace, messageId: messageId));
_sink.close();
}

/// Sends [message] to the host with the given [wireId].
Expand All @@ -316,6 +330,11 @@ final class Dispatcher {
: 0;
packet.setAll(1, _compilationIdVarint);
protobufWriter.writeTo(packet, 1 + _compilationIdVarint.length);
_channel.sink.add(packet);
_sink.add(packet);
}

/// Sends [error] to the host.
void sendError(ProtocolError error) {
_send(OutboundMessage()..error = error);
}
}
6 changes: 1 addition & 5 deletions lib/src/embedded/host_callable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// ignore: deprecated_member_use
import 'dart:cli';

import '../callable.dart';
import '../exception.dart';
import 'dispatcher.dart';
Expand Down Expand Up @@ -37,8 +34,7 @@ Callable hostCallable(
request.name = callable.name;
}

// ignore: deprecated_member_use
var response = waitFor(dispatcher.sendFunctionCallRequest(request));
var response = dispatcher.sendFunctionCallRequest(request);
try {
switch (response.whichResult()) {
case InboundMessage_FunctionCallResponse_Result.success:
Expand Down
48 changes: 21 additions & 27 deletions lib/src/embedded/importer/file.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// ignore: deprecated_member_use
import 'dart:cli';

import '../../importer.dart';
import '../dispatcher.dart';
import '../embedded_sass.pb.dart' hide SourceSpan;
Expand All @@ -27,30 +24,27 @@ final class FileImporter extends ImporterBase {
Uri? canonicalize(Uri url) {
if (url.scheme == 'file') return _filesystemImporter.canonicalize(url);

// ignore: deprecated_member_use
return waitFor(() async {
var response = await dispatcher
.sendFileImportRequest(OutboundMessage_FileImportRequest()
..importerId = _importerId
..url = url.toString()
..fromImport = fromImport);

switch (response.whichResult()) {
case InboundMessage_FileImportResponse_Result.fileUrl:
var url = parseAbsoluteUrl("The file importer", response.fileUrl);
if (url.scheme != 'file') {
throw 'The file importer must return a file: URL, was "$url"';
}

return _filesystemImporter.canonicalize(url);

case InboundMessage_FileImportResponse_Result.error:
throw response.error;

case InboundMessage_FileImportResponse_Result.notSet:
return null;
}
}());
var response =
dispatcher.sendFileImportRequest(OutboundMessage_FileImportRequest()
..importerId = _importerId
..url = url.toString()
..fromImport = fromImport);

switch (response.whichResult()) {
case InboundMessage_FileImportResponse_Result.fileUrl:
var url = parseAbsoluteUrl("The file importer", response.fileUrl);
if (url.scheme != 'file') {
throw 'The file importer must return a file: URL, was "$url"';
}

return _filesystemImporter.canonicalize(url);

case InboundMessage_FileImportResponse_Result.error:
throw response.error;

case InboundMessage_FileImportResponse_Result.notSet:
return null;
}
}

ImporterResult? load(Uri url) => _filesystemImporter.load(url);
Expand Down
Loading

0 comments on commit 795b870

Please sign in to comment.