From 8fab6a6fa26e326d72c6dcb6a882e502c1a25a6d Mon Sep 17 00:00:00 2001 From: Andrei Toterman Date: Tue, 10 Dec 2024 15:47:04 +0100 Subject: [PATCH] [gui] read everything from grpc response streams we have bidirectional streaming rpcs, but we usually send only one response back on that stream for most operations. therefore, the gui would only read data from the stream until it got the first reply. but this introduced errors in which the stream was disposed too early. now we read everything from the stream, so that issue shouldn't happen. --- src/client/gui/lib/grpc_client.dart | 38 ++++++++++++++++++++--------- src/client/gui/lib/providers.dart | 3 --- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/client/gui/lib/grpc_client.dart b/src/client/gui/lib/grpc_client.dart index 603ac751ed3..9f5e96d9be0 100644 --- a/src/client/gui/lib/grpc_client.dart +++ b/src/client/gui/lib/grpc_client.dart @@ -1,6 +1,6 @@ +import 'dart:async'; import 'dart:io'; -import 'package:async/async.dart'; import 'package:fpdart/fpdart.dart'; import 'package:grpc/grpc.dart'; import 'package:protobuf/protobuf.dart' hide RpcClient; @@ -100,7 +100,7 @@ class GrpcClient { .start(Stream.value(request)) .doOnData(checkForUpdate) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future stop(Iterable names) { @@ -111,7 +111,7 @@ class GrpcClient { return _client .stop(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future suspend(Iterable names) { @@ -122,7 +122,7 @@ class GrpcClient { return _client .suspend(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future restart(Iterable names) { @@ -134,7 +134,7 @@ class GrpcClient { .restart(Stream.value(request)) .doOnData(checkForUpdate) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future delete(Iterable names) { @@ -147,7 +147,7 @@ class GrpcClient { return _client .delet(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future recover(Iterable names) { @@ -158,7 +158,7 @@ class GrpcClient { return _client .recover(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future purge(Iterable names) { @@ -172,7 +172,7 @@ class GrpcClient { return _client .delet(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future> info([Iterable names = const []]) { @@ -193,7 +193,7 @@ class GrpcClient { return _client .mount(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future umount(String name, [String? path]) { @@ -204,7 +204,7 @@ class GrpcClient { return _client .umount(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future find({bool images = true, bool blueprints = true}) { @@ -254,7 +254,7 @@ class GrpcClient { return _client .set(Stream.value(request)) .doOnEach(logGrpc(request)) - .firstOrNull; + .lastOrNull; } Future sshInfo(String name) { @@ -263,7 +263,7 @@ class GrpcClient { return _client .ssh_info(Stream.value(request)) .doOnEach(logGrpc(request)) - .first + .last .then((reply) => reply.sshInfo[name]); } @@ -299,3 +299,17 @@ class CustomChannelCredentials extends ChannelCredentials { return ctx; } } + +extension on Stream { + Future get lastOrNull { + final completer = Completer.sync(); + T? result; + listen( + (event) => result = event, + onError: completer.completeError, + onDone: () => completer.complete(result), + cancelOnError: true, + ); + return completer.future; + } +} diff --git a/src/client/gui/lib/providers.dart b/src/client/gui/lib/providers.dart index dccc9c75366..f3d0e898609 100644 --- a/src/client/gui/lib/providers.dart +++ b/src/client/gui/lib/providers.dart @@ -68,9 +68,6 @@ final daemonAvailableProvider = Provider((ref) { if (message.contains('failed to obtain exit status for remote process')) { return true; } - if (message.contains('Connection is being forcefully terminated')) { - return true; - } } return false; });