diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index b19e0e2ab7f..ae7d3ff465e 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -29,10 +29,9 @@ use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult, - InitiateMultipartUploadResult, ListResponse, + InitiateMultipartUploadResult, ListResponse, MultipartPart, }; use crate::client::GetOptionsExt; -use crate::multipart::PartId; use crate::path::DELIMITER; use crate::{ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path, @@ -186,7 +185,7 @@ impl From for Error { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct S3Config { pub region: String, pub endpoint: Option, @@ -452,6 +451,25 @@ impl S3Client { } } + pub(crate) fn request_with_config<'a>( + &'a self, + method: Method, + path: &'a Path, + config: &'a S3Config, + ) -> Request<'a> { + let url = self.config.path_url(path); + Request { + path, + builder: self.client.request(method, url), + payload: None, + payload_sha256: None, + config, + use_session_creds: true, + idempotent: false, + retry_error_body: false, + } + } + /// Make an S3 Delete Objects request /// /// Produces a vector of results, one for each path in the input vector. If @@ -609,6 +627,7 @@ impl S3Client { ) -> Result { let response = self .request(Method::POST, location) + .header("x-amz-checksum-algorithm", "SHA256") .query(&[("uploads", "")]) .with_encryption_headers() .with_attributes(opts.attributes) @@ -632,12 +651,16 @@ impl S3Client { upload_id: &MultipartId, part_idx: usize, data: PutPartPayload<'_>, - ) -> Result { + ) -> Result { let is_copy = matches!(data, PutPartPayload::Copy(_)); let part = (part_idx + 1).to_string(); + let config = S3Config { + checksum: Some(Checksum::SHA256), + ..self.config.clone() + }; let mut request = self - .request(Method::PUT, path) + .request_with_config(Method::PUT, path, &config) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .idempotent(true); @@ -659,6 +682,10 @@ impl S3Client { request = request.with_encryption_headers(); } let response = request.send().await?; + let checksum_sha256 = response + .headers() + .get("x-amz-checksum-sha256") + .map(|v| v.to_str().unwrap().to_string()); let content_id = match is_copy { false => get_etag(response.headers()).context(MetadataSnafu)?, @@ -672,7 +699,12 @@ impl S3Client { response.e_tag } }; - Ok(PartId { content_id }) + let part = MultipartPart { + e_tag: content_id, + part_number: part_idx + 1, + checksum_sha256, + }; + Ok(part) } pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { @@ -689,7 +721,7 @@ impl S3Client { &self, location: &Path, upload_id: &str, - parts: Vec, + parts: Vec, mode: CompleteMultipartMode, ) -> Result { let parts = if parts.is_empty() { diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 81511bad7b0..b91d4f0b8a6 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -40,7 +40,7 @@ use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3 use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -use crate::multipart::{MultipartStore, PartId}; +use crate::multipart::MultipartStore; use crate::signer::Signer; use crate::util::STRICT_ENCODE_SET; use crate::{ @@ -74,6 +74,7 @@ const STORE: &str = "S3"; /// [`CredentialProvider`] for [`AmazonS3`] pub type AwsCredentialProvider = Arc>; use crate::client::parts::Parts; +use crate::client::s3::MultipartPart; pub use credential::{AwsAuthorizer, AwsCredential}; /// Interface for [Amazon S3](https://aws.amazon.com/s3/). @@ -305,7 +306,7 @@ impl ObjectStore for AmazonS3 { .await?; let res = async { - let part_id = self + let part = self .client .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) .await?; @@ -314,7 +315,7 @@ impl ObjectStore for AmazonS3 { .complete_multipart( to, &upload_id, - vec![part_id], + vec![part], CompleteMultipartMode::Create, ) .await @@ -393,7 +394,7 @@ impl MultipartUpload for S3MultiPartUpload { PutPartPayload::Part(data), ) .await?; - state.parts.put(idx, part); + state.parts.put(part); Ok(()) }) } @@ -439,7 +440,7 @@ impl MultipartStore for AmazonS3 { id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { self.client .put_part(path, id, part_idx, PutPartPayload::Part(data)) .await @@ -449,7 +450,7 @@ impl MultipartStore for AmazonS3 { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { self.client .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite) @@ -479,6 +480,66 @@ mod tests { const NON_EXISTENT_NAME: &str = "nonexistentname"; + #[tokio::test] + async fn write_multipart_file_with_signature() { + maybe_skip_integration!(); + + let bucket = "bg-no-object-lock-test"; + let store = AmazonS3Builder::from_env() + .with_region("eu-west-1") + .with_bucket_name(bucket) + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + let payload = PutPayload::from_static(&[0u8; 10485760]); + let part = upload.put_part(payload).await; + if part.is_err() { + part.unwrap() + } + + let part = upload.put_part(PutPayload::from_static(&[0u8; 5514256])); + part.await.unwrap(); + + let res = upload.complete().await.unwrap(); + println!("res={res:?}"); + } + + #[tokio::test] + async fn write_multipart_file_with_signature_object_lock() { + maybe_skip_integration!(); + + let bucket = "bg-object-lock-test"; + let store = AmazonS3Builder::from_env() + .with_region("eu-north-1") + .with_bucket_name(bucket) + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + let payload = PutPayload::from_static(&[0u8; 10485760]); + let part = upload.put_part(payload).await; + if part.is_err() { + part.unwrap() + } + + let part = upload.put_part(PutPayload::from_static(&[0u8; 5514256])); + part.await.unwrap(); + + let res = upload.complete().await.unwrap(); + println!("res={res:?}"); + } + #[tokio::test] async fn s3_test() { maybe_skip_integration!(); diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 76dedd71aa5..31ca28d55ac 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -22,6 +22,7 @@ use crate::client::get::GetClient; use crate::client::header::{get_put_result, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; +use crate::client::s3::MultipartPart; use crate::client::GetOptionsExt; use crate::multipart::PartId; use crate::path::DELIMITER; @@ -558,7 +559,7 @@ impl AzureClient { path: &Path, part_idx: usize, payload: PutPayload, - ) -> Result { + ) -> Result { let content_id = format!("{part_idx:20}"); let block_id = BASE64_STANDARD.encode(&content_id); @@ -568,7 +569,12 @@ impl AzureClient { .send() .await?; - Ok(PartId { content_id }) + let part = MultipartPart { + e_tag: content_id, + part_number: part_idx + 1, + checksum_sha256: None, + }; + Ok(part) } /// PUT a block list diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 177bffb653a..fd42f1ce75d 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -23,11 +23,9 @@ //! //! Unused blocks will automatically be dropped after 7 days. use crate::{ - multipart::{MultipartStore, PartId}, - path::Path, - signer::Signer, - GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, + multipart::MultipartStore, path::Path, signer::Signer, GetOptions, GetResult, ListResult, + MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, + PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use futures::stream::{BoxStream, StreamExt, TryStreamExt}; @@ -50,6 +48,7 @@ mod credential; pub type AzureCredentialProvider = Arc>; use crate::azure::client::AzureClient; use crate::client::parts::Parts; +use crate::client::s3::MultipartPart; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -239,13 +238,14 @@ impl MultipartUpload for AzureMultiPartUpload { let state = Arc::clone(&self.state); Box::pin(async move { let part = state.client.put_block(&state.location, idx, data).await?; - state.parts.put(idx, part); + state.parts.put(part); Ok(()) }) } async fn complete(&mut self) -> Result { let parts = self.state.parts.finish(self.part_idx)?; + let parts = parts.into_iter().map(|part| part.into()).collect(); self.state .client @@ -271,7 +271,7 @@ impl MultipartStore for MicrosoftAzure { _: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { self.client.put_block(path, part_idx, data).await } @@ -279,8 +279,9 @@ impl MultipartStore for MicrosoftAzure { &self, path: &Path, _: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { + let parts = parts.into_iter().map(|p| p.into()).collect(); self.client .put_block_list(path, parts, Default::default()) .await diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs index 9fc301edcf8..d41db959cbf 100644 --- a/object_store/src/client/parts.rs +++ b/object_store/src/client/parts.rs @@ -15,26 +15,26 @@ // specific language governing permissions and limitations // under the License. -use crate::multipart::PartId; +use crate::client::s3::MultipartPart; use parking_lot::Mutex; /// An interior mutable collection of upload parts and their corresponding part index #[derive(Debug, Default)] -pub(crate) struct Parts(Mutex>); +pub(crate) struct Parts(Mutex>); impl Parts { /// Record the [`PartId`] for a given index /// /// Note: calling this method multiple times with the same `part_idx` /// will result in multiple [`PartId`] in the final output - pub(crate) fn put(&self, part_idx: usize, id: PartId) { - self.0.lock().push((part_idx, id)) + pub(crate) fn put(&self, part: MultipartPart) { + self.0.lock().push(part) } /// Produce the final list of [`PartId`] ordered by `part_idx` /// /// `expected` is the number of parts expected in the final result - pub(crate) fn finish(&self, expected: usize) -> crate::Result> { + pub(crate) fn finish(&self, expected: usize) -> crate::Result> { let mut parts = self.0.lock(); if parts.len() != expected { return Err(crate::Error::Generic { @@ -42,7 +42,7 @@ impl Parts { source: "Missing part".to_string().into(), }); } - parts.sort_unstable_by_key(|(idx, _)| *idx); - Ok(parts.drain(..).map(|(_, v)| v).collect()) + parts.sort_unstable_by_key(|part| part.part_number); + Ok(parts.drain(..).collect()) } } diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index dba752cb125..c69f048fc53 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -114,6 +114,22 @@ impl From> for CompleteMultipartUpload { .map(|(part_number, part)| MultipartPart { e_tag: part.content_id, part_number: part_number + 1, + checksum_sha256: None, + }) + .collect(); + Self { part } + } +} + +impl From> for CompleteMultipartUpload { + fn from(value: Vec) -> Self { + let part = value + .into_iter() + .enumerate() + .map(|(_, part)| MultipartPart { + e_tag: part.e_tag, + part_number: part.part_number, + checksum_sha256: part.checksum_sha256, }) .collect(); Self { part } @@ -121,11 +137,13 @@ impl From> for CompleteMultipartUpload { } #[derive(Debug, Serialize)] -pub(crate) struct MultipartPart { +pub struct MultipartPart { #[serde(rename = "ETag")] pub e_tag: String, #[serde(rename = "PartNumber")] pub part_number: usize, + #[serde(rename = "ChecksumSHA256")] + pub checksum_sha256: Option, } #[derive(Debug, Deserialize)] diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index ccc9c341f2f..58a04ddaab4 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -21,7 +21,7 @@ use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, - ListResponse, + ListResponse, MultipartPart, }; use crate::client::GetOptionsExt; use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE}; @@ -411,7 +411,7 @@ impl GoogleCloudStorageClient { upload_id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { let query = &[ ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), @@ -424,9 +424,12 @@ impl GoogleCloudStorageClient { .do_put() .await?; - Ok(PartId { - content_id: result.e_tag.unwrap(), - }) + let part = MultipartPart { + e_tag: result.e_tag.unwrap(), + part_number: part_idx + 1, + checksum_sha256: None, + }; + Ok(part) } /// Initiate a multipart upload diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 039ec46b68c..f75f99d6200 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -41,9 +41,8 @@ use crate::client::CredentialProvider; use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ - multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, - UploadPart, + path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use client::GoogleCloudStorageClient; @@ -54,6 +53,7 @@ use url::Url; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::parts::Parts; +use crate::client::s3::MultipartPart; use crate::multipart::MultipartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey}; @@ -124,13 +124,14 @@ impl MultipartUpload for GCSMultipartUpload { .client .put_part(&state.path, &state.multipart_id, idx, payload) .await?; - state.parts.put(idx, part); + state.parts.put(part); Ok(()) }) } async fn complete(&mut self) -> Result { let parts = self.state.parts.finish(self.part_idx)?; + let parts = parts.into_iter().map(|p| p.into()).collect(); self.state .client @@ -222,7 +223,7 @@ impl MultipartStore for GoogleCloudStorage { id: &MultipartId, part_idx: usize, payload: PutPayload, - ) -> Result { + ) -> Result { self.client.put_part(path, id, part_idx, payload).await } @@ -230,8 +231,9 @@ impl MultipartStore for GoogleCloudStorage { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { + let parts = parts.into_iter().map(|p| p.into()).collect(); self.client.multipart_complete(path, id, parts).await } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index a467e3b88a2..eb995c4df20 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -27,7 +27,8 @@ use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use crate::multipart::{MultipartStore, PartId}; +use crate::client::s3::MultipartPart; +use crate::multipart::MultipartStore; use crate::util::InvalidGetRange; use crate::{ path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, @@ -412,23 +413,26 @@ impl MultipartStore for InMemory { id: &MultipartId, part_idx: usize, payload: PutPayload, - ) -> Result { + ) -> Result { let mut storage = self.storage.write(); let upload = storage.upload_mut(id)?; if part_idx <= upload.parts.len() { upload.parts.resize(part_idx + 1, None); } upload.parts[part_idx] = Some(payload.into()); - Ok(PartId { - content_id: Default::default(), - }) + let part = MultipartPart { + e_tag: "".to_string(), + part_number: 0, + checksum_sha256: None, + }; + Ok(part) } async fn complete_multipart( &self, path: &Path, id: &MultipartId, - _parts: Vec, + _parts: Vec, ) -> Result { let mut storage = self.storage.write(); let upload = storage.remove_upload(id)?; diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index d94e7f15051..e52daf2c7b9 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -23,6 +23,7 @@ use async_trait::async_trait; +use crate::client::s3::MultipartPart; use crate::path::Path; use crate::{MultipartId, PutPayload, PutResult, Result}; @@ -33,6 +34,14 @@ pub struct PartId { pub content_id: String, } +impl From for PartId { + fn from(value: MultipartPart) -> Self { + PartId { + content_id: value.e_tag.clone(), + } + } +} + /// A low-level interface for interacting with multipart upload APIs /// /// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is supported by more @@ -64,7 +73,7 @@ pub trait MultipartStore: Send + Sync + 'static { id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result; + ) -> Result; /// Completes a multipart upload /// @@ -76,7 +85,7 @@ pub trait MultipartStore: Send + Sync + 'static { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result; /// Aborts a multipart upload diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index d07276c3dca..5b2b218ce8d 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -20,7 +20,8 @@ use parking_lot::Mutex; use std::ops::Range; use std::{convert::TryInto, sync::Arc}; -use crate::multipart::{MultipartStore, PartId}; +use crate::client::s3::MultipartPart; +use crate::multipart::MultipartStore; use crate::{ path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, @@ -350,7 +351,7 @@ impl MultipartStore for ThrottledStore { id: &MultipartId, part_idx: usize, data: PutPayload, - ) -> Result { + ) -> Result { sleep(self.config().wait_put_per_call).await; self.inner.put_part(path, id, part_idx, data).await } @@ -359,7 +360,7 @@ impl MultipartStore for ThrottledStore { &self, path: &Path, id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result { self.inner.complete_multipart(path, id, parts).await }