Skip to content

Commit

Permalink
core: remove delete data source tx and batch documents deletion (#8104)
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu authored Oct 18, 2024
1 parent b21288f commit 958059c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
3 changes: 2 additions & 1 deletion core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1765,11 +1765,12 @@ impl DataSource {
);

// Delete data source and documents (SQL).
store
let deleted_rows = store
.delete_data_source(&self.project, &self.data_source_id)
.await?;

info!(
deleted_rows = deleted_rows,
data_source_internal_id = self.internal_id(),
"Deleted data source records"
);
Expand Down
36 changes: 26 additions & 10 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1927,12 +1927,12 @@ impl Store for PostgresStore {
Ok(())
}

async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result<()> {
async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result<u64> {
let project_id = project.project_id();
let data_source_id = data_source_id.to_string();

let pool = self.pool.clone();
let mut c = pool.get().await?;
let c = pool.get().await?;

let r = c
.query(
Expand All @@ -1947,19 +1947,35 @@ impl Store for PostgresStore {
_ => unreachable!(),
};

let tx = c.transaction().await?;
// Data source documents can be numerous so we want to avoid any transaction that could
// potentially hurt the performance of the database. Also we delete documents in small
// batches to avoid long running operations.
let deletion_batch_size: u64 = 512;
let mut total_deleted_rows: u64 = 0;

let stmt = tx
.prepare("DELETE FROM data_sources_documents WHERE data_source = $1")
let stmt = c
.prepare(
"DELETE FROM data_sources_documents WHERE id IN (
SELECT id FROM data_sources_documents WHERE data_source = $1 LIMIT $2
)",
)
.await?;
let _ = tx.query(&stmt, &[&data_source_row_id]).await?;

let stmt = tx.prepare("DELETE FROM data_sources WHERE id = $1").await?;
let _ = tx.query(&stmt, &[&data_source_row_id]).await?;
loop {
let deleted_rows = c
.execute(&stmt, &[&data_source_row_id, &(deletion_batch_size as i64)])
.await?;
total_deleted_rows += deleted_rows;

tx.commit().await?;
if deleted_rows < deletion_batch_size {
break;
}
}

Ok(())
let stmt = c.prepare("DELETE FROM data_sources WHERE id = $1").await?;
let _ = c.query(&stmt, &[&data_source_row_id]).await?;

Ok(total_deleted_rows)
}

async fn upsert_database(
Expand Down
2 changes: 1 addition & 1 deletion core/src/stores/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub trait Store {
data_source_id: &str,
document_id: &str,
) -> Result<()>;
async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result<()>;
async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result<u64>;
// Databases
async fn upsert_database(
&self,
Expand Down

0 comments on commit 958059c

Please sign in to comment.