Skip to content

Commit

Permalink
Merge branch 'main' into ysaito/follow-up-on-pr75
Browse files Browse the repository at this point in the history
  • Loading branch information
ysaito1001 committed Dec 3, 2024
2 parents 63f24e0 + a1235c0 commit 0cd2d44
Show file tree
Hide file tree
Showing 16 changed files with 537 additions and 167 deletions.
7 changes: 7 additions & 0 deletions aws-s3-transfer-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,10 @@ where
Error::new(kind, value)
}
}

static CANCELLATION_ERROR: &str =
"at least one operation has been aborted, cancelling all ongoing requests";

pub(crate) fn operation_cancelled() -> Error {
Error::new(ErrorKind::OperationCancelled, CANCELLATION_ERROR)
}
4 changes: 4 additions & 0 deletions aws-s3-transfer-manager/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub mod upload_objects;
// The default delimiter of the S3 object key
pub(crate) const DEFAULT_DELIMITER: &str = "/";

// Type aliases to channel ends to send/receive cancel notification
pub(crate) type CancelNotificationSender = tokio::sync::watch::Sender<bool>;
pub(crate) type CancelNotificationReceiver = tokio::sync::watch::Receiver<bool>;

/// Container for maintaining context required to carry out a single operation/transfer.
///
/// `State` is whatever additional operation specific state is required for the operation.
Expand Down
11 changes: 6 additions & 5 deletions aws-s3-transfer-manager/src/operation/download/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,10 @@ mod tests {
use aws_sdk_s3::operation::get_object::GetObjectOutput;
use aws_sdk_s3::operation::head_object::HeadObjectOutput;
use aws_sdk_s3::Client;
use aws_smithy_mocks_experimental::{mock, mock_client};
use aws_smithy_mocks_experimental::mock;
use aws_smithy_types::byte_stream::ByteStream;
use bytes::Buf;
use test_common::mock_client_with_stubbed_http_client;

use super::ObjectDiscovery;

Expand Down Expand Up @@ -247,7 +248,7 @@ mod tests {
async fn get_discovery_from_head(range: Option<ByteRange>) -> ObjectDiscovery {
let head_obj_rule = mock!(Client::head_object)
.then_output(|| HeadObjectOutput::builder().content_length(500).build());
let client = mock_client!(aws_sdk_s3, &[&head_obj_rule]);
let client = mock_client_with_stubbed_http_client!(aws_sdk_s3, &[&head_obj_rule]);

let ctx = DownloadContext::new(test_handle(client, 5 * ByteUnit::Mebibyte.as_bytes_u64()));

Expand Down Expand Up @@ -296,7 +297,7 @@ mod tests {
.body(ByteStream::from_static(bytes))
.build()
});
let client = mock_client!(aws_sdk_s3, &[&get_obj_rule]);
let client = mock_client_with_stubbed_http_client!(aws_sdk_s3, &[&get_obj_rule]);

let ctx = DownloadContext::new(test_handle(client, target_part_size));

Expand Down Expand Up @@ -332,7 +333,7 @@ mod tests {
.body(ByteStream::from_static(bytes))
.build()
});
let client = mock_client!(aws_sdk_s3, &[&get_obj_rule]);
let client = mock_client_with_stubbed_http_client!(aws_sdk_s3, &[&get_obj_rule]);

let ctx = DownloadContext::new(test_handle(client, target_part_size));

Expand Down Expand Up @@ -367,7 +368,7 @@ mod tests {
.body(ByteStream::from_static(bytes))
.build()
});
let client = mock_client!(aws_sdk_s3, &[&get_obj_rule]);
let client = mock_client_with_stubbed_http_client!(aws_sdk_s3, &[&get_obj_rule]);

let ctx = DownloadContext::new(test_handle(client, target_part_size));

Expand Down
11 changes: 9 additions & 2 deletions aws-s3-transfer-manager/src/operation/download_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use output::{DownloadObjectsOutput, DownloadObjectsOutputBuilder};

mod handle;
pub use handle::DownloadObjectsHandle;
use tokio::fs;
use tokio::{fs, sync::watch};

mod list_objects;
mod worker;
Expand All @@ -27,7 +27,9 @@ use tracing::Instrument;

use crate::types::FailedDownload;

use super::{validate_target_is_dir, TransferContext};
use super::{
validate_target_is_dir, CancelNotificationReceiver, CancelNotificationSender, TransferContext,
};

