Skip to content

Commit

Permalink
DatasetFactoryImpl: use RepoExternalAddressConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
s373r committed Oct 17, 2024
1 parent d52beb5 commit b76be41
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/adapter/http/tests/tests/test_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn setup_client(dataset_url: url::Url, head_expected: Multihash) {
.add::<auth::DummyOdfServerAccessTokenResolver>()
.build();

let dataset = DatasetFactoryImpl::new(IpfsGateway::default(), catalog.get_one().unwrap())
let dataset = DatasetFactoryImpl::new(IpfsGateway::default(), catalog.get_one().unwrap(), None)
.get_dataset(&dataset_url, false)
.await
.unwrap();
Expand Down
38 changes: 31 additions & 7 deletions src/infra/core/src/repos/dataset_factory_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::*;
pub struct DatasetFactoryImpl {
ipfs_gateway: IpfsGateway,
access_token_resolver: Arc<dyn kamu_core::auth::OdfServerAccessTokenResolver>,
maybe_repo_external_address_config: Option<Arc<RepoExternalAddressConfig>>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -47,10 +48,12 @@ impl DatasetFactoryImpl {
pub fn new(
ipfs_gateway: IpfsGateway,
access_token_resolver: Arc<dyn kamu_core::auth::OdfServerAccessTokenResolver>,
maybe_repo_external_address_config: Option<Arc<RepoExternalAddressConfig>>,
) -> Self {
Self {
ipfs_gateway,
access_token_resolver,
maybe_repo_external_address_config,
}
}

Expand Down Expand Up @@ -110,25 +113,44 @@ impl DatasetFactoryImpl {
/// credential resolution from scratch which can be very expensive. If you
/// already have an established [S3Context] use
/// [DatasetFactoryImpl::get_s3_from_context()] function instead.
pub async fn get_s3_from_url(base_url: Url) -> Result<impl Dataset, InternalError> {
pub async fn get_s3_from_url(
base_url: Url,
maybe_repo_external_address_config: &Option<Arc<RepoExternalAddressConfig>>,
) -> Result<impl Dataset, InternalError> {
// TODO: We should ensure optimal credential reuse. Perhaps in future we should
// create a cache of S3Contexts keyed by an endpoint.
let s3_context = S3Context::from_url(&base_url).await;
Self::get_s3_from_context(s3_context)
Self::get_s3_from_context(s3_context, maybe_repo_external_address_config)
}

pub fn get_s3_from_context(s3_context: S3Context) -> Result<impl Dataset, InternalError> {
pub fn get_s3_from_context(
s3_context: S3Context,
maybe_repo_external_address_config: &Option<Arc<RepoExternalAddressConfig>>,
) -> Result<impl Dataset, InternalError> {
let maybe_external_address_override = maybe_repo_external_address_config
.as_ref()
.map(|config| config.external_address.clone());

Ok(DatasetImpl::new(
MetadataChainImpl::new(
MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new(
ObjectRepositoryS3Sha3::new(s3_context.sub_context("blocks/"), None),
ObjectRepositoryS3Sha3::new(
s3_context.sub_context("blocks/"),
maybe_external_address_override.clone(),
),
)),
ReferenceRepositoryImpl::new(NamedObjectRepositoryS3::new(
s3_context.sub_context("refs/"),
)),
),
ObjectRepositoryS3Sha3::new(s3_context.sub_context("data/"), None),
ObjectRepositoryS3Sha3::new(s3_context.sub_context("checkpoints/"), None),
ObjectRepositoryS3Sha3::new(
s3_context.sub_context("data/"),
maybe_external_address_override.clone(),
),
ObjectRepositoryS3Sha3::new(
s3_context.sub_context("checkpoints/"),
maybe_external_address_override.clone(),
),
NamedObjectRepositoryS3::new(s3_context.into_sub_context("info/")),
))
}
Expand Down Expand Up @@ -294,7 +316,9 @@ impl DatasetFactory for DatasetFactoryImpl {
Ok(Arc::new(ds))
}
"s3" | "s3+http" | "s3+https" => {
let ds = Self::get_s3_from_url(url.clone()).await?;
let ds =
Self::get_s3_from_url(url.clone(), &self.maybe_repo_external_address_config)
.await?;
Ok(Arc::new(ds))
}
_ => Err(UnsupportedProtocolError {
Expand Down
1 change: 1 addition & 0 deletions src/infra/core/tests/tests/test_pull_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async fn create_graph_remote(
Arc::new(DatasetFactoryImpl::new(
IpfsGateway::default(),
Arc::new(auth::DummyOdfServerAccessTokenResolver::new()),
None,
)),
Arc::new(DummySmartTransferProtocolClient::new()),
Arc::new(kamu::utils::ipfs_wrapper::IpfsClient::default()),
Expand Down

0 comments on commit b76be41

Please sign in to comment.