diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index edc2e7600f7c..602c75dc7458 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -1765,11 +1765,12 @@ impl DataSource { ); // Delete data source and documents (SQL). - store + let deleted_rows = store .delete_data_source(&self.project, &self.data_source_id) .await?; info!( + deleted_rows = deleted_rows, data_source_internal_id = self.internal_id(), "Deleted data source records" ); diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index ed94f3e523d1..5d2358f1a1e5 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -1927,12 +1927,12 @@ impl Store for PostgresStore { Ok(()) } - async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result<()> { + async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result { let project_id = project.project_id(); let data_source_id = data_source_id.to_string(); let pool = self.pool.clone(); - let mut c = pool.get().await?; + let c = pool.get().await?; let r = c .query( @@ -1947,19 +1947,35 @@ impl Store for PostgresStore { _ => unreachable!(), }; - let tx = c.transaction().await?; + // Data source documents can be numerous so we want to avoid any transaction that could + // potentially hurt the performance of the database. Also we delete documents in small + // batches to avoid long running operations. + let deletion_batch_size: u64 = 512; + let mut total_deleted_rows: u64 = 0; - let stmt = tx - .prepare("DELETE FROM data_sources_documents WHERE data_source = $1") + let stmt = c + .prepare( + "DELETE FROM data_sources_documents WHERE id IN ( + SELECT id FROM data_sources_documents WHERE data_source = $1 LIMIT $2 + )", + ) .await?; - let _ = tx.query(&stmt, &[&data_source_row_id]).await?; - let stmt = tx.prepare("DELETE FROM data_sources WHERE id = $1").await?; - let _ = tx.query(&stmt, &[&data_source_row_id]).await?; + loop { + let deleted_rows = c + .execute(&stmt, &[&data_source_row_id, &(deletion_batch_size as i64)]) + .await?; + total_deleted_rows += deleted_rows; - tx.commit().await?; + if deleted_rows < deletion_batch_size { + break; + } + } - Ok(()) + let stmt = c.prepare("DELETE FROM data_sources WHERE id = $1").await?; + let _ = c.query(&stmt, &[&data_source_row_id]).await?; + + Ok(total_deleted_rows) } async fn upsert_database( diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index 36e7820e02d1..0fe8edcea4b4 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -175,7 +175,7 @@ pub trait Store { data_source_id: &str, document_id: &str, ) -> Result<()>; - async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result<()>; + async fn delete_data_source(&self, project: &Project, data_source_id: &str) -> Result; // Databases async fn upsert_database( &self,