Skip to content

Commit

Permalink
Implement Hedging (Retry Slow Parts) for Upload (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 authored Oct 8, 2024
1 parent d39e1e1 commit 31550a0
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 4 deletions.
1 change: 1 addition & 0 deletions .cargo-deny-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ allow = [
"MPL-2.0",
"Unicode-DFS-2016",
"Unicode-3.0",
"Zlib"
]
confidence-threshold = 1.0
exceptions = [
Expand Down
4 changes: 2 additions & 2 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ futures-util = "0.3.30"
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tower = { version = "0.5.1", features = ["limit", "retry", "util"] }
tower = { version = "0.5.1", features = ["limit", "retry", "util", "hedge", "buffer"] }
tracing = "0.1"

[dev-dependencies]
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-mocks-experimental = "0.2.1"
aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
console-subscriber = "0.4.0"
http-02x = { package = "http", version = "0.2.9" }
http-body-1x = { package = "http-body", version = "1" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/src/io/part_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ReadPart for PartReader {
}

/// Data for a single part
#[derive(Debug, Clone)]
pub(crate) struct PartData {
// 1-indexed
pub(crate) part_number: u64,
Expand Down
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

pub(crate) mod hedge;
pub(crate) mod limit;
pub(crate) mod retry;
84 changes: 84 additions & 0 deletions aws-s3-transfer-manager/src/middleware/hedge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use std::time::Duration;
use tower::{
hedge::{Hedge, Policy},
layer::layer_fn,
BoxError, Layer, Service,
};

/// S3 recommends retrying the slowest 5% of the requests.
const LATENCY_PERCENTILE: f32 = 95.0;
/// This value was chosen randomly, but seems to work.
const MIN_DATA_POINTS: u64 = 20;
/// The Hedge layer maintains two rotating histograms, i.e., ReadHistogram and WriteHistogram. They
/// are switched every period. This value was chosen randomly but with consideration given to the fact that most 8-16 MB part
/// requests take on average 0.2 seconds, and we should retry it if it takes more than a second.
const PERIOD: Duration = Duration::from_secs(2);

/*
* During uploads, S3 recommends retrying the slowest 5% of requests for latency-sensitive applications,
* as some requests can experience high time to first byte. If a slow part is hit near the end of the request,
* the application may spend the last few seconds waiting for those final parts to complete, which can reduce overall
* throughput. This layer is used to retry the slowest 5% of requests to improve performance.
* Based on our experiments, this makes a significant difference for multipart upload use-cases and
* does not have a noticeable impact for the Download.
*/
pub(crate) struct Builder<P> {
policy: P,
latency_percentile: f32,
min_data_points: u64,
period: Duration,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct DefaultPolicy;

impl<T: Clone> Policy<T> for DefaultPolicy {
fn clone_request(&self, req: &T) -> Option<T> {
Some(req.clone())
}

fn can_retry(&self, _req: &T) -> bool {
true
}
}

impl Default for Builder<DefaultPolicy> {
fn default() -> Self {
Self {
policy: DefaultPolicy,
latency_percentile: LATENCY_PERCENTILE,
min_data_points: MIN_DATA_POINTS,
period: PERIOD,
}
}
}

impl<P> Builder<P> {
/// Converts the `Hedge` into a `Layer` that can be used in a service stack.
pub(crate) fn into_layer<Request, S>(self) -> impl Layer<S, Service = Hedge<S, P>> + Clone
where
P: Policy<Request> + Clone,
S: Service<Request> + Clone,
S::Error: Into<BoxError>,
{
let policy = self.policy;
let min_data_points = self.min_data_points;
let latency_percentile = self.latency_percentile;
let period = self.period;

layer_fn(move |service: S| {
Hedge::new(
service,
policy.clone(),
min_data_points,
latency_percentile,
period,
)
})
}
}
19 changes: 17 additions & 2 deletions aws-s3-transfer-manager/src/operation/upload/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
part_reader::{Builder as PartReaderBuilder, PartData, ReadPart},
InputStream,
},
middleware::limit::concurrency::ConcurrencyLimitLayer,
middleware::{hedge, limit::concurrency::ConcurrencyLimitLayer},
operation::upload::UploadContext,
};
use aws_sdk_s3::{primitives::ByteStream, types::CompletedPart};
Expand All @@ -18,6 +18,7 @@ use tracing::Instrument;
use super::UploadHandle;

/// Request/input type for our "upload_part" service.
#[derive(Debug, Clone)]
pub(super) struct UploadPartRequest {
pub(super) ctx: UploadContext,
pub(super) part_data: PartData,
Expand Down Expand Up @@ -69,7 +70,21 @@ pub(super) fn upload_part_service(
+ Send {
let svc = service_fn(upload_part_handler);
let concurrency_limit = ConcurrencyLimitLayer::new(ctx.handle.scheduler.clone());
ServiceBuilder::new().layer(concurrency_limit).service(svc)

let svc = ServiceBuilder::new()
.layer(concurrency_limit)
// FIXME - This setting will need to be globalized.
.buffer(ctx.handle.num_workers())
// FIXME - Hedged request should also get a permit. Currently, it can bypass the
// concurrency_limit layer.
.layer(hedge::Builder::default().into_layer())
.service(svc);
svc.map_err(|err| {
let e = err
.downcast::<error::Error>()
.unwrap_or_else(|err| Box::new(error::Error::new(error::ErrorKind::RuntimeError, err)));
*e
})
}

/// Spawn tasks to read the body and upload the remaining parts of object
Expand Down

0 comments on commit 31550a0

Please sign in to comment.