Skip to content

Commit

Permalink
multipart signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Nov 26, 2024
1 parent 7fc0e87 commit 74f0c9c
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 53 deletions.
46 changes: 39 additions & 7 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
InitiateMultipartUploadResult, ListResponse,
InitiateMultipartUploadResult, ListResponse, MultipartPart,
};
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path,
Expand Down Expand Up @@ -186,7 +185,7 @@ impl From<DeleteError> for Error {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct S3Config {
pub region: String,
pub endpoint: Option<String>,
Expand Down Expand Up @@ -452,6 +451,25 @@ impl S3Client {
}
}

pub(crate) fn request_with_config<'a>(
&'a self,
method: Method,
path: &'a Path,
config: &'a S3Config,
) -> Request<'a> {
let url = self.config.path_url(path);
Request {
path,
builder: self.client.request(method, url),
payload: None,
payload_sha256: None,
config,
use_session_creds: true,
idempotent: false,
retry_error_body: false,
}
}

/// Make an S3 Delete Objects request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
///
/// Produces a vector of results, one for each path in the input vector. If
Expand Down Expand Up @@ -609,6 +627,7 @@ impl S3Client {
) -> Result<MultipartId> {
let response = self
.request(Method::POST, location)
.header("x-amz-checksum-algorithm", "SHA256")
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
Expand All @@ -632,12 +651,16 @@ impl S3Client {
upload_id: &MultipartId,
part_idx: usize,
data: PutPartPayload<'_>,
) -> Result<PartId> {
) -> Result<MultipartPart> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();
let config = S3Config {
checksum: Some(Checksum::SHA256),
..self.config.clone()
};

let mut request = self
.request(Method::PUT, path)
.request_with_config(Method::PUT, path, &config)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);

Expand All @@ -659,6 +682,10 @@ impl S3Client {
request = request.with_encryption_headers();
}
let response = request.send().await?;
let checksum_sha256 = response
.headers()
.get("x-amz-checksum-sha256")
.map(|v| v.to_str().unwrap().to_string());

let content_id = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
Expand All @@ -672,7 +699,12 @@ impl S3Client {
response.e_tag
}
};
Ok(PartId { content_id })
let part = MultipartPart {
e_tag: content_id,
part_number: part_idx + 1,
checksum_sha256,
};
Ok(part)
}

pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> {
Expand All @@ -689,7 +721,7 @@ impl S3Client {
&self,
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
parts: Vec<MultipartPart>,
mode: CompleteMultipartMode,
) -> Result<PutResult> {
let parts = if parts.is_empty() {
Expand Down
73 changes: 67 additions & 6 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultipartStore, PartId};
use crate::multipart::MultipartStore;
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Expand Down Expand Up @@ -74,6 +74,7 @@ const STORE: &str = "S3";
/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
use crate::client::parts::Parts;
use crate::client::s3::MultipartPart;
pub use credential::{AwsAuthorizer, AwsCredential};

/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
Expand Down Expand Up @@ -305,7 +306,7 @@ impl ObjectStore for AmazonS3 {
.await?;

let res = async {
let part_id = self
let part = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
Expand All @@ -314,7 +315,7 @@ impl ObjectStore for AmazonS3 {
.complete_multipart(
to,
&upload_id,
vec![part_id],
vec![part],
CompleteMultipartMode::Create,
)
.await
Expand Down Expand Up @@ -393,7 +394,7 @@ impl MultipartUpload for S3MultiPartUpload {
PutPartPayload::Part(data),
)
.await?;
state.parts.put(idx, part);
state.parts.put(part);
Ok(())
})
}
Expand Down Expand Up @@ -439,7 +440,7 @@ impl MultipartStore for AmazonS3 {
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
) -> Result<MultipartPart> {
self.client
.put_part(path, id, part_idx, PutPartPayload::Part(data))
.await
Expand All @@ -449,7 +450,7 @@ impl MultipartStore for AmazonS3 {
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
parts: Vec<MultipartPart>,
) -> Result<PutResult> {
self.client
.complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
Expand Down Expand Up @@ -479,6 +480,66 @@ mod tests {

const NON_EXISTENT_NAME: &str = "nonexistentname";

#[tokio::test]
async fn write_multipart_file_with_signature() {
maybe_skip_integration!();

let bucket = "bg-no-object-lock-test";
let store = AmazonS3Builder::from_env()
.with_region("eu-west-1")
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

let payload = PutPayload::from_static(&[0u8; 10485760]);
let part = upload.put_part(payload).await;
if part.is_err() {
part.unwrap()
}

let part = upload.put_part(PutPayload::from_static(&[0u8; 5514256]));
part.await.unwrap();

let res = upload.complete().await.unwrap();
println!("res={res:?}");
}

#[tokio::test]
async fn write_multipart_file_with_signature_object_lock() {
maybe_skip_integration!();

let bucket = "bg-object-lock-test";
let store = AmazonS3Builder::from_env()
.with_region("eu-north-1")
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

let payload = PutPayload::from_static(&[0u8; 10485760]);
let part = upload.put_part(payload).await;
if part.is_err() {
part.unwrap()
}

let part = upload.put_part(PutPayload::from_static(&[0u8; 5514256]));
part.await.unwrap();

let res = upload.complete().await.unwrap();
println!("res={res:?}");
}

#[tokio::test]
async fn s3_test() {
maybe_skip_integration!();
Expand Down
10 changes: 8 additions & 2 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::client::get::GetClient;
use crate::client::header::{get_put_result, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::MultipartPart;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
Expand Down Expand Up @@ -558,7 +559,7 @@ impl AzureClient {
path: &Path,
part_idx: usize,
payload: PutPayload,
) -> Result<PartId> {
) -> Result<MultipartPart> {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);

Expand All @@ -568,7 +569,12 @@ impl AzureClient {
.send()
.await?;

Ok(PartId { content_id })
let part = MultipartPart {
e_tag: content_id,
part_number: part_idx + 1,
checksum_sha256: None,
};
Ok(part)
}

/// PUT a block list <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
Expand Down
17 changes: 9 additions & 8 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
//!
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
multipart::MultipartStore, path::Path, signer::Signer, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions,
PutPayload, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
Expand All @@ -50,6 +48,7 @@ mod credential;
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::azure::client::AzureClient;
use crate::client::parts::Parts;
use crate::client::s3::MultipartPart;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -239,13 +238,14 @@ impl MultipartUpload for AzureMultiPartUpload {
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state.client.put_block(&state.location, idx, data).await?;
state.parts.put(idx, part);
state.parts.put(part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
let parts = parts.into_iter().map(|part| part.into()).collect();

self.state
.client
Expand All @@ -271,16 +271,17 @@ impl MultipartStore for MicrosoftAzure {
_: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
) -> Result<MultipartPart> {
self.client.put_block(path, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
_: &MultipartId,
parts: Vec<PartId>,
parts: Vec<MultipartPart>,
) -> Result<PutResult> {
let parts = parts.into_iter().map(|p| p.into()).collect();
self.client
.put_block_list(path, parts, Default::default())
.await
Expand Down
14 changes: 7 additions & 7 deletions object_store/src/client/parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,34 @@
// specific language governing permissions and limitations
// under the License.

use crate::multipart::PartId;
use crate::client::s3::MultipartPart;
use parking_lot::Mutex;

/// An interior mutable collection of upload parts and their corresponding part index
#[derive(Debug, Default)]
pub(crate) struct Parts(Mutex<Vec<(usize, PartId)>>);
pub(crate) struct Parts(Mutex<Vec<MultipartPart>>);

impl Parts {
/// Record the [`PartId`] for a given index
///
/// Note: calling this method multiple times with the same `part_idx`
/// will result in multiple [`PartId`] in the final output
pub(crate) fn put(&self, part_idx: usize, id: PartId) {
self.0.lock().push((part_idx, id))
pub(crate) fn put(&self, part: MultipartPart) {
self.0.lock().push(part)
}

/// Produce the final list of [`PartId`] ordered by `part_idx`
///
/// `expected` is the number of parts expected in the final result
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<PartId>> {
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<MultipartPart>> {
let mut parts = self.0.lock();
if parts.len() != expected {
return Err(crate::Error::Generic {
store: "Parts",
source: "Missing part".to_string().into(),
});
}
parts.sort_unstable_by_key(|(idx, _)| *idx);
Ok(parts.drain(..).map(|(_, v)| v).collect())
parts.sort_unstable_by_key(|part| part.part_number);
Ok(parts.drain(..).collect())
}
}
Loading

0 comments on commit 74f0c9c

Please sign in to comment.