Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko committed Feb 8, 2024
1 parent 42e2285 commit 79f23ec
Show file tree
Hide file tree
Showing 3 changed files with 511 additions and 19 deletions.
17 changes: 11 additions & 6 deletions src/app/cli/src/commands/delete_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use futures::{future, StreamExt, TryStreamExt};
use kamu::domain::*;
use kamu::utils::datasets_filtering::filter_datasets_by_pattern;
use opendatafabric::*;
Expand Down Expand Up @@ -93,14 +93,19 @@ impl Command for DeleteCommand {
let confirmed = if self.no_confirmation {
true
} else {
let dataset_aliases = future::join_all(dataset_refs.iter().map(|dataset_id| async {
let dataset_hdl = self
.dataset_repo
.resolve_dataset_ref(dataset_id)
.await
.unwrap();
dataset_hdl.alias.to_string()
}))
.await;
common::prompt_yes_no(&format!(
"{}: {}\n{}\nDo you wish to continue? [y/N]: ",
console::style("You are about to delete following dataset(s)").yellow(),
dataset_refs
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
dataset_aliases.join(", "),
console::style("This operation is irreversible!").yellow(),
))
};
Expand Down
42 changes: 30 additions & 12 deletions src/infra/core/src/dependency_graph_service_inmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use kamu_core::events::{
use kamu_core::*;
use opendatafabric::DatasetID;
use petgraph::stable_graph::{NodeIndex, StableDiGraph};
use petgraph::visit::{depth_first_search, Bfs, DfsEvent, Walker};
use petgraph::visit::{depth_first_search, DfsEvent};
use petgraph::Direction;

/////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -214,21 +214,23 @@ impl DependencyGraphService for DependencyGraphServiceInMemory {
.map_err(GetUpstreamDependenciesError::DatasetNotFound)
})
.collect::<Result<Vec<_>, _>>()?;
let mut node_indexes_result = HashSet::new();

let mut incoming_nodes = Vec::new();
let mut visited_indexes = HashSet::new();

for node_index in &nodes_to_search {
let mut bfs = Bfs::new(&state.datasets_graph, *node_index);
while let Some(ni) = bfs.walk_next(&state.datasets_graph) {
// Insert current node_index into HashSet
// if it return false it means we already have such value
// skip searching
if !node_indexes_result.insert(ni) {
break;
}
}
// Perform recursive breadth-first search starting from the target node
incoming_nodes.push(node_index.clone());

Check failure on line 223 in src/infra/core/src/dependency_graph_service_inmem.rs

View workflow job for this annotation

GitHub Actions / Lint / Code

using `clone` on type `NodeIndex` which implements the `Copy` trait
visited_indexes.insert(*node_index);
recursive_bfs(
&state.datasets_graph,
node_index.clone(),

Check failure on line 227 in src/infra/core/src/dependency_graph_service_inmem.rs

View workflow job for this annotation

GitHub Actions / Lint / Code

using `clone` on type `NodeIndex` which implements the `Copy` trait
&mut visited_indexes,
&mut incoming_nodes,
);
}

let result: Vec<_> = node_indexes_result
let result: Vec<_> = incoming_nodes
.iter()
.map(|node_index| {
state
Expand Down Expand Up @@ -424,3 +426,19 @@ impl AsyncEventHandler<DatasetEventDependenciesUpdated> for DependencyGraphServi
}

/////////////////////////////////////////////////////////////////////////////////////////

fn recursive_bfs(
graph: &StableDiGraph<DatasetID, ()>,
current_node: NodeIndex,
visited: &mut HashSet<NodeIndex>,
incoming_nodes: &mut Vec<NodeIndex>,
) {
// Visit neighbors of the current node
for neighbor in graph.neighbors_directed(current_node, petgraph::Direction::Incoming) {
if !visited.contains(&neighbor) {
incoming_nodes.push(neighbor);
visited.insert(neighbor);
recursive_bfs(graph, neighbor, visited, incoming_nodes);
}
}
}
Loading

0 comments on commit 79f23ec

Please sign in to comment.