Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support checking for object modified during download #82

Merged
merged 7 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn send_discovery(
ctx: DownloadContext,
comp_tx: mpsc::Sender<Result<ChunkOutput, crate::error::Error>>,
object_meta_tx: oneshot::Sender<ObjectMetadata>,
input: DownloadInput,
mut input: DownloadInput,
use_current_span_as_parent_for_tasks: bool,
) {
// create span to serve as parent of spawned child tasks.
Expand Down Expand Up @@ -139,6 +139,11 @@ async fn send_discovery(
}
};

// Add if_match to the rest of the requests using the etag
// we got from discovery to ensure the object stays the same
// during the download process.
input.if_match.clone_from(&discovery.object_meta.e_tag);

if object_meta_tx.send(discovery.object_meta).is_err() {
tracing::debug!(
"Download handle for key({:?}) has been dropped, aborting during the discovery phase",
Expand Down
93 changes: 93 additions & 0 deletions aws-s3-transfer-manager/tests/download_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ fn simple_object_connector(data: &Bytes, part_size: usize) -> StaticReplayClient
"Content-Range",
format!("bytes {start}-{end}/{}", data.len()),
)
.header("ETag", "my-etag")
.body(SdkBody::from(chunk))
.unwrap(),
)
Expand Down Expand Up @@ -420,3 +421,95 @@ async fn test_retry_max_attempts() {
let requests = http_client.actual_requests().collect::<Vec<_>>();
assert_eq!(4, requests.len());
}

/// Test the if_match header was added correctly based on the response from server.
#[tokio::test]
async fn test_download_if_match() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expected behavior if the tag has changed? Can we add a test for it?

Copy link
Contributor Author

@TingDaoK TingDaoK Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for mock the object change, but the error will just be the service error from S3 that complains about if-match precondition didn't work.

In aws-c-s3, we override this error to our customized error code to inform it's because the object changed during download. But, in here, we are not overriding any service error, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't yet I think this is fine for now though unless you see an ergonomic reason to change it for users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we override it for aws-c-s3 mostly because the if-match header was added by the client under the hood, so, it may be confusing to user for the if-match precondition failure.
But, not a big deal for now.

let data = rand_data(12 * MEBIBYTE);
let part_size = 5 * MEBIBYTE;

let (tm, http_client) = simple_test_tm(&data, part_size);

let mut handle = tm
.download()
.bucket("test-bucket")
.key("test-object")
.initiate()
.unwrap();

let _ = drain(&mut handle).await.unwrap();

let requests = http_client.actual_requests().collect::<Vec<_>>();
assert_eq!(3, requests.len());

// The first request is to discover the object meta data and should not have any if-match
assert_eq!(requests[0].headers().get("If-Match"), None);
// All the following requests should have the if-match header
assert_eq!(requests[1].headers().get("If-Match"), Some("my-etag"));
assert_eq!(requests[2].headers().get("If-Match"), Some("my-etag"));
}

const OBJECT_MODIFIED_RESPONSE: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>PreconditionFailed</Code>
<Message>At least one of the pre-conditions you specified did not hold</Message>
<Condition>If-Match</Condition>
</Error>
"#;

/// Test that if the object modified during download.
#[tokio::test]
async fn test_download_object_modified() {
let data = rand_data(12 * MEBIBYTE);
let part_size = 5 * MEBIBYTE;

// Create a static replay client (http connector) to mock the S3 response when object modified during download.
//
// Assumptions:
// 1. First request for discovery, succeed with etag
// 2. Followed requests fail to mock the object changed during download.
let events = data
.chunks(part_size)
.enumerate()
.map(|(idx, chunk)| {
let start = idx * part_size;
let end = std::cmp::min(start + part_size, data.len()) - 1;
let mut response = http_02x::Response::builder()
.status(206)
.header("Content-Length", format!("{}", end - start + 1))
.header(
"Content-Range",
format!("bytes {start}-{end}/{}", data.len()),
)
.header("ETag", "my-etag")
.body(SdkBody::from(chunk))
.unwrap();
if idx > 0 {
response = http_02x::Response::builder()
.status(412)
.header("Date", "Thu, 12 Jan 2023 00:04:21 GMT")
.body(SdkBody::from(OBJECT_MODIFIED_RESPONSE))
.unwrap();
}
ReplayEvent::new(
// NOTE: Rather than try to recreate all the expected requests we just put in placeholders and
// make our own assertions against the captured requests.
dummy_expected_request(),
response,
)
})
.collect();

let http_client = StaticReplayClient::new(events);
let tm = test_tm(http_client.clone(), part_size);

let mut handle = tm
.download()
.bucket("test-bucket")
.key("test-object")
.initiate()
.unwrap();

let error = drain(&mut handle).await.unwrap_err();
assert!(format!("{:?}", error).contains("PreconditionFailed"));
}