Skip to content

Commit

Permalink
object_store/gcp: Migrate from snafu to thiserror
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 committed Nov 10, 2024
1 parent 981648a commit ec946aa
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 87 deletions.
48 changes: 29 additions & 19 deletions object_store/src/gcp/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::gcp::{
};
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -37,33 +36,33 @@ use super::credential::{AuthorizedUserSigningCredentials, InstanceSigningCredent

const TOKEN_MIN_TTL: Duration = Duration::from_secs(4 * 60);

#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
enum Error {
#[snafu(display("Missing bucket name"))]
#[error("Missing bucket name")]
MissingBucketName {},

#[snafu(display("One of service account path or service account key may be provided."))]
#[error("One of service account path or service account key may be provided.")]
ServiceAccountPathAndKeyProvided,

#[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
#[error("Unable parse source url. Url: {}, Error: {}", url, source)]
UnableToParseUrl {
source: url::ParseError,
url: String,
},

#[snafu(display(
#[error(
"Unknown url scheme cannot be parsed into storage location: {}",
scheme
))]
)]
UnknownUrlScheme { scheme: String },

#[snafu(display("URL did not match any known pattern for scheme: {}", url))]
#[error("URL did not match any known pattern for scheme: {}", url)]
UrlNotRecognised { url: String },

#[snafu(display("Configuration key: '{}' is not known.", key))]
#[error("Configuration key: '{}' is not known.", key)]
UnknownConfigurationKey { key: String },

#[snafu(display("GCP credential error: {}", source))]
#[error("GCP credential error: {}", source)]
Credential { source: credential::Error },
}

