Skip to content

Commit

Permalink
Add GetObject support for object metadata (#1065)
Browse files Browse the repository at this point in the history
## Description of change

<!-- Please describe your contribution here. What and why? -->

Adds support for fetching user defined object metadata in GetObject
calls.

Relevant issues: N/A

## Does this change impact existing behavior?

<!-- Please confirm there's no breaking change, or call our any behavior
changes you think are necessary. -->

No

## Does this change need a changelog entry in any of the crates?

Yes, for mountpoint-s3-client.

<!--
    Please confirm yes or no.
    If no, add justification. If unsure, ask a reviewer.

    You can find the changelog for each crate here:
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3/CHANGELOG.md
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3-client/CHANGELOG.md
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3-crt/CHANGELOG.md
-
https://github.com/awslabs/mountpoint-s3/blob/main/mountpoint-s3-crt-sys/CHANGELOG.md
-->

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish authored Nov 7, 2024
1 parent 89e13a1 commit 9d48a72
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 19 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mountpoint-s3-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "High-performance Amazon S3 client for Mountpoint for Amazon S3."
mountpoint-s3-crt = { path = "../mountpoint-s3-crt", version = "0.10.0" }
mountpoint-s3-crt-sys = { path = "../mountpoint-s3-crt-sys", version = "0.10.0" }

async_cell = "0.2.2"
async-trait = "0.1.83"
auto_impl = "1.2.0"
base64ct = { version = "1.6.0", features = ["std"] }
Expand Down
13 changes: 10 additions & 3 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::object_client::{
CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart,
GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient,
ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectSingleParams, UploadReview,
ObjectClientError, ObjectClientResult, ObjectMetadata, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, PutObjectSingleParams, UploadReview,
};

// Wrapper for injecting failures into a get stream or a put request
Expand Down Expand Up @@ -222,9 +222,16 @@ pub struct FailureGetRequest<Client: ObjectClient, GetWrapperState> {
request: Client::GetObjectRequest,
}

impl<Client: ObjectClient, FailState: Send> GetObjectRequest for FailureGetRequest<Client, FailState> {
#[cfg_attr(not(docsrs), async_trait)]
impl<Client: ObjectClient + Send + Sync, FailState: Send + Sync> GetObjectRequest
for FailureGetRequest<Client, FailState>
{
type ClientError = Client::ClientError;

async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError> {
self.request.get_object_metadata().await
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
Expand Down
36 changes: 29 additions & 7 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::object_client::{
DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts,
GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectParams, HeadObjectResult,
ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
ObjectInfo, ObjectMetadata, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -530,9 +530,14 @@ impl MockGetObjectRequest {
}
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectRequest for MockGetObjectRequest {
type ClientError = MockClientError;

async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError> {
Ok(self.object.object_metadata.clone())
}

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.read_window_end_offset += len as u64;
}
Expand Down Expand Up @@ -1061,7 +1066,12 @@ mod tests {
};
}

async fn test_get_object(key: &str, size: usize, range: Option<Range<u64>>) {
async fn test_get_object(
key: &str,
size: usize,
range: Option<Range<u64>>,
object_metadata: HashMap<String, String>,
) {
let mut rng = ChaChaRng::seed_from_u64(0x12345678);

let client = MockClient::new(MockClientConfig {
Expand All @@ -1073,7 +1083,10 @@ mod tests {

let mut body = vec![0u8; size];
rng.fill_bytes(&mut body);
client.add_object(key, MockObject::from_bytes(&body, ETag::for_tests()));

let mut object = MockObject::from_bytes(&body, ETag::for_tests());
object.set_object_metadata(object_metadata.clone());
client.add_object(key, object);

let mut get_request = client
.get_object("test_bucket", key, range.clone(), None)
Expand All @@ -1091,13 +1104,22 @@ mod tests {
let expected_range = range.unwrap_or(0..size as u64);
let expected_range = expected_range.start as usize..expected_range.end as usize;
assert_eq!(&accum[..], &body[expected_range], "body does not match");

assert_eq!(get_request.get_object_metadata().await, Ok(object_metadata));
}

#[tokio::test]
async fn get_object() {
test_get_object("key1", 2000, None).await;
test_get_object("key1", 9000, Some(50..2000)).await;
test_get_object("key1", 10, Some(0..10)).await;
test_get_object("key1", 2000, None, Default::default()).await;
test_get_object("key1", 9000, Some(50..2000), Default::default()).await;
test_get_object("key1", 10, Some(0..10), Default::default()).await;
test_get_object(
"key1",
10,
None,
HashMap::from([("foo".to_string(), "bar".to_string())]),
)
.await;
}

async fn test_get_object_backpressure(
Expand Down
7 changes: 6 additions & 1 deletion mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::object_client::{
CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart,
GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient,
ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult, PutObjectSingleParams,
ObjectClientResult, ObjectMetadata, PutObjectError, PutObjectParams, PutObjectResult, PutObjectSingleParams,
};

/// A [MockClient] that rate limits overall download throughput to simulate a target network
Expand Down Expand Up @@ -66,9 +66,14 @@ pub struct ThroughputGetObjectRequest {
rate_limiter: LeakyBucket,
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectRequest for ThroughputGetObjectRequest {
type ClientError = MockClientError;

async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError> {
Ok(self.request.object.object_metadata.clone())
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
Expand Down
19 changes: 13 additions & 6 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub trait ObjectClient {
///
/// [`ServiceError`]: ObjectClientError::ServiceError
/// [`ClientError`]: ObjectClientError::ClientError
#[derive(Debug, Error)]
#[derive(Debug, Error, PartialEq)]
pub enum ObjectClientError<S, C> {
/// An error returned by the service itself
#[error("Service error")]
Expand Down Expand Up @@ -357,6 +357,8 @@ pub enum GetObjectAttributesError {
NoSuchKey,
}

pub type ObjectMetadata = HashMap<String, String>;

/// Parameters to a [`put_object`](ObjectClient::put_object) request
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
Expand All @@ -373,7 +375,7 @@ pub struct PutObjectParams {
/// Custom headers to add to the request
pub custom_headers: Vec<(String, String)>,
/// User-defined object metadata
pub object_metadata: HashMap<String, String>,
pub object_metadata: ObjectMetadata,
}

impl PutObjectParams {
Expand Down Expand Up @@ -413,7 +415,7 @@ impl PutObjectParams {
}

/// Set user defined object metadata.
pub fn object_metadata(mut self, value: HashMap<String, String>) -> Self {
pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
self.object_metadata = value;
self
}
Expand Down Expand Up @@ -456,7 +458,7 @@ pub struct PutObjectSingleParams {
/// Custom headers to add to the request
pub custom_headers: Vec<(String, String)>,
/// User-defined object metadata
pub object_metadata: HashMap<String, String>,
pub object_metadata: ObjectMetadata,
}

impl PutObjectSingleParams {
Expand Down Expand Up @@ -496,7 +498,7 @@ impl PutObjectSingleParams {
}

/// Set user defined object metadata.
pub fn object_metadata(mut self, value: HashMap<String, String>) -> Self {
pub fn object_metadata(mut self, value: ObjectMetadata) -> Self {
self.object_metadata = value;
self
}
Expand Down Expand Up @@ -525,10 +527,15 @@ impl UploadChecksum {
/// object.
#[cfg_attr(not(docsrs), async_trait)]
pub trait GetObjectRequest:
Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send
Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
{
type ClientError: std::error::Error + Send + Sync + 'static;

/// Get the object's user defined metadata.
/// If the metadata has already been read, return immediately. Otherwise, resolve the future
/// when they're read.
async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError>;

/// Increment the flow-control window, so that response data continues downloading.
///
/// If the client was created with `enable_read_backpressure` set true,
Expand Down
62 changes: 60 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use async_cell::sync::AsyncCell;
use async_trait::async_trait;
use std::future::Future;
use std::ops::Deref;
use std::ops::Range;
use std::os::unix::prelude::OsStrExt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::channel::mpsc::UnboundedReceiver;
Expand All @@ -11,12 +14,20 @@ use mountpoint_s3_crt::common::error::Error;
use mountpoint_s3_crt::http::request_response::Header;
use mountpoint_s3_crt::s3::client::MetaRequestResult;
use pin_project::pin_project;
use thiserror::Error;

use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult};
use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult, ObjectMetadata};
use crate::s3_crt_client::{
GetObjectRequest, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, S3RequestError,
};

/// Failures to return object metadata
#[derive(Clone, Error, Debug)]
pub enum ObjectMetadataError {
#[error("error occurred fetching object metadata")]
ObjectMetadataError,
}

impl S3CrtClient {
/// Create and begin a new GetObject request. The returned [GetObjectRequest] is a [Stream] of
/// body parts of the object, which will be delivered in order.
Expand Down Expand Up @@ -67,15 +78,44 @@ impl S3CrtClient {

let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::GetObject);
options.part_size(self.inner.read_part_size as u64);

let object_metadata = AsyncCell::shared();

let object_metadata_setter_on_headers = object_metadata.clone();
let object_metadata_setter_on_finish = object_metadata.clone();

let request = self.inner.make_meta_request_from_options(
options,
span,
|_| (),
|_, _| (),
move |headers, status| {
// Headers can be returned multiple times, but the object metadata doesn't change.
// Explicitly ignore the case where we've already set object metadata.

// Only set metadata if we have a 2xx status code. If we only get other status
// codes, then on_finish cancels.
if (200..300).contains(&status) {
// This isn't to do with safety, only minor performance gains.
if !object_metadata_setter_on_headers.is_set() {
let object_metadata = headers
.iter()
.filter_map(|(key, value)| {
let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?;
let value = value.to_str()?;
Some((metadata_header.to_string(), value.to_string()))
})
.collect();
// Don't overwrite if already set.
object_metadata_setter_on_headers.or_set(Ok(object_metadata));
}
}
},
move |offset, data| {
let _ = sender.unbounded_send(Ok((offset, data.into())));
},
move |result| {
// FIXME - Ideally we'd include a reason why we failed here.
object_metadata_setter_on_finish.or_set(Err(ObjectMetadataError::ObjectMetadataError));
if result.is_err() {
Err(parse_get_object_error(result).map(ObjectClientError::ServiceError))
} else {
Expand All @@ -89,6 +129,8 @@ impl S3CrtClient {
finish_receiver: receiver,
finished: false,
enable_backpressure: self.inner.enable_backpressure,
object_metadata,
initial_read_window_empty: self.inner.initial_read_window_size == 0,
next_offset,
read_window_end_offset,
})
Expand All @@ -109,16 +151,32 @@ pub struct S3GetObjectRequest {
finish_receiver: UnboundedReceiver<Result<GetBodyPart, Error>>,
finished: bool,
enable_backpressure: bool,
object_metadata: Arc<AsyncCell<Result<ObjectMetadata, ObjectMetadataError>>>,
initial_read_window_empty: bool,
/// Next offset of the data to be polled from [poll_next]
next_offset: u64,
/// Upper bound of the current read window. When backpressure is enabled, [S3GetObjectRequest]
/// can return data up to this offset *exclusively*.
read_window_end_offset: u64,
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectRequest for S3GetObjectRequest {
type ClientError = S3RequestError;

async fn get_object_metadata(&self) -> ObjectClientResult<ObjectMetadata, GetObjectError, Self::ClientError> {
match self.object_metadata.try_get() {
Some(result) => result,
None => {
if self.enable_backpressure && self.initial_read_window_empty {
return Err(ObjectClientError::ClientError(S3RequestError::EmptyReadWindow));
}
self.object_metadata.get().await
}
}
.map_err(|_| ObjectClientError::ClientError(S3RequestError::RequestCanceled))
}

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.read_window_end_offset += len as u64;
self.request.meta_request.increment_read_window(len as u64);
Expand Down
Loading

0 comments on commit 9d48a72

Please sign in to comment.