Skip to content

Commit

Permalink
Merge pull request #561 from Nukesor/add-enqueue-task-selection
Browse files Browse the repository at this point in the history
Add enqueue task selection
  • Loading branch information
Nukesor authored Aug 1, 2024
2 parents 3e12a75 + 3d349df commit afcd28d
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ TLDR: The new task state representation is more verbose but significantly cleane
### Add

- Add `--all` and `--group` to `pueue log`. [#509](https://github.com/Nukesor/pueue/issues/509)
- Add `--all` and `--group` to `pueue enqueue`. [#558](https://github.com/Nukesor/pueue/issues/558)
- Add `--all` and `--group` to `pueue stash`. [#558](https://github.com/Nukesor/pueue/issues/558)
- Add `pueue reset --groups [group_names]` to allow resetting individual groups. [#482](https://github.com/Nukesor/pueue/issues/482) \
This also refactors the way resets are done internally, resulting in a cleaner code architecture.
- Ability to set the Unix socket permissions through the new `unix_socket_permissions` configuration option. [#544](https://github.com/Nukesor/pueue/pull/544)
Expand Down
4 changes: 4 additions & 0 deletions docs/Pueue State Diagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 22 additions & 1 deletion pueue/src/client/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,23 @@ pub enum SubCommand {
/// You have to enqueue them or start them by hand.
Stash {
/// Stash these specific tasks.
#[arg(required = true)]
task_ids: Vec<usize>,

/// Stash all queued tasks in a group
#[arg(short, long, conflicts_with = "all")]
group: Option<String>,

/// Stash all queued tasks across all groups.
#[arg(short, long)]
all: bool,

/// Delay enqueuing these tasks until 'delay' elapses. See DELAY FORMAT below.
#[arg(name = "delay", short, long, value_parser = parse_delay_until)]
delay_until: Option<DateTime<Local>>,
},
/// Enqueue stashed tasks. They'll be handled normally afterwards.
///
/// Enqueues all stashed task in the default group if no arguments are given.
#[command(after_help = "DELAY FORMAT:
The --delay argument must be either a number of seconds or a \"date expression\" similar to GNU \
Expand All @@ -126,6 +139,14 @@ pub enum SubCommand {
/// Enqueue these specific tasks.
task_ids: Vec<usize>,

/// Enqueue all stashed tasks in a group
#[arg(short, long, conflicts_with = "all")]
group: Option<String>,

/// Enqueue all stashed tasks across all groups.
#[arg(short, long)]
all: bool,

/// Delay enqueuing these tasks until 'delay' elapses. See DELAY FORMAT below.
#[arg(name = "delay", short, long, value_parser = parse_delay_until)]
delay_until: Option<DateTime<Local>>,
Expand Down
25 changes: 21 additions & 4 deletions pueue/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,19 @@ impl Client {
}
Message::Remove(task_ids.clone())
}
SubCommand::Stash { task_ids } => Message::Stash(task_ids.clone()),
SubCommand::Stash {
task_ids,
group,
all,
delay_until,
} => {
let selection = selection_from_params(*all, group, task_ids);
StashMessage {
tasks: selection,
enqueue_at: *delay_until,
}
.into()
}
SubCommand::Switch {
task_id_1,
task_id_2,
Expand All @@ -467,10 +479,15 @@ impl Client {
.into(),
SubCommand::Enqueue {
task_ids,
group,
all,
delay_until,
} => EnqueueMessage {
task_ids: task_ids.clone(),
enqueue_at: *delay_until,
} => {
let selection = selection_from_params(*all, group, task_ids);
EnqueueMessage {
tasks: selection,
enqueue_at: *delay_until,
}
}
.into(),
SubCommand::Start {
Expand Down
2 changes: 1 addition & 1 deletion pueue/src/daemon/network/message_handler/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -
let mut response = if message.print_task_id {
task_id.to_string()
} else if let Some(enqueue_at) = message.enqueue_at {
let enqueue_at = enqueue_at.format("%Y-%m-%d %H:%M:%S");
let enqueue_at = format_datetime(settings, &enqueue_at);
format!("New task added (id {task_id}). It will be enqueued at {enqueue_at}")
} else {
format!("New task added (id {task_id}).")
Expand Down
117 changes: 95 additions & 22 deletions pueue/src/daemon/network/message_handler/enqueue.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,62 @@
use chrono::Local;
use pueue_lib::network::message::*;
use pueue_lib::state::SharedState;
use pueue_lib::task::TaskStatus;
use pueue_lib::{
network::message::*, settings::Settings, state::SharedState, success_msg, task::TaskStatus,
};

use crate::daemon::network::response_helper::*;

use super::format_datetime;

/// Invoked when calling `pueue enqueue`.
/// Enqueue specific stashed tasks.
pub fn enqueue(state: &SharedState, message: EnqueueMessage) -> Message {
pub fn enqueue(settings: &Settings, state: &SharedState, message: EnqueueMessage) -> Message {
let mut state = state.lock().unwrap();
let filtered_tasks = state.filter_tasks(
|task| {
matches!(
task.status,
TaskStatus::Stashed { .. } | TaskStatus::Locked { .. }
)
},
Some(message.task_ids),
);

for task_id in &filtered_tasks.matching_ids {
// Get the affected task ids, based on the task selection.
let selected_task_ids = match message.tasks {
TaskSelection::TaskIds(ref task_ids) => state
.tasks
.iter()
.filter(|(task_id, task)| {
if !task_ids.contains(task_id) {
return false;
}

matches!(
task.status,
TaskStatus::Stashed { .. } | TaskStatus::Locked { .. }
)
})
.map(|(task_id, _)| *task_id)
.collect::<Vec<usize>>(),
TaskSelection::Group(ref group) => state
.tasks
.iter()
.filter(|(_, task)| {
if task.group != *group {
return false;
}

matches!(
task.status,
TaskStatus::Stashed { .. } | TaskStatus::Locked { .. }
)
})
.map(|(task_id, _)| *task_id)
.collect::<Vec<usize>>(),
TaskSelection::All => state
.tasks
.iter()
.filter(|(_, task)| {
matches!(
task.status,
TaskStatus::Stashed { .. } | TaskStatus::Locked { .. }
)
})
.map(|(task_id, _)| *task_id)
.collect::<Vec<usize>>(),
};

for task_id in &selected_task_ids {
// We just checked that they're there and the state is locked. It's safe to unwrap.
let task = state.tasks.get_mut(task_id).expect("Task should be there.");

Expand All @@ -36,12 +73,48 @@ pub fn enqueue(state: &SharedState, message: EnqueueMessage) -> Message {
}
}

let text = if let Some(enqueue_at) = message.enqueue_at {
let enqueue_at = enqueue_at.format("%Y-%m-%d %H:%M:%S");
format!("Tasks will be enqueued at {enqueue_at}")
} else {
String::from("Tasks are enqueued")
};
// Construct a response depending on the selected tasks.
if let Some(enqueue_at) = &message.enqueue_at {
let enqueue_at = format_datetime(settings, enqueue_at);

compile_task_response(&text, filtered_tasks)
match &message.tasks {
TaskSelection::TaskIds(task_ids) => task_action_response_helper(
&format!("Stashed tasks will be enqueued at {enqueue_at}"),
task_ids.clone(),
|task| {
matches!(
task.status,
TaskStatus::Stashed { .. } | TaskStatus::Locked { .. }
)
},
&state,
),
TaskSelection::Group(group) => {
success_msg!("Enqueue stashed tasks of group {group} at {enqueue_at}.",)
}
TaskSelection::All => {
success_msg!("Enqueue all stashed tasks at {enqueue_at}.",)
}
}
} else {
match &message.tasks {
TaskSelection::TaskIds(task_ids) => task_action_response_helper(
"Stashed tasks have been enqueued",
task_ids.clone(),
|task| {
matches!(
task.status,
TaskStatus::Stashed { .. } | TaskStatus::Locked { .. }
)
},
&state,
),
TaskSelection::Group(group) => {
success_msg!("All stashed tasks of group \"{group}\" have been enqueued.")
}
TaskSelection::All => {
success_msg!("All stashed tasks have been enqueued.")
}
}
}
}
15 changes: 13 additions & 2 deletions pueue/src/daemon/network/message_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Display;

use chrono::{DateTime, Local};
use pueue_lib::failure_msg;
use pueue_lib::network::message::*;
use pueue_lib::settings::Settings;
Expand Down Expand Up @@ -31,7 +32,7 @@ pub fn handle_message(message: Message, state: &SharedState, settings: &Settings
Message::Edit(message) => edit::edit(settings, state, message),
Message::EditRequest(task_id) => edit::edit_request(state, task_id),
Message::EditRestore(task_id) => edit::edit_restore(state, task_id),
Message::Enqueue(message) => enqueue::enqueue(state, message),
Message::Enqueue(message) => enqueue::enqueue(settings, state, message),
Message::Group(message) => group::group(settings, state, message),
Message::Kill(message) => kill::kill(settings, state, message),
Message::Log(message) => log::get_log(settings, state, message),
Expand All @@ -42,7 +43,7 @@ pub fn handle_message(message: Message, state: &SharedState, settings: &Settings
Message::Restart(message) => restart::restart_multiple(settings, state, message),
Message::Send(message) => send::send(state, message),
Message::Start(message) => start::start(settings, state, message),
Message::Stash(task_ids) => stash::stash(state, task_ids),
Message::Stash(message) => stash::stash(settings, state, message),
Message::Switch(message) => switch::switch(settings, state, message),
Message::Status => get_status(state),
_ => create_failure_message("Not yet implemented"),
Expand All @@ -56,6 +57,16 @@ fn get_status(state: &SharedState) -> Message {
Message::StatusResponse(Box::new(state))
}

// If the enqueue at time is today, only show the time. Otherwise, include the date.
fn format_datetime(settings: &Settings, enqueue_at: &DateTime<Local>) -> String {
let format_string = if enqueue_at.date_naive() == Local::now().date_naive() {
&settings.client.status_time_format
} else {
&settings.client.status_datetime_format
};
enqueue_at.format(format_string).to_string()
}

fn ok_or_failure_message<T, E: Display>(result: Result<T, E>) -> Result<T, Message> {
match result {
Ok(inner) => Ok(inner),
Expand Down
4 changes: 2 additions & 2 deletions pueue/src/daemon/network/message_handler/pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ pub fn pause(settings: &Settings, state: &SharedState, message: PauseMessage) ->
// Construct a response depending on the selected tasks.
let response = match &message.tasks {
TaskSelection::TaskIds(task_ids) => task_action_response_helper(
"Tasks are being paused",
"Tasks have been paused",
task_ids.clone(),
|task| matches!(task.status, TaskStatus::Running { .. }),
&state,
),
TaskSelection::Group(group) => {
success_msg!("Group \"{group}\" is being paused.")
}
TaskSelection::All => success_msg!("All queues are being paused."),
TaskSelection::All => success_msg!("All groups are being paused."),
};

// Actually execute the command
Expand Down
2 changes: 1 addition & 1 deletion pueue/src/daemon/network/message_handler/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn restart_multiple(
// We have to compile the response beforehand.
// Otherwise we no longer know which tasks, were actually capable of being being restarted.
let response = task_action_response_helper(
"Tasks restarted",
"Tasks has restarted",
task_ids.clone(),
|task| task.is_done(),
&state,
Expand Down
4 changes: 2 additions & 2 deletions pueue/src/daemon/network/message_handler/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn start(settings: &Settings, state: &SharedState, message: StartMessage) ->

let response = match &message.tasks {
TaskSelection::TaskIds(task_ids) => task_action_response_helper(
"Tasks are being started",
"Tasks have been started/resumed",
task_ids.clone(),
|task| {
matches!(
Expand All @@ -35,7 +35,7 @@ pub fn start(settings: &Settings, state: &SharedState, message: StartMessage) ->
TaskSelection::Group(group) => {
success_msg!("Group \"{group}\" is being resumed.")
}
TaskSelection::All => success_msg!("All queues are being resumed."),
TaskSelection::All => success_msg!("All groups are being resumed."),
};

if let Message::Success(_) = response {
Expand Down
Loading

0 comments on commit afcd28d

Please sign in to comment.