Skip to content

Commit

Permalink
[gui] read everything from grpc response streams
Browse files Browse the repository at this point in the history
we have bidirectional streaming rpcs, but we usually send only one
response back on that stream for most operations. therefore, the gui
would only read data from the stream until it got the first reply. but
this introduced errors in which the stream was disposed too early. now
we read everything from the stream, so that issue shouldn't happen.
  • Loading branch information
andrei-toterman committed Dec 10, 2024
1 parent 5b6f110 commit ca75b7a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
38 changes: 26 additions & 12 deletions src/client/gui/lib/grpc_client.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:fpdart/fpdart.dart';
import 'package:grpc/grpc.dart';
import 'package:protobuf/protobuf.dart' hide RpcClient;
Expand Down Expand Up @@ -100,7 +100,7 @@ class GrpcClient {
.start(Stream.value(request))
.doOnData(checkForUpdate)
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<StopReply?> stop(Iterable<String> names) {
Expand All @@ -111,7 +111,7 @@ class GrpcClient {
return _client
.stop(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<SuspendReply?> suspend(Iterable<String> names) {
Expand All @@ -122,7 +122,7 @@ class GrpcClient {
return _client
.suspend(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<RestartReply?> restart(Iterable<String> names) {
Expand All @@ -134,7 +134,7 @@ class GrpcClient {
.restart(Stream.value(request))
.doOnData(checkForUpdate)
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<DeleteReply?> delete(Iterable<String> names) {
Expand All @@ -147,7 +147,7 @@ class GrpcClient {
return _client
.delet(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<RecoverReply?> recover(Iterable<String> names) {
Expand All @@ -158,7 +158,7 @@ class GrpcClient {
return _client
.recover(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<DeleteReply?> purge(Iterable<String> names) {
Expand All @@ -172,7 +172,7 @@ class GrpcClient {
return _client
.delet(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<List<VmInfo>> info([Iterable<String> names = const []]) {
Expand All @@ -193,7 +193,7 @@ class GrpcClient {
return _client
.mount(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<void> umount(String name, [String? path]) {
Expand All @@ -204,7 +204,7 @@ class GrpcClient {
return _client
.umount(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<FindReply> find({bool images = true, bool blueprints = true}) {
Expand Down Expand Up @@ -254,7 +254,7 @@ class GrpcClient {
return _client
.set(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
.lastOrNull;
}

Future<SSHInfo?> sshInfo(String name) {
Expand All @@ -263,7 +263,7 @@ class GrpcClient {
return _client
.ssh_info(Stream.value(request))
.doOnEach(logGrpc(request))
.first
.last
.then((reply) => reply.sshInfo[name]);
}

Expand Down Expand Up @@ -299,3 +299,17 @@ class CustomChannelCredentials extends ChannelCredentials {
return ctx;
}
}

extension<T> on Stream<T> {
Future<T?> get lastOrNull {
final completer = Completer<T?>.sync();
T? result;
listen(
(event) => result = event,
onError: completer.completeError,
onDone: () => completer.complete(result),
cancelOnError: true,
);
return completer.future;
}
}
3 changes: 0 additions & 3 deletions src/client/gui/lib/providers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ final daemonAvailableProvider = Provider((ref) {
if (message.contains('failed to obtain exit status for remote process')) {
return true;
}
if (message.contains('Connection is being forcefully terminated')) {
return true;
}
}
return false;
});
Expand Down

0 comments on commit ca75b7a

Please sign in to comment.