Skip to content

Commit

Permalink
Data access panel: GraphQL initial API (#570)
Browse files Browse the repository at this point in the history
* GraphQL: Dataset->Endpoints initial implementation
* OData: respect KAMU_BASE_URL
* Smart Transfer Protocol: respect KAMU_BASE_URL
  • Loading branch information
s373r authored Mar 29, 2024
1 parent c6762b2 commit 4441cc1
Show file tree
Hide file tree
Showing 18 changed files with 371 additions and 89 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Added
- Added GrapqQL Dataset Endpoints object
### Changed
- REST API: `/ingest` endpoint will return HTTP 400 error when data cannot be read correctly
- Improved API token generation command
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 56 additions & 3 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ type Checkpoint {
size: Int!
}

type CliProtocolDesc {
pullCommand: String!
pushCommand: String!
}

interface CommitResult {
message: String!
}
Expand Down Expand Up @@ -279,6 +284,10 @@ type Dataset {
Permissions of the current user
"""
permissions: DatasetPermissions!
"""
Various endpoints for interacting with data
"""
endpoints: DatasetEndpoints!
}

scalar DatasetAlias
Expand Down Expand Up @@ -332,6 +341,18 @@ type DatasetEdge {
node: Dataset!
}

type DatasetEndpoints {
webLink: LinkProtocolDesc!
cli: CliProtocolDesc!
rest: RestProtocolDesc!
flightsql: FlightSqlDesc!
jdbc: JdbcDesc!
postgresql: PostgreSqlDesl!
kafka: KafkaProtocolDesc!
websocket: WebSocketProtocolDesc!
odata: OdataProtocolDesc!
}

type DatasetFlowConfigs {
"""
Returns defined configuration for a flow of specified type
Expand Down Expand Up @@ -557,9 +578,6 @@ type DisablePushSource {
sourceName: String!
}

"""
Describes
"""
type EngineDesc {
"""
A short name of the engine, e.g. "Spark", "Flink".
Expand Down Expand Up @@ -640,6 +658,10 @@ type FetchStepUrl {
headers: [RequestHeader!]
}

type FlightSqlDesc {
url: String!
}


type Flow {
"""
Expand Down Expand Up @@ -893,6 +915,18 @@ input InitiatorFilterInput @oneOf {
}


type JdbcDesc {
url: String!
}

type KafkaProtocolDesc {
url: String!
}

type LinkProtocolDesc {
url: String!
}

type LoginResponse {
accessToken: String!
account: Account!
Expand Down Expand Up @@ -1010,6 +1044,11 @@ type NoChanges implements CommitResult & UpdateReadmeResult {
message: String!
}

type OdataProtocolDesc {
serviceUrl: String!
collectionUrl: String!
}

type OffsetInterval {
start: Int!
end: Int!
Expand All @@ -1035,6 +1074,10 @@ type PageBasedInfo {
totalPages: Int
}

type PostgreSqlDesl {
url: String!
}

union PrepStep = PrepStepDecompress | PrepStepPipe

type PrepStepDecompress {
Expand Down Expand Up @@ -1169,6 +1212,12 @@ type RequestHeader {
value: String!
}

type RestProtocolDesc {
tailUrl: String!
queryUrl: String!
pushUrl: String!
}

input ScheduleInput @oneOf {
timeDelta: TimeDeltaInput
cron5ComponentExpression: String
Expand Down Expand Up @@ -1471,6 +1520,10 @@ interface UpdateReadmeResult {
message: String!
}

type WebSocketProtocolDesc {
url: String!
}

schema {
query: Query
mutation: Mutation
Expand Down
5 changes: 3 additions & 2 deletions src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ doctest = false

[dependencies]
internal-error = { workspace = true }
opendatafabric = { workspace = true, features=["arrow"] }
opendatafabric = { workspace = true, features = ["arrow"] }

kamu-data-utils = { workspace = true }
kamu-core = { workspace = true }
kamu-task-system = { workspace = true }
kamu-flow-system = { workspace = true }
kamu-flow-system = { workspace = true }
event-sourcing = { workspace = true }


Expand All @@ -45,6 +45,7 @@ tokio = { version = "1", default-features = false, features = [] }
tokio-stream = { version = "0.1", default-features = false }
tracing = "0.1"
thiserror = { version = "1", default-features = false }
url = { version = "2", default-features = false }


[dev-dependencies]
Expand Down
1 change: 0 additions & 1 deletion src/adapter/graphql/src/queries/accounts/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ impl Accounts {
}

/// Returns account by its name
#[allow(unused_variables)]
async fn by_name(&self, ctx: &Context<'_>, name: AccountName) -> Result<Option<Account>> {
let authentication_service =
from_catalog::<dyn kamu_core::auth::AuthenticationService>(ctx).unwrap();
Expand Down
9 changes: 8 additions & 1 deletion src/adapter/graphql/src/queries/datasets/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use chrono::prelude::*;
use futures::TryStreamExt;
use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt};
use kamu_core::{self as domain, MetadataChainExt, ServerUrlConfig, TryStreamExtExt};
use opendatafabric as odf;

use crate::prelude::*;
Expand Down Expand Up @@ -153,6 +153,13 @@ impl Dataset {
can_schedule: can_write,
})
}

/// Various endpoints for interacting with data
async fn endpoints(&self, ctx: &Context<'_>) -> DatasetEndpoints<'_> {
let config = from_catalog::<ServerUrlConfig>(ctx).unwrap();

DatasetEndpoints::new(&self.owner, self.dataset_handle.clone(), config)
}
}

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
Expand Down
138 changes: 138 additions & 0 deletions src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use kamu_core::ServerUrlConfig;
use opendatafabric as odf;

use crate::prelude::*;
use crate::queries::*;

pub struct DatasetEndpoints<'a> {
owner: &'a Account,
dataset_handle: odf::DatasetHandle,
config: Arc<ServerUrlConfig>,
}

#[Object]
impl<'a> DatasetEndpoints<'a> {
#[graphql(skip)]
pub fn new(
owner: &'a Account,
dataset_handle: odf::DatasetHandle,
config: Arc<ServerUrlConfig>,
) -> Self {
Self {
owner,
dataset_handle,
config,
}
}

#[allow(clippy::unused_async)]
async fn web_link(&self) -> Result<LinkProtocolDesc> {
let url = format!(
"{}{}",
self.config.protocols.base_url_rest, self.dataset_handle.alias
);

Ok(LinkProtocolDesc { url })
}

#[allow(clippy::unused_async)]
async fn cli(&self) -> Result<CliProtocolDesc> {
let url = format!(
"{}{}",
self.config.protocols.base_url_rest, self.dataset_handle.alias
);

let push_command = format!("kamu push {url}");
let pull_command = format!("kamu pull {url}");

Ok(CliProtocolDesc {
pull_command,
push_command,
})
}

#[allow(clippy::unused_async)]
async fn rest(&self) -> Result<RestProtocolDesc> {
let base_url = format!(
"{}{}",
self.config.protocols.base_url_rest, self.dataset_handle.alias
);

let tail_url = format!("{base_url}/tail?limit=10");
let push_url = format!("{base_url}/push");

let query_url = format!(
"{}graphql?query=query {{%0A%20 apiVersion%0A}}%0A",
self.config.protocols.base_url_rest
);

Ok(RestProtocolDesc {
tail_url,
query_url,
push_url,
})
}

#[allow(clippy::unused_async)]
async fn flightsql(&self) -> Result<FlightSqlDesc> {
Ok(FlightSqlDesc {
url: self.config.protocols.base_url_flightsql.to_string(),
})
}

#[allow(clippy::unused_async)]
async fn jdbc(&self) -> Result<JdbcDesc> {
let mut url = self.config.protocols.base_url_flightsql.clone();

url.set_scheme("arrow-flight-sql").unwrap();

Ok(JdbcDesc {
url: format!("jdbc:{url}"),
})
}

#[allow(clippy::unused_async)]
async fn postgresql(&self) -> Result<PostgreSqlDesl> {
Ok(PostgreSqlDesl {
url: "- coming soon -".to_string(),
})
}

#[allow(clippy::unused_async)]
async fn kafka(&self) -> Result<KafkaProtocolDesc> {
Ok(KafkaProtocolDesc {
url: "- coming soon -".to_string(),
})
}

#[allow(clippy::unused_async)]
async fn websocket(&self) -> Result<WebSocketProtocolDesc> {
Ok(WebSocketProtocolDesc {
url: "- coming soon -".to_string(),
})
}

#[allow(clippy::unused_async)]
async fn odata(&self) -> Result<OdataProtocolDesc> {
let url = format!("{}odata", self.config.protocols.base_url_rest);

let service_url = format!("{url}/{}", self.owner.account_name_internal().as_str());
let collection_url = format!("{url}/{}", self.dataset_handle.alias);

Ok(OdataProtocolDesc {
service_url,
collection_url,
})
}
}
2 changes: 2 additions & 0 deletions src/adapter/graphql/src/queries/datasets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

mod dataset;
mod dataset_data;
mod dataset_endpoints;
mod dataset_flow_configs;
mod dataset_flow_runs;
mod dataset_flows;
Expand All @@ -18,6 +19,7 @@ mod metadata_chain;

pub(crate) use dataset::*;
pub(crate) use dataset_data::*;
pub(crate) use dataset_endpoints::*;
pub(crate) use dataset_flow_configs::*;
pub(crate) use dataset_flow_runs::*;
pub(crate) use dataset_flows::*;
Expand Down
Loading

0 comments on commit 4441cc1

Please sign in to comment.