Skip to content

Commit

Permalink
Follow up on PR#75 (#78)
Browse files Browse the repository at this point in the history
* Incorporate post-merge review feedback from PR#75

This commit addresses
#75 (comment)
#75 (comment)

* Add test for canceling upload object via MPU

This commit addresses #75 (comment)

* Fix memory leak in test detected by `LeakSanitizer`

* Simulate the flow of CreateMPU -> upload cancellation -> AbortMPU

This commit addresses #78 (comment)

* Avoid a single letter variable name
  • Loading branch information
ysaito1001 authored Dec 5, 2024
1 parent a1235c0 commit 061f974
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 15 deletions.
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::cmp;
pub(crate) mod loader;

/// Minimum upload part size in bytes
const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();
pub(crate) const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();

/// Configuration for a [`Client`](crate::client::Client)
#[derive(Debug, Clone)]
Expand Down
3 changes: 3 additions & 0 deletions aws-s3-transfer-manager/src/operation/upload/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) enum UploadType {
/// It first calls `.abort_all` on the tasks it owns, and then invokes `AbortMultipartUpload`
/// to abort any in-progress multipart uploads. Errors encountered during `AbortMultipartUpload`
/// are logged, but do not affect the overall cancellation flow.
///
/// In either case, if the upload operation has already been completed before the handle is dropped
/// or aborted, the uploaded object will not be deleted from S3.
#[derive(Debug)]
#[non_exhaustive]
pub struct UploadHandle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ impl UploadObjectsHandle {
/// Consume the handle and wait for the upload to complete
///
/// When the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Abort`], this method
/// will return an error if any of the spawned tasks encounter one. The other tasks will
/// be canceled, but their cancellations will not be reported as errors by this method;
/// will return the first error if any of the spawned tasks encounter one. The other tasks
/// will be canceled, but their cancellations will not be reported as errors by this method;
/// they will be logged as errors, instead.
///
/// If the `FailedTransferPolicy` is set to [`FailedTransferPolicy::Continue`], the
Expand Down
98 changes: 96 additions & 2 deletions aws-s3-transfer-manager/src/operation/upload_objects/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,27 @@ fn handle_failed_upload(

#[cfg(test)]
mod tests {
use aws_sdk_s3::operation::put_object::PutObjectOutput;
use std::sync::{Arc, Barrier};

use aws_sdk_s3::operation::{
abort_multipart_upload::AbortMultipartUploadOutput,
create_multipart_upload::CreateMultipartUploadOutput, put_object::PutObjectOutput,
upload_part::UploadPartOutput,
};
use aws_smithy_mocks_experimental::{mock, RuleMode};
use bytes::Bytes;
use test_common::mock_client_with_stubbed_http_client;

use crate::{
client::Handle,
config::MIN_MULTIPART_PART_SIZE_BYTES,
io::InputStream,
operation::upload_objects::{
worker::{upload_single_obj, UploadObjectJob},
UploadObjectsContext, UploadObjectsInputBuilder,
},
runtime::scheduler::Scheduler,
types::PartSize,
DEFAULT_CONCURRENCY,
};

Expand Down Expand Up @@ -700,7 +708,7 @@ mod tests {
}

#[tokio::test]
async fn test_cancel_single_upload() {
async fn test_cancel_single_upload_via_put_object() {
let bucket = "doesnotmatter";
let put_object = mock!(aws_sdk_s3::Client::put_object)
.match_requests(move |input| input.bucket() == Some(bucket))
Expand Down Expand Up @@ -730,4 +738,90 @@ mod tests {

assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_cancel_single_upload_via_multipart_upload() {
let bucket = "test-bucket";
let key = "test-key";
let upload_id: String = "test-upload-id".to_owned();

let wait_till_create_mpu = Arc::new(Barrier::new(2));
let (resume_upload_single_obj_tx, resume_upload_single_obj_rx) =
tokio::sync::watch::channel(());
let resume_upload_single_obj_tx = Arc::new(resume_upload_single_obj_tx);

let create_mpu = mock!(aws_sdk_s3::Client::create_multipart_upload).then_output({
let wait_till_create_mpu = wait_till_create_mpu.clone();
let upload_id = upload_id.clone();
move || {
// This ensures that a cancellation signal won't be sent until `create_multipart_upload`.
wait_till_create_mpu.wait();

// This increases the reliability of the test, ensuring that the cancellation signal has been sent
// and that `upload_single_obj` can now resume.
while !resume_upload_single_obj_rx.has_changed().unwrap() {
std::thread::sleep(std::time::Duration::from_millis(100));
}

CreateMultipartUploadOutput::builder()
.upload_id(upload_id.clone())
.build()
}
});
let upload_part = mock!(aws_sdk_s3::Client::upload_part)
.then_output(|| UploadPartOutput::builder().build());
let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
.match_requests({
let upload_id = upload_id.clone();
move |input| {
input.upload_id.as_ref() == Some(&upload_id)
&& input.bucket() == Some(bucket)
&& input.key() == Some(key)
}
})
.then_output(|| AbortMultipartUploadOutput::builder().build());
let s3_client = mock_client_with_stubbed_http_client!(
aws_sdk_s3,
RuleMode::MatchAny,
&[create_mpu, upload_part, abort_mpu]
);
let config = crate::Config::builder()
.set_multipart_threshold(PartSize::Target(MIN_MULTIPART_PART_SIZE_BYTES))
.client(s3_client)
.build();

let scheduler = Scheduler::new(DEFAULT_CONCURRENCY);

let handle = std::sync::Arc::new(Handle { config, scheduler });
let input = UploadObjectsInputBuilder::default()
.source("doesnotmatter")
.bucket(bucket)
.build()
.unwrap();

// specify the size of the contents so it triggers multipart upload
let contents = vec![0; MIN_MULTIPART_PART_SIZE_BYTES as usize];
let ctx = UploadObjectsContext::new(handle, input);
let job = UploadObjectJob {
object: InputStream::from(Bytes::copy_from_slice(contents.as_slice())),
key: key.to_owned(),
};

tokio::task::spawn({
let ctx = ctx.clone();
let resume_upload_single_obj_tx = resume_upload_single_obj_tx.clone();
async move {
wait_till_create_mpu.wait();
// The upload operation has reached a point where a `CreateMultipartUploadOutput` is being prepared,
// which means that cancellation can now be triggered.
ctx.state.cancel_tx.send(true).unwrap();
// Tell `upload_single_obj` that it can now proceed.
resume_upload_single_obj_tx.send(()).unwrap();
}
});

let err = upload_single_obj(&ctx, job).await.unwrap_err();

assert_eq!(&crate::error::ErrorKind::OperationCancelled, err.kind());
}
}
10 changes: 0 additions & 10 deletions aws-s3-transfer-manager/tests/upload_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOut
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
use aws_sdk_s3::operation::upload_part::UploadPartOutput;
use aws_smithy_mocks_experimental::{mock, RuleMode};
use aws_smithy_runtime::client::http::test_util::infallible_client_fn;
use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs;
use bytes::Bytes;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -116,15 +115,6 @@ fn mock_s3_client_for_multipart_upload() -> aws_sdk_s3::Client {
async fn test_many_uploads_no_deadlock() {
let (_guard, _rx) = capture_test_logs();
let client = mock_s3_client_for_multipart_upload();
let client = aws_sdk_s3::Client::from_conf(
client
.config()
.to_builder()
.http_client(infallible_client_fn(|_req| {
http_02x::Response::builder().status(200).body("").unwrap()
}))
.build(),
);
let config = aws_s3_transfer_manager::Config::builder()
.client(client)
.build();
Expand Down

0 comments on commit 061f974

Please sign in to comment.