Expand Down Expand Up @@ -319,12 +318,21 @@ impl GoogleCloudStorageBuilder {
/// This is a separate member function to allow fallible computation to
/// be deferred until [`Self::build`] which in turn allows deriving [`Clone`]
fn parse_url(&mut self, url: &str) -> Result<()> {
let parsed = Url::parse(url).context(UnableToParseUrlSnafu { url })?;
let host = parsed.host_str().context(UrlNotRecognisedSnafu { url })?;
let parsed = Url::parse(url).map_err(|source| Error::UnableToParseUrl {
source,
url: url.to_string(),
})?;

let host = parsed.host_str().ok_or_else(|| Error::UrlNotRecognised {
url: url.to_string(),
})?;

match parsed.scheme() {
"gs" => self.bucket_name = Some(host.to_string()),
scheme => return Err(UnknownUrlSchemeSnafu { scheme }.build().into()),
scheme => {
let scheme = scheme.to_string();
return Err(Error::UnknownUrlScheme { scheme }.into());
}
}
Ok(())
}
Expand Down Expand Up @@ -428,12 +436,14 @@ impl GoogleCloudStorageBuilder {
// First try to initialize from the service account information.
let service_account_credentials =
match (self.service_account_path, self.service_account_key) {
(Some(path), None) => {
Some(ServiceAccountCredentials::from_file(path).context(CredentialSnafu)?)
}
(None, Some(key)) => {
Some(ServiceAccountCredentials::from_key(&key).context(CredentialSnafu)?)
}
(Some(path), None) => Some(
ServiceAccountCredentials::from_file(path)
.map_err(|source| Error::Credential { source })?,
),
(None, Some(key)) => Some(
ServiceAccountCredentials::from_key(&key)
.map_err(|source| Error::Credential { source })?,
),
(None, None) => None,
(Some(_), Some(_)) => return Err(Error::ServiceAccountPathAndKeyProvided.into()),
};
Expand Down
91 changes: 50 additions & 41 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::header::HeaderName;
use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;

const VERSION_HEADER: &str = "x-goog-generation";
Expand All @@ -53,62 +52,62 @@ const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-goog-meta-";

static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match");

#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
enum Error {
#[snafu(display("Error performing list request: {}", source))]
#[error("Error performing list request: {}", source)]
ListRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting list response body: {}", source))]
#[error("Error getting list response body: {}", source)]
ListResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid list response: {}", source))]
#[error("Got invalid list response: {}", source)]
InvalidListResponse { source: quick_xml::de::DeError },

#[snafu(display("Error performing get request {}: {}", path, source))]
#[error("Error performing get request {}: {}", path, source)]
GetRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing request {}: {}", path, source))]
#[error("Error performing request {}: {}", path, source)]
Request {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error getting put response body: {}", source))]
#[error("Error getting put response body: {}", source)]
PutResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid put request: {}", source))]
#[error("Got invalid put request: {}", source)]
InvalidPutRequest { source: quick_xml::se::SeError },

#[snafu(display("Got invalid put response: {}", source))]
#[error("Got invalid put response: {}", source)]
InvalidPutResponse { source: quick_xml::de::DeError },

#[snafu(display("Unable to extract metadata from headers: {}", source))]
#[error("Unable to extract metadata from headers: {}", source)]
Metadata {
source: crate::client::header::Error,
},

#[snafu(display("Version required for conditional update"))]
#[error("Version required for conditional update")]
MissingVersion,

#[snafu(display("Error performing complete multipart request: {}", source))]
#[error("Error performing complete multipart request: {}", source)]
CompleteMultipartRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting complete multipart response body: {}", source))]
#[error("Error getting complete multipart response body: {}", source)]
CompleteMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid multipart response: {}", source))]
#[error("Got invalid multipart response: {}", source)]
InvalidMultipartResponse { source: quick_xml::de::DeError },

#[snafu(display("Error signing blob: {}", source))]
#[error("Error signing blob: {}", source)]
SignBlobRequest { source: crate::client::retry::Error },

#[snafu(display("Got invalid signing blob response: {}", source))]
#[error("Got invalid signing blob response: {}", source)]
InvalidSignBlobResponse { source: reqwest::Error },

#[snafu(display("Got invalid signing blob signature: {}", source))]
#[error("Got invalid signing blob signature: {}", source)]
InvalidSignBlobSignature { source: base64::DecodeError },
}

Expand Down Expand Up @@ -236,15 +235,17 @@ impl<'a> Request<'a> {
.payload(self.payload)
.send()
.await
.context(RequestSnafu {
path: self.path.as_ref(),
.map_err(|source| {
let path = self.path.as_ref().into();
Error::Request { source, path }
})?;
Ok(resp)
}

async fn do_put(self) -> Result<PutResult> {
let response = self.send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
Ok(get_put_result(response.headers(), VERSION_HEADER)
.map_err(|source| Error::Metadata { source })?)
}
}

Expand Down Expand Up @@ -336,17 +337,17 @@ impl GoogleCloudStorageClient {
.idempotent(true)
.send()
.await
.context(SignBlobRequestSnafu)?;
.map_err(|source| Error::SignBlobRequest { source })?;

//If successful, the signature is returned in the signedBlob field in the response.
let response = response
.json::<SignBlobResponse>()
.await
.context(InvalidSignBlobResponseSnafu)?;
.map_err(|source| Error::InvalidSignBlobResponse { source })?;

let signed_blob = BASE64_STANDARD
.decode(response.signed_blob)
.context(InvalidSignBlobSignatureSnafu)?;
.map_err(|source| Error::InvalidSignBlobSignature { source })?;

Ok(hex_encode(&signed_blob))
}
Expand Down Expand Up @@ -389,7 +390,7 @@ impl GoogleCloudStorageClient {
PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&VERSION_MATCH, "0"),
PutMode::Update(v) => {
let etag = v.version.as_ref().context(MissingVersionSnafu)?;
let etag = v.version.as_ref().ok_or(Error::MissingVersion)?;
builder.header(&VERSION_MATCH, etag)
}
};
Expand Down Expand Up @@ -443,9 +444,14 @@ impl GoogleCloudStorageClient {
.send()
.await?;

let data = response.bytes().await.context(PutResponseBodySnafu)?;
let data = response
.bytes()
.await
.map_err(|source| Error::PutResponseBody { source })?;

let result: InitiateMultipartUploadResult =
quick_xml::de::from_reader(data.as_ref().reader()).context(InvalidPutResponseSnafu)?;
quick_xml::de::from_reader(data.as_ref().reader())
.map_err(|source| Error::InvalidPutResponse { source })?;

Ok(result.upload_id)
}
Expand All @@ -467,8 +473,9 @@ impl GoogleCloudStorageClient {
.query(&[("uploadId", multipart_id)])
.send_retry(&self.config.retry_config)
.await
.context(RequestSnafu {
path: path.as_ref(),
.map_err(|source| {
let path = path.as_ref().into();
Error::Request { source, path }
})?;

Ok(())
Expand Down Expand Up @@ -498,7 +505,7 @@ impl GoogleCloudStorageClient {
let credential = self.get_credential().await?;

let data = quick_xml::se::to_string(&upload_info)
.context(InvalidPutRequestSnafu)?
.map_err(|source| Error::InvalidPutRequest { source })?
// We cannot disable the escaping that transforms "/" to "&quote;" :(
// https://github.com/tafia/quick-xml/issues/362
// https://github.com/tafia/quick-xml/issues/350
Expand All @@ -514,17 +521,18 @@ impl GoogleCloudStorageClient {
.idempotent(true)
.send()
.await
.context(CompleteMultipartRequestSnafu)?;
.map_err(|source| Error::CompleteMultipartRequest { source })?;

let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
let version = get_version(response.headers(), VERSION_HEADER)
.map_err(|source| Error::Metadata { source })?;

let data = response
.bytes()
.await
.context(CompleteMultipartResponseBodySnafu)?;
.map_err(|source| Error::CompleteMultipartResponseBody { source })?;

let response: CompleteMultipartUploadResult =
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
let response: CompleteMultipartUploadResult = quick_xml::de::from_reader(data.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;

Ok(PutResult {
e_tag: Some(response.e_tag),
Expand Down Expand Up @@ -615,8 +623,9 @@ impl GetClient for GoogleCloudStorageClient {
.with_get_options(options)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
.map_err(|source| {
let path = path.as_ref().into();
Error::GetRequest { source, path }
})?;

Ok(response)
Expand Down Expand Up @@ -665,13 +674,13 @@ impl ListClient for GoogleCloudStorageClient {
.bearer_auth(&credential.bearer)
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.map_err(|source| Error::ListRequest { source })?
.bytes()
.await
.context(ListResponseBodySnafu)?;
.map_err(|source| Error::ListResponseBody { source })?;

let mut response: ListResponse =
quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidListResponse { source })?;

let token = response.next_continuation_token.take();
Ok((response.try_into()?, token))
Expand Down
Loading

0 comments on commit ec946aa

Please sign in to comment.