Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Request Cancellation, Timeouts via RequestController #978

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
19 changes: 0 additions & 19 deletions pkgs/cronet_http/cronet_http.iml

This file was deleted.

19 changes: 0 additions & 19 deletions pkgs/cupertino_http/cupertino_http.iml

This file was deleted.

5 changes: 5 additions & 0 deletions pkgs/http/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.2.0

* Add a `RequestController` class which can be used to manage the lifecycle of an
HTTP request (e.g., for timeouts and request cancellation).

## 1.1.0

* Add better error messages for `SocketException`s when using `IOClient`.
Expand Down
1 change: 1 addition & 0 deletions pkgs/http/lib/http.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export 'src/exception.dart';
export 'src/multipart_file.dart';
export 'src/multipart_request.dart';
export 'src/request.dart';
export 'src/request_controller.dart';
export 'src/response.dart';
export 'src/streamed_request.dart';
export 'src/streamed_response.dart';
Expand Down
5 changes: 4 additions & 1 deletion pkgs/http/lib/retry.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import 'http.dart';
/// resending it. This can cause a lot of memory usage when sending a large
/// [StreamedRequest].
final class RetryClient extends BaseClient {
@override
bool get supportsController => _inner.supportsController;

/// The wrapped client.
final Client _inner;

Expand Down Expand Up @@ -149,7 +152,7 @@ final class RetryClient extends BaseClient {
}

@override
void close() => _inner.close();
void close({bool force = true}) => _inner.close(force: force);
}

bool _defaultWhen(BaseResponse response) => response.statusCode == 503;
Expand Down
147 changes: 147 additions & 0 deletions pkgs/http/lib/src/active_request_tracker.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import 'dart:async';

import 'package:meta/meta.dart';

import 'base_request.dart';
import 'exception.dart';
import 'request_controller.dart';

/// Used internally to track a request's lifecycle, if a tracker exists for the
/// request (as specified in the [tracker] parameter). Otherwise returns the
/// original [future].
///
/// Used for more concise code in client implementations that track the
/// lifecycle.
///
/// See [RequestController] for the public API.
@internal
Future<T> maybeTrack<T>(
final Future<T> future, {
final ActiveRequestTracker? tracker,
final RequestLifecycleState? state,
void Function(Exception?)? onCancel,
}) {
if (tracker != null) {
return tracker.trackRequestState(
future,
state: state,
onCancel: onCancel,
);
} else {
return future;
}
}

/// Used internally to track a request's lifecycle.
/// See [RequestController] for the public API.
@internal
final class ActiveRequestTracker {
final BaseRequest request;

/// Whether the [ActiveRequestTracker] is tracking a streaming request.
/// The response timeout can only be handled internally for non-streaming
/// requests.
/// This signals to any clients that want to buffer the response that they
/// should track the response timeout themselves.
final bool isStreaming;

final List<Completer<void>> _pendingRequestActions = [];

ActiveRequestTracker(
this.request, {
required this.isStreaming,
Duration? timeout,
}) {
// If an overall timeout is specified, apply it to the completer for the
// request and cancel the request if it times out.
if (timeout != null) {
_inProgressCompleter.future.timeout(timeout, onTimeout: () {
_cancelWith(TimeoutException(null, timeout));
});
}
}

RequestController get controller => request.controller!;

final Completer<void> _inProgressCompleter = Completer<void>();

/// Whether the request is still in progress.
bool get inProgress => !_inProgressCompleter.isCompleted;

Future<T> trackRequestState<T>(
final Future<T> future, {
final RequestLifecycleState? state,
void Function(Exception?)? onCancel,
}) {
// If the request is not being processed, simply ignore any tracking.
if (!inProgress) {
return _inProgressCompleter.future.then((_) => future);
}

// Create a completer to track the request (and allow it to be cancelled).
final pendingRequestAction = Completer<T>();
_pendingRequestActions.add(pendingRequestAction);

// Return a future that tracks both; completing or error-ing with the
// result of the first one to complete. This means if
// [pendingRequestAction] is cancelled first, [future] will be discarded.
// If [future] completes first, [pendingRequestAction] will be discarded.
var cancellable = Future.any([pendingRequestAction.future, future]);

// If a timeout is specified for this state, apply it to the cancellable
// future.
if (state != null && controller.hasTimeoutForLifecycleState(state)) {
cancellable =
cancellable.timeout(controller.timeoutForLifecycleState(state)!);
}

cancellable
// If the cancellable future succeeds, and the state was the receiving
// state, mark the request as no longer in progress.
..then((value) {
if (state == RequestLifecycleState.receiving) {
_inProgressCompleter.complete();
}

return value;
})
// Handle timeouts by simply calling [onCancel] and rethrowing the
// timeout exception.
..onError<TimeoutException>(
(error, stackTrace) {
onCancel?.call(error);
throw error;
},
)
// Similarly, handle cancellations by calling [onCancel] and rethrowing
// the cancellation exception.
..onError<CancelledException>(
(error, stackTrace) {
onCancel?.call(error);
throw error;
},
)
// When the cancellable future completes, remove the pending request from
// the list of pending requests.
..whenComplete(
() => _pendingRequestActions.remove(pendingRequestAction),
);

return cancellable;
}

/// Cancels the request by expiring all pending request actions.
///
/// Does nothing if the request is not in progress.
void cancel([final String message = 'Request cancelled']) =>
_cancelWith(CancelledException(message));

void _cancelWith(Exception exception) {
if (!inProgress) return;
_inProgressCompleter.completeError(exception);

for (final pendingAction in _pendingRequestActions) {
pendingAction.completeError(exception);
}
}
}
62 changes: 43 additions & 19 deletions pkgs/http/lib/src/base_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import 'byte_stream.dart';
import 'client.dart';
import 'exception.dart';
import 'request.dart';
import 'request_controller.dart';
import 'response.dart';
import 'streamed_response.dart';

