Skip to content

Commit

Permalink
Allow force deleting a task (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga authored May 29, 2024
1 parent eff027a commit 201b907
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 21 deletions.
15 changes: 13 additions & 2 deletions cli/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ pub enum TaskAction {
Rename { task_id: String, name: String },

/// delete a task
Delete { task_id: String },
Delete {
task_id: String,
/// delete the task even if the aggregators are unreachable
#[arg(long, action)]
force: bool,
},

/// set the expiration date of a task
SetExpiration {
Expand Down Expand Up @@ -171,7 +176,13 @@ impl TaskAction {
TaskAction::CollectorAuthTokens { task_id } => {
output.display(client.task_collector_auth_tokens(&task_id).await?)
}
TaskAction::Delete { task_id } => client.delete_task(&task_id).await?,
TaskAction::Delete { task_id, force } => {
if force {
client.force_delete_task(&task_id).await?
} else {
client.delete_task(&task_id).await?
}
}
TaskAction::SetExpiration {
task_id,
expiration,
Expand Down
5 changes: 5 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ impl DivviupClient {
self.delete(&format!("api/tasks/{task_id}")).await
}

pub async fn force_delete_task(&self, task_id: &str) -> ClientResult<()> {
self.delete(&format!("api/tasks/{task_id}?force=true"))
.await
}

pub async fn api_tokens(&self, account_id: Uuid) -> ClientResult<Vec<ApiToken>> {
self.get(&format!("api/accounts/{account_id}/api_tokens"))
.await
Expand Down
18 changes: 18 additions & 0 deletions client/tests/integration/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,24 @@ async fn delete_task(app: Arc<DivviupApi>, account: Account, client: DivviupClie
Ok(())
}

#[test(harness = with_configured_client)]
async fn force_delete_task(
app: Arc<DivviupApi>,
account: Account,
client: DivviupClient,
) -> TestResult {
let task = fixtures::task(&app, &account).await;

let response_tasks = client.tasks(account.id).await?;
assert!(!response_tasks.is_empty());

client.force_delete_task(&task.id).await?;

let response_tasks = client.tasks(account.id).await?;
assert!(response_tasks.is_empty());
Ok(())
}

#[test(harness = with_configured_client)]
async fn collector_auth_tokens_no_token_hash(
app: Arc<DivviupApi>,
Expand Down
21 changes: 21 additions & 0 deletions documentation/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,27 @@ paths:
$ref: "#/components/schemas/Task"
"404":
$ref: "#/components/responses/NotFound"
delete:
tags: [tasks]
operationId: deleteTask
summary: delete a task by id
description: delete a task by id
parameters:
- in: query
name: force
schema:
type: boolean
required: false
description: >-
forces deletion of the task, even if task's aggregators are unreachable. this is a
dangerous operation!
responses:
"204":
description: Successful operation
"403":
description: Forbidden
"404":
description: Not Found

/tasks/{task_id}/collector_auth_tokens:
get:
Expand Down
2 changes: 1 addition & 1 deletion src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct DivviupApi {
}

impl DivviupApi {
async fn init(&mut self, info: &mut Info) {
pub async fn init(&mut self, info: &mut Info) {
*info.server_description_mut() = format!("divviup-api {}", env!("CARGO_PKG_VERSION"));
*info.listener_description_mut() = format!(
"api url: {}\n app url: {}\n",
Expand Down
40 changes: 23 additions & 17 deletions src/routes/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use crate::{
entity::{Account, NewTask, Task, TaskColumn, Tasks, UpdateTask},
Crypter, Db, Error, Permissions, PermissionsActor,
};
use querystrong::QueryStrong;
use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait,
QueryFilter,
};
use std::time::Duration;
use time::OffsetDateTime;
use tokio::join;
use tracing::warn;
use trillium::{Conn, Handler, Status};
use trillium_api::{FromConn, Json, State};
Expand Down Expand Up @@ -123,6 +125,12 @@ pub async fn delete(
conn: &mut Conn,
(task, client, db): (Task, State<Client>, Db),
) -> Result<impl Handler, Error> {
let params = QueryStrong::parse(conn.querystring()).unwrap_or_default();
let force = params
.get_str("force")
.and_then(|param| param.parse().ok())
.unwrap_or(false);

if task.deleted_at.is_some() {
return Ok(Status::NoContent);
}
Expand All @@ -136,32 +144,30 @@ pub async fn delete(
if task.expiration.is_none() || task.expiration > Some(now) {
let update = UpdateTask::expiration(Some(now));

// The leader aggregator drives DAP, so we _must_ succeed in setting its expiration.
update
.update_aggregator_expiration(
let (leader_result, helper_result) = join!(
update.update_aggregator_expiration(
task.leader_aggregator(&db).await?,
&task.id,
&client,
crypter,
)
.await?;

// Expiry helper-side is best-effort. It's plausible that the user is deleting tasks to a
// BYOH that no longer exists, so we don't want to irritate them by forcing them to bring
// the BYOH back up.
//
// If we fail to set expiry transiently, e.g. due to temporary server outage, it's no big
// deal, because the leader is what drives the protocol forward. The helper shouldn't incur
// load based on this task because we've ensured that the leader will stop.
let _ = update
.update_aggregator_expiration(
),
update.update_aggregator_expiration(
task.helper_aggregator(&db).await?,
&task.id,
&client,
crypter,
)
.await
.map_err(|err| warn!(?err, "failed to expire helper-side task"));
);

if force {
let _ = leader_result
.map_err(|err| warn!(?err, "failed to expire leader-side task, ignoring"));
let _ = helper_result
.map_err(|err| warn!(?err, "failed to expire helper-side task, ignoring"));
} else {
leader_result?;
helper_result?;
}

am.expiration = ActiveValue::Set(Some(now));
}
Expand Down
3 changes: 3 additions & 0 deletions test-support/src/api_mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ pub struct ApiMocks {
handler: (ClientLogs, divviup_api::api_mocks::ApiMocks),
client_logs: ClientLogs,
}

impl Default for ApiMocks {
fn default() -> Self {
Self::new()
}
}

impl ApiMocks {
pub fn new() -> Self {
let client_logs = ClientLogs::default();
Expand All @@ -24,6 +26,7 @@ impl ApiMocks {
client_logs,
}
}

pub fn client_logs(&self) -> ClientLogs {
self.client_logs.clone()
}
Expand Down
1 change: 0 additions & 1 deletion tests/integration/aggregator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use divviup_api::{
clients::AggregatorClient,
};
use test_support::{assert_eq, test, *};
use trillium::Handler;

#[test(harness = with_client_logs)]
async fn get_task_ids(app: DivviupApi, client_logs: ClientLogs) -> TestResult {
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,9 @@ mod update {
}

mod delete {
use divviup_api::DivviupApi;
use janus_messages::Time as JanusTime;
use test_support::tracing::install_test_trace_subscriber;

use super::{assert_eq, test, *};

Expand Down Expand Up @@ -946,6 +948,45 @@ mod delete {
Ok(())
}

#[tokio::test]
async fn force() -> TestResult {
install_test_trace_subscriber();
let client_logs = ClientLogs::default();
let mut app = DivviupApi::new(config((
client_logs.clone(),
// Stub out aggregator API mocks to simulate a FUBAR aggregator.
|conn: Conn| async move { conn.with_status(Status::InternalServerError) },
)))
.await;
set_up_schema(app.db()).await;
let mut info = "testing".into();
app.init(&mut info).await;

let (user, account, ..) = fixtures::member(&app).await;
let task = fixtures::task(&app, &account).await;

let conn = delete(format!("/api/tasks/{}?force=true", task.id))
.with_api_headers()
.with_state(user.clone())
.run_async(&app)
.await;
assert_status!(conn, Status::NoContent);
let task_reload = task.reload(app.db()).await?.unwrap();
assert!(task_reload.expiration.is_some());
assert!(task_reload.expiration <= Some(OffsetDateTime::now_utc()));
assert!(task_reload.deleted_at.is_some());
assert!(task_reload.deleted_at <= Some(OffsetDateTime::now_utc()));

let logs = dbg!(client_logs.logs());
let [leader, helper] = &logs[..] else {
panic!("expected exactly two requests");
};
assert_eq!(leader.response_status, Status::InternalServerError);
assert_eq!(helper.response_status, Status::InternalServerError);

Ok(())
}

#[test(harness = with_client_logs)]
async fn success_if_no_expiry(app: DivviupApi, client_logs: ClientLogs) -> TestResult {
let (user, account, ..) = fixtures::member(&app).await;
Expand Down

0 comments on commit 201b907

Please sign in to comment.