Skip to content

Commit

Permalink
Merge branch 'main' into upload-initiate
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 committed Dec 19, 2024
2 parents 66268e5 + 6e64664 commit 6225e54
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 92 deletions.
5 changes: 3 additions & 2 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
aws-smithy-async = "1.2.1"
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-runtime-api = "1.7.3"
aws-runtime = "1.4.4"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
blocking = "1.6.0"
Expand All @@ -32,7 +33,7 @@ walkdir = "2"
[dev-dependencies]
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-mocks-experimental = "0.2.1"
aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
aws-smithy-runtime = { version = "1.7.4", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.4.0"
http-02x = { package = "http", version = "0.2.9" }
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time;
use aws_s3_transfer_manager::io::InputStream;
use aws_s3_transfer_manager::metrics::unit::ByteUnit;
use aws_s3_transfer_manager::metrics::Throughput;
use aws_s3_transfer_manager::operation::download::body::Body;
use aws_s3_transfer_manager::operation::download::Body;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::error::DisplayErrorContext;
use bytes::Buf;
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ allowed_external_types = [
"bytes::bytes::Bytes",
"bytes::buf::buf_impl::Buf",
"aws_types::request_id::RequestId",
"aws_types::request_id::RequestIdExt"
"aws_types::request_id::RequestIdExt",
]
26 changes: 25 additions & 1 deletion aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_runtime::user_agent::FrameworkMetadata;

use crate::metrics::unit::ByteUnit;
use crate::types::{ConcurrencySetting, PartSize};
use std::cmp;

pub(crate) mod loader;

/// Minimum upload part size in bytes
const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();
pub(crate) const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();

/// Configuration for a [`Client`](crate::client::Client)
#[derive(Debug, Clone)]
pub struct Config {
multipart_threshold: PartSize,
target_part_size: PartSize,
concurrency: ConcurrencySetting,
framework_metadata: Option<FrameworkMetadata>,
client: aws_sdk_s3::client::Client,
}

Expand All @@ -43,6 +46,12 @@ impl Config {
&self.concurrency
}

/// Returns the framework metadata setting when using transfer manager.
#[doc(hidden)]
pub fn framework_metadata(&self) -> Option<&FrameworkMetadata> {
self.framework_metadata.as_ref()
}

/// The Amazon S3 client instance that will be used to send requests to S3.
pub fn client(&self) -> &aws_sdk_s3::Client {
&self.client
Expand All @@ -55,6 +64,7 @@ pub struct Builder {
multipart_threshold_part_size: PartSize,
target_part_size: PartSize,
concurrency: ConcurrencySetting,
framework_metadata: Option<FrameworkMetadata>,
client: Option<aws_sdk_s3::Client>,
}

Expand Down Expand Up @@ -122,8 +132,21 @@ impl Builder {
self
}

/// Sets the framework metadata for the transfer manager.
///
/// This _optional_ name is used to identify the framework using transfer manager in the user agent that
/// gets sent along with requests.
#[doc(hidden)]
pub fn framework_metadata(mut self, framework_metadata: Option<FrameworkMetadata>) -> Self {
self.framework_metadata = framework_metadata;
self
}

/// Set an explicit S3 client to use.
pub fn client(mut self, client: aws_sdk_s3::Client) -> Self {
// TODO - decide the approach here:
// - Convert the client to build to modify it based on other configs for transfer manager
// - Instead of taking the client, take sdk-config/s3-config/builder?
self.client = Some(client);
self
}
Expand All @@ -134,6 +157,7 @@ impl Builder {
multipart_threshold: self.multipart_threshold_part_size,
target_part_size: self.target_part_size,
concurrency: self.concurrency,
framework_metadata: self.framework_metadata,
client: self.client.expect("client set"),
}
}
Expand Down
131 changes: 128 additions & 3 deletions aws-s3-transfer-manager/src/config/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,52 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_config::BehaviorVersion;
use aws_runtime::sdk_feature::AwsSdkFeature;
use aws_runtime::user_agent::{ApiMetadata, AwsUserAgent, FrameworkMetadata};
use aws_sdk_s3::config::{Intercept, IntoShared};
use aws_types::os_shim_internal::Env;

use crate::config::Builder;
use crate::{
http,
types::{ConcurrencySetting, PartSize},
Config,
};

#[derive(Debug)]
struct S3TransferManagerInterceptor {
frame_work_meta_data: Option<FrameworkMetadata>,
}

impl Intercept for S3TransferManagerInterceptor {
fn name(&self) -> &'static str {
"S3TransferManager"
}

fn read_before_execution(
&self,
_ctx: &aws_sdk_s3::config::interceptors::BeforeSerializationInterceptorContextRef<'_>,
cfg: &mut aws_sdk_s3::config::ConfigBag,
) -> Result<(), aws_sdk_s3::error::BoxError> {
// Assume the interceptor only be added to the client constructed by the loader.
// In this case, there should not be any user agent was sent before this interceptor starts.
// Create our own user agent with S3Transfer feature and user passed-in framework_meta_data if any.
cfg.interceptor_state()
.store_append(AwsSdkFeature::S3Transfer);
let api_metadata = cfg.load::<ApiMetadata>().unwrap();
// TODO: maybe APP Name someday
let mut ua = AwsUserAgent::new_from_environment(Env::real(), api_metadata.clone());
if let Some(framework_metadata) = self.frame_work_meta_data.clone() {
ua = ua.with_framework_metadata(framework_metadata);
}

cfg.interceptor_state().store_put(ua);

Ok(())
}
}