Expand All @@ -19,43 +20,66 @@ import 'streamed_response.dart';
/// maybe [close], and then they get various convenience methods for free.
abstract mixin class BaseClient implements Client {
@override
Future<Response> head(Uri url, {Map<String, String>? headers}) =>
_sendUnstreamed('HEAD', url, headers);
bool get supportsController => false;

@override
Future<Response> get(Uri url, {Map<String, String>? headers}) =>
_sendUnstreamed('GET', url, headers);
Future<Response> head(Uri url,
{Map<String, String>? headers, RequestController? controller}) =>
_sendUnstreamed('HEAD', url, headers, controller: controller);

@override
Future<Response> get(Uri url,
{Map<String, String>? headers, RequestController? controller}) =>
_sendUnstreamed('GET', url, headers, controller: controller);

@override
Future<Response> post(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('POST', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
RequestController? controller}) =>
_sendUnstreamed('POST', url, headers,
body: body, encoding: encoding, controller: controller);

@override
Future<Response> put(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('PUT', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
RequestController? controller}) =>
_sendUnstreamed('PUT', url, headers,
body: body, encoding: encoding, controller: controller);

@override
Future<Response> patch(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('PATCH', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
RequestController? controller}) =>
_sendUnstreamed('PATCH', url, headers,
body: body, encoding: encoding, controller: controller);

@override
Future<Response> delete(Uri url,
{Map<String, String>? headers, Object? body, Encoding? encoding}) =>
_sendUnstreamed('DELETE', url, headers, body, encoding);
{Map<String, String>? headers,
Object? body,
Encoding? encoding,
RequestController? controller}) =>
_sendUnstreamed('DELETE', url, headers,
body: body, encoding: encoding, controller: controller);

@override
Future<String> read(Uri url, {Map<String, String>? headers}) async {
final response = await get(url, headers: headers);
Future<String> read(Uri url,
{Map<String, String>? headers, RequestController? controller}) async {
final response = await get(url, headers: headers, controller: controller);
_checkResponseSuccess(url, response);
return response.body;
}

@override
Future<Uint8List> readBytes(Uri url, {Map<String, String>? headers}) async {
final response = await get(url, headers: headers);
Future<Uint8List> readBytes(Uri url,
{Map<String, String>? headers, RequestController? controller}) async {
final response = await get(url, headers: headers, controller: controller);
_checkResponseSuccess(url, response);
return response.bodyBytes;
}
Expand All @@ -73,8 +97,8 @@ abstract mixin class BaseClient implements Client {
/// Sends a non-streaming [Request] and returns a non-streaming [Response].
Future<Response> _sendUnstreamed(
String method, Uri url, Map<String, String>? headers,
[Object? body, Encoding? encoding]) async {
var request = Request(method, url);
{Object? body, Encoding? encoding, RequestController? controller}) async {
var request = Request(method, url, controller: controller);

if (headers != null) request.headers.addAll(headers);
if (encoding != null) request.encoding = encoding;
Expand Down Expand Up @@ -104,5 +128,5 @@ abstract mixin class BaseClient implements Client {
}

@override
void close() {}
void close({bool force = true}) {}
}
9 changes: 8 additions & 1 deletion pkgs/http/lib/src/base_request.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'base_client.dart';
import 'base_response.dart';
import 'byte_stream.dart';
import 'client.dart';
import 'request_controller.dart';
import 'streamed_response.dart';
import 'utils.dart';

Expand All @@ -21,6 +22,12 @@ import 'utils.dart';
/// over the request properties. However, usually it's easier to use convenience
/// methods like [get] or [BaseClient.get].
abstract class BaseRequest {
/// The [RequestController] of the request.
///
/// Used to manage the lifecycle of the request. If this is `null`, the
/// request is assumed to be unmanaged and it cannot be cancelled.
final RequestController? controller;

/// The HTTP method of the request.
///
/// Most commonly "GET" or "POST", less commonly "HEAD", "PUT", or "DELETE".
Expand Down Expand Up @@ -96,7 +103,7 @@ abstract class BaseRequest {
return method;
}

BaseRequest(String method, this.url)
BaseRequest(String method, this.url, {this.controller})
: method = _validateMethod(method),
headers = LinkedHashMap(
equals: (key1, key2) => key1.toLowerCase() == key2.toLowerCase(),
Expand Down
Loading