diff --git a/src/app/cli/src/commands/verify_command.rs b/src/app/cli/src/commands/verify_command.rs index 433dcc395..8aab660b2 100644 --- a/src/app/cli/src/commands/verify_command.rs +++ b/src/app/cli/src/commands/verify_command.rs @@ -152,7 +152,6 @@ impl VerifyCommand { Ok(self .verification_svc - .clone() .verify_multi(filtered_requests, options, listener) .await) } diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs b/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs index 695c4f9d4..b25d7e1c4 100644 --- a/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs +++ b/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. mod test_add_command; +mod test_compact_command; mod test_complete_command; mod test_config_command; mod test_delete_command; @@ -18,3 +19,4 @@ mod test_repo_alias_command; mod test_sql_command; mod test_system_api_server_gql_query; mod test_system_generate_token_command; +mod test_verify_command; diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_compact_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_compact_command.rs new file mode 100644 index 000000000..6ff90ee7b --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_compact_command.rs @@ -0,0 +1,36 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_compact_hard + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_compact_keep_metadata_only + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_compact_verify + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_verify_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_verify_command.rs new file mode 100644 index 000000000..0a4b5598a --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_verify_command.rs @@ -0,0 +1,36 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_verify_regular_dataset + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_verify_recursive + extra_test_groups = "containerized, engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_verify_integrity + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/mod.rs b/src/e2e/app/cli/repo-tests/src/commands/mod.rs index d3763d842..674540491 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/mod.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/mod.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. mod test_add_command; +mod test_compact_command; mod test_complete_command; mod test_config_command; mod test_delete_command; @@ -18,8 +19,10 @@ mod test_repo_alias_command; mod test_sql_command; mod test_system_api_server_gql_query; mod test_system_generate_token_command; +mod test_verify_command; pub use test_add_command::*; +pub use test_compact_command::*; pub use test_complete_command::*; pub use test_config_command::*; pub use test_delete_command::*; @@ -30,3 +33,4 @@ pub use test_repo_alias_command::*; pub use test_sql_command::*; pub use test_system_api_server_gql_query::*; pub use test_system_generate_token_command::*; +pub use test_verify_command::*; diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_compact_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_compact_command.rs new file mode 100644 index 000000000..d665c14fd --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_compact_command.rs @@ -0,0 +1,208 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::assert_matches::assert_matches; + +use kamu_cli_e2e_common::DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::{DatasetName, MetadataEvent}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_compact_hard(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let blocks_before_compacting = kamu.list_blocks(&dataset_name).await; + + let assert = kamu + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) were compacted + "# + )), + "Unexpected output:\n{stderr}", + ); + + let blocks_after_compacting = kamu.list_blocks(&dataset_name).await; + assert_eq!( + blocks_before_compacting.len() - 1, + blocks_after_compacting.len() + ); + assert_matches!( + blocks_after_compacting.first().unwrap().block.event, + MetadataEvent::AddData(_) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_compact_keep_metadata_only(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let blocks_before_compacting = kamu.list_blocks(&dataset_name).await; + + let assert = kamu + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + "--keep-metadata-only", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) were compacted + "# + )), + "Unexpected output:\n{stderr}", + ); + + let blocks_after_compacting = kamu.list_blocks(&dataset_name).await; + assert_eq!( + blocks_before_compacting.len() - 2, + blocks_after_compacting.len() + ); + assert_matches!( + blocks_after_compacting.first().unwrap().block.event, + MetadataEvent::SetDataSchema(_) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_compact_verify(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let blocks_before_compacting = kamu.list_blocks(&dataset_name).await; + + let assert = kamu + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + "--verify", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains( + indoc::indoc!( + r#" + verify with dataset_ref: player-scores + "# + ) + .trim() + ), + "Unexpected output:\n{stderr}", + ); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) were compacted + "# + )), + "Unexpected output:\n{stderr}", + ); + + let blocks_after_compacting = kamu.list_blocks(&dataset_name).await; + assert_eq!( + blocks_before_compacting.len() - 1, + blocks_after_compacting.len() + ); + assert_matches!( + blocks_after_compacting.first().unwrap().block.event, + MetadataEvent::AddData(_) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_verify_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_verify_command.rs new file mode 100644 index 000000000..088c6e1b2 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_verify_command.rs @@ -0,0 +1,150 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::{ + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::DatasetName; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_verify_regular_dataset(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let assert = kamu + .execute(["verify", dataset_name.as_str()]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_verify_recursive(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + let dataset_derivative_name = DatasetName::new_unchecked("leaderboard"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + kamu.execute(["pull", dataset_derivative_name.as_str()]) + .await + .success(); + + // Call verify without recursive flag + let assert = kamu + .execute(["verify", dataset_derivative_name.as_str()]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); + + // Call verify wit recursive flag + let assert = kamu + .execute(["verify", dataset_derivative_name.as_str(), "--recursive"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 2 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_verify_integrity(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let assert = kamu + .execute(["verify", dataset_name.as_str(), "--integrity"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/lib.rs b/src/e2e/app/cli/repo-tests/src/lib.rs index 2bd7d97f9..51d5ab6b2 100644 --- a/src/e2e/app/cli/repo-tests/src/lib.rs +++ b/src/e2e/app/cli/repo-tests/src/lib.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +#![feature(assert_matches)] + mod commands; mod test_auth; mod test_flow; diff --git a/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs b/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs index 0500eee17..a6ca2dbdc 100644 --- a/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs +++ b/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs @@ -932,21 +932,22 @@ pub async fn test_smart_pull_set_watermark(kamu: KamuCliPuppet) { .await .success(); - kamu.execute([ - "pull", - dataset_name.as_str(), - "--set-watermark", - "2000-01-01T00:00:00Z", - ]) - .await - .success(); + let assert = kamu + .execute([ + "pull", + dataset_name.as_str(), + "--set-watermark", + "2000-01-01T00:00:00Z", + ]) + .await + .success(); - // let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); - // assert!( - // stdout.contains(indoc::indoc!(r#"Committed new block"#).trim()), - // "Unexpected output:\n{stdout}", - // ); + assert!( + stderr.contains(indoc::indoc!(r#"Committed new block"#).trim()), + "Unexpected output:\n{stderr}", + ); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs index 71bb960b7..40a23d74d 100644 --- a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs +++ b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs @@ -14,9 +14,21 @@ use std::path::PathBuf; use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use opendatafabric::serde::yaml::{DatasetKindDef, YamlDatasetSnapshotSerializer}; -use opendatafabric::serde::DatasetSnapshotSerializer; -use opendatafabric::{DatasetID, DatasetKind, DatasetName, DatasetRef, DatasetSnapshot, Multihash}; +use opendatafabric::serde::yaml::{ + DatasetKindDef, + YamlDatasetSnapshotSerializer, + YamlMetadataBlockDeserializer, +}; +use opendatafabric::serde::{DatasetSnapshotSerializer, MetadataBlockDeserializer}; +use opendatafabric::{ + DatasetID, + DatasetKind, + DatasetName, + DatasetRef, + DatasetSnapshot, + MetadataBlock, + Multihash, +}; use serde::Deserialize; use crate::KamuCliPuppet; @@ -29,6 +41,8 @@ pub trait KamuCliPuppetExt { async fn add_dataset(&self, dataset_snapshot: DatasetSnapshot); + async fn list_blocks(&self, dataset_name: &DatasetName) -> Vec; + async fn ingest_data(&self, dataset_name: &DatasetName, data: &str); async fn get_list_of_repo_aliases(&self, dataset_ref: &DatasetRef) -> Vec; @@ -190,6 +204,43 @@ impl KamuCliPuppetExt for KamuCliPuppet { .await .success(); } + + async fn list_blocks(&self, dataset_name: &DatasetName) -> Vec { + let assert = self + .execute(["log", dataset_name.as_str(), "--output-format", "yaml"]) + .await + .success(); + + let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); + + // TODO: Don't parse the output, after implementation: + // `kamu log`: support `--output-format json` + // https://github.com/kamu-data/kamu-cli/issues/887 + + stdout + .split("---") + .skip(1) + .map(str::trim) + .map(|block_data| { + let Some(pos) = block_data.find('\n') else { + unreachable!() + }; + let (first_line_with_block_hash, metadata_block_str) = block_data.split_at(pos); + + let block_hash = first_line_with_block_hash + .strip_prefix("# Block: ") + .unwrap(); + let block = YamlMetadataBlockDeserializer {} + .read_manifest(metadata_block_str.as_ref()) + .unwrap(); + + BlockRecord { + block_hash: Multihash::from_multibase(block_hash).unwrap(), + block, + } + }) + .collect() + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -225,4 +276,10 @@ pub struct RepoAlias { pub alias: String, } +#[derive(Debug, PartialEq, Eq)] +pub struct BlockRecord { + pub block_hash: Multihash, + pub block: MetadataBlock, +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////