/// Load transfer manager [`Config`] from the environment.
#[derive(Default, Debug)]
pub struct ConfigLoader {
Expand Down Expand Up @@ -52,17 +91,103 @@ impl ConfigLoader {
self
}

/// Sets the framework metadata for the transfer manager.
///
/// This _optional_ name is used to identify the framework using transfer manager in the user agent that
/// gets sent along with requests.
#[doc(hidden)]
pub fn framework_metadata(mut self, framework_metadata: Option<FrameworkMetadata>) -> Self {
self.builder = self.builder.framework_metadata(framework_metadata);
self
}

/// Load the default configuration
///
/// If fields have been overridden during builder construction, the override values will be
/// used. Otherwise, the default values for each field will be provided.
pub async fn load(self) -> Config {
let shared_config = aws_config::from_env()
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.http_client(http::default_client())
.load()
.await;
let s3_client = aws_sdk_s3::Client::new(&shared_config);
let builder = self.builder.client(s3_client);

let mut sdk_client_builder = aws_sdk_s3::config::Builder::from(&shared_config);

let interceptor = S3TransferManagerInterceptor {
frame_work_meta_data: self.builder.framework_metadata.clone(),
};
sdk_client_builder.push_interceptor(S3TransferManagerInterceptor::into_shared(interceptor));
let builder = self
.builder
.client(aws_sdk_s3::Client::from_conf(sdk_client_builder.build()));
builder.build()
}
}

#[cfg(test)]
mod tests {
use std::borrow::Cow;

use crate::types::{ConcurrencySetting, PartSize};
use aws_config::Region;
use aws_runtime::user_agent::FrameworkMetadata;
use aws_sdk_s3::config::Intercept;
use aws_smithy_runtime::client::http::test_util::capture_request;

#[tokio::test]
async fn load_with_interceptor() {
let config = crate::from_env()
.concurrency(ConcurrencySetting::Explicit(123))
.part_size(PartSize::Target(8))
.load()
.await;
let sdk_s3_config = config.client().config();
let tm_interceptor_exists = sdk_s3_config
.interceptors()
.any(|item| item.name() == "S3TransferManager");
assert!(tm_interceptor_exists);
}

#[tokio::test]
async fn load_with_interceptor_and_framework_metadata() {
let (http_client, captured_request) = capture_request(None);
let config = crate::from_env()
.concurrency(ConcurrencySetting::Explicit(123))
.part_size(PartSize::Target(8))
.framework_metadata(Some(
FrameworkMetadata::new("some-framework", Some(Cow::Borrowed("1.3"))).unwrap(),
))
.load()
.await;
// Inject the captured request to the http client to capture the request made from transfer manager.
let sdk_s3_config = config
.client()
.config()
.to_builder()
.http_client(http_client)
.region(Region::from_static("us-west-2"))
.with_test_defaults()
.build();

let capture_request_config = crate::Config::builder()
.client(aws_sdk_s3::Client::from_conf(sdk_s3_config))
.concurrency(ConcurrencySetting::Explicit(123))
.part_size(PartSize::Target(8))
.build();

let transfer_manager = crate::Client::new(capture_request_config);

let mut handle = transfer_manager
.download()
.bucket("foo")
.key("bar")
.initiate()
.unwrap();
// Expect to fail
let _ = handle.body_mut().next().await;
// Check the request made contains the expected framework meta data in user agent.
let expected_req = captured_request.expect_request();
let user_agent = expected_req.headers().get("x-amz-user-agent").unwrap();
assert!(user_agent.contains("lib/some-framework/1.3"));
}
}
60 changes: 47 additions & 13 deletions aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use aws_sdk_s3::error::DisplayErrorContext;
/// Request type for dowloading a single object from Amazon S3
pub use input::{DownloadInput, DownloadInputBuilder};

