diff --git a/CHANGELOG.md b/CHANGELOG.md index 14fe60006..4b95638ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,19 @@ Recommendation: for ease of reading, use the following order: - `kamu init` command - `kamu add` command - `kamu rename` command + - `kamu ingest` command + - `kamu inspect` command + - `kamu log` command + - `kamu new` command + - `kamu reset` command + - `kamu search` command + - `kamu sql` command + - `kamu system gc` command + - `kamu system info` command + - `kamu system diagnose` command + - `kamu tail` command + - `kamu login` command + - `kamu logout` command ### Changed - `kamu repo alias list`: added JSON output alongside with other formats mentioned in the command's help - Private Datasets, `DatasetEntry` integration that will allow us to build dataset indexing diff --git a/resources/cli-reference.md b/resources/cli-reference.md index c7af457a6..0c172baaf 100644 --- a/resources/cli-reference.md +++ b/resources/cli-reference.md @@ -20,7 +20,7 @@ To regenerate this schema from existing code, use the following command: * `inspect` — Group of commands for exploring dataset metadata * `list [ls]` — List all datasets in the workspace * `log` — Shows dataset metadata history -* `login` — Authentiates with a remote ODF server interactively +* `login` — Authenticates with a remote ODF server interactively * `logout` — Logs out from a remote Kamu server * `new` — Creates a new dataset manifest from a template * `notebook` — Starts the notebook server for exploring the data in the workspace @@ -508,7 +508,7 @@ Using a filter to inspect blocks containing query changes of a derivative datase ## `kamu login` -Authentiates with a remote ODF server interactively +Authenticates with a remote ODF server interactively **Usage:** `kamu login [OPTIONS] [SERVER] [COMMAND]` diff --git a/src/adapter/graphql/src/mutations/datasets_mut.rs b/src/adapter/graphql/src/mutations/datasets_mut.rs index aa023cb81..9357a72d0 100644 --- a/src/adapter/graphql/src/mutations/datasets_mut.rs +++ b/src/adapter/graphql/src/mutations/datasets_mut.rs @@ -110,6 +110,8 @@ impl DatasetsMut { } // TODO: Multi-tenancy + // https://github.com/kamu-data/kamu-cli/issues/891 + // TODO: Multi-tenant resolution for derivative dataset inputs (should it only // work by ID?) #[allow(unused_variables)] diff --git a/src/app/cli/src/cli.rs b/src/app/cli/src/cli.rs index ad74510d2..3bb589888 100644 --- a/src/app/cli/src/cli.rs +++ b/src/app/cli/src/cli.rs @@ -634,7 +634,7 @@ pub struct Log { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -/// Authentiates with a remote ODF server interactively +/// Authenticates with a remote ODF server interactively #[derive(Debug, clap::Args)] pub struct Login { #[command(subcommand)] diff --git a/src/app/cli/src/cli_commands.rs b/src/app/cli/src/cli_commands.rs index f7ef6df72..952850cf2 100644 --- a/src/app/cli/src/cli_commands.rs +++ b/src/app/cli/src/cli_commands.rs @@ -127,7 +127,7 @@ pub fn get_command( } } cli::Command::Inspect(c) => match c.subcommand { - cli::InspectSubCommand::Lineage(sc) => Box::new(LineageCommand::new( + cli::InspectSubCommand::Lineage(sc) => Box::new(InspectLineageCommand::new( cli_catalog.get_one()?, cli_catalog.get_one()?, cli_catalog.get_one()?, diff --git a/src/app/cli/src/commands/ingest_command.rs b/src/app/cli/src/commands/ingest_command.rs index bbffd7ca2..67471f9f0 100644 --- a/src/app/cli/src/commands/ingest_command.rs +++ b/src/app/cli/src/commands/ingest_command.rs @@ -145,6 +145,8 @@ impl Command for IngestCommand { _ => Ok(()), }?; + // TODO: `kamu ingest`: implement `--recursive` mode + // https://github.com/kamu-data/kamu-cli/issues/886 if self.recursive { unimplemented!("Sorry, recursive ingest is not yet implemented") } diff --git a/src/app/cli/src/commands/lineage_command.rs b/src/app/cli/src/commands/inspect_lineage_command.rs similarity index 99% rename from src/app/cli/src/commands/lineage_command.rs rename to src/app/cli/src/commands/inspect_lineage_command.rs index 1503238f9..c1200342b 100644 --- a/src/app/cli/src/commands/lineage_command.rs +++ b/src/app/cli/src/commands/inspect_lineage_command.rs @@ -31,7 +31,7 @@ pub enum LineageOutputFormat { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub struct LineageCommand { +pub struct InspectLineageCommand { dataset_repo: Arc, provenance_svc: Arc, workspace_layout: Arc, @@ -41,7 +41,7 @@ pub struct LineageCommand { output_config: Arc, } -impl LineageCommand { +impl InspectLineageCommand { pub fn new( dataset_repo: Arc, provenance_svc: Arc, @@ -94,7 +94,7 @@ impl LineageCommand { // TODO: Support temporality and evolution #[async_trait::async_trait(?Send)] -impl Command for LineageCommand { +impl Command for InspectLineageCommand { async fn run(&mut self) -> Result<(), CLIError> { use futures::{StreamExt, TryStreamExt}; let mut dataset_handles: Vec<_> = if self.dataset_refs.is_empty() { diff --git a/src/app/cli/src/commands/log_command.rs b/src/app/cli/src/commands/log_command.rs index 87561fe55..ca5852b20 100644 --- a/src/app/cli/src/commands/log_command.rs +++ b/src/app/cli/src/commands/log_command.rs @@ -29,6 +29,8 @@ use crate::output::OutputConfig; pub enum MetadataLogOutputFormat { Shell, Yaml, + // TODO: `kamu log`: support `--output-format json` + // https://github.com/kamu-data/kamu-cli/issues/887 } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/commands/login_silent_command.rs b/src/app/cli/src/commands/login_silent_command.rs index ba5368eeb..0db094576 100644 --- a/src/app/cli/src/commands/login_silent_command.rs +++ b/src/app/cli/src/commands/login_silent_command.rs @@ -16,16 +16,19 @@ use crate::{odf_server, CLIError, Command}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[derive(Debug)] pub enum LoginSilentMode { OAuth(LoginSilentModeOAuth), Password(LoginSilentModePassword), } +#[derive(Debug)] pub struct LoginSilentModeOAuth { pub provider: String, pub access_token: String, } +#[derive(Debug)] pub struct LoginSilentModePassword { pub login: String, pub password: String, diff --git a/src/app/cli/src/commands/mod.rs b/src/app/cli/src/commands/mod.rs index 5525e8f3d..385ad0a10 100644 --- a/src/app/cli/src/commands/mod.rs +++ b/src/app/cli/src/commands/mod.rs @@ -20,9 +20,9 @@ mod delete_command; mod gc_command; mod ingest_command; mod init_command; +mod inspect_lineage_command; mod inspect_query_command; mod inspect_schema_command; -mod lineage_command; mod list_command; mod log_command; mod login_command; @@ -71,9 +71,9 @@ pub use delete_command::*; pub use gc_command::*; pub use ingest_command::*; pub use init_command::*; +pub use inspect_lineage_command::*; pub use inspect_query_command::*; pub use inspect_schema_command::*; -pub use lineage_command::*; pub use list_command::*; pub use log_command::*; pub use login_command::*; diff --git a/src/app/cli/src/commands/reset_command.rs b/src/app/cli/src/commands/reset_command.rs index 8a4ba34d5..32f69a4df 100644 --- a/src/app/cli/src/commands/reset_command.rs +++ b/src/app/cli/src/commands/reset_command.rs @@ -15,6 +15,8 @@ use opendatafabric::*; use super::{CLIError, Command}; use crate::Interact; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct ResetCommand { interact: Arc, dataset_repo: Arc, @@ -66,3 +68,5 @@ impl Command for ResetCommand { Ok(()) } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/commands/tail_command.rs b/src/app/cli/src/commands/tail_command.rs index e715e71ca..b025ceeb3 100644 --- a/src/app/cli/src/commands/tail_command.rs +++ b/src/app/cli/src/commands/tail_command.rs @@ -17,6 +17,8 @@ use opendatafabric::*; use super::{CLIError, Command}; use crate::output::*; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct TailCommand { query_svc: Arc, dataset_ref: DatasetRef, @@ -93,3 +95,5 @@ impl Command for TailCommand { Ok(()) } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/services/gc_service.rs b/src/app/cli/src/services/gc_service.rs index 29d0e4e98..9c91e7638 100644 --- a/src/app/cli/src/services/gc_service.rs +++ b/src/app/cli/src/services/gc_service.rs @@ -9,11 +9,17 @@ use std::sync::Arc; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Duration, TimeDelta, Utc}; use internal_error::{InternalError, ResultIntoInternal}; use crate::WorkspaceLayout; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +const EVICTION_THRESHOLD: TimeDelta = Duration::hours(24); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct GcService { workspace_layout: Arc, } @@ -56,9 +62,6 @@ impl GcService { /// Evict stale entries to manage cache size #[tracing::instrument(level = "debug", skip_all)] pub fn evict_cache(&self) -> Result { - // TODO: Make const after https://github.com/chronotope/chrono/issues/309 - // Or make into a config option - let eviction_threshold: Duration = Duration::hours(24); let now = Utc::now(); let mut entries_freed = 0; let mut bytes_freed = 0; @@ -69,7 +72,7 @@ impl GcService { let mtime: DateTime = chrono::DateTime::from(entry.metadata().int_err()?.modified().int_err()?); - if (now - mtime) > eviction_threshold { + if (now - mtime) > EVICTION_THRESHOLD { if entry.path().is_dir() { bytes_freed += fs_extra::dir::get_size(entry.path()).int_err()?; std::fs::remove_dir_all(entry.path()).int_err()?; @@ -93,7 +96,11 @@ impl GcService { } } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct GcResult { pub entries_freed: usize, pub bytes_freed: u64, } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/reset_service.rs b/src/domain/core/src/services/reset_service.rs index 17240e704..51b3738d2 100644 --- a/src/domain/core/src/services/reset_service.rs +++ b/src/domain/core/src/services/reset_service.rs @@ -14,6 +14,8 @@ use thiserror::Error; use crate::entities::SetRefError; use crate::*; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[async_trait::async_trait] pub trait ResetService: Send + Sync { async fn reset_dataset( @@ -103,3 +105,5 @@ pub struct OldHeadMismatchError { pub current_head: Multihash, pub old_head: Multihash, } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/common/src/e2e_harness.rs b/src/e2e/app/cli/common/src/e2e_harness.rs index 539e4a3eb..40d4992d2 100644 --- a/src/e2e/app/cli/common/src/e2e_harness.rs +++ b/src/e2e/app/cli/common/src/e2e_harness.rs @@ -9,7 +9,7 @@ use std::future::Future; -use chrono::{DateTime, NaiveTime, Utc}; +use chrono::{DateTime, NaiveTime, TimeZone, Utc}; use kamu_cli_puppet::extensions::KamuCliPuppetExt; use kamu_cli_puppet::{KamuCliPuppet, NewWorkspaceOptions}; use regex::Regex; @@ -55,12 +55,18 @@ impl KamuCliApiServerHarnessOptions { self } - pub fn with_frozen_system_time(mut self, value: DateTime) -> Self { + pub fn with_custom_frozen_system_time(mut self, value: DateTime) -> Self { self.frozen_system_time = Some(value); self } + pub fn with_frozen_system_time(self) -> Self { + let t = Utc.with_ymd_and_hms(2050, 1, 2, 3, 4, 5).unwrap(); + + self.with_custom_frozen_system_time(t) + } + pub fn with_today_as_frozen_system_time(self) -> Self { let today = { let now = Utc::now(); @@ -69,7 +75,7 @@ impl KamuCliApiServerHarnessOptions { .unwrap() }; - self.with_frozen_system_time(today) + self.with_custom_frozen_system_time(today) } pub fn with_kamu_config(mut self, content: &str) -> Self { diff --git a/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs b/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs index aafe912e5..a8a53ac4e 100644 --- a/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs +++ b/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs @@ -9,12 +9,14 @@ use async_trait::async_trait; use lazy_static::lazy_static; +use opendatafabric::{DatasetAlias, DatasetName}; use reqwest::{Method, StatusCode}; use crate::{KamuApiServerClient, RequestBody}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +/// pub const DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR: &str = indoc::indoc!( r#" kind: DatasetSnapshot @@ -42,6 +44,7 @@ pub const DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR: &str = indoc::indoc!( "# ); +/// pub const DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR: &str = indoc::indoc!( r#" kind: DatasetSnapshot @@ -86,14 +89,14 @@ pub const DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR: &str = indoc::indoc!( ); lazy_static! { - // https://github.com/kamu-data/kamu-cli/blob/master/examples/leaderboard/player-scores.yaml + /// pub static ref DATASET_ROOT_PLAYER_SCORES_SNAPSHOT: String = { DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR .escape_default() .to_string() }; - // https://github.com/kamu-data/kamu-cli/blob/master/examples/leaderboard/leaderboard.yaml + /// pub static ref DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT: String = { DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR .escape_default() @@ -101,6 +104,36 @@ lazy_static! { }; } +/// NOTE: 1 millisecond for stable order within tests +/// +/// +pub const DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1: &str = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 100} + {"match_time": "2000-01-01 00:00:00.001", "match_id": 1, "player_id": "Bob", "score": 80} + "# +); + +/// NOTE: 1 millisecond for stable order within tests +/// +/// +pub const DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2: &str = indoc::indoc!( + r#" + {"match_time": "2000-01-02", "match_id": 2, "player_id": "Alice", "score": 70} + {"match_time": "2000-01-02 00:00:00.001", "match_id": 2, "player_id": "Charlie", "score": 90} + "# +); + +/// NOTE: 1 millisecond for stable order within tests +/// +/// +pub const DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_3: &str = indoc::indoc!( + r#" + {"match_time": "2000-01-03", "match_id": 3, "player_id": "Bob", "score": 60} + {"match_time": "2000-01-03 00:00:00.001", "match_id": 3, "player_id": "Charlie", "score": 110} + "# +); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub type AccessToken = String; @@ -111,11 +144,26 @@ pub type DatasetId = String; #[async_trait] pub trait KamuApiServerClientExt { async fn login_as_kamu(&self) -> AccessToken; + async fn login_as_e2e_user(&self) -> AccessToken; + + // TODO: also return alias, after solving this bug: + // https://github.com/kamu-data/kamu-cli/issues/891 async fn create_dataset(&self, dataset_snapshot_yaml: &str, token: &AccessToken) -> DatasetId; + async fn create_player_scores_dataset(&self, token: &AccessToken) -> DatasetId; + + /// NOTE: only for single-tenant workspaces async fn create_player_scores_dataset_with_data(&self, token: &AccessToken) -> DatasetId; + async fn create_leaderboard(&self, token: &AccessToken) -> DatasetId; + + async fn ingest_data( + &self, + dataset_alias: &DatasetAlias, + data: RequestBody, + token: &AccessToken, + ); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -204,21 +252,16 @@ impl KamuApiServerClientExt for KamuApiServerClient { async fn create_player_scores_dataset_with_data(&self, token: &AccessToken) -> DatasetId { let dataset_id = self.create_player_scores_dataset(token).await; - self.rest_api_call_assert( - Some(token.clone()), - Method::POST, - "player-scores/ingest", - Some(RequestBody::NdJson( - indoc::indoc!( - r#" - {"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 100} - {"match_time": "2000-01-01", "match_id": 1, "player_id": "Bob", "score": 80} - "#, - ) - .into(), - )), - StatusCode::OK, - None, + // TODO: Use the alias from the reply, after fixing the bug: + // https://github.com/kamu-data/kamu-cli/issues/891 + + // At the moment, only single-tenant + let dataset_alias = DatasetAlias::new(None, DatasetName::new_unchecked("player-scores")); + + self.ingest_data( + &dataset_alias, + RequestBody::NdJson(DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1.into()), + token, ) .await; @@ -229,6 +272,25 @@ impl KamuApiServerClientExt for KamuApiServerClient { self.create_dataset(&DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT, token) .await } + + async fn ingest_data( + &self, + dataset_alias: &DatasetAlias, + data: RequestBody, + token: &AccessToken, + ) { + let endpoint = format!("{dataset_alias}/ingest"); + + self.rest_api_call_assert( + Some(token.clone()), + Method::POST, + endpoint.as_str(), + Some(data), + StatusCode::OK, + None, + ) + .await; + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs b/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs index 695c4f9d4..52308e3fa 100644 --- a/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs +++ b/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs @@ -13,8 +13,18 @@ mod test_config_command; mod test_delete_command; mod test_ingest_command; mod test_init_command; +mod test_inspect_command; +mod test_log_command; +mod test_login_command; +mod test_new_command; mod test_rename_command; -mod test_repo_alias_command; +mod test_repo_command; +mod test_reset_command; +mod test_search_command; mod test_sql_command; mod test_system_api_server_gql_query; +mod test_system_diagnose_command; +mod test_system_gc_command; mod test_system_generate_token_command; +mod test_system_info_command; +mod test_tail_command; diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_ingest_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_ingest_command.rs index 7974dae1c..3687ca7d5 100644 --- a/src/e2e/app/cli/inmem/tests/tests/commands/test_ingest_command.rs +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_ingest_command.rs @@ -14,6 +14,7 @@ use kamu_cli_e2e_common::prelude::*; kamu_cli_execute_command_e2e_test!( storage = inmem, fixture = kamu_cli_e2e_repo_tests::test_push_ingest_from_file_ledger, + options = Options::default().with_frozen_system_time(), extra_test_groups = "engine, ingest, datafusion" ); @@ -22,6 +23,34 @@ kamu_cli_execute_command_e2e_test!( kamu_cli_execute_command_e2e_test!( storage = inmem, fixture = kamu_cli_e2e_repo_tests::test_push_ingest_from_file_snapshot_with_event_time, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_ingest_from_stdin, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_ingest_recursive, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_ingest_with_source_name, + options = Options::default().with_frozen_system_time(), extra_test_groups = "engine, ingest, datafusion" ); diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_inspect_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_inspect_command.rs new file mode 100644 index 000000000..0d22b67d5 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_inspect_command.rs @@ -0,0 +1,35 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_inspect_lineage, +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_inspect_query, + options = Options::default().with_frozen_system_time() +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_inspect_schema, + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_log_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_log_command.rs new file mode 100644 index 000000000..d83b3b951 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_log_command.rs @@ -0,0 +1,21 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_log, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_login_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_login_command.rs new file mode 100644 index 000000000..d47541b57 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_login_command.rs @@ -0,0 +1,27 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_login_logout_password, +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_login_logout_oauth, + options = Options::default().with_multi_tenant() +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_new_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_new_command.rs new file mode 100644 index 000000000..fd38ab3d6 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_new_command.rs @@ -0,0 +1,26 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_new_root, +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_new_derivative, +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_repo_alias_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_repo_command.rs similarity index 100% rename from src/e2e/app/cli/inmem/tests/tests/commands/test_repo_alias_command.rs rename to src/e2e/app/cli/inmem/tests/tests/commands/test_repo_command.rs diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_reset_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_reset_command.rs new file mode 100644 index 000000000..17c46fee1 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_reset_command.rs @@ -0,0 +1,20 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_reset, + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_search_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_search_command.rs new file mode 100644 index 000000000..3d9b2c90d --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_search_command.rs @@ -0,0 +1,66 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_search_multi_user + // We need synthetic time for the tests, but the third-party JWT code + // uses the current time. Assuming that the token lifetime is 24 hours, we will + // use the projected date (the current day) as a workaround. + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time() + .with_kamu_config( + indoc::indoc!( + r#" + kind: CLIConfig + version: 1 + content: + users: + predefined: + - accountName: kamu + "# + ) + ), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_search_by_name + // We need synthetic time for the tests, but the third-party JWT code + // uses the current time. Assuming that the token lifetime is 24 hours, we will + // use the projected date (the current day) as a workaround. + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_search_by_repo + // We need synthetic time for the tests, but the third-party JWT code + // uses the current time. Assuming that the token lifetime is 24 hours, we will + // use the projected date (the current day) as a workaround. + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_sql_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_sql_command.rs index a48fd8e50..7315367ca 100644 --- a/src/e2e/app/cli/inmem/tests/tests/commands/test_sql_command.rs +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_sql_command.rs @@ -27,3 +27,12 @@ kamu_cli_execute_command_e2e_test!( ); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_sql_command, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "engine, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_system_diagnose_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_system_diagnose_command.rs new file mode 100644 index 000000000..44d851352 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_system_diagnose_command.rs @@ -0,0 +1,19 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_system_diagnose +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_system_gc_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_system_gc_command.rs new file mode 100644 index 000000000..b36419994 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_system_gc_command.rs @@ -0,0 +1,16 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!(storage = inmem, fixture = kamu_cli_e2e_repo_tests::test_gc); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_system_info_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_system_info_command.rs new file mode 100644 index 000000000..00c824968 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_system_info_command.rs @@ -0,0 +1,19 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_system_info +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_tail_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_tail_command.rs new file mode 100644 index 000000000..ad25c37f6 --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_tail_command.rs @@ -0,0 +1,21 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_tail, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/mod.rs b/src/e2e/app/cli/repo-tests/src/commands/mod.rs index d3763d842..5125a93ce 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/mod.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/mod.rs @@ -13,11 +13,21 @@ mod test_config_command; mod test_delete_command; mod test_ingest_command; mod test_init_command; +mod test_inspect_command; +mod test_log_command; +mod test_login_command; +mod test_new_command; mod test_rename_command; -mod test_repo_alias_command; +mod test_repo_command; +mod test_reset_command; +mod test_search_command; mod test_sql_command; mod test_system_api_server_gql_query; +mod test_system_gc_command; mod test_system_generate_token_command; +mod test_system_info_command; +mod test_system_info_diagnose; +mod test_tail_command; pub use test_add_command::*; pub use test_complete_command::*; @@ -25,8 +35,18 @@ pub use test_config_command::*; pub use test_delete_command::*; pub use test_ingest_command::*; pub use test_init_command::*; +pub use test_inspect_command::*; +pub use test_log_command::*; +pub use test_login_command::*; +pub use test_new_command::*; pub use test_rename_command::*; -pub use test_repo_alias_command::*; +pub use test_repo_command::*; +pub use test_reset_command::*; +pub use test_search_command::*; pub use test_sql_command::*; pub use test_system_api_server_gql_query::*; +pub use test_system_gc_command::*; pub use test_system_generate_token_command::*; +pub use test_system_info_command::*; +pub use test_system_info_diagnose::*; +pub use test_tail_command::*; diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_config_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_config_command.rs index f8d11beff..b1a3fcf19 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/test_config_command.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/test_config_command.rs @@ -34,14 +34,14 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" engine: runtime: podman "# - ) + ), + stdout ); } { @@ -66,13 +66,13 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" host "# - ) + ), + stdout ); } { @@ -80,7 +80,6 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" engine: @@ -88,7 +87,8 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { networkNs: host "# - ) + ), + stdout ); } // 2. Set flow for the "uploads.maxFileSizeInMb" key @@ -114,13 +114,13 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" 42 "# - ) + ), + stdout ); } { @@ -128,7 +128,6 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" engine: @@ -138,7 +137,8 @@ pub async fn test_config_set_value(kamu: KamuCliPuppet) { maxFileSizeInMb: 42 "# - ) + ), + stdout ); } } @@ -212,13 +212,13 @@ pub async fn test_config_get_with_default(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" private "# - ) + ), + stdout ); } } @@ -230,7 +230,6 @@ pub async fn test_config_get_from_config(kamu: KamuCliPuppet) { let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); pretty_assertions::assert_eq!( - stdout, indoc::indoc!( r#" engine: @@ -239,7 +238,8 @@ pub async fn test_config_get_from_config(kamu: KamuCliPuppet) { maxFileSizeInMb: 42 "# - ) + ), + stdout ); } diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_ingest_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_ingest_command.rs index 1181b36de..04224bcaa 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/test_ingest_command.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/test_ingest_command.rs @@ -9,17 +9,21 @@ use std::path::Path; -use chrono::{TimeZone, Utc}; use indoc::indoc; +use kamu_cli_e2e_common::{ + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_3, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; use kamu_cli_puppet::extensions::KamuCliPuppetExt; use kamu_cli_puppet::KamuCliPuppet; use opendatafabric::*; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub async fn test_push_ingest_from_file_ledger(mut kamu: KamuCliPuppet) { - kamu.set_system_time(Some(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap())); - +pub async fn test_push_ingest_from_file_ledger(kamu: KamuCliPuppet) { kamu.add_dataset(DatasetSnapshot { name: "population".try_into().unwrap(), kind: DatasetKind::Root, @@ -86,9 +90,9 @@ pub async fn test_push_ingest_from_file_ledger(mut kamu: KamuCliPuppet) { +--------+----+----------------------+----------------------+------+------------+ | offset | op | system_time | event_time | city | population | +--------+----+----------------------+----------------------+------+------------+ - | 0 | 0 | 2000-01-01T00:00:00Z | 2020-01-01T00:00:00Z | A | 1000 | - | 1 | 0 | 2000-01-01T00:00:00Z | 2020-01-01T00:00:00Z | B | 2000 | - | 2 | 0 | 2000-01-01T00:00:00Z | 2020-01-01T00:00:00Z | C | 3000 | + | 0 | 0 | 2050-01-02T03:04:05Z | 2020-01-01T00:00:00Z | A | 1000 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2020-01-01T00:00:00Z | B | 2000 | + | 2 | 0 | 2050-01-02T03:04:05Z | 2020-01-01T00:00:00Z | C | 3000 | +--------+----+----------------------+----------------------+------+------------+ "# ), @@ -96,9 +100,9 @@ pub async fn test_push_ingest_from_file_ledger(mut kamu: KamuCliPuppet) { .await; } -pub async fn test_push_ingest_from_file_snapshot_with_event_time(mut kamu: KamuCliPuppet) { - kamu.set_system_time(Some(Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap())); +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +pub async fn test_push_ingest_from_file_snapshot_with_event_time(kamu: KamuCliPuppet) { kamu.add_dataset(DatasetSnapshot { name: "population".try_into().unwrap(), kind: DatasetKind::Root, @@ -168,9 +172,9 @@ pub async fn test_push_ingest_from_file_snapshot_with_event_time(mut kamu: KamuC +--------+----+----------------------+----------------------+------+------------+ | offset | op | system_time | event_time | city | population | +--------+----+----------------------+----------------------+------+------------+ - | 0 | 0 | 2000-01-01T00:00:00Z | 2050-01-01T00:00:00Z | A | 1000 | - | 1 | 0 | 2000-01-01T00:00:00Z | 2050-01-01T00:00:00Z | B | 2000 | - | 2 | 0 | 2000-01-01T00:00:00Z | 2050-01-01T00:00:00Z | C | 3000 | + | 0 | 0 | 2050-01-02T03:04:05Z | 2050-01-01T00:00:00Z | A | 1000 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2050-01-01T00:00:00Z | B | 2000 | + | 2 | 0 | 2050-01-02T03:04:05Z | 2050-01-01T00:00:00Z | C | 3000 | +--------+----+----------------------+----------------------+------+------------+ "# ), @@ -180,8 +184,289 @@ pub async fn test_push_ingest_from_file_snapshot_with_event_time(mut kamu: KamuC //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +pub async fn test_ingest_from_stdin(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + assert_ingest_data_to_player_scores_from_stdio( + &kamu, + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + indoc::indoc!( + r#" + ┌────┬──────────────────────┬──────────────────────┬──────────┬───────────┬───────┐ + │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────┼──────────────────────┼──────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Bob │ 80 │ + └────┴──────────────────────┴──────────────────────┴──────────┴───────────┴───────┘ + "# + ), + ) + .await; + + assert_ingest_data_to_player_scores_from_stdio( + &kamu, + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + indoc::indoc!( + r#" + ┌────┬──────────────────────┬──────────────────────┬──────────┬───────────┬───────┐ + │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────┼──────────────────────┼──────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Bob │ 80 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Alice │ 70 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Charlie │ 90 │ + └────┴──────────────────────┴──────────────────────┴──────────┴───────────┴───────┘ + "# + ), + ) + .await; + + assert_ingest_data_to_player_scores_from_stdio( + &kamu, + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_3, + indoc::indoc!( + r#" + ┌────┬──────────────────────┬──────────────────────┬──────────┬───────────┬───────┐ + │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────┼──────────────────────┼──────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Bob │ 80 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Alice │ 70 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Charlie │ 90 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-03T00:00:00Z │ 3 │ Bob │ 60 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-03T00:00:00Z │ 3 │ Charlie │ 110 │ + └────┴──────────────────────┴──────────────────────┴──────────┴───────────┴───────┘ + "# + ), + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_ingest_recursive(kamu: KamuCliPuppet) { + // 0. Add datasets: the root dataset and its derived dataset + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + { + let assert = kamu + .execute(["tail", "leaderboard", "--output-format", "table"]) + .await + .failure(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Error: Dataset schema is not yet available: leaderboard"), + "Unexpected output:\n{stderr}", + ); + } + + // TODO: `kamu ingest`: implement `--recursive` mode + // https://github.com/kamu-data/kamu-cli/issues/886 + + // 1. Ingest data: the first chunk + // { + // let assert = kamu + // .execute_with_input( + // ["ingest", "player-scores", "--stdin", "--recursive"], + // DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + // ) + // .await + // .success(); + // + // let stderr = + // std::str::from_utf8(&assert.get_output().stderr).unwrap(); + // + // assert!( + // stderr.contains("Dataset updated"), + // "Unexpected output:\n{stderr}", + // ); + // } + + // TODO: check via the tail command added data in the derived dataset + // (leaderboard) + + // TODO: do the same for 2nd & 3rd chunks +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_ingest_with_source_name(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + assert_ingest_data_to_player_scores_from_stdio( + &kamu, + [ + "ingest", + "player-scores", + "--stdin", + "--source-name", + "default", + ], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + indoc::indoc!( + r#" + ┌────┬──────────────────────┬──────────────────────┬──────────┬───────────┬───────┐ + │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────┼──────────────────────┼──────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Bob │ 80 │ + └────┴──────────────────────┴──────────────────────┴──────────┴───────────┴───────┘ + "# + ), + ) + .await; + + assert_ingest_data_to_player_scores_from_stdio( + &kamu, + [ + "ingest", + "player-scores", + "--stdin", + "--source-name", + "default", + ], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + indoc::indoc!( + r#" + ┌────┬──────────────────────┬──────────────────────┬──────────┬───────────┬───────┐ + │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────┼──────────────────────┼──────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Bob │ 80 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Alice │ 70 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Charlie │ 90 │ + └────┴──────────────────────┴──────────────────────┴──────────┴───────────┴───────┘ + "# + ), + ) + .await; + + assert_ingest_data_to_player_scores_from_stdio( + &kamu, + [ + "ingest", + "player-scores", + "--stdin", + "--source-name", + "default", + ], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_3, + indoc::indoc!( + r#" + ┌────┬──────────────────────┬──────────────────────┬──────────┬───────────┬───────┐ + │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────┼──────────────────────┼──────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Bob │ 80 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Alice │ 70 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-02T00:00:00Z │ 2 │ Charlie │ 90 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-03T00:00:00Z │ 3 │ Bob │ 60 │ + │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-03T00:00:00Z │ 3 │ Charlie │ 110 │ + └────┴──────────────────────┴──────────────────────┴──────────┴───────────┴───────┘ + "# + ), + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + fn path(p: &Path) -> &str { p.as_os_str().to_str().unwrap() } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +async fn assert_ingest_data_to_player_scores_from_stdio( + kamu: &KamuCliPuppet, + ingest_cmd: I, + ingest_data: T, + expected_tail_table: &str, +) where + I: IntoIterator + Clone, + S: AsRef, + T: Into> + Clone, +{ + // Ingest + { + let assert = kamu + .execute_with_input(ingest_cmd.clone(), ingest_data.clone()) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Dataset updated"), + "Unexpected output:\n{stderr}", + ); + } + // Trying to ingest the same data + { + let assert = kamu + .execute_with_input(ingest_cmd, ingest_data) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Dataset up-to-date"), + "Unexpected output:\n{stderr}", + ); + } + // Assert ingested data + { + let assert = kamu + .execute([ + "sql", + "--engine", + "datafusion", + "--command", + // Without unstable "offset" column. + // For a beautiful output, cut to seconds + indoc::indoc!( + r#" + SELECT op, + system_time, + DATE_TRUNC('second', match_time) as match_time, + match_id, + player_id, + score + FROM "player-scores" + ORDER BY match_time; + "# + ), + "--output-format", + "table", + ]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!(expected_tail_table, stdout); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_inspect_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_inspect_command.rs new file mode 100644 index 000000000..c849f4a7c --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_inspect_command.rs @@ -0,0 +1,250 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::{ + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::{DatasetName, EnumWithVariants, SetTransform}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_inspect_lineage(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + { + let assert = kamu + .execute(["inspect", "lineage", "--output-format", "shell"]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + stdout, + indoc::indoc!( + r#" + leaderboard: Derivative + └── player-scores: Root + player-scores: Root + "# + ) + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_inspect_query(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let player_scores_dataset_id = kamu + .list_datasets() + .await + .into_iter() + .find_map(|dataset| { + if dataset.name == DatasetName::new_unchecked("player-scores") { + Some(dataset.id) + } else { + None + } + }) + .unwrap(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + let leaderboard_transform_block_hash = kamu + .list_blocks(&DatasetName::new_unchecked("leaderboard")) + .await + .into_iter() + .find_map(|block| { + if block.block.event.as_variant::().is_some() { + Some(block.block_hash) + } else { + None + } + }) + .unwrap(); + + { + let assert = kamu + .execute(["inspect", "query", "player-scores"]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!(stdout, ""); + } + { + let assert = kamu + .execute(["inspect", "query", "leaderboard"]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + indoc::formatdoc!( + r#" + Transform: {leaderboard_transform_block_hash} + As Of: 2050-01-02T03:04:05Z + Inputs: + player_scores {player_scores_dataset_id} + Engine: risingwave (None) + Query: leaderboard + create materialized view leaderboard as + select + * + from ( + select + row_number() over (partition by 1 order by score desc) as place, + match_time, + match_id, + player_id, + score + from player_scores + ) + where place <= 2 + Query: leaderboard + select * from leaderboard + "# + ), + stdout + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_inspect_schema(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + { + let assert = kamu + .execute(["inspect", "schema", "player-scores"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Warning: Dataset schema is not yet available: player-scores"), + "Unexpected output:\n{stderr}", + ); + } + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + { + let assert = kamu + .execute([ + "inspect", + "schema", + "leaderboard", + "--output-format", + "parquet", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Warning: Dataset schema is not yet available: leaderboard"), + "Unexpected output:\n{stderr}", + ); + } + + kamu.execute_with_input( + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await + .success(); + + { + let assert = kamu + .execute([ + "inspect", + "schema", + "player-scores", + "--output-format", + "parquet", + ]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + indoc::indoc!( + r#" + message arrow_schema { + REQUIRED INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ), + stdout + ); + } + { + let assert = kamu + .execute([ + "inspect", + "schema", + "leaderboard", + "--output-format", + "parquet", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Warning: Dataset schema is not yet available: leaderboard"), + "Unexpected output:\n{stderr}", + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_log_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_log_command.rs new file mode 100644 index 000000000..d24bc9471 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_log_command.rs @@ -0,0 +1,316 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::assert_matches::assert_matches; + +use chrono::{TimeZone, Timelike, Utc}; +use kamu_cli_e2e_common::{ + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::{ + AddData, + AddPushSource, + DatasetKind, + DatasetName, + EnumWithVariants, + MergeStrategy, + MergeStrategyLedger, + MetadataEvent, + OffsetInterval, + ReadStep, + ReadStepNdJson, + SetDataSchema, + SetTransform, + SetVocab, + SqlQueryStep, + Transform, + TransformSql, +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_log(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await + .success(); + + kamu.execute_with_input( + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await + .success(); + + { + let mut metadata_blocks = kamu + .list_blocks(&DatasetName::new_unchecked("player-scores")) + .await + .into_iter() + .map(|br| br.block) + .collect::>(); + + pretty_assertions::assert_eq!(6, metadata_blocks.len()); + + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(0, block.sequence_number); + + assert_matches!( + block.event, + MetadataEvent::Seed(event) + if event.dataset_kind == DatasetKind::Root + ); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(1, block.sequence_number); + + let actual_push_source = block.event.as_variant::().unwrap(); + let expected_push_source = AddPushSource { + source_name: "default".to_string(), + read: ReadStep::NdJson(ReadStepNdJson { + schema: Some(vec![ + "match_time TIMESTAMP".into(), + "match_id BIGINT".into(), + "player_id STRING".into(), + "score BIGINT".into(), + ]), + date_format: None, + encoding: None, + timestamp_format: None, + }), + preprocess: None, + merge: MergeStrategy::Ledger(MergeStrategyLedger { + primary_key: vec!["match_id".into(), "player_id".into()], + }), + }; + + pretty_assertions::assert_eq!(&expected_push_source, actual_push_source); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(2, block.sequence_number); + + let actual_set_vocab = block.event.as_variant::().unwrap(); + let expected_set_vocab = SetVocab { + offset_column: None, + operation_type_column: None, + system_time_column: None, + event_time_column: Some("match_time".into()), + }; + + pretty_assertions::assert_eq!(&expected_set_vocab, actual_set_vocab); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(3, block.sequence_number); + + let actual_set_data_schema = block.event.as_variant::().unwrap(); + let expected_set_data_schema = SetDataSchema { + schema: vec![ + 12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 7, 0, 0, 0, 124, + 1, 0, 0, 60, 1, 0, 0, 244, 0, 0, 0, 180, 0, 0, 0, 108, 0, 0, 0, 56, 0, 0, 0, 4, + 0, 0, 0, 108, 255, 255, 255, 16, 0, 0, 0, 24, 0, 0, 0, 0, 0, 1, 2, 20, 0, 0, 0, + 160, 254, 255, 255, 64, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 0, 0, 0, 115, 99, + 111, 114, 101, 0, 0, 0, 156, 255, 255, 255, 24, 0, 0, 0, 12, 0, 0, 0, 0, 0, 1, + 5, 16, 0, 0, 0, 0, 0, 0, 0, 4, 0, 4, 0, 4, 0, 0, 0, 9, 0, 0, 0, 112, 108, 97, + 121, 101, 114, 95, 105, 100, 0, 0, 0, 204, 255, 255, 255, 16, 0, 0, 0, 24, 0, + 0, 0, 0, 0, 1, 2, 20, 0, 0, 0, 0, 255, 255, 255, 64, 0, 0, 0, 0, 0, 0, 1, 0, 0, + 0, 0, 8, 0, 0, 0, 109, 97, 116, 99, 104, 95, 105, 100, 0, 0, 0, 0, 16, 0, 20, + 0, 16, 0, 14, 0, 15, 0, 4, 0, 0, 0, 8, 0, 16, 0, 0, 0, 20, 0, 0, 0, 12, 0, 0, + 0, 0, 0, 1, 10, 28, 0, 0, 0, 0, 0, 0, 0, 196, 255, 255, 255, 8, 0, 0, 0, 0, 0, + 1, 0, 3, 0, 0, 0, 85, 84, 67, 0, 10, 0, 0, 0, 109, 97, 116, 99, 104, 95, 116, + 105, 109, 101, 0, 0, 144, 255, 255, 255, 28, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 10, + 36, 0, 0, 0, 0, 0, 0, 0, 8, 0, 12, 0, 10, 0, 4, 0, 8, 0, 0, 0, 8, 0, 0, 0, 0, + 0, 1, 0, 3, 0, 0, 0, 85, 84, 67, 0, 11, 0, 0, 0, 115, 121, 115, 116, 101, 109, + 95, 116, 105, 109, 101, 0, 212, 255, 255, 255, 16, 0, 0, 0, 24, 0, 0, 0, 0, 0, + 0, 2, 20, 0, 0, 0, 196, 255, 255, 255, 32, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2, + 0, 0, 0, 111, 112, 0, 0, 16, 0, 20, 0, 16, 0, 0, 0, 15, 0, 4, 0, 0, 0, 8, 0, + 16, 0, 0, 0, 24, 0, 0, 0, 32, 0, 0, 0, 0, 0, 0, 2, 28, 0, 0, 0, 8, 0, 12, 0, 4, + 0, 11, 0, 8, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 6, 0, 0, 0, 111, + 102, 102, 115, 101, 116, 0, 0, + ], + }; + + pretty_assertions::assert_eq!(&expected_set_data_schema, actual_set_data_schema); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(4, block.sequence_number); + + let actual_add_data = block.event.as_variant::().unwrap(); + + pretty_assertions::assert_eq!(None, actual_add_data.prev_checkpoint); + pretty_assertions::assert_eq!(None, actual_add_data.prev_offset); + + let actual_new_data = actual_add_data.new_data.as_ref().unwrap(); + + pretty_assertions::assert_eq!( + OffsetInterval { start: 0, end: 1 }, + actual_new_data.offset_interval + ); + pretty_assertions::assert_eq!(1674, actual_new_data.size); + + pretty_assertions::assert_eq!(None, actual_add_data.new_checkpoint); + pretty_assertions::assert_eq!( + Some( + Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0) + .unwrap() + .with_nanosecond(1_000_000) // 1 ms + .unwrap() + ), + actual_add_data.new_watermark + ); + pretty_assertions::assert_eq!(None, actual_add_data.new_source_state); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(5, block.sequence_number); + + let actual_add_data = block.event.as_variant::().unwrap(); + + pretty_assertions::assert_eq!(None, actual_add_data.prev_checkpoint); + pretty_assertions::assert_eq!(Some(1), actual_add_data.prev_offset); + + let actual_new_data = actual_add_data.new_data.as_ref().unwrap(); + + pretty_assertions::assert_eq!( + OffsetInterval { start: 2, end: 3 }, + actual_new_data.offset_interval + ); + pretty_assertions::assert_eq!(1690, actual_new_data.size); + + pretty_assertions::assert_eq!(None, actual_add_data.new_checkpoint); + pretty_assertions::assert_eq!( + Some( + Utc.with_ymd_and_hms(2000, 1, 2, 0, 0, 0) + .unwrap() + .with_nanosecond(1_000_000) // 1 ms + .unwrap() + ), + actual_add_data.new_watermark + ); + pretty_assertions::assert_eq!(None, actual_add_data.new_source_state); + } + } + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + { + let mut metadata_blocks = kamu + .list_blocks(&DatasetName::new_unchecked("leaderboard")) + .await + .into_iter() + .map(|br| br.block) + .collect::>(); + + pretty_assertions::assert_eq!(3, metadata_blocks.len()); + + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(0, block.sequence_number); + + assert_matches!( + block.event, + MetadataEvent::Seed(event) + if event.dataset_kind == DatasetKind::Derivative + ); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(1, block.sequence_number); + + let actual_set_transform = block.event.as_variant::().unwrap(); + + pretty_assertions::assert_eq!(1, actual_set_transform.inputs.len()); + pretty_assertions::assert_eq!( + Some("player_scores".into()), + actual_set_transform.inputs[0].alias + ); + + let expected_transform = Transform::Sql(TransformSql { + engine: "risingwave".into(), + version: None, + query: None, + queries: Some(vec![ + SqlQueryStep { + alias: Some("leaderboard".into()), + query: indoc::indoc!( + r#" + create materialized view leaderboard as + select + * + from ( + select + row_number() over (partition by 1 order by score desc) as place, + match_time, + match_id, + player_id, + score + from player_scores + ) + where place <= 2 + "# + ) + .into(), + }, + SqlQueryStep { + alias: None, + query: "select * from leaderboard".into(), + }, + ]), + temporal_tables: None, + }); + + pretty_assertions::assert_eq!(expected_transform, actual_set_transform.transform); + } + { + let block = metadata_blocks.pop().unwrap(); + + pretty_assertions::assert_eq!(2, block.sequence_number); + + let actual_set_vocab = block.event.as_variant::().unwrap(); + let expected_set_vocab = SetVocab { + offset_column: None, + operation_type_column: None, + system_time_column: None, + event_time_column: Some("match_time".into()), + }; + + pretty_assertions::assert_eq!(&expected_set_vocab, actual_set_vocab); + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_login_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_login_command.rs new file mode 100644 index 000000000..79f3c005a --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_login_command.rs @@ -0,0 +1,185 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::{ + KamuApiServerClient, + KamuApiServerClientExt, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::KamuCliPuppet; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_login_logout_password(kamu_node_api_client: KamuApiServerClient) { + let kamu_node_url = kamu_node_api_client.get_base_url().as_str(); + let kamu = KamuCliPuppet::new_workspace_tmp().await; + + { + let assert = kamu.execute(["logout", kamu_node_url]).await.success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Not logged in to {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + { + let assert = kamu + .execute(["login", kamu_node_url, "--check"]) + .await + .failure(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Error: No access token found for: {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + + { + let assert = kamu + .execute(["login", "password", "kamu", "kamu", kamu_node_url]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Login successful: {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + { + let assert = kamu + .execute(["login", kamu_node_url, "--check"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Access token valid: {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + // Token validation, via an API call that requires authorization + { + let assert = kamu + .execute([ + "push", + "player-scores", + "--to", + &format!("odf+{kamu_node_url}player-scores"), + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("1 dataset(s) pushed"), + "Unexpected output:\n{stderr}", + ); + } + { + let assert = kamu.execute(["logout", kamu_node_url]).await.success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Logged out of {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_login_logout_oauth(kamu_node_api_client: KamuApiServerClient) { + let kamu_node_url = kamu_node_api_client.get_base_url().as_str(); + let kamu = KamuCliPuppet::new_workspace_tmp().await; + + { + let assert = kamu.execute(["logout", kamu_node_url]).await.success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Not logged in to {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + { + let assert = kamu + .execute(["login", kamu_node_url, "--check"]) + .await + .failure(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Error: No access token found for: {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + + let oauth_token = kamu_node_api_client.login_as_e2e_user().await; + + { + let assert = kamu + .execute(["login", "oauth", "github", &oauth_token, kamu_node_url]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Login successful: {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + { + let assert = kamu + .execute(["login", kamu_node_url, "--check"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Access token valid: {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } + + // Token validation, via an API call that requires authorization + kamu_node_api_client + .create_player_scores_dataset(&oauth_token) + .await; + + { + let assert = kamu.execute(["logout", kamu_node_url]).await.success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Logged out of {kamu_node_url}").as_str()), + "Unexpected output:\n{stderr}", + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_new_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_new_command.rs new file mode 100644 index 000000000..7d9db6442 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_new_command.rs @@ -0,0 +1,68 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_puppet::KamuCliPuppet; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_new_root(kamu: KamuCliPuppet) { + let assert = kamu + .execute(["new", "--root", "test-dataset"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + Written new manifest template to: test-dataset.yaml + Follow directions in the file's comments and use `kamu add test-dataset.yaml` when ready. + "# + )), + "Unexpected output:\n{stderr}", + ); + + // TODO: After solving this issue, add `kamu add` calls and populate with + // data + // + // `kamu new`: generate snapshots that will be immediately ready to be + // added/worked on + // https://github.com/kamu-data/kamu-cli/issues/888 +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_new_derivative(kamu: KamuCliPuppet) { + let assert = kamu + .execute(["new", "--derivative", "test-dataset"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + Written new manifest template to: test-dataset.yaml + Follow directions in the file's comments and use `kamu add test-dataset.yaml` when ready. + "# + )), + "Unexpected output:\n{stderr}", + ); + + // TODO: After solving this issue, add `kamu add` calls and populate with + // data + // + // `kamu new`: generate snapshots that will be immediately ready to be + // added/worked on + // https://github.com/kamu-data/kamu-cli/issues/888 +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_repo_alias_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_repo_command.rs similarity index 97% rename from src/e2e/app/cli/repo-tests/src/commands/test_repo_alias_command.rs rename to src/e2e/app/cli/repo-tests/src/commands/test_repo_command.rs index 154dc993e..bda9155e3 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/test_repo_alias_command.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/test_repo_command.rs @@ -87,6 +87,8 @@ pub async fn test_repository_pull_aliases_commands(kamu: KamuCliPuppet) { assert!(aliases.is_empty()); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub async fn test_repository_push_aliases_commands(kamu: KamuCliPuppet) { kamu.add_dataset(DatasetSnapshot { name: "foo".try_into().unwrap(), diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_reset_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_reset_command.rs new file mode 100644 index 000000000..de1f02d0a --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_reset_command.rs @@ -0,0 +1,87 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::assert_matches::assert_matches; + +use kamu_cli_e2e_common::{ + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::{DatasetName, MetadataEvent}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_reset(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let block_records_after_ingesting = kamu + .list_blocks(&DatasetName::new_unchecked("player-scores")) + .await; + + pretty_assertions::assert_eq!(3, block_records_after_ingesting.len()); + + let set_vocab_block_record = &block_records_after_ingesting[0]; + + assert_matches!( + &set_vocab_block_record.block.event, + MetadataEvent::SetVocab(_), + ); + + pretty_assertions::assert_eq!(3, block_records_after_ingesting.len()); + + kamu.execute_with_input( + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await + .success(); + + pretty_assertions::assert_eq!( + 5, + kamu.list_blocks(&DatasetName::new_unchecked("player-scores")) + .await + .len() + ); + + let set_vocab_block_hash = set_vocab_block_record + .block_hash + .as_multibase() + .to_stack_string(); + + { + let assert = kamu + .execute([ + "--yes", + "reset", + "player-scores", + set_vocab_block_hash.as_str(), + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Dataset was reset"), + "Unexpected output:\n{stderr}", + ); + } + + let block_records_after_resetting = kamu + .list_blocks(&DatasetName::new_unchecked("player-scores")) + .await; + + pretty_assertions::assert_eq!(block_records_after_ingesting, block_records_after_resetting); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_search_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_search_command.rs new file mode 100644 index 000000000..8be9d1df4 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_search_command.rs @@ -0,0 +1,382 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::{ + KamuApiServerClient, + KamuApiServerClientExt, + RequestBody, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, +}; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::*; +use reqwest::Url; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_search_multi_user(kamu_node_api_client: KamuApiServerClient) { + let kamu = KamuCliPuppet::new_workspace_tmp().await; + + add_repo_to_workspace(&kamu_node_api_client, &kamu, "kamu-node").await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌───────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├───────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ │ │ │ │ │ │ + └───────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + let e2e_user_token = kamu_node_api_client.login_as_e2e_user().await; + + kamu_node_api_client + .create_player_scores_dataset(&e2e_user_token) + .await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌──────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + └──────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + let player_scores_alias = DatasetAlias::new( + Some(AccountName::new_unchecked("e2e-user")), + DatasetName::new_unchecked("player-scores"), + ); + + kamu_node_api_client + .ingest_data( + &player_scores_alias, + RequestBody::NdJson(DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1.into()), + &e2e_user_token, + ) + .await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌──────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────────┤ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 5 │ 2 │ 1.63 KiB │ + └──────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────────┘ + "# + ), + ) + .await; + + // The same as DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT, but contains the word + // "player" in the name so that it can be found together with "player-scores" + let dataset_derivative_player_leaderboard_snapshot = indoc::indoc!( + r#" + kind: DatasetSnapshot + version: 1 + content: + name: player-leaderboard + kind: Derivative + metadata: + - kind: SetTransform + inputs: + - datasetRef: player-scores + alias: player_scores + transform: + kind: Sql + engine: risingwave + queries: + - alias: leaderboard + query: | + create materialized view leaderboard as + select + * + from ( + select + row_number() over (partition by 1 order by score desc) as place, + match_time, + match_id, + player_id, + score + from player_scores + ) + where place <= 2 + - query: | + select * from leaderboard + - kind: SetVocab + eventTimeColumn: match_time + "# + ) + .escape_default() + .to_string(); + + kamu_node_api_client + .create_dataset( + &dataset_derivative_player_leaderboard_snapshot, + &e2e_user_token, + ) + .await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌───────────────────────────────────────┬────────────┬─────────────┬────────┬─────────┬──────────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├───────────────────────────────────────┼────────────┼─────────────┼────────┼─────────┼──────────┤ + │ kamu-node/e2e-user/player-leaderboard │ Derivative │ - │ 3 │ - │ - │ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 5 │ 2 │ 1.63 KiB │ + └───────────────────────────────────────┴────────────┴─────────────┴────────┴─────────┴──────────┘ + "# + ), + ) + .await; + + let kamu_token = kamu_node_api_client.login_as_kamu().await; + + kamu_node_api_client + .create_player_scores_dataset(&kamu_token) + .await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌───────────────────────────────────────┬────────────┬─────────────┬────────┬─────────┬──────────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├───────────────────────────────────────┼────────────┼─────────────┼────────┼─────────┼──────────┤ + │ kamu-node/e2e-user/player-leaderboard │ Derivative │ - │ 3 │ - │ - │ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 5 │ 2 │ 1.63 KiB │ + │ kamu-node/kamu/player-scores │ Root │ - │ 3 │ - │ - │ + └───────────────────────────────────────┴────────────┴─────────────┴────────┴─────────┴──────────┘ + "# + ), + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_search_by_name(kamu_node_api_client: KamuApiServerClient) { + let kamu = KamuCliPuppet::new_workspace_tmp().await; + + add_repo_to_workspace(&kamu_node_api_client, &kamu, "kamu-node").await; + + let e2e_user_token = kamu_node_api_client.login_as_e2e_user().await; + + kamu_node_api_client + .create_player_scores_dataset(&e2e_user_token) + .await; + + kamu_node_api_client + .create_leaderboard(&e2e_user_token) + .await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌──────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + └──────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + assert_search( + &kamu, + ["search", "scores", "--output-format", "table"], + indoc::indoc!( + r#" + ┌──────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + └──────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + assert_search( + &kamu, + ["search", "not-relevant-query", "--output-format", "table"], + indoc::indoc!( + r#" + ┌───────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├───────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ │ │ │ │ │ │ + └───────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + assert_search( + &kamu, + ["search", "lead", "--output-format", "table"], + indoc::indoc!( + r#" + ┌────────────────────────────────┬────────────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├────────────────────────────────┼────────────┼─────────────┼────────┼─────────┼──────┤ + │ kamu-node/e2e-user/leaderboard │ Derivative │ - │ 3 │ - │ - │ + └────────────────────────────────┴────────────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_search_by_repo(kamu_node_api_client: KamuApiServerClient) { + let kamu = KamuCliPuppet::new_workspace_tmp().await; + + // As a test, add two repos pointing to the same node + add_repo_to_workspace(&kamu_node_api_client, &kamu, "kamu-node").await; + add_repo_to_workspace(&kamu_node_api_client, &kamu, "acme-org-node").await; + + let e2e_user_token = kamu_node_api_client.login_as_e2e_user().await; + + kamu_node_api_client + .create_player_scores_dataset(&e2e_user_token) + .await; + + kamu_node_api_client + .create_leaderboard(&e2e_user_token) + .await; + + assert_search( + &kamu, + ["search", "player", "--output-format", "table"], + indoc::indoc!( + r#" + ┌──────────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ acme-org-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + └──────────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + assert_search( + &kamu, + [ + "search", + "player", + "--repo", + "acme-org-node", + "--output-format", + "table", + ], + indoc::indoc!( + r#" + ┌──────────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ acme-org-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + └──────────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; + + assert_search( + &kamu, + [ + "search", + "player", + "--repo", + "kamu-node", + "--output-format", + "table", + ], + indoc::indoc!( + r#" + ┌──────────────────────────────────┬──────┬─────────────┬────────┬─────────┬──────┐ + │ Alias │ Kind │ Description │ Blocks │ Records │ Size │ + ├──────────────────────────────────┼──────┼─────────────┼────────┼─────────┼──────┤ + │ kamu-node/e2e-user/player-scores │ Root │ - │ 3 │ - │ - │ + └──────────────────────────────────┴──────┴─────────────┴────────┴─────────┴──────┘ + "# + ), + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +async fn add_repo_to_workspace( + kamu_node_api_client: &KamuApiServerClient, + kamu: &KamuCliPuppet, + repo_name: &str, +) { + let http_repo = { + let mut url = Url::parse("odf+http://host").unwrap(); + let base_url = kamu_node_api_client.get_base_url(); + url.set_host(base_url.host_str()).unwrap(); + url.set_port(base_url.port()).unwrap(); + url + }; + + let assert = kamu + .execute(["repo", "add", repo_name, http_repo.as_str()]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(format!("Added: {repo_name}").as_str()), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +async fn assert_search(kamu: &KamuCliPuppet, search_cmd: I, expected_table_output: &str) +where + I: IntoIterator, + S: AsRef, +{ + let assert = kamu.execute(search_cmd).await.success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!(stdout, expected_table_output); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_sql_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_sql_command.rs index abe16c63d..24f3576ab 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/test_sql_command.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/test_sql_command.rs @@ -8,6 +8,10 @@ // by the Apache License, Version 2.0. use indoc::indoc; +use kamu_cli_e2e_common::{ + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; use kamu_cli_puppet::KamuCliPuppet; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -45,7 +49,117 @@ pub async fn test_datafusion_cli_not_launched_in_root_ws(kamu: KamuCliPuppet) { // The workspace search functionality checks for parent folders, // so there is no problem that the process working directory is one of the // subdirectories (kamu-cli/src/e2e/app/cli/inmem) - kamu.execute(["list"]).await.failure(); + + { + let assert = kamu.execute(["list"]).await.failure(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Error: Directory is not a kamu workspace"), + "Unexpected output:\n{stderr}", + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_sql_command(kamu: KamuCliPuppet) { + { + let assert = kamu + .execute([ + "sql", + "--command", + "SELECT 42 as answer;", + "--output-format", + "table", + ]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + indoc::indoc!( + r#" + ┌────────┐ + │ answer │ + ├────────┤ + │ 42 │ + └────────┘ + "# + ), + stdout + ); + } + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + { + let assert = kamu + .execute([ + "sql", + "--command", + "SELECT * FROM \"player-scores\";", + "--output-format", + "table", + ]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + indoc::indoc!( + r#" + ┌┐ + ││ + ├┤ + ││ + └┘ + "# + ), + stdout + ); + } + + kamu.execute_with_input( + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await + .success(); + + { + let assert = kamu + .execute([ + "sql", + "--command", + "SELECT * FROM \"player-scores\";", + "--output-format", + "table", + ]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + indoc::indoc!( + r#" + ┌────────┬────┬──────────────────────┬──────────────────────────┬──────────┬───────────┬───────┐ + │ offset │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────────┼────┼──────────────────────┼──────────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 1 │ 0 │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00.001Z │ 1 │ Bob │ 80 │ + └────────┴────┴──────────────────────┴──────────────────────────┴──────────┴───────────┴───────┘ + "# + ), + stdout + ); + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_system_gc_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_system_gc_command.rs new file mode 100644 index 000000000..862f7c92f --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_system_gc_command.rs @@ -0,0 +1,25 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_puppet::KamuCliPuppet; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_gc(kamu: KamuCliPuppet) { + let assert = kamu.execute(["system", "gc"]).await.success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Cleaning cache...") && stderr.contains("Workspace is already clean"), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_system_info_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_system_info_command.rs new file mode 100644 index 000000000..d0f8d206e --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_system_info_command.rs @@ -0,0 +1,18 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_puppet::KamuCliPuppet; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_system_info(kamu: KamuCliPuppet) { + kamu.execute(["system", "info"]).await.success(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_system_info_diagnose.rs b/src/e2e/app/cli/repo-tests/src/commands/test_system_info_diagnose.rs new file mode 100644 index 000000000..9ba38606d --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_system_info_diagnose.rs @@ -0,0 +1,18 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_puppet::KamuCliPuppet; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_system_diagnose(kamu: KamuCliPuppet) { + kamu.execute(["system", "diagnose"]).await.success(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_tail_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_tail_command.rs new file mode 100644 index 000000000..54242ad84 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_tail_command.rs @@ -0,0 +1,68 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::{ + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::KamuCliPuppet; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_tail(kamu: KamuCliPuppet) { + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + { + let assert = kamu + .execute(["tail", "player-scores", "--output-format", "table"]) + .await + .failure(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains("Error: Dataset schema is not yet available: player-scores"), + "Unexpected output:\n{stderr}", + ); + } + + kamu.execute_with_input( + ["ingest", "player-scores", "--stdin"], + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await + .success(); + + { + let assert = kamu + .execute(["tail", "player-scores", "--output-format", "table"]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + pretty_assertions::assert_eq!( + indoc::indoc!( + r#" + ┌────────┬────┬──────────────────────┬──────────────────────────┬──────────┬───────────┬───────┐ + │ offset │ op │ system_time │ match_time │ match_id │ player_id │ score │ + ├────────┼────┼──────────────────────┼──────────────────────────┼──────────┼───────────┼───────┤ + │ 0 │ +A │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00Z │ 1 │ Alice │ 100 │ + │ 1 │ +A │ 2050-01-02T03:04:05Z │ 2000-01-01T00:00:00.001Z │ 1 │ Bob │ 80 │ + └────────┴────┴──────────────────────┴──────────────────────────┴──────────┴───────────┴───────┘ + "# + ), + stdout + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/lib.rs b/src/e2e/app/cli/repo-tests/src/lib.rs index 2bd7d97f9..51d5ab6b2 100644 --- a/src/e2e/app/cli/repo-tests/src/lib.rs +++ b/src/e2e/app/cli/repo-tests/src/lib.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +#![feature(assert_matches)] + mod commands; mod test_auth; mod test_flow; diff --git a/src/e2e/app/cli/repo-tests/src/test_flow.rs b/src/e2e/app/cli/repo-tests/src/test_flow.rs index 855724fc5..aae45ade0 100644 --- a/src/e2e/app/cli/repo-tests/src/test_flow.rs +++ b/src/e2e/app/cli/repo-tests/src/test_flow.rs @@ -615,7 +615,7 @@ pub async fn test_dataset_trigger_flow(kamu_api_server_client: KamuApiServerClie "__typename": "FlowDescriptionUpdateResultSuccess", "numBlocks": 2, "numRecords": 2, - "updatedWatermark": "2000-01-01T00:00:00+00:00" + "updatedWatermark": "2000-01-01T00:00:00.001+00:00" } }, "initiator": { @@ -649,7 +649,7 @@ pub async fn test_dataset_trigger_flow(kamu_api_server_client: KamuApiServerClie "__typename": "FlowDescriptionUpdateResultSuccess", "numBlocks": 2, "numRecords": 2, - "updatedWatermark": "2000-01-01T00:00:00+00:00" + "updatedWatermark": "2000-01-01T00:00:00.001+00:00" } }, "initiator": { diff --git a/src/infra/core/src/query_service_impl.rs b/src/infra/core/src/query_service_impl.rs index 352460f98..d38dd1f9f 100644 --- a/src/infra/core/src/query_service_impl.rs +++ b/src/infra/core/src/query_service_impl.rs @@ -56,7 +56,7 @@ impl QueryServiceImpl { .with_information_schema(true) .with_default_catalog_and_schema("kamu", "kamu"); - // Forcing cese-sensitive identifiers in case-insensitive language seems to + // Forcing case-sensitive identifiers in case-insensitive language seems to // be a lesser evil than following DataFusion's default behavior of forcing // identifiers to lowercase instead of case-insensitive matching. // diff --git a/src/infra/core/src/remote_repository_registry_impl.rs b/src/infra/core/src/remote_repository_registry_impl.rs index cd48a2063..f80402d1d 100644 --- a/src/infra/core/src/remote_repository_registry_impl.rs +++ b/src/infra/core/src/remote_repository_registry_impl.rs @@ -44,7 +44,7 @@ impl RemoteRepositoryRegistryImpl { let file_path = self.repos_dir.join(repo_name); if !file_path.exists() { - // run full scan to support case-insensetive matches + // run full scan to support case-insensitive matches let all_repositories_stream = self.get_all_repositories(); for repository_name in all_repositories_stream { if &repository_name == repo_name { @@ -157,6 +157,7 @@ impl RemoteRepositoryRegistry for RemoteRepositoryRegistryNull { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Config //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct RemoteReposDir(PathBuf); impl RemoteReposDir { diff --git a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet.rs b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet.rs index 422b4c1dd..915311fc2 100644 --- a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet.rs +++ b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet.rs @@ -14,6 +14,10 @@ use chrono::{DateTime, Utc}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +pub type ExecuteCommandResult = assert_cmd::assert::Assert; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct KamuCliPuppet { workspace_path: PathBuf, system_time: Option>, @@ -88,7 +92,7 @@ impl KamuCliPuppet { temp_dir.join("e2e-output-data.txt") } - pub async fn execute(&self, cmd: I) -> assert_cmd::assert::Assert + pub async fn execute(&self, cmd: I) -> ExecuteCommandResult where I: IntoIterator, S: AsRef, @@ -96,7 +100,7 @@ impl KamuCliPuppet { self.execute_impl(cmd, None::>).await } - pub async fn execute_with_input(&self, cmd: I, input: T) -> assert_cmd::assert::Assert + pub async fn execute_with_input(&self, cmd: I, input: T) -> ExecuteCommandResult where I: IntoIterator, S: AsRef, @@ -105,11 +109,7 @@ impl KamuCliPuppet { self.execute_impl(cmd, Some(input)).await } - async fn execute_impl( - &self, - cmd: I, - maybe_input: Option, - ) -> assert_cmd::assert::Assert + async fn execute_impl(&self, cmd: I, maybe_input: Option) -> ExecuteCommandResult where I: IntoIterator, S: AsRef, diff --git a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs index 09ae75317..8c9292bf7 100644 --- a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs +++ b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs @@ -14,9 +14,21 @@ use std::path::PathBuf; use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use opendatafabric::serde::yaml::{DatasetKindDef, YamlDatasetSnapshotSerializer}; -use opendatafabric::serde::DatasetSnapshotSerializer; -use opendatafabric::{DatasetID, DatasetKind, DatasetName, DatasetRef, DatasetSnapshot, Multihash}; +use opendatafabric::serde::yaml::{ + DatasetKindDef, + YamlDatasetSnapshotSerializer, + YamlMetadataBlockDeserializer, +}; +use opendatafabric::serde::{DatasetSnapshotSerializer, MetadataBlockDeserializer}; +use opendatafabric::{ + DatasetID, + DatasetKind, + DatasetName, + DatasetRef, + DatasetSnapshot, + MetadataBlock, + Multihash, +}; use serde::Deserialize; use crate::KamuCliPuppet; @@ -35,6 +47,8 @@ pub trait KamuCliPuppetExt { where T: Into + Send; + async fn list_blocks(&self, dataset_name: &DatasetName) -> Vec; + async fn start_api_server(self, e2e_data_file_path: PathBuf) -> ServerOutput; async fn assert_last_data_slice( @@ -111,6 +125,43 @@ impl KamuCliPuppetExt for KamuCliPuppet { stdout.lines().map(ToString::to_string).collect() } + async fn list_blocks(&self, dataset_name: &DatasetName) -> Vec { + let assert = self + .execute(["log", dataset_name.as_str(), "--output-format", "yaml"]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + // TODO: Don't parse the output, after implementation: + // `kamu log`: support `--output-format json` + // https://github.com/kamu-data/kamu-cli/issues/887 + + stdout + .split("---") + .skip(1) + .map(str::trim) + .map(|block_data| { + let Some(pos) = block_data.find('\n') else { + unreachable!() + }; + let (first_line_with_block_hash, metadata_block_str) = block_data.split_at(pos); + + let block_hash = first_line_with_block_hash + .strip_prefix("# Block: ") + .unwrap(); + let block = YamlMetadataBlockDeserializer {} + .read_manifest(metadata_block_str.as_ref()) + .unwrap(); + + BlockRecord { + block_hash: Multihash::from_multibase(block_hash).unwrap(), + block, + } + }) + .collect() + } + async fn start_api_server(self, e2e_data_file_path: PathBuf) -> ServerOutput { let host = Ipv4Addr::LOCALHOST.to_string(); @@ -211,4 +262,10 @@ pub struct RepoAlias { pub alias: String, } +#[derive(Debug, PartialEq, Eq)] +pub struct BlockRecord { + pub block_hash: Multihash, + pub block: MetadataBlock, +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////