From d87782ca9b25a36a9a06924fe5cf620a761edea3 Mon Sep 17 00:00:00 2001 From: rmn-boiko Date: Tue, 8 Oct 2024 15:57:23 +0200 Subject: [PATCH] Fix review comments. Iter 1 --- src/adapter/http/src/data/mod.rs | 3 - src/adapter/http/src/data/router.rs | 12 -- .../src/{data => general}/account_handler.rs | 36 +--- .../{data => general}/dataset_info_handler.rs | 4 + src/adapter/http/src/general/mod.rs | 15 ++ .../node_info_handler.rs} | 14 +- src/adapter/http/src/general/router.rs | 37 ++++ src/adapter/http/src/lib.rs | 1 + .../http/tests/harness/client_side_harness.rs | 14 +- .../http/tests/harness/test_api_server.rs | 1 + src/adapter/http/tests/tests/mod.rs | 2 +- .../http/tests/tests/test_account_info.rs | 12 +- ...st_workspace_info.rs => test_node_info.rs} | 16 +- .../scenario_new_dataset_via_repo_ref.rs | 4 +- .../tests_push/test_smart_push_shared.rs | 18 +- .../tests_push/test_smart_push_special.rs | 10 +- src/app/cli/src/cli.rs | 8 +- src/app/cli/src/cli_commands.rs | 2 - src/app/cli/src/cli_value_parser.rs | 14 +- src/app/cli/src/commands/push_command.rs | 106 +++-------- src/app/cli/src/explore/api_server.rs | 1 + src/domain/core/src/services/push_service.rs | 44 ++--- .../src/services/remote_aliases_registry.rs | 35 +++- .../src/identity/dataset_refs.rs | 59 ++++-- .../tests/tests/test_dataset_refs.rs | 38 +++- src/infra/core/src/push_service_impl.rs | 175 ++++++------------ .../core/src/remote_alias_resolver_impl.rs | 129 +++++-------- 27 files changed, 370 insertions(+), 440 deletions(-) rename src/adapter/http/src/{data => general}/account_handler.rs (68%) rename src/adapter/http/src/{data => general}/dataset_info_handler.rs (96%) create mode 100644 src/adapter/http/src/general/mod.rs rename src/adapter/http/src/{data/workspace_info_handler.rs => general/node_info_handler.rs} (81%) create mode 100644 src/adapter/http/src/general/router.rs rename src/adapter/http/tests/tests/{test_workspace_info.rs => test_node_info.rs} (85%) diff --git a/src/adapter/http/src/data/mod.rs b/src/adapter/http/src/data/mod.rs index 1556b5388..3d3ab11b6 100644 --- a/src/adapter/http/src/data/mod.rs +++ b/src/adapter/http/src/data/mod.rs @@ -7,8 +7,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod account_handler; -mod dataset_info_handler; mod ingest_handler; pub mod metadata_handler; mod query_handler; @@ -16,6 +14,5 @@ pub mod query_types; mod router; mod tail_handler; mod verify_handler; -mod workspace_info_handler; pub use router::*; diff --git a/src/adapter/http/src/data/router.rs b/src/adapter/http/src/data/router.rs index da94ff593..0049ffbf7 100644 --- a/src/adapter/http/src/data/router.rs +++ b/src/adapter/http/src/data/router.rs @@ -29,18 +29,6 @@ pub fn root_router() -> axum::Router { "/verify", axum::routing::post(super::verify_handler::verify_handler), ) - .route( - "/workspace/info", - axum::routing::get(super::workspace_info_handler::workspace_info_handler), - ) - .route( - "/me", - axum::routing::get(super::account_handler::account_handler), - ) - .route( - "/datasets/:id", - axum::routing::get(super::dataset_info_handler::dataset_info_handler), - ) } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/src/data/account_handler.rs b/src/adapter/http/src/general/account_handler.rs similarity index 68% rename from src/adapter/http/src/data/account_handler.rs rename to src/adapter/http/src/general/account_handler.rs index da01d07cb..3251e88dc 100644 --- a/src/adapter/http/src/data/account_handler.rs +++ b/src/adapter/http/src/general/account_handler.rs @@ -18,17 +18,10 @@ use axum::extract::Extension; use axum::response::Json; -use chrono::{DateTime, Utc}; use database_common_macros::transactional_handler; use dill::Catalog; use http_common::*; -use kamu_accounts::{ - Account, - AccountDisplayName, - AccountType, - AuthenticationService, - CurrentAccountSubject, -}; +use kamu_accounts::{Account, AuthenticationService, CurrentAccountSubject}; use opendatafabric::{AccountID, AccountName}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -38,14 +31,6 @@ use opendatafabric::{AccountID, AccountName}; pub struct AccountResponse { pub id: AccountID, pub account_name: AccountName, - pub email: Option, - pub display_name: AccountDisplayName, - pub account_type: AccountType, - pub avatar_url: Option, - pub registered_at: DateTime, - pub is_admin: bool, - pub provider: String, - pub provider_identity_key: String, } impl From for AccountResponse { @@ -53,14 +38,6 @@ impl From for AccountResponse { Self { id: value.id, account_name: value.account_name, - email: value.email, - display_name: value.display_name, - account_type: value.account_type, - avatar_url: value.avatar_url, - registered_at: value.registered_at, - is_admin: value.is_admin, - provider: value.provider, - provider_identity_key: value.provider_identity_key, } } } @@ -82,12 +59,11 @@ async fn get_account(catalog: &Catalog) -> Result, ApiErro CurrentAccountSubject::Anonymous(_) => Err(ApiError::new_unauthorized()), CurrentAccountSubject::Logged(account) => { let auth_service = catalog.get_one::().unwrap(); - let full_account_info_maybe = auth_service.account_by_id(&account.account_id).await?; - if let Some(full_account_info) = full_account_info_maybe { - return Ok(Json(full_account_info.into())); - } - - Err(ApiError::not_found_without_body()) + let full_account_info = auth_service + .account_by_id(&account.account_id) + .await? + .unwrap(); + Ok(Json(full_account_info.into())) } } } diff --git a/src/adapter/http/src/data/dataset_info_handler.rs b/src/adapter/http/src/general/dataset_info_handler.rs similarity index 96% rename from src/adapter/http/src/data/dataset_info_handler.rs rename to src/adapter/http/src/general/dataset_info_handler.rs index 5f1fc4dcc..1b2054980 100644 --- a/src/adapter/http/src/data/dataset_info_handler.rs +++ b/src/adapter/http/src/general/dataset_info_handler.rs @@ -24,6 +24,8 @@ use http_common::*; use kamu_core::{DatasetRepository, GetDatasetError}; use opendatafabric::{AccountName, DatasetHandle, DatasetID, DatasetName}; +use crate::axum_utils::ensure_authenticated_account; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, serde::Serialize)] @@ -60,6 +62,8 @@ async fn get_dataset_by_id( catalog: &Catalog, dataset_id: &DatasetID, ) -> Result, ApiError> { + ensure_authenticated_account(catalog).api_err()?; + let dataset_repo = catalog.get_one::().unwrap(); let dataset_handle = dataset_repo .resolve_dataset_ref(&dataset_id.clone().as_local_ref()) diff --git a/src/adapter/http/src/general/mod.rs b/src/adapter/http/src/general/mod.rs new file mode 100644 index 000000000..60f92f82e --- /dev/null +++ b/src/adapter/http/src/general/mod.rs @@ -0,0 +1,15 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod account_handler; +mod dataset_info_handler; +mod node_info_handler; +mod router; + +pub use router::*; diff --git a/src/adapter/http/src/data/workspace_info_handler.rs b/src/adapter/http/src/general/node_info_handler.rs similarity index 81% rename from src/adapter/http/src/data/workspace_info_handler.rs rename to src/adapter/http/src/general/node_info_handler.rs index 6d2bde2f6..f938e2714 100644 --- a/src/adapter/http/src/data/workspace_info_handler.rs +++ b/src/adapter/http/src/general/node_info_handler.rs @@ -26,24 +26,24 @@ use kamu_core::DatasetRepository; #[derive(Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] -pub struct WorkspaceInfoResponse { +pub struct NodeInfoResponse { pub is_multi_tenant: bool, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub async fn workspace_info_handler( +pub async fn node_info_handler( Extension(catalog): Extension, -) -> Result, ApiError> { - let response = get_workspace_info(&catalog); - tracing::debug!(?response, "Get workspace info response"); +) -> Result, ApiError> { + let response = get_node_info(&catalog); + tracing::debug!(?response, "Get node info response"); Ok(response) } -fn get_workspace_info(catalog: &Catalog) -> Json { +fn get_node_info(catalog: &Catalog) -> Json { let dataset_repo = catalog.get_one::().unwrap(); - Json(WorkspaceInfoResponse { + Json(NodeInfoResponse { is_multi_tenant: dataset_repo.is_multi_tenant(), }) } diff --git a/src/adapter/http/src/general/router.rs b/src/adapter/http/src/general/router.rs new file mode 100644 index 000000000..1dac53d9b --- /dev/null +++ b/src/adapter/http/src/general/router.rs @@ -0,0 +1,37 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub fn root_router() -> axum::Router { + axum::Router::new() + .route( + "/info", + axum::routing::get(super::node_info_handler::node_info_handler), + ) + .route( + "/accounts/me", + axum::routing::get(super::account_handler::account_handler), + ) + .route( + "/datasets/:id", + axum::routing::get(super::dataset_info_handler::dataset_info_handler), + ) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/src/lib.rs b/src/adapter/http/src/lib.rs index 5be8164c4..0a138e055 100644 --- a/src/adapter/http/src/lib.rs +++ b/src/adapter/http/src/lib.rs @@ -24,6 +24,7 @@ pub mod smart_protocol; mod upload; mod ws_common; pub use upload::*; +pub mod general; pub type SmartTransferProtocolClientWs = smart_protocol::ws_tungstenite_client::WsSmartTransferProtocolClient; diff --git a/src/adapter/http/tests/harness/client_side_harness.rs b/src/adapter/http/tests/harness/client_side_harness.rs index c0d5ec776..01b44ce41 100644 --- a/src/adapter/http/tests/harness/client_side_harness.rs +++ b/src/adapter/http/tests/harness/client_side_harness.rs @@ -27,10 +27,10 @@ use opendatafabric::{ AccountID, AccountName, DatasetID, + DatasetPushTarget, DatasetRef, DatasetRefAny, RepoName, - TransferDatasetRef, }; use tempfile::TempDir; use time_source::SystemTimeSourceDefault; @@ -243,16 +243,13 @@ impl ClientSideHarness { pub async fn push_dataset( &self, dataset_local_ref: DatasetRef, - dataset_remote_ref: TransferDatasetRef, + dataset_remote_ref: DatasetPushTarget, force: bool, dataset_visibility: DatasetVisibility, ) -> Vec { self.push_service - .push_multi_ext( - vec![PushRequest { - local_ref: Some(dataset_local_ref), - remote_ref: Some(dataset_remote_ref), - }], + .push_multi( + vec![dataset_local_ref], PushMultiOptions { sync_options: SyncOptions { create_if_not_exists: true, @@ -260,6 +257,7 @@ impl ClientSideHarness { dataset_visibility, ..SyncOptions::default() }, + remote_target: Some(dataset_remote_ref), ..PushMultiOptions::default() }, None, @@ -270,7 +268,7 @@ impl ClientSideHarness { pub async fn push_dataset_result( &self, dataset_local_ref: DatasetRef, - dataset_remote_ref: TransferDatasetRef, + dataset_remote_ref: DatasetPushTarget, force: bool, dataset_visibility: DatasetVisibility, ) -> SyncResult { diff --git a/src/adapter/http/tests/harness/test_api_server.rs b/src/adapter/http/tests/harness/test_api_server.rs index 24471fbff..9fd34bb00 100644 --- a/src/adapter/http/tests/harness/test_api_server.rs +++ b/src/adapter/http/tests/harness/test_api_server.rs @@ -38,6 +38,7 @@ impl TestAPIServer { axum::routing::post(kamu_adapter_http::platform_file_upload_post_handler), ) .nest("/", kamu_adapter_http::data::root_router()) + .nest("/", kamu_adapter_http::general::root_router()) .nest( if multi_tenant { "/:account_name/:dataset_name" diff --git a/src/adapter/http/tests/tests/mod.rs b/src/adapter/http/tests/tests/mod.rs index f5266aa93..eabae3bea 100644 --- a/src/adapter/http/tests/tests/mod.rs +++ b/src/adapter/http/tests/tests/mod.rs @@ -13,12 +13,12 @@ mod test_data_ingest; mod test_data_query; mod test_dataset_authorization_layer; mod test_dataset_info; +mod test_node_info; mod test_platform_login_validate; mod test_protocol_dataset_helpers; mod test_routing; mod test_upload_local; mod test_upload_s3; -mod test_workspace_info; mod tests_pull; mod tests_push; diff --git a/src/adapter/http/tests/tests/test_account_info.rs b/src/adapter/http/tests/tests/test_account_info.rs index 68ecd83aa..db3b32b01 100644 --- a/src/adapter/http/tests/tests/test_account_info.rs +++ b/src/adapter/http/tests/tests/test_account_info.rs @@ -23,7 +23,7 @@ async fn test_get_account_info_with_wrong_token() { let cl = reqwest::Client::new(); let res = cl - .get(&format!("{}me", harness.root_url)) + .get(&format!("{}accounts/me", harness.root_url)) .send() .await .unwrap(); @@ -43,7 +43,7 @@ async fn test_get_account_info() { let cl = reqwest::Client::new(); let res = cl - .get(&format!("{}me", harness.root_url)) + .get(&format!("{}accounts/me", harness.root_url)) .header("Authorization", format!("Bearer {DUMMY_ACCESS_TOKEN}")) .send() .await @@ -53,15 +53,7 @@ async fn test_get_account_info() { res.json::().await.unwrap(), json!({ "accountName": expected_account.account_name, - "accountType": expected_account.account_type, "id": expected_account.id, - "avatarUrl": expected_account.avatar_url, - "displayName": expected_account.display_name, - "email": expected_account.email, - "isAdmin": expected_account.is_admin, - "provider": expected_account.provider, - "providerIdentityKey": expected_account.provider_identity_key, - "registeredAt": expected_account.registered_at }) ); }; diff --git a/src/adapter/http/tests/tests/test_workspace_info.rs b/src/adapter/http/tests/tests/test_node_info.rs similarity index 85% rename from src/adapter/http/tests/tests/test_workspace_info.rs rename to src/adapter/http/tests/tests/test_node_info.rs index fd66405de..894bda0b0 100644 --- a/src/adapter/http/tests/tests/test_workspace_info.rs +++ b/src/adapter/http/tests/tests/test_node_info.rs @@ -14,14 +14,14 @@ use crate::harness::*; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_workspace_info_single_tenant() { - let harness = WorkspaceInfoHarness::new(false).await; +async fn test_node_info_single_tenant() { + let harness = NodeInfoHarness::new(false).await; let client = async move { let cl = reqwest::Client::new(); let res = cl - .get(&format!("{}workspace/info", harness.root_url)) + .get(&format!("{}info", harness.root_url)) .send() .await .unwrap() @@ -40,14 +40,14 @@ async fn test_workspace_info_single_tenant() { } #[test_log::test(tokio::test)] -async fn test_workspace_info_multi_tenant() { - let harness = WorkspaceInfoHarness::new(true).await; +async fn test_node_info_multi_tenant() { + let harness = NodeInfoHarness::new(true).await; let client = async move { let cl = reqwest::Client::new(); let res = cl - .get(&format!("{}workspace/info", harness.root_url)) + .get(&format!("{}info", harness.root_url)) .send() .await .unwrap() @@ -67,12 +67,12 @@ async fn test_workspace_info_multi_tenant() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -struct WorkspaceInfoHarness { +struct NodeInfoHarness { root_url: url::Url, server_harness: ServerSideLocalFsHarness, } -impl WorkspaceInfoHarness { +impl NodeInfoHarness { async fn new(is_multi_tenant: bool) -> Self { let server_harness = ServerSideLocalFsHarness::new(ServerSideHarnessOptions { multi_tenant: is_multi_tenant, diff --git a/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs b/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs index e4063eb32..c09647c7d 100644 --- a/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs +++ b/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs @@ -26,7 +26,7 @@ pub(crate) struct SmartPushNewDatasetViaRepoRefScenario SmartPushNewDatasetViaRepoRefScenario, + #[arg(long, value_name = "REM", value_parser = parsers::dataset_push_target)] + pub to: Option, /// Overwrite remote version with local, even if revisions have diverged #[arg(long, short = 'f')] @@ -924,8 +924,8 @@ pub struct Push { pub visibility: parsers::DatasetVisibility, /// Local or remote dataset reference(s) - #[arg(value_parser = parsers::dataset_ref_pattern_any)] - pub dataset: Option>, + #[arg(value_parser = parsers::dataset_ref_pattern)] + pub dataset: Option>, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/cli_commands.rs b/src/app/cli/src/cli_commands.rs index 82b0fa445..09c790b6b 100644 --- a/src/app/cli/src/cli_commands.rs +++ b/src/app/cli/src/cli_commands.rs @@ -279,11 +279,9 @@ pub fn get_command( } } cli::Command::Push(c) => Box::new(PushCommand::new( - cli_catalog.get_one()?, cli_catalog.get_one()?, cli_catalog.get_one()?, c.dataset.unwrap_or_default(), - cli_catalog.get_one()?, c.all, c.recursive, !c.no_alias, diff --git a/src/app/cli/src/cli_value_parser.rs b/src/app/cli/src/cli_value_parser.rs index bb46417f3..0407f9bbe 100644 --- a/src/app/cli/src/cli_value_parser.rs +++ b/src/app/cli/src/cli_value_parser.rs @@ -74,13 +74,15 @@ pub(crate) fn dataset_ref_remote(s: &str) -> Result Result { - match odf::TransferDatasetRef::from_str(s) { +pub(crate) fn dataset_push_target(s: &str) -> Result { + match odf::DatasetPushTarget::from_str(s) { Ok(push_dataset_ref) => Ok(push_dataset_ref), - Err(_) => Err("Remote reference should be in form: `did:odf:...` or \ - `repository/account/dataset-id` or `scheme://some-url` or repository \ - reference can only contain alphanumerics, dashes, and dots" - .to_string()), + Err(_) => Err( + "Remote reference should be in form: `repository/account/dataset-name` or \ + `scheme://some-url` or repository reference can only contain alphanumerics, dashes, \ + and dots" + .to_string(), + ), } } diff --git a/src/app/cli/src/commands/push_command.rs b/src/app/cli/src/commands/push_command.rs index 5aa007f0f..33231f4e4 100644 --- a/src/app/cli/src/commands/push_command.rs +++ b/src/app/cli/src/commands/push_command.rs @@ -13,8 +13,7 @@ use std::time::Duration; use console::style as s; use futures::TryStreamExt; use kamu::domain::*; -use kamu::utils::datasets_filtering::filter_datasets_by_any_pattern; -use kamu_accounts::CurrentAccountSubject; +use kamu::utils::datasets_filtering::filter_datasets_by_local_pattern; use opendatafabric::*; use super::{BatchError, CLIError, Command}; @@ -27,14 +26,12 @@ use crate::output::OutputConfig; pub struct PushCommand { push_svc: Arc, dataset_repo: Arc, - search_svc: Arc, - refs: Vec, - current_account_subject: Arc, + refs: Vec, all: bool, recursive: bool, add_aliases: bool, force: bool, - to: Option, + to: Option, dataset_visibility: DatasetVisibility, output_config: Arc, } @@ -43,26 +40,22 @@ impl PushCommand { pub fn new( push_svc: Arc, dataset_repo: Arc, - search_svc: Arc, refs: I, - current_account_subject: Arc, all: bool, recursive: bool, add_aliases: bool, force: bool, - to: Option, + to: Option, dataset_visibility: DatasetVisibility, output_config: Arc, ) -> Self where - I: IntoIterator, + I: IntoIterator, { Self { push_svc, dataset_repo, - search_svc, refs: refs.into_iter().collect(), - current_account_subject, all, recursive, add_aliases, @@ -77,71 +70,26 @@ impl PushCommand { &self, listener: Option>, ) -> Result, CLIError> { - let current_account_name = match self.current_account_subject.as_ref() { - CurrentAccountSubject::Anonymous(_) => { - return Err(CLIError::usage_error( - "Anonymous account misused, use multi-tenant alias", - )) - } - CurrentAccountSubject::Logged(l) => &l.account_name, - }; - - if let Some(transfer_ref) = &self.to { - let local_ref = match self.refs[0].as_dataset_ref_any() { - Some(dataset_ref_any) => dataset_ref_any - .as_local_ref(|_| !self.dataset_repo.is_multi_tenant()) - .map_err(|_| { - CLIError::usage_error( - "When using --to reference should point to a local dataset", - ) - })?, - None => { - return Err(CLIError::usage_error( - "When using --to reference should not point to wildcard pattern", - )) - } - }; - - Ok(self - .push_svc - .push_multi_ext( - vec![PushRequest { - local_ref: Some(local_ref), - remote_ref: Some(transfer_ref.clone()), - }], - PushMultiOptions { - all: self.all, - recursive: self.recursive, - add_aliases: self.add_aliases, - sync_options: self.sync_options(), - }, - listener, - ) - .await) - } else { - let dataset_refs: Vec<_> = filter_datasets_by_any_pattern( - self.dataset_repo.as_ref(), - self.search_svc.clone(), - self.refs.clone(), - current_account_name, + let dataset_refs: Vec<_> = + filter_datasets_by_local_pattern(self.dataset_repo.as_ref(), self.refs.clone()) + .map_ok(|dataset_handle| dataset_handle.as_local_ref()) + .try_collect() + .await?; + + Ok(self + .push_svc + .push_multi( + dataset_refs, + PushMultiOptions { + all: self.all, + recursive: self.recursive, + add_aliases: self.add_aliases, + sync_options: self.sync_options(), + remote_target: self.to.clone(), + }, + listener, ) - .try_collect() - .await?; - - Ok(self - .push_svc - .push_multi( - dataset_refs, - PushMultiOptions { - all: self.all, - recursive: self.recursive, - add_aliases: self.add_aliases, - sync_options: self.sync_options(), - }, - listener, - ) - .await) - } + .await) } fn sync_options(&self) -> SyncOptions { @@ -232,10 +180,8 @@ impl Command for PushCommand { .into_iter() .filter(|res| res.result.is_err()) .map(|res| { - ( - res.result.err().unwrap(), - format!("Failed to push {}", res.original_request), - ) + let push_err = format!("Failed to push {res}"); + (res.result.err().unwrap(), push_err) }), ) .into()) diff --git a/src/app/cli/src/explore/api_server.rs b/src/app/cli/src/explore/api_server.rs index d045f48a2..3d238c5fc 100644 --- a/src/app/cli/src/explore/api_server.rs +++ b/src/app/cli/src/explore/api_server.rs @@ -114,6 +114,7 @@ impl APIServer { axum::routing::post(kamu_adapter_http::platform_file_upload_post_handler), ) .nest("/", kamu_adapter_http::data::root_router()) + .nest("/", kamu_adapter_http::general::root_router()) .nest( "/odata", if multi_tenant_workspace { diff --git a/src/domain/core/src/services/push_service.rs b/src/domain/core/src/services/push_service.rs index a72b1533e..f87befbf3 100644 --- a/src/domain/core/src/services/push_service.rs +++ b/src/domain/core/src/services/push_service.rs @@ -24,37 +24,33 @@ use crate::{DatasetNotFoundError, GetDatasetError}; pub trait PushService: Send + Sync { async fn push_multi( &self, - dataset_refs: Vec, + dataset_refs: Vec, options: PushMultiOptions, sync_listener: Option>, ) -> Vec; - - async fn push_multi_ext( - &self, - requests: Vec, - options: PushMultiOptions, - sync_listener: Option>, - ) -> Vec; -} - -#[derive(Debug, Clone)] -pub struct PushRequest { - pub local_ref: Option, - pub remote_ref: Option, } #[derive(Debug)] pub struct PushResponse { - /// Parameters passed into the call - pub original_request: PushRequest, /// Local dataset handle, if resolved pub local_handle: Option, /// Destination reference, if resolved - pub remote_ref: Option, + pub target: Option, /// Result of the push operation pub result: Result, } +impl std::fmt::Display for PushResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match (&self.local_handle, &self.target) { + (Some(l), None) => write!(f, "{l}"), + (None, Some(r)) => write!(f, "{r}"), + (Some(l), Some(r)) => write!(f, "{l} to {r}"), + (None, None) => write!(f, "???"), + } + } +} + #[derive(Debug, Clone)] pub struct PushMultiOptions { /// Push all dataset dependencies recursively in depth-first order @@ -65,6 +61,8 @@ pub struct PushMultiOptions { pub add_aliases: bool, /// Sync options pub sync_options: SyncOptions, + /// Destination reference, if resolved + pub remote_target: Option, } impl Default for PushMultiOptions { @@ -74,17 +72,7 @@ impl Default for PushMultiOptions { all: false, add_aliases: true, sync_options: SyncOptions::default(), - } - } -} - -impl std::fmt::Display for PushRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match (&self.local_ref, &self.remote_ref) { - (Some(l), None) => write!(f, "{l}"), - (None, Some(r)) => write!(f, "{r}"), - (Some(l), Some(r)) => write!(f, "{l} to {r}"), - (None, None) => write!(f, "???"), + remote_target: None, } } } diff --git a/src/domain/core/src/services/remote_aliases_registry.rs b/src/domain/core/src/services/remote_aliases_registry.rs index f45cd0d4b..d181d92bd 100644 --- a/src/domain/core/src/services/remote_aliases_registry.rs +++ b/src/domain/core/src/services/remote_aliases_registry.rs @@ -53,18 +53,39 @@ impl From for GetAliasesError { #[async_trait] pub trait RemoteAliasResolver: Send + Sync { + // Resolve remote alias reference. + // Firstly try to resolve from AliasRegistry, if cannot do it + // try to resolve via repository registry async fn resolve_remote_alias( &self, local_dataset_handle: &DatasetHandle, - transfer_dataset_ref_maybe: Option, + dataset_push_target_maybe: Option, remote_alias_kind: RemoteAliasKind, - ) -> Result; + ) -> Result; +} - async fn inverse_lookup_dataset_by_alias( - &self, - remote_ref: &TransferDatasetRef, - remote_alias_kind: RemoteAliasKind, - ) -> Result; +#[derive(Debug, Clone)] +pub struct RemoteAliasRef { + pub url: url::Url, + pub repo_name: Option, + pub dataset_name: Option, + pub account_name: Option, +} + +impl RemoteAliasRef { + pub fn new( + url: url::Url, + repo_name: Option, + dataset_name: Option, + account_name: Option, + ) -> Self { + Self { + url, + repo_name, + dataset_name, + account_name, + } + } } #[derive(Error, Debug)] diff --git a/src/domain/opendatafabric/src/identity/dataset_refs.rs b/src/domain/opendatafabric/src/identity/dataset_refs.rs index 2e1fb4e07..76bffdd93 100644 --- a/src/domain/opendatafabric/src/identity/dataset_refs.rs +++ b/src/domain/opendatafabric/src/identity/dataset_refs.rs @@ -11,6 +11,7 @@ use std::fmt; use std::str::FromStr; use std::sync::Arc; +use thiserror::Error; use url::Url; use super::grammar::Grammar; @@ -931,52 +932,70 @@ super::dataset_identity::impl_parse_error!(DatasetRefAnyPattern); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, PartialEq, Eq)] -pub enum TransferDatasetRef { - RemoteRef(DatasetRefRemote), +pub enum DatasetPushTarget { + Url(Url), + Alias(DatasetAliasRemote), Repository(RepoName), } -impl std::str::FromStr for TransferDatasetRef { +impl std::str::FromStr for DatasetPushTarget { type Err = ParseError; fn from_str(s: &str) -> Result { - if let Ok(dataset_ref_remote) = DatasetRefRemote::try_from(s) { - return Ok(Self::RemoteRef(dataset_ref_remote)); + match DatasetAliasRemote::from_str(s) { + Ok(alias) => Ok(Self::Alias(alias)), + Err(_) => match RepoName::from_str(s) { + Ok(repo_name) => Ok(Self::Repository(repo_name)), + Err(_) => match Grammar::match_url(s) { + Some(_) => match Url::from_str(s) { + Ok(url) => Ok(Self::Url(url)), + Err(_) => Err(Self::Err::new(s)), + }, + None => Err(Self::Err::new(s)), + }, + }, } - let repository_ref = RepoName::from_str(s).unwrap(); - Ok(Self::Repository(repository_ref)) } } -impl TransferDatasetRef { +impl DatasetPushTarget { pub fn into_repo_name(self) -> Option { match self { - Self::RemoteRef(dataset_ref_remote) => match dataset_ref_remote { - DatasetRefRemote::Alias(dataset_alias_remote) => { - Some(dataset_alias_remote.repo_name) - } - _ => None, - }, + Self::Alias(dataset_alias_remote) => Some(dataset_alias_remote.repo_name), Self::Repository(repo_name) => Some(repo_name), + Self::Url(_) => None, } } } -impl fmt::Display for TransferDatasetRef { +impl fmt::Display for DatasetPushTarget { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::RemoteRef(v) => write!(f, "{v}"), + Self::Alias(v) => write!(f, "{v}"), + Self::Url(v) => write!(f, "{v}"), Self::Repository(v) => write!(f, "{v}"), } } } -impl From for TransferDatasetRef { - fn from(value: DatasetRefRemote) -> Self { - Self::RemoteRef(value) +impl TryFrom for DatasetPushTarget { + type Error = DatasetPushTargetError; + + fn try_from(value: DatasetRefRemote) -> Result { + match value { + DatasetRefRemote::Alias(remote_alias_ref) => Ok(Self::Alias(remote_alias_ref)), + DatasetRefRemote::Url(url_ref) => Ok(Self::Url(url_ref.as_ref().clone())), + _ => Err(Self::Error::UnsupportedType), + } } } -super::dataset_identity::impl_parse_error!(TransferDatasetRef); +#[derive(Error, Debug)] +pub enum DatasetPushTargetError { + #[error("Unsupported type to cast")] + UnsupportedType, +} + +super::dataset_identity::impl_parse_error!(DatasetPushTarget); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs index 4eeef53ac..a3f14a3c4 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs @@ -288,22 +288,48 @@ fn test_dataset_ref_any_pattern() { } #[test] -fn test_transfer_dataset_ref() { - // Parse valid remote dataset ref +fn test_dataset_push_target() { + // Parse valid remote url ref let param = "http://net.example.com"; - let res = TransferDatasetRef::from_str(param).unwrap(); + let res = DatasetPushTarget::from_str(param).unwrap(); + + assert_eq!(res, DatasetPushTarget::Url(Url::from_str(param).unwrap())); + + // Parse valid remote single tenant alias ref + let repo_name = RepoName::new_unchecked("net.example.com"); + let dataset_name = DatasetName::new_unchecked("foo"); + + let res = DatasetPushTarget::from_str(&format!("{repo_name}/{dataset_name}")).unwrap(); assert_eq!( res, - TransferDatasetRef::RemoteRef(DatasetRefRemote::from_str(param).unwrap()) + DatasetPushTarget::Alias(DatasetAliasRemote::new( + repo_name.clone(), + None, + dataset_name.clone() + )) + ); + + // Parse valid remote single tenant alias ref + let account_name = AccountName::new_unchecked("bar"); + let res = + DatasetPushTarget::from_str(&format!("{repo_name}/{account_name}/{dataset_name}")).unwrap(); + + assert_eq!( + res, + DatasetPushTarget::Alias(DatasetAliasRemote::new( + repo_name.clone(), + Some(account_name.clone()), + dataset_name.clone() + )) ); // Parse valid repository ref let param = "net.example.com"; - let res = TransferDatasetRef::from_str(param).unwrap(); + let res = DatasetPushTarget::from_str(param).unwrap(); assert_eq!( res, - TransferDatasetRef::Repository(RepoName::new_unchecked(param)) + DatasetPushTarget::Repository(RepoName::new_unchecked(param)) ); } diff --git a/src/infra/core/src/push_service_impl.rs b/src/infra/core/src/push_service_impl.rs index e10167009..0c18b6bb9 100644 --- a/src/infra/core/src/push_service_impl.rs +++ b/src/infra/core/src/push_service_impl.rs @@ -37,12 +37,16 @@ impl PushServiceImpl { } } - async fn collect_plan(&self, items: &Vec) -> (Vec, Vec) { + async fn collect_plan( + &self, + items: &Vec, + push_target: &Option, + ) -> (Vec, Vec) { let mut plan = Vec::new(); let mut errors = Vec::new(); - for request in items { - match self.collect_plan_item(request.clone()).await { + for dataset_ref in items { + match self.collect_plan_item(dataset_ref, push_target).await { Ok(item) => plan.push(item), Err(err) => errors.push(err), } @@ -51,66 +55,38 @@ impl PushServiceImpl { (plan, errors) } - async fn collect_plan_item(&self, request: PushRequest) -> Result { + async fn collect_plan_item( + &self, + dataset_ref: &DatasetRef, + push_target: &Option, + ) -> Result { // Resolve local dataset if we have a local reference - let local_handle = if let Some(local_ref) = &request.local_ref { - match self.dataset_repo.resolve_dataset_ref(local_ref).await { - Ok(h) => h, - Err(e) => { - return Err(PushResponse { - local_handle: None, - remote_ref: request.remote_ref.clone(), - result: Err(e.into()), - original_request: request, - }) - } - } - } else { - // We are sure that here we will have remote ref - let transfer_ref = request.remote_ref.as_ref().unwrap(); - match self - .remote_alias_resolver - .inverse_lookup_dataset_by_alias(transfer_ref, RemoteAliasKind::Push) - .await - { - Ok(local_handle) => local_handle, - Err(e) => { - return Err(PushResponse { - local_handle: None, - remote_ref: Some(transfer_ref.clone()), - result: Err(e.into()), - original_request: request, - }) - } + let local_handle = match self.dataset_repo.resolve_dataset_ref(dataset_ref).await { + Ok(h) => h, + Err(e) => { + return Err(PushResponse { + local_handle: None, + target: push_target.clone(), + result: Err(e.into()), + }) } }; - match &request { - PushRequest { - local_ref: None, - remote_ref: None, - } => panic!("Push request must contain either local or remote reference"), - push_request => match self - .remote_alias_resolver - .resolve_remote_alias( - &local_handle, - push_request.remote_ref.clone(), - RemoteAliasKind::Push, - ) - .await - { - Ok(remote_ref) => Ok(PushItem { - local_handle, - remote_ref, - original_request: request, - }), - Err(e) => Err(PushResponse { - local_handle: Some(local_handle), - remote_ref: request.remote_ref.clone(), - result: Err(e.into()), - original_request: request, - }), - }, + match self + .remote_alias_resolver + .resolve_remote_alias(&local_handle, push_target.clone(), RemoteAliasKind::Push) + .await + { + Ok(remote_alias_ref) => Ok(PushItem { + local_handle, + remote_alias_ref, + push_target: push_target.clone(), + }), + Err(e) => Err(PushResponse { + local_handle: Some(local_handle), + target: push_target.clone(), + result: Err(e.into()), + }), } } } @@ -119,32 +95,7 @@ impl PushServiceImpl { impl PushService for PushServiceImpl { async fn push_multi( &self, - dataset_refs: Vec, - options: PushMultiOptions, - sync_listener: Option>, - ) -> Vec { - let requests = dataset_refs - .into_iter() - .map( - |r| match r.as_local_ref(|_| !self.dataset_repo.is_multi_tenant()) { - Ok(local_ref) => PushRequest { - local_ref: Some(local_ref), - remote_ref: None, - }, - Err(remote_ref) => PushRequest { - local_ref: None, - remote_ref: Some(remote_ref.into()), - }, - }, - ) - .collect(); - - self.push_multi_ext(requests, options, sync_listener).await - } - - async fn push_multi_ext( - &self, - initial_requests: Vec, + dataset_refs: Vec, options: PushMultiOptions, sync_listener: Option>, ) -> Vec { @@ -155,7 +106,9 @@ impl PushService for PushServiceImpl { unimplemented!("Pushing all datasets is not yet supported") } - let (plan, errors) = self.collect_plan(&initial_requests).await; + let (plan, errors) = self + .collect_plan(&dataset_refs, &options.remote_target) + .await; if !errors.is_empty() { return errors; } @@ -166,7 +119,7 @@ impl PushService for PushServiceImpl { plan.iter() .map(|pi| SyncRequest { src: pi.local_handle.as_any_ref(), - dst: pi.remote_ref.as_any_ref(), + dst: (&pi.remote_alias_ref.url).into(), }) .collect(), options.sync_options, @@ -176,34 +129,29 @@ impl PushService for PushServiceImpl { assert_eq!(plan.len(), sync_results.len()); - let results: Vec<_> = std::iter::zip(plan, sync_results) + let results: Vec<_> = std::iter::zip(&plan, sync_results) .map(|(pi, res)| { + let remote_ref: DatasetRefAny = (&pi.remote_alias_ref.url).into(); assert_eq!(pi.local_handle.as_any_ref(), res.src); - assert_eq!(pi.remote_ref.as_any_ref(), res.dst); - pi.into_response(res.result) + assert_eq!(remote_ref, res.dst); + pi.as_response(res.result) }) .collect(); // If no errors - add aliases to initial items if options.add_aliases && results.iter().all(|r| r.result.is_ok()) { - for request in &initial_requests { - if let PushRequest { - local_ref: Some(local_ref), - remote_ref: Some(TransferDatasetRef::RemoteRef(remote_ref)), - } = request - { - if let DatasetRefRemote::Alias(_) = &remote_ref { - continue; - } - // TODO: Improve error handling - self.remote_alias_reg - .get_remote_aliases(local_ref) - .await - .unwrap() - .add(remote_ref, RemoteAliasKind::Push) - .await - .unwrap(); - } + for push_item in &plan { + // TODO: Improve error handling + self.remote_alias_reg + .get_remote_aliases(&(push_item.local_handle.as_local_ref())) + .await + .unwrap() + .add( + &((&push_item.remote_alias_ref.url).into()), + RemoteAliasKind::Push, + ) + .await + .unwrap(); } } @@ -213,17 +161,16 @@ impl PushService for PushServiceImpl { #[derive(Debug)] struct PushItem { - original_request: PushRequest, local_handle: DatasetHandle, - remote_ref: DatasetRefRemote, + remote_alias_ref: RemoteAliasRef, + push_target: Option, } impl PushItem { - fn into_response(self, result: Result) -> PushResponse { + fn as_response(&self, result: Result) -> PushResponse { PushResponse { - original_request: self.original_request, - local_handle: Some(self.local_handle), - remote_ref: Some(self.remote_ref.into()), + local_handle: Some(self.local_handle.clone()), + target: self.push_target.clone(), result: result.map_err(Into::into), } } diff --git a/src/infra/core/src/remote_alias_resolver_impl.rs b/src/infra/core/src/remote_alias_resolver_impl.rs index a8a0ed79e..9e4c8990c 100644 --- a/src/infra/core/src/remote_alias_resolver_impl.rs +++ b/src/infra/core/src/remote_alias_resolver_impl.rs @@ -14,7 +14,7 @@ use auth::OdfServerAccessTokenResolver; use dill::*; use internal_error::{InternalError, ResultIntoInternal}; use kamu_core::*; -use opendatafabric::{self as odf, DatasetRefRemote}; +use opendatafabric as odf; use url::Url; use crate::UrlExt; @@ -24,7 +24,6 @@ use crate::UrlExt; pub struct RemoteAliasResolverImpl { remote_repo_reg: Arc, access_token_resolver: Arc, - dataset_repo: Arc, remote_alias_reg: Arc, } @@ -34,52 +33,55 @@ impl RemoteAliasResolverImpl { pub fn new( remote_repo_reg: Arc, access_token_resolver: Arc, - dataset_repo: Arc, remote_alias_reg: Arc, ) -> Self { Self { remote_repo_reg, access_token_resolver, - dataset_repo, remote_alias_reg, } } - async fn fetch_remote_alias( + async fn fetch_remote_url( &self, local_handle: &odf::DatasetHandle, remote_alias_kind: RemoteAliasKind, - ) -> Result, ResolveAliasError> { + ) -> Result, ResolveAliasError> { let remote_aliases = self .remote_alias_reg .get_remote_aliases(&local_handle.as_local_ref()) .await .int_err()?; - let push_aliases: Vec<_> = remote_aliases.get_by_kind(remote_alias_kind).collect(); + let aliases: Vec<_> = remote_aliases.get_by_kind(remote_alias_kind).collect(); - match push_aliases.len() { + match aliases.len() { 0 => Ok(None), - 1 => Ok(Some(push_aliases[0].clone())), + 1 => { + if let odf::DatasetRefRemote::Url(remote_url) = aliases[0].clone() { + return Ok(Some(remote_url.as_ref().clone())); + } + Ok(None) + } _ => Err(ResolveAliasError::AmbiguousAlias), } } - fn combine_remote_alias( + fn combine_remote_url( &self, repo_url: &Url, - account_name_maybe: Option, + account_name_maybe: Option<&odf::AccountName>, dataset_name: &odf::DatasetName, - ) -> Result { + ) -> Result { let mut res_url = repo_url.clone().as_odf_protocol().int_err()?; { let mut path_segments = res_url.path_segments_mut().unwrap(); if let Some(account_name) = account_name_maybe { - path_segments.push(&account_name); + path_segments.push(account_name); } path_segments.push(dataset_name); } - Ok(res_url.into()) + Ok(res_url) } async fn resolve_remote_dataset_name( @@ -109,35 +111,43 @@ impl RemoteAliasResolver for RemoteAliasResolverImpl { async fn resolve_remote_alias( &self, local_dataset_handle: &odf::DatasetHandle, - transfer_dataset_ref_maybe: Option, + dataset_push_target_maybe: Option, remote_alias_kind: RemoteAliasKind, - ) -> Result { + ) -> Result { let repo_name: odf::RepoName; let mut account_name = None; let mut dataset_name = None; - if let Some(transfer_dataset_ref) = &transfer_dataset_ref_maybe { - match transfer_dataset_ref { - odf::TransferDatasetRef::RemoteRef(DatasetRefRemote::Alias( - dataset_alias_remote, - )) => { + if let Some(dataset_push_target) = &dataset_push_target_maybe { + match dataset_push_target { + odf::DatasetPushTarget::Alias(dataset_alias_remote) => { repo_name = dataset_alias_remote.repo_name.clone(); account_name.clone_from(&dataset_alias_remote.account_name); dataset_name = Some(dataset_alias_remote.dataset_name.clone()); } - odf::TransferDatasetRef::RemoteRef(dataset_ref_remote) => { - return Ok(dataset_ref_remote.clone()); + odf::DatasetPushTarget::Url(url_ref) => { + return Ok(RemoteAliasRef::new( + url_ref.clone(), + None, + dataset_name, + account_name, + )); } - odf::TransferDatasetRef::Repository(repository_name) => { + odf::DatasetPushTarget::Repository(repository_name) => { repo_name = repository_name.clone(); } } } else { - if let Some(remote_alias) = self - .fetch_remote_alias(local_dataset_handle, remote_alias_kind) + if let Some(remote_url) = self + .fetch_remote_url(local_dataset_handle, remote_alias_kind) .await? { - return Ok(remote_alias); + return Ok(RemoteAliasRef::new( + remote_url, + None, + dataset_name, + account_name, + )); } let remote_repo_names: Vec<_> = self.remote_repo_reg.get_all_repositories().collect(); if remote_repo_names.len() > 1 { @@ -162,60 +172,23 @@ impl RemoteAliasResolver for RemoteAliasResolverImpl { .await .int_err()?; } - let push_dataset_name = dataset_name.unwrap_or( + let transfer_dataset_name = dataset_name.clone().unwrap_or( self.resolve_remote_dataset_name(local_dataset_handle, &remote_repo.url) .await?, ); - let remote_alias = - self.combine_remote_alias(&remote_repo.url, account_name, &push_dataset_name)?; - - return Ok(remote_alias); - } + let remote_url = self.combine_remote_url( + &remote_repo.url, + account_name.as_ref(), + &transfer_dataset_name, + )?; - // TODO: avoid traversing all datasets for every alias - async fn inverse_lookup_dataset_by_alias( - &self, - transfer_ref: &odf::TransferDatasetRef, - remote_alias_kind: RemoteAliasKind, - ) -> Result { - // Do a quick check when remote and local names match - if let odf::TransferDatasetRef::RemoteRef(remote_ref) = transfer_ref { - if let Some(remote_name) = remote_ref.dataset_name() - && let Some(local_handle) = self - .dataset_repo - .try_resolve_dataset_ref( - &odf::DatasetAlias::new(None, remote_name.clone()).as_local_ref(), - ) - .await? - && self - .remote_alias_reg - .get_remote_aliases(&local_handle.as_local_ref()) - .await - .int_err()? - .contains(remote_ref, remote_alias_kind) - { - return Ok(local_handle); - } - - // No luck - now have to search through aliases - use tokio_stream::StreamExt; - let mut datasets = self.dataset_repo.get_all_datasets(); - while let Some(dataset_handle) = datasets.next().await { - let dataset_handle = dataset_handle?; - - if self - .remote_alias_reg - .get_remote_aliases(&dataset_handle.as_local_ref()) - .await - .int_err()? - .contains(remote_ref, RemoteAliasKind::Push) - { - return Ok(dataset_handle); - } - } - } - Err(ResolveAliasError::EmptyRepositoryList) + return Ok(RemoteAliasRef::new( + remote_url, + Some(repo_name), + dataset_name, + account_name, + )); } } @@ -239,7 +212,7 @@ impl RemoteAliasResolverApiHelper { }; let workspace_info_response = client - .get(server_backend_url.join("workspace/info").unwrap()) + .get(server_backend_url.join("info").unwrap()) .headers(header_map.clone()) .send() .await @@ -256,7 +229,7 @@ impl RemoteAliasResolverApiHelper { } let account_response = client - .get(server_backend_url.join("me").unwrap()) + .get(server_backend_url.join("accounts/me").unwrap()) .headers(header_map) .send() .await