Skip to content

Commit

Permalink
core: delete non last superseded older than 24h (#8249)
Browse files Browse the repository at this point in the history
* core: delete non last superseded older than 24h

* migration

* migration work

* Fix

* start small

* log

* retry

* clean Cargo.toml
  • Loading branch information
spolu authored Oct 27, 2024
1 parent 5a67e24 commit 9b0a7d5
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 17 deletions.
4 changes: 0 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ path = "bin/oauth.rs"
name = "oauth_generate_key"
path = "bin/oauth_generate_key.rs"

[[bin]]
name = "migration_clean_legacy_gcs"
path = "bin/migrations/20241024_clean_legacy_gcs.rs"

[[test]]
name = "oauth_connections_test"
path = "src/oauth/tests/functional_connections.rs"
Expand Down
206 changes: 206 additions & 0 deletions core/bin/migrations/20241025_scrub_old_superseded_verions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use anyhow::{anyhow, Context, Error, Result};
use dust::data_sources::data_source::{DataSource, DocumentVersion};
use dust::stores::{postgres, store};
use tokio_postgres::Row;

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use futures::prelude::*;
use futures::{StreamExt, TryStreamExt};
use std::time::Duration;
use tokio_postgres::NoTls;
use tokio_stream::{self as stream};

pub async fn with_retryable_back_off<F, O>(
mut f: impl FnMut() -> F,
log_retry: impl Fn(&Error, &Duration, usize) -> (),
log_error: impl Fn(&Error) -> (),
) -> Result<O>
where
F: Future<Output = Result<O, anyhow::Error>>,
{
let factor = 2;
let retries = 3;
let mut sleep = Duration::from_millis(500);
let mut attempts = 0_usize;
let out = loop {
match f().await {
Err(err) => {
log_error(&err);
attempts += 1;
log_retry(&err, &sleep, attempts);
tokio::time::sleep(sleep).await;
sleep *= factor;
if attempts > retries {
break Err(anyhow!("Too many retries ({}): {}", retries, err));
}
}
Ok(out) => break Ok(out),
}
};
out
}

async fn fetch_data_sources_documents_batch(
pool: &Pool<PostgresConnectionManager<NoTls>>,
data_source_id: i64,
last_id: u64,
limit: usize,
) -> Result<Vec<Row>, anyhow::Error> {
let c = pool.get().await?;

c.query(
"SELECT id, document_id FROM data_sources_documents WHERE data_source = $1 AND status='latest' AND id > $2 ORDER BY id ASC LIMIT $3",
&[&data_source_id, &(last_id as i64), &(limit as i64)],
)
.await
.context("fetch_data_sources_documents")
}

async fn scrub_wrapper(
store: Box<dyn store::Store + Sync + Send>,
data_source: &DataSource,
document_id: &str,
) -> Result<Vec<DocumentVersion>> {
data_source
.scrub_document_superseded_versions(store, document_id)
.await
}

async fn scrub_superseded_versions_for_data_source(
store: Box<dyn store::Store + Sync + Send>,
data_source_internal_id: &str,
data_source_id: i64,
) -> Result<()> {
let data_source = match store
.load_data_source_by_internal_id(&data_source_internal_id)
.await?
{
Some(ds) => ds,
None => Err(anyhow!("Data source not found"))?,
};

let pool = store.raw_pool();

let limit: usize = 1024;
let mut last_data_source_document_id = 0;
let mut iteration = 0;

loop {
let rows = fetch_data_sources_documents_batch(
pool,
data_source_id,
last_data_source_document_id,
limit,
)
.await?;

stream::iter(
rows.iter()
.map(|row| {
let document_id: String = row.get(1);
(store.clone(), document_id, data_source.clone())
})
.map(|(store, document_id, data_source)| async move {
let v = with_retryable_back_off(
|| scrub_wrapper(store.clone(), &data_source, &document_id),
|err, sleep, attempts| {
println!(
"Retrying scrub: err_msg={}, sleep={:?}, attempts={}",
err, sleep, attempts
);
},
|err| {
println!(
"Error scrubbing: data_source_id={}, err_msg={}",
data_source_id, err
);
},
)
.await?;
if v.len() > 0 {
println!(
"Scrubbed document: data_source_id={} document_id={} scrubbed={}",
data_source_id,
document_id,
v.len()
);
}
Ok::<(), anyhow::Error>(())
}),
)
.buffer_unordered(8)
.try_collect::<Vec<_>>()
.await?;

if rows.len() < limit {
println!("Scrub loop done: data_source_id={}", data_source_id);
break;
}

last_data_source_document_id = match rows.last() {
Some(r) => {
let id: i64 = r.get(0);
println!(
"Scrub loop: data_source_id={} iteration={}, last_data_source_document_id={}",
data_source_id, iteration, id
);

id as u64
}
None => {
println!(
"Scrub loop done: data_source_id={} iteration={}",
data_source_id, iteration
);
break;
}
};

iteration += 1;
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let store: Box<dyn store::Store + Sync + Send> = match std::env::var("CORE_DATABASE_URI") {
Ok(db_uri) => {
let store = postgres::PostgresStore::new(&db_uri).await?;
store.init().await?;
Box::new(store)
}
Err(_) => Err(anyhow!("CORE_DATABASE_URI is required (postgres)"))?,
};

let pool = store.raw_pool();

let c = pool.get().await?;

let rows = c
.query("SELECT id, internal_id FROM data_sources ORDER BY id", &[])
.await
.context("fetch_data_sources")?;

stream::iter(rows.iter().map(|row| {
let store = store.clone();

async move {
let data_source_id: i64 = row.get(0);
let data_source_internal_id: String = row.get(1);

scrub_superseded_versions_for_data_source(
store,
&data_source_internal_id,
data_source_id,
)
.await
}
}))
.buffer_unordered(8)
.try_collect::<Vec<_>>()
.await?;

Ok(())
}
91 changes: 86 additions & 5 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ impl FromStr for DocumentStatus {
}
}

impl ToString for DocumentStatus {
fn to_string(&self) -> String {
match self {
DocumentStatus::Latest => "latest".to_string(),
DocumentStatus::Superseded => "superseded".to_string(),
DocumentStatus::Deleted => "deleted".to_string(),
}
}
}

#[derive(Debug, Serialize, Clone)]
pub struct DocumentVersion {
pub created: u64,
Expand Down Expand Up @@ -699,7 +709,7 @@ impl DataSource {
.await?;
}

// Upsert document (SQL)
// Upsert document (SQL).
store
.upsert_data_source_document(
&self.project,
Expand All @@ -708,6 +718,10 @@ impl DataSource {
)
.await?;

// Clean-up old superseded versions.
self.scrub_document_superseded_versions(store, &document_id)
.await?;

Ok(main_collection_document)
}

Expand Down Expand Up @@ -1769,7 +1783,7 @@ impl DataSource {
.filter(|v| v.status == DocumentStatus::Deleted)
.collect::<Vec<_>>();

let mut scrubbed_hashes: Vec<DocumentVersion> = vec![];
let mut scrubbed_versions: Vec<DocumentVersion> = vec![];
for v in versions {
let document_id_hash = make_document_id_hash(document_id);

Expand All @@ -1782,7 +1796,7 @@ impl DataSource {
.await?;

store
.scrub_data_source_document_version(
.delete_data_source_document_version(
&self.project,
&self.data_source_id,
document_id,
Expand All @@ -1798,10 +1812,77 @@ impl DataSource {
"Scrubbed deleted document version"
);

scrubbed_hashes.push(v);
scrubbed_versions.push(v);
}

Ok(scrubbed_hashes)
Ok(scrubbed_versions)
}

pub async fn scrub_document_superseded_versions(
&self,
store: Box<dyn Store + Sync + Send>,
document_id: &str,
) -> Result<Vec<DocumentVersion>> {
let (versions, _) = store
.list_data_source_document_versions(
&self.project,
&self.data_source_id,
document_id,
None,
&None,
&None,
)
.await?;

// We scrub only superseded version keeping always the last one as well as the ones that
// have been created within the past 24h. Document versions are ordered by creation date
// (descending) but we resort here just to be safe in case the API of the store changes.
let now = utils::now();
let scrubbed_versions = versions
.into_iter()
.sorted_by(|a, b| Ord::cmp(&b.created, &a.created))
.filter(|v| v.status == DocumentStatus::Superseded)
.skip(1)
.filter(|v| now - v.created > 24 * 60 * 60 * 1000)
.collect::<Vec<_>>();

for v in scrubbed_versions.iter() {
let document_id_hash = make_document_id_hash(document_id);

FileStorageDocument::scrub_document_version_from_file_storage(
&self,
document_id,
&document_id_hash,
v,
)
.await?;

store
.delete_data_source_document_version(
&self.project,
&self.data_source_id,
document_id,
v,
)
.await?;

info!(
data_source_internal_id = self.internal_id,
document_id = document_id,
version_created = v.created,
version_hash = v.hash,
"Scrubbed superseded document version"
);
}

info!(
data_source_internal_id = self.internal_id,
document_id = document_id,
scrubbed_version_count = scrubbed_versions.len(),
"Scrubbed superseded document versions"
);

Ok(scrubbed_versions)
}

pub async fn delete(
Expand Down
5 changes: 1 addition & 4 deletions core/src/data_sources/file_storage_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ impl FileStorageDocument {
let bucket = FileStorageDocument::get_bucket().await?;

match Object::delete(&bucket, &path).await {
Ok(_) => {
// println!("Deleted: path={}", path);
Ok(true)
}
Ok(_) => Ok(true),
Err(e) => match e {
cloud_storage::Error::Google(GoogleErrorResponse {
error: ErrorList { code: 404, .. },
Expand Down
10 changes: 7 additions & 3 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1938,7 +1938,7 @@ impl Store for PostgresStore {
Ok(())
}

async fn scrub_data_source_document_version(
async fn delete_data_source_document_version(
&self,
project: &Project,
data_source_id: &str,
Expand All @@ -1950,6 +1950,7 @@ impl Store for PostgresStore {
let document_id = document_id.to_string();
let created = version.created as i64;
let hash = version.hash.clone();
let status = version.status.to_string();

let pool = self.pool.clone();
let c = pool.get().await?;
Expand All @@ -1971,11 +1972,14 @@ impl Store for PostgresStore {
.prepare(
"DELETE FROM data_sources_documents \
WHERE data_source = $1 AND document_id = $2 \
AND created = $3 AND hash = $4 AND status='deleted'",
AND created = $3 AND hash = $4 AND status=$5",
)
.await?;
let _ = c
.query(&stmt, &[&data_source_row_id, &document_id, &created, &hash])
.query(
&stmt,
&[&data_source_row_id, &document_id, &created, &hash, &status],
)
.await?;

Ok(())
Expand Down
Loading

0 comments on commit 9b0a7d5

Please sign in to comment.