diff --git a/bin/src/main.rs b/bin/src/main.rs index 8f7f9fa6..151b47dc 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -21,8 +21,8 @@ use clap::Subcommand; use env_logger::Env; use log::{error, info}; -use gt::manager::Signal; use gt::*; +use gt::manager::Signal; use crate::cs::{ClientArgs, ServerArgs}; use crate::manager::ManagerArgs; @@ -33,9 +33,12 @@ struct Cli { #[command(subcommand)] command: Option, - /// Path to the config file or the directory contains the config files + /// Path to the config file or the directory containing the config files #[arg(short, long)] config: Option, + /// The maximum allowed depth of the subdirectory to be traversed to search config files + #[arg(long)] + depth: Option, /// Send signal to the running GT processes #[arg(short, long, value_enum)] signal: Option, @@ -69,6 +72,7 @@ fn main() { } let mut manager_args = ManagerArgs { config: cli.config, + depth: cli.depth, server_args: None, client_args: None, }; diff --git a/bin/src/manager.rs b/bin/src/manager.rs index 29832b75..af429084 100644 --- a/bin/src/manager.rs +++ b/bin/src/manager.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::{env, fs, future, io, process}; use std::collections::{BTreeMap, HashMap}; use std::ffi::OsStr; use std::future::Future; @@ -24,21 +25,20 @@ use std::os::unix::process::CommandExt; use std::os::windows::process::CommandExt; use std::path::PathBuf; use std::process::Stdio; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; -use std::{env, fs, future, io, process}; use anyhow::{anyhow, Context, Error, Result}; use clap::ValueEnum; use futures::future::{BoxFuture, FutureExt}; use log::{error, info, warn}; use notify::{ErrorKind, Event, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher}; -use serde::{de, ser, Deserialize, Serialize}; +use serde::{de, Deserialize, ser, Serialize}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tokio::sync::{mpsc, Mutex, oneshot}; use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::time::timeout; use crate::cs::{ClientArgs, ServerArgs}; @@ -46,6 +46,7 @@ use crate::cs::{ClientArgs, ServerArgs}; #[derive(Debug)] pub struct ManagerArgs { pub config: Option, + pub depth: Option, pub server_args: Option, pub client_args: Option, } @@ -87,6 +88,45 @@ impl Manager { } } + fn collect_files(&self, path: PathBuf, depth: u8) -> io::Result> { + let mut files = vec![]; + if path.is_dir() { + for entry in fs::read_dir(path)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + if depth + 1 > self.args.depth.unwrap_or(3) { + info!("{} is too deep", path.display()); + continue; + } + files.append(self.collect_files(path, depth + 1)?.as_mut()); + } else { + match path.extension().and_then(OsStr::to_str) { + Some("yaml") | Some("yml") => {} + None | Some(_) => { + info!( + "ignored file {} is not end with yml or yaml", + path.display() + ); + continue; + } + } + let fm = entry.metadata()?.len(); + if fm > 10 * 1024 * 1024 { + info!("ignored file {} is too large", path.display()); + continue; + } + info!("collected file {}", path.display()); + files.push(ProcessConfigEnum::Config(path)); + } + } + } else { + info!("collected file {}", path.display()); + files.push(ProcessConfigEnum::Config(path)); + } + Ok(files) + } + async fn collect_configs( &self, ) -> Result<(Vec, Option>)> { @@ -100,7 +140,7 @@ impl Manager { None => env::current_dir()?, Some(path) => path.into(), }; - configs = collect_files(config.clone())?; + configs = self.collect_files(config.clone(), 1)?; } if configs.is_empty() { return Err(anyhow!("no target found")); @@ -324,9 +364,7 @@ impl Manager { let reconnect = async { let cmds = cmd_map.clone(); let config = config.clone(); - if let Err(e) = - Self::sync_run(cmds, vec![config.clone()], sub_cmd).await - { + if let Err(e) = Self::sync_run(cmds, vec![config.clone()], sub_cmd).await { error!("{sub_cmd} ({config:?}) reconnect sync_run failed: {:?}", e); } }; @@ -427,23 +465,15 @@ impl Manager { } } if !server_config.is_empty() { - Self::run( - self.cmds.clone(), - server_config, - "sub-server", - ) - .await - .context("run_server failed")?; + Self::run(self.cmds.clone(), server_config, "sub-server") + .await + .context("run_server failed")?; } if !client_config.is_empty() { - Self::run( - self.cmds.clone(), - client_config, - "sub-client", - ) - .await - .context("run_client failed")?; + Self::run(self.cmds.clone(), client_config, "sub-client") + .await + .context("run_client failed")?; } Ok(()) } @@ -759,41 +789,6 @@ pub fn send_signal(signal: Signal) -> Result<()> { Ok(()) } -fn collect_files(path: PathBuf) -> io::Result> { - let mut files = vec![]; - if path.is_dir() { - for entry in fs::read_dir(path)? { - let entry = entry?; - let path = entry.path(); - if path.is_dir() { - collect_files(path)?; - } else { - let fm = entry.metadata()?.len(); - if fm > 10 * 1024 * 1024 { - info!("ignored file {} is too large", path.display()); - continue; - } - match path.extension().and_then(OsStr::to_str) { - Some("yaml") | Some("yml") => {} - None | Some(_) => { - info!( - "ignored file {} is not end with yml or yaml", - path.display() - ); - continue; - } - } - info!("collected file {}", path.display()); - files.push(ProcessConfigEnum::Config(path)); - } - } - } else { - info!("collected file {}", path.display()); - files.push(ProcessConfigEnum::Config(path)); - } - Ok(files) -} - #[derive(Serialize, Deserialize, Debug)] struct Config { #[serde(rename = "type")]