/// Operation struct for downloading multiple objects from Amazon S3
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -83,6 +85,8 @@ pub(crate) struct DownloadObjectsState {
// TODO - Determine if `input` should be separated from this struct
// https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/67#discussion_r1821661603
input: DownloadObjectsInput,
cancel_tx: CancelNotificationSender,
cancel_rx: CancelNotificationReceiver,
failed_downloads: Mutex<Vec<FailedDownload>>,
successful_downloads: AtomicU64,
total_bytes_transferred: AtomicU64,
Expand All @@ -92,8 +96,11 @@ type DownloadObjectsContext = TransferContext<DownloadObjectsState>;

impl DownloadObjectsContext {
fn new(handle: Arc<crate::client::Handle>, input: DownloadObjectsInput) -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
let state = Arc::new(DownloadObjectsState {
input,
cancel_tx,
cancel_rx,
failed_downloads: Mutex::new(Vec::new()),
successful_downloads: AtomicU64::default(),
total_bytes_transferred: AtomicU64::default(),
Expand Down
51 changes: 48 additions & 3 deletions aws-s3-transfer-manager/src/operation/download_objects/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

use tokio::task;

use crate::{error::ErrorKind, types::FailedTransferPolicy};

use super::{DownloadObjectsContext, DownloadObjectsOutput};

/// Handle for `DownloadObjects` transfer operation
Expand All @@ -19,14 +21,57 @@ pub struct DownloadObjectsHandle {

impl DownloadObjectsHandle {
/// Consume the handle and wait for download transfer to complete
///
/// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method
/// will return the first error if any of the spawned tasks encounter one. The other tasks
/// will be canceled, but their cancellations will not be reported as errors by this method;
/// they will be logged as errors, instead.
///
/// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the
/// [`DownloadObjectsOutput`] will include a detailed breakdown, including the number of
/// successful downloads and the number of failed ones.
///
// TODO(aws-sdk-rust#1159) - Consider if we want to return other all errors encountered during cancellation.
#[tracing::instrument(skip_all, level = "debug", name = "join-download-objects")]
pub async fn join(mut self) -> Result<DownloadObjectsOutput, crate::error::Error> {
// TODO - Consider implementing more sophisticated error handling such as canceling in-progress transfers
let mut first_error_to_report = None;
// join all tasks
while let Some(join_result) = self.tasks.join_next().await {
join_result??;
let result = join_result.expect("task completed");
if let Err(e) = result {
match self.ctx.state.input.failure_policy() {
FailedTransferPolicy::Abort
if first_error_to_report.is_none()
&& e.kind() != &ErrorKind::OperationCancelled =>
{
first_error_to_report = Some(e);
}
FailedTransferPolicy::Continue => {
tracing::warn!("encountered but dismissed error when the failure policy is `Continue`: {e}")
}
_ => {}
}
}
}

if let Some(e) = first_error_to_report {
Err(e)
} else {
Ok(DownloadObjectsOutput::from(self.ctx.state.as_ref()))
}
}

/// Aborts all tasks owned by the handle.
pub async fn abort(&mut self) -> Result<(), crate::error::Error> {
if self.ctx.state.input.failure_policy() == &FailedTransferPolicy::Abort {
if self.ctx.state.cancel_tx.send(true).is_err() {
tracing::debug!(
"all receiver ends have been dropped, unable to send a cancellation signal"
);
}
while (self.tasks.join_next().await).is_some() {}
}

Ok(DownloadObjectsOutput::from(self.ctx.state.as_ref()))
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ mod tests {
operation::list_objects_v2::ListObjectsV2Output,
types::{CommonPrefix, Object},
};
use aws_smithy_mocks_experimental::{mock, mock_client};
use aws_smithy_mocks_experimental::mock;
use test_common::mock_client_with_stubbed_http_client;

use crate::operation::download_objects::{DownloadObjectsContext, DownloadObjectsInput};

Expand Down Expand Up @@ -332,7 +333,10 @@ mod tests {
.then_output(|| list_resp(None, "pre1", None, vec!["pre1/k7", "pre1/k8"]));
let resp5 = mock!(aws_sdk_s3::Client::list_objects_v2)
.then_output(|| list_resp(None, "pre2", None, vec!["pre2/k9", "pre2/k10"]));
let client = mock_client!(aws_sdk_s3, &[&resp1, &resp2, &resp3, &resp4, &resp5]);
let client = mock_client_with_stubbed_http_client!(
aws_sdk_s3,
&[&resp1, &resp2, &resp3, &resp4, &resp5]
);

let config = crate::Config::builder().client(client).build();
let client = crate::Client::new(config);
Expand Down
Loading

0 comments on commit 0cd2d44

Please sign in to comment.