From 4441cc1d5189a71494eb7b17d0a960e723c68be2 Mon Sep 17 00:00:00 2001 From: Dima Pristupa Date: Fri, 29 Mar 2024 12:44:26 +0200 Subject: [PATCH] Data access panel: GraphQL initial API (#570) * GraphQL: Dataset->Endpoints initial implementation * OData: respect KAMU_BASE_URL * Smart Transfer Protocol: respect KAMU_BASE_URL --- CHANGELOG.md | 2 + Cargo.lock | 1 + resources/schema.gql | 59 +++++++- src/adapter/graphql/Cargo.toml | 5 +- .../graphql/src/queries/accounts/accounts.rs | 1 - .../graphql/src/queries/datasets/dataset.rs | 9 +- .../src/queries/datasets/dataset_endpoints.rs | 138 ++++++++++++++++++ .../graphql/src/queries/datasets/mod.rs | 2 + .../graphql/src/scalars/dataset_endpoints.rs | 63 ++++++++ .../graphql/src/scalars/engine_desc.rs | 1 - src/adapter/graphql/src/scalars/mod.rs | 2 + .../http/src/simple_protocol/handlers.rs | 8 +- src/adapter/odata/src/context.rs | 30 +--- src/adapter/odata/src/handler.rs | 45 ++---- .../odata/tests/tests/test_handlers.rs | 26 +--- src/app/cli/src/app.rs | 2 + src/domain/core/src/services/mod.rs | 2 + .../core/src/services/server_url_config.rs | 64 ++++++++ 18 files changed, 371 insertions(+), 89 deletions(-) create mode 100644 src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs create mode 100644 src/adapter/graphql/src/scalars/dataset_endpoints.rs create mode 100644 src/domain/core/src/services/server_url_config.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 86b902d55..77b479b02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +### Added +- Added GrapqQL Dataset Endpoints object ### Changed - REST API: `/ingest` endpoint will return HTTP 400 error when data cannot be read correctly - Improved API token generation command diff --git a/Cargo.lock b/Cargo.lock index 5d8d04277..8ed940d09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3469,6 +3469,7 @@ dependencies = [ "tokio-stream", "tracing", "tracing-subscriber", + "url", ] [[package]] diff --git a/resources/schema.gql b/resources/schema.gql index 71db80e23..7b6d85d4d 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -121,6 +121,11 @@ type Checkpoint { size: Int! } +type CliProtocolDesc { + pullCommand: String! + pushCommand: String! +} + interface CommitResult { message: String! } @@ -279,6 +284,10 @@ type Dataset { Permissions of the current user """ permissions: DatasetPermissions! + """ + Various endpoints for interacting with data + """ + endpoints: DatasetEndpoints! } scalar DatasetAlias @@ -332,6 +341,18 @@ type DatasetEdge { node: Dataset! } +type DatasetEndpoints { + webLink: LinkProtocolDesc! + cli: CliProtocolDesc! + rest: RestProtocolDesc! + flightsql: FlightSqlDesc! + jdbc: JdbcDesc! + postgresql: PostgreSqlDesl! + kafka: KafkaProtocolDesc! + websocket: WebSocketProtocolDesc! + odata: OdataProtocolDesc! +} + type DatasetFlowConfigs { """ Returns defined configuration for a flow of specified type @@ -557,9 +578,6 @@ type DisablePushSource { sourceName: String! } -""" -Describes -""" type EngineDesc { """ A short name of the engine, e.g. "Spark", "Flink". @@ -640,6 +658,10 @@ type FetchStepUrl { headers: [RequestHeader!] } +type FlightSqlDesc { + url: String! +} + type Flow { """ @@ -893,6 +915,18 @@ input InitiatorFilterInput @oneOf { } +type JdbcDesc { + url: String! +} + +type KafkaProtocolDesc { + url: String! +} + +type LinkProtocolDesc { + url: String! +} + type LoginResponse { accessToken: String! account: Account! @@ -1010,6 +1044,11 @@ type NoChanges implements CommitResult & UpdateReadmeResult { message: String! } +type OdataProtocolDesc { + serviceUrl: String! + collectionUrl: String! +} + type OffsetInterval { start: Int! end: Int! @@ -1035,6 +1074,10 @@ type PageBasedInfo { totalPages: Int } +type PostgreSqlDesl { + url: String! +} + union PrepStep = PrepStepDecompress | PrepStepPipe type PrepStepDecompress { @@ -1169,6 +1212,12 @@ type RequestHeader { value: String! } +type RestProtocolDesc { + tailUrl: String! + queryUrl: String! + pushUrl: String! +} + input ScheduleInput @oneOf { timeDelta: TimeDeltaInput cron5ComponentExpression: String @@ -1471,6 +1520,10 @@ interface UpdateReadmeResult { message: String! } +type WebSocketProtocolDesc { + url: String! +} + schema { query: Query mutation: Mutation diff --git a/src/adapter/graphql/Cargo.toml b/src/adapter/graphql/Cargo.toml index 1a5abe082..ee0abb231 100644 --- a/src/adapter/graphql/Cargo.toml +++ b/src/adapter/graphql/Cargo.toml @@ -23,12 +23,12 @@ doctest = false [dependencies] internal-error = { workspace = true } -opendatafabric = { workspace = true, features=["arrow"] } +opendatafabric = { workspace = true, features = ["arrow"] } kamu-data-utils = { workspace = true } kamu-core = { workspace = true } kamu-task-system = { workspace = true } -kamu-flow-system = { workspace = true } +kamu-flow-system = { workspace = true } event-sourcing = { workspace = true } @@ -45,6 +45,7 @@ tokio = { version = "1", default-features = false, features = [] } tokio-stream = { version = "0.1", default-features = false } tracing = "0.1" thiserror = { version = "1", default-features = false } +url = { version = "2", default-features = false } [dev-dependencies] diff --git a/src/adapter/graphql/src/queries/accounts/accounts.rs b/src/adapter/graphql/src/queries/accounts/accounts.rs index 88c4e979c..c48ba473b 100644 --- a/src/adapter/graphql/src/queries/accounts/accounts.rs +++ b/src/adapter/graphql/src/queries/accounts/accounts.rs @@ -26,7 +26,6 @@ impl Accounts { } /// Returns account by its name - #[allow(unused_variables)] async fn by_name(&self, ctx: &Context<'_>, name: AccountName) -> Result> { let authentication_service = from_catalog::(ctx).unwrap(); diff --git a/src/adapter/graphql/src/queries/datasets/dataset.rs b/src/adapter/graphql/src/queries/datasets/dataset.rs index e85605357..e862e59dd 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset.rs @@ -9,7 +9,7 @@ use chrono::prelude::*; use futures::TryStreamExt; -use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt}; +use kamu_core::{self as domain, MetadataChainExt, ServerUrlConfig, TryStreamExtExt}; use opendatafabric as odf; use crate::prelude::*; @@ -153,6 +153,13 @@ impl Dataset { can_schedule: can_write, }) } + + /// Various endpoints for interacting with data + async fn endpoints(&self, ctx: &Context<'_>) -> DatasetEndpoints<'_> { + let config = from_catalog::(ctx).unwrap(); + + DatasetEndpoints::new(&self.owner, self.dataset_handle.clone(), config) + } } #[derive(SimpleObject, Debug, Clone, PartialEq, Eq)] diff --git a/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs b/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs new file mode 100644 index 000000000..a6e22c8dd --- /dev/null +++ b/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs @@ -0,0 +1,138 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use kamu_core::ServerUrlConfig; +use opendatafabric as odf; + +use crate::prelude::*; +use crate::queries::*; + +pub struct DatasetEndpoints<'a> { + owner: &'a Account, + dataset_handle: odf::DatasetHandle, + config: Arc, +} + +#[Object] +impl<'a> DatasetEndpoints<'a> { + #[graphql(skip)] + pub fn new( + owner: &'a Account, + dataset_handle: odf::DatasetHandle, + config: Arc, + ) -> Self { + Self { + owner, + dataset_handle, + config, + } + } + + #[allow(clippy::unused_async)] + async fn web_link(&self) -> Result { + let url = format!( + "{}{}", + self.config.protocols.base_url_rest, self.dataset_handle.alias + ); + + Ok(LinkProtocolDesc { url }) + } + + #[allow(clippy::unused_async)] + async fn cli(&self) -> Result { + let url = format!( + "{}{}", + self.config.protocols.base_url_rest, self.dataset_handle.alias + ); + + let push_command = format!("kamu push {url}"); + let pull_command = format!("kamu pull {url}"); + + Ok(CliProtocolDesc { + pull_command, + push_command, + }) + } + + #[allow(clippy::unused_async)] + async fn rest(&self) -> Result { + let base_url = format!( + "{}{}", + self.config.protocols.base_url_rest, self.dataset_handle.alias + ); + + let tail_url = format!("{base_url}/tail?limit=10"); + let push_url = format!("{base_url}/push"); + + let query_url = format!( + "{}graphql?query=query {{%0A%20 apiVersion%0A}}%0A", + self.config.protocols.base_url_rest + ); + + Ok(RestProtocolDesc { + tail_url, + query_url, + push_url, + }) + } + + #[allow(clippy::unused_async)] + async fn flightsql(&self) -> Result { + Ok(FlightSqlDesc { + url: self.config.protocols.base_url_flightsql.to_string(), + }) + } + + #[allow(clippy::unused_async)] + async fn jdbc(&self) -> Result { + let mut url = self.config.protocols.base_url_flightsql.clone(); + + url.set_scheme("arrow-flight-sql").unwrap(); + + Ok(JdbcDesc { + url: format!("jdbc:{url}"), + }) + } + + #[allow(clippy::unused_async)] + async fn postgresql(&self) -> Result { + Ok(PostgreSqlDesl { + url: "- coming soon -".to_string(), + }) + } + + #[allow(clippy::unused_async)] + async fn kafka(&self) -> Result { + Ok(KafkaProtocolDesc { + url: "- coming soon -".to_string(), + }) + } + + #[allow(clippy::unused_async)] + async fn websocket(&self) -> Result { + Ok(WebSocketProtocolDesc { + url: "- coming soon -".to_string(), + }) + } + + #[allow(clippy::unused_async)] + async fn odata(&self) -> Result { + let url = format!("{}odata", self.config.protocols.base_url_rest); + + let service_url = format!("{url}/{}", self.owner.account_name_internal().as_str()); + let collection_url = format!("{url}/{}", self.dataset_handle.alias); + + Ok(OdataProtocolDesc { + service_url, + collection_url, + }) + } +} diff --git a/src/adapter/graphql/src/queries/datasets/mod.rs b/src/adapter/graphql/src/queries/datasets/mod.rs index 784b46645..6fd96390f 100644 --- a/src/adapter/graphql/src/queries/datasets/mod.rs +++ b/src/adapter/graphql/src/queries/datasets/mod.rs @@ -9,6 +9,7 @@ mod dataset; mod dataset_data; +mod dataset_endpoints; mod dataset_flow_configs; mod dataset_flow_runs; mod dataset_flows; @@ -18,6 +19,7 @@ mod metadata_chain; pub(crate) use dataset::*; pub(crate) use dataset_data::*; +pub(crate) use dataset_endpoints::*; pub(crate) use dataset_flow_configs::*; pub(crate) use dataset_flow_runs::*; pub(crate) use dataset_flows::*; diff --git a/src/adapter/graphql/src/scalars/dataset_endpoints.rs b/src/adapter/graphql/src/scalars/dataset_endpoints.rs new file mode 100644 index 000000000..361fad916 --- /dev/null +++ b/src/adapter/graphql/src/scalars/dataset_endpoints.rs @@ -0,0 +1,63 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::prelude::*; + +///////////////////////////////////////////////////////////////////////////////////////// + +#[derive(SimpleObject, Debug)] +pub struct LinkProtocolDesc { + pub url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct CliProtocolDesc { + pub pull_command: String, + pub push_command: String, +} + +#[derive(SimpleObject, Debug)] +pub struct RestProtocolDesc { + pub tail_url: String, + pub query_url: String, + pub push_url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct FlightSqlDesc { + pub url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct JdbcDesc { + pub url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct PostgreSqlDesl { + pub url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct KafkaProtocolDesc { + pub url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct WebSocketProtocolDesc { + pub url: String, +} + +#[derive(SimpleObject, Debug)] +pub struct OdataProtocolDesc { + pub service_url: String, + pub collection_url: String, +} + +///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/scalars/engine_desc.rs b/src/adapter/graphql/src/scalars/engine_desc.rs index 75a3cc1ba..af2cea05a 100644 --- a/src/adapter/graphql/src/scalars/engine_desc.rs +++ b/src/adapter/graphql/src/scalars/engine_desc.rs @@ -13,7 +13,6 @@ use crate::prelude::*; ///////////////////////////////////////////////////////////////////////////////////////// -/// Describes #[derive(SimpleObject)] pub struct EngineDesc { /// A short name of the engine, e.g. "Spark", "Flink". diff --git a/src/adapter/graphql/src/scalars/mod.rs b/src/adapter/graphql/src/scalars/mod.rs index fe07077a7..b0ee99065 100644 --- a/src/adapter/graphql/src/scalars/mod.rs +++ b/src/adapter/graphql/src/scalars/mod.rs @@ -11,6 +11,7 @@ mod account; mod data_batch; mod data_query; mod data_schema; +mod dataset_endpoints; mod dataset_id_name; mod engine_desc; mod event_id; @@ -28,6 +29,7 @@ pub(crate) use account::*; pub(crate) use data_batch::*; pub(crate) use data_query::*; pub(crate) use data_schema::*; +pub(crate) use dataset_endpoints::*; pub(crate) use dataset_id_name::*; pub(crate) use engine_desc::*; pub(crate) use event_id::*; diff --git a/src/adapter/http/src/simple_protocol/handlers.rs b/src/adapter/http/src/simple_protocol/handlers.rs index 930748911..39885cacc 100644 --- a/src/adapter/http/src/simple_protocol/handlers.rs +++ b/src/adapter/http/src/simple_protocol/handlers.rs @@ -279,8 +279,12 @@ fn get_base_dataset_url( ///////////////////////////////////////////////////////////////////////////////// fn get_api_server_url(host: &str) -> Url { - let scheme = std::env::var("KAMU_PROTOCOL_SCHEME").unwrap_or_else(|_| String::from("http")); - Url::parse(&format!("{scheme}://{host}")).unwrap() + // TODO: Use value from config not envvar + // https://github.com/kamu-data/kamu-node/issues/45 + let raw_base_url = + std::env::var("KAMU_BASE_URL_REST").unwrap_or_else(|_| format!("http://{host}")); + + Url::parse(&raw_base_url).unwrap() } ///////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/odata/src/context.rs b/src/adapter/odata/src/context.rs index 9afd268c8..b31acd4f2 100644 --- a/src/adapter/odata/src/context.rs +++ b/src/adapter/odata/src/context.rs @@ -43,18 +43,9 @@ pub(crate) struct ODataServiceContext { } impl ODataServiceContext { - pub(crate) fn new( - host: &axum::headers::Host, - uri: &http::Uri, - catalog: Catalog, - account_name: Option, - ) -> Self { - // TODO: Find out scheme the API is served through (e.g. if there is an LB) - let scheme = "http"; - let mut service_base_url = format!("{scheme}://{host}{uri}"); - if service_base_url.ends_with('/') { - service_base_url.pop(); - } + pub(crate) fn new(catalog: Catalog, account_name: Option) -> Self { + let config = catalog.get_one::().unwrap(); + let service_base_url = config.protocols.odata_base_url(); Self { catalog, @@ -118,23 +109,12 @@ pub(crate) struct ODataCollectionContext { impl ODataCollectionContext { pub(crate) fn new( - host: &axum::headers::Host, - uri: &http::Uri, catalog: Catalog, dataset_handle: DatasetHandle, dataset: Arc, ) -> Self { - let scheme = std::env::var("KAMU_PROTOCOL_SCHEME").unwrap_or_else(|_| String::from("http")); - let (base_path, _) = uri - .path_and_query() - .unwrap() - .path() - .rsplit_once('/') - .unwrap(); - let mut service_base_url = format!("{scheme}://{host}{base_path}"); - if service_base_url.ends_with('/') { - service_base_url.pop(); - } + let config = catalog.get_one::().unwrap(); + let service_base_url = config.protocols.odata_base_url(); Self { catalog, diff --git a/src/adapter/odata/src/handler.rs b/src/adapter/odata/src/handler.rs index 628b78a7c..49fcf1af3 100644 --- a/src/adapter/odata/src/handler.rs +++ b/src/adapter/odata/src/handler.rs @@ -32,57 +32,45 @@ use crate::context::*; pub async fn odata_service_handler_st( catalog: axum::extract::Extension, - host: axum::extract::TypedHeader, - uri: axum::extract::OriginalUri, ) -> axum::response::Response { - odata_service_handler_common(catalog, host, uri, None).await + odata_service_handler_common(catalog, None).await } pub async fn odata_service_handler_mt( catalog: axum::extract::Extension, - host: axum::extract::TypedHeader, - uri: axum::extract::OriginalUri, axum::extract::Path(account_name): axum::extract::Path, ) -> axum::response::Response { - odata_service_handler_common(catalog, host, uri, Some(account_name)).await + odata_service_handler_common(catalog, Some(account_name)).await } /////////////////////////////////////////////////////////////////////////////// pub async fn odata_metadata_handler_st( catalog: axum::extract::Extension, - host: axum::extract::TypedHeader, - uri: axum::extract::OriginalUri, ) -> axum::response::Response { - odata_metadata_handler_common(catalog, host, uri, None).await + odata_metadata_handler_common(catalog, None).await } pub async fn odata_metadata_handler_mt( catalog: axum::extract::Extension, - host: axum::extract::TypedHeader, - uri: axum::extract::OriginalUri, axum::extract::Path(account_name): axum::extract::Path, ) -> axum::response::Response { - odata_metadata_handler_common(catalog, host, uri, Some(account_name)).await + odata_metadata_handler_common(catalog, Some(account_name)).await } /////////////////////////////////////////////////////////////////////////////// pub async fn odata_collection_handler_st( catalog: axum::extract::Extension, - host: axum::extract::TypedHeader, - uri: axum::extract::OriginalUri, axum::extract::Path(dataset_name): axum::extract::Path, headers: axum::http::HeaderMap, query: axum::extract::Query, ) -> axum::response::Response { - odata_collection_handler_common(catalog, host, uri, None, dataset_name, headers, query).await + odata_collection_handler_common(catalog, None, dataset_name, headers, query).await } pub async fn odata_collection_handler_mt( catalog: axum::extract::Extension, - host: axum::extract::TypedHeader, - uri: axum::extract::OriginalUri, axum::extract::Path((account_name, dataset_name)): axum::extract::Path<( AccountName, DatasetName, @@ -90,16 +78,7 @@ pub async fn odata_collection_handler_mt( headers: axum::http::HeaderMap, query: axum::extract::Query, ) -> axum::response::Response { - odata_collection_handler_common( - catalog, - host, - uri, - Some(account_name), - dataset_name, - headers, - query, - ) - .await + odata_collection_handler_common(catalog, Some(account_name), dataset_name, headers, query).await } /////////////////////////////////////////////////////////////////////////////// @@ -108,11 +87,9 @@ pub async fn odata_collection_handler_mt( pub async fn odata_service_handler_common( axum::extract::Extension(catalog): axum::extract::Extension, - axum::extract::TypedHeader(host): axum::extract::TypedHeader, - axum::extract::OriginalUri(uri): axum::extract::OriginalUri, account_name: Option, ) -> axum::response::Response { - let ctx = ODataServiceContext::new(&host, &uri, catalog, account_name); + let ctx = ODataServiceContext::new(catalog, account_name); datafusion_odata::handlers::odata_service_handler(axum::Extension(Arc::new(ctx))).await } @@ -120,11 +97,9 @@ pub async fn odata_service_handler_common( pub async fn odata_metadata_handler_common( axum::extract::Extension(catalog): axum::extract::Extension, - axum::extract::TypedHeader(host): axum::extract::TypedHeader, - axum::extract::OriginalUri(uri): axum::extract::OriginalUri, account_name: Option, ) -> axum::response::Response { - let ctx = ODataServiceContext::new(&host, &uri, catalog, account_name); + let ctx = ODataServiceContext::new(catalog, account_name); datafusion_odata::handlers::odata_metadata_handler(axum::Extension(Arc::new(ctx))).await } @@ -132,8 +107,6 @@ pub async fn odata_metadata_handler_common( pub async fn odata_collection_handler_common( axum::extract::Extension(catalog): axum::extract::Extension, - axum::extract::TypedHeader(host): axum::extract::TypedHeader, - axum::extract::OriginalUri(uri): axum::extract::OriginalUri, account_name: Option, dataset_name: DatasetName, headers: axum::http::HeaderMap, @@ -161,7 +134,7 @@ pub async fn odata_collection_handler_common( .await .unwrap(); - let ctx = ODataCollectionContext::new(&host, &uri, catalog, dataset_handle, dataset); + let ctx = ODataCollectionContext::new(catalog, dataset_handle, dataset); datafusion_odata::handlers::odata_collection_handler( axum::Extension(Arc::new(ctx)), query, diff --git a/src/adapter/odata/tests/tests/test_handlers.rs b/src/adapter/odata/tests/tests/test_handlers.rs index ec0fa2f8d..b0f8063f3 100644 --- a/src/adapter/odata/tests/tests/test_handlers.rs +++ b/src/adapter/odata/tests/tests/test_handlers.rs @@ -17,6 +17,7 @@ use kamu::domain::*; use kamu::testing::*; use kamu::*; use opendatafabric::*; +use url::Url; use super::test_api_server::TestAPIServer; @@ -45,12 +46,7 @@ async fn test_service_handler() { let client = async move { let cl = reqwest::Client::new(); - let res = cl - .get(&service_url) - .header("host", "example.com") - .send() - .await - .unwrap(); + let res = cl.get(&service_url).send().await.unwrap(); assert_eq!(res.status(), http::StatusCode::OK); assert_eq!( res.headers()["content-type"], @@ -95,12 +91,7 @@ async fn test_metadata_handler() { let client = async move { let cl = reqwest::Client::new(); - let res = cl - .get(&service_url) - .header("host", "example.com") - .send() - .await - .unwrap(); + let res = cl.get(&service_url).send().await.unwrap(); assert_eq!(res.status(), http::StatusCode::OK); assert_eq!( res.headers()["content-type"], @@ -153,12 +144,7 @@ async fn test_collection_handler() { let client = async move { let cl = reqwest::Client::new(); - let res = cl - .get(&collection_url) - .header("host", "example.com") - .send() - .await - .unwrap(); + let res = cl.get(&collection_url).send().await.unwrap(); assert_eq!(res.status(), http::StatusCode::OK); assert_eq!( res.headers()["content-type"], @@ -293,6 +279,10 @@ impl TestHarness { ) .bind::() .add::() + .add_value(ServerUrlConfig::new(Protocols { + base_url_rest: Url::parse("http://example.com").unwrap(), + base_url_flightsql: Url::parse("grpc://localhost:50050").unwrap(), + })) .build(); let dataset_repo = catalog.get_one::().unwrap(); diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index c375c0fe1..57b6df8b3 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -71,6 +71,8 @@ pub async fn run( let mut base_catalog_builder = configure_base_catalog(&workspace_layout, workspace_svc.is_multi_tenant_workspace()); + base_catalog_builder.add_value(ServerUrlConfig::load()?); + base_catalog_builder .add_value(dependencies_graph_repository) .bind::(); diff --git a/src/domain/core/src/services/mod.rs b/src/domain/core/src/services/mod.rs index fd4541643..3e15f070f 100644 --- a/src/domain/core/src/services/mod.rs +++ b/src/domain/core/src/services/mod.rs @@ -25,6 +25,7 @@ pub mod remote_repository_registry; pub mod reset_service; pub mod resource_loader; pub mod search_service; +pub mod server_url_config; pub mod sync_service; pub mod transform_service; pub mod verification_service; @@ -44,6 +45,7 @@ pub use remote_repository_registry::*; pub use reset_service::*; pub use resource_loader::*; pub use search_service::*; +pub use server_url_config::*; pub use sync_service::*; pub use transform_service::*; pub use verification_service::*; diff --git a/src/domain/core/src/services/server_url_config.rs b/src/domain/core/src/services/server_url_config.rs new file mode 100644 index 000000000..f6c934696 --- /dev/null +++ b/src/domain/core/src/services/server_url_config.rs @@ -0,0 +1,64 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use internal_error::{InternalError, ResultIntoInternal}; +use url::Url; + +///////////////////////////////////////////////////////////////////////////////////////// + +pub struct ServerUrlConfig { + pub protocols: Protocols, +} + +impl ServerUrlConfig { + pub fn load() -> Result { + // TODO: Use value from config not envvar + // https://github.com/kamu-data/kamu-node/issues/45 + // + // Example: + // https://github.com/mehcode/config-rs/blob/master/examples/hierarchical-env/settings.rs + + let base_url_rest = { + let raw = std::env::var("KAMU_BASE_URL_REST") + .unwrap_or_else(|_| "http://127.0.0.1:8080".to_string()); + Url::parse(&raw).int_err()? + }; + let base_url_flightsql = { + let raw = std::env::var("KAMU_BASE_URL_FLIGHTSQL") + .unwrap_or_else(|_| "grpc://localhost:50050".to_string()); + Url::parse(&raw).int_err()? + }; + + Ok(Self { + protocols: Protocols { + base_url_rest, + base_url_flightsql, + }, + }) + } + + pub fn new(protocols: Protocols) -> Self { + Self { protocols } + } +} + +///////////////////////////////////////////////////////////////////////////////////////// + +pub struct Protocols { + pub base_url_rest: Url, + pub base_url_flightsql: Url, +} + +impl Protocols { + pub fn odata_base_url(&self) -> String { + format!("{}odata", self.base_url_rest) + } +} + +/////////////////////////////////////////////////////////////////////////////////////////