/// Abstractions for response bodies and consuming data streams.
pub mod body;
/// Operation builders
pub mod builders;

/// Abstractions for responses and consuming data streams.
mod body;
pub use body::{Body, ChunkOutput};

mod discovery;

mod handle;
Expand All @@ -33,15 +35,14 @@ use crate::error;
use crate::io::AggregatedBytes;
use crate::runtime::scheduler::OwnedWorkPermit;
use aws_smithy_types::byte_stream::ByteStream;
use body::{Body, ChunkOutput};
use discovery::discover_obj;
use service::distribute_work;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex, OnceCell};
use tokio::sync::{mpsc, oneshot, watch, Mutex, OnceCell};
use tokio::task::{self, JoinSet};

use super::TransferContext;
use super::{CancelNotificationReceiver, CancelNotificationSender, TransferContext};

/// Operation struct for single object download
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -99,9 +100,9 @@ async fn send_discovery(
ctx: DownloadContext,
comp_tx: mpsc::Sender<Result<ChunkOutput, crate::error::Error>>,
object_meta_tx: oneshot::Sender<ObjectMetadata>,
input: DownloadInput,
mut input: DownloadInput,
use_current_span_as_parent_for_tasks: bool,
) -> Result<(), crate::error::Error> {
) {
// create span to serve as parent of spawned child tasks.
let parent_span_for_tasks = tracing::debug_span!(
parent: if use_current_span_as_parent_for_tasks { tracing::Span::current().id() } else { None } ,
Expand All @@ -115,13 +116,42 @@ async fn send_discovery(
}

// acquire a permit for discovery
let permit = ctx.handle.scheduler.acquire_permit().await?;
let permit = ctx.handle.scheduler.acquire_permit().await;
let permit = match permit {
Ok(permit) => permit,
Err(err) => {
if comp_tx.send(Err(err)).await.is_err() {
tracing::debug!("Download handle for key({:?}) has been dropped, aborting during the discovery phase", input.key);
}
return;
}
};

// make initial discovery about the object size, metadata, possibly first chunk
let mut discovery = discover_obj(&ctx, &input).await?;
// FIXME - This will fail if the handle is dropped at this point. We should handle
// the cancellation gracefully here.
let _ = object_meta_tx.send(discovery.object_meta);
let discovery = discover_obj(&ctx, &input).await;
let mut discovery = match discovery {
Ok(discovery) => discovery,
Err(err) => {
if comp_tx.send(Err(err)).await.is_err() {
tracing::debug!("Download handle for key({:?}) has been dropped, aborting during the discovery phase", input.key);
}
return;
}
};

// Add if_match to the rest of the requests using the etag
// we got from discovery to ensure the object stays the same
// during the download process.
input.if_match.clone_from(&discovery.object_meta.e_tag);

if object_meta_tx.send(discovery.object_meta).is_err() {
tracing::debug!(
"Download handle for key({:?}) has been dropped, aborting during the discovery phase",
input.key
);
return;
}

let initial_chunk = discovery.initial_chunk.take();

let mut tasks = tasks.lock().await;
Expand All @@ -148,7 +178,6 @@ async fn send_discovery(
parent_span_for_tasks,
);
}
Ok(())
}

/// Handle possibly sending the first chunk of data received through discovery. Returns
Expand Down Expand Up @@ -200,14 +229,19 @@ fn handle_discovery_chunk(
#[derive(Debug)]
pub(crate) struct DownloadState {
current_seq: AtomicU64,
cancel_tx: CancelNotificationSender,
cancel_rx: CancelNotificationReceiver,
}

type DownloadContext = TransferContext<DownloadState>;

impl DownloadContext {
fn new(handle: Arc<crate::client::Handle>) -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
let state = Arc::new(DownloadState {
current_seq: AtomicU64::new(0),
cancel_tx,
cancel_rx,
});
TransferContext { handle, state }
}
Expand Down
Loading

0 comments on commit 6225e54

Please sign in to comment.