Skip to content

Commit

Permalink
encode with run command
Browse files Browse the repository at this point in the history
  • Loading branch information
rkdud007 committed Mar 6, 2024
1 parent 1d50043 commit f004c29
Showing 1 changed file with 69 additions and 30 deletions.
99 changes: 69 additions & 30 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,25 @@ enum Commands {
/// Encode the task and datalake in batched format test purposes
#[command(arg_required_else_help = true)]
Encode {
/// Decide if want to run evaluator as follow step or not (default: false)
#[arg(short, long, action = clap::ArgAction::SetTrue)]
allow_run: bool,

/// The aggregate function id e.g. "sum", "min", "avg"
aggregate_fn_id: String,
/// The aggregate function context. It depends on the aggregate function
aggregate_fn_ctx: Option<String>,
#[command(subcommand)]
command: DataLakeCommands,

/// The RPC URL to fetch the data
rpc_url: Option<String>,
/// Path to the file to save the output result
#[arg(short, long)]
output_file: Option<String>,
/// Path to the file to save the input.json in cairo format
#[arg(short, long)]
cairo_input: Option<String>,
},
/// Decode batch tasks and datalakes
///
Expand Down Expand Up @@ -82,13 +95,55 @@ enum DataLakeCommands {
},
}

async fn handle_run(
tasks: Option<String>,
datalakes: Option<String>,
rpc_url: Option<String>,
output_file: Option<String>,
cairo_input: Option<String>,
) {
let start_run = std::time::Instant::now();
let config = Config::init(rpc_url, datalakes, tasks).await;
let abstract_fetcher = AbstractFetcher::new(config.rpc_url.clone());
let tasks = tasks_decoder(config.tasks.clone()).unwrap();
let datalakes = datalakes_decoder(config.datalakes.clone()).unwrap();

println!("tasks: \n{:?}\n", tasks);
println!("datalakes: \n{:?}\n", datalakes);

if tasks.len() != datalakes.len() {
panic!("Tasks and datalakes must have the same length");
}

let res = evaluator(
tasks,
Some(datalakes),
Arc::new(RwLock::new(abstract_fetcher)),
)
.await
.unwrap();

let duration_run = start_run.elapsed();
println!("Time elapsed in run evaluator is: {:?}", duration_run);

if let Some(output_file) = output_file {
res.save_to_file(&output_file, false).unwrap();
}
if let Some(cairo_input) = cairo_input {
res.save_to_file(&cairo_input, true).unwrap();
}
}

#[tokio::main]
async fn main() {
let start = std::time::Instant::now();
let cli = Cli::parse();
dotenv::dotenv().ok();
match cli.command {
Commands::Encode {
allow_run,
rpc_url,
output_file,
cairo_input,
aggregate_fn_id,
aggregate_fn_ctx,
command,
Expand Down Expand Up @@ -118,6 +173,18 @@ async fn main() {
println!("Original task: \n{:?}\n", tasks);
let encoded_task = tasks_encoder(vec![tasks]).unwrap();
println!("Encoded task: \n{}\n", encoded_task);

// if allow_run is true, then run the evaluator
if allow_run {
handle_run(
Some(encoded_task),
Some(encoded_datalake),
rpc_url,
output_file,
cairo_input,
)
.await;
}
}
Commands::Decode { tasks, datalakes } => {
let datalakes = datalakes_decoder(datalakes.clone()).unwrap();
Expand All @@ -144,35 +211,7 @@ async fn main() {
output_file,
cairo_input,
} => {
let config = Config::init(rpc_url, datalakes, tasks).await;
let abstract_fetcher = AbstractFetcher::new(config.rpc_url.clone());
let tasks = tasks_decoder(config.tasks.clone()).unwrap();
let datalakes = datalakes_decoder(config.datalakes.clone()).unwrap();

println!("tasks: \n{:?}\n", tasks);
println!("datalakes: \n{:?}\n", datalakes);

if tasks.len() != datalakes.len() {
panic!("Tasks and datalakes must have the same length");
}

let res = evaluator(
tasks,
Some(datalakes),
Arc::new(RwLock::new(abstract_fetcher)),
)
.await
.unwrap();

let duration = start.elapsed();
println!("Time elapsed in main() is: {:?}", duration);

if let Some(output_file) = output_file {
res.save_to_file(&output_file, false).unwrap();
}
if let Some(cairo_input) = cairo_input {
res.save_to_file(&cairo_input, true).unwrap();
}
handle_run(tasks, datalakes, rpc_url, output_file, cairo_input).await;
}
}
}

0 comments on commit f004c29

Please sign in to comment.