diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 5585cad8..bfab7bbb 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, convert::Infallible, net::SocketAddr, ops::RangeInclusive, @@ -548,7 +548,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { loop { sleep(COMPACT_BOOKED_INTERVAL).await; - let tables = agent.schema().read().tables.keys().cloned().collect(); let to_check: Vec = { bookie.read().keys().copied().collect() }; for actor_id in to_check { @@ -563,10 +562,19 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { continue; } - let res = block_in_place(|| { + let mut conn = match pool.write_low().await { + Ok(conn) => conn, + Err(e) => { + error!("could not acquire low priority write connection for compaction: {e}"); + continue; + } + }; + + let res = block_in_place(move || { + let mut bookedw = booked.write(); + let to_clear = { - let conn = pool.read_blocking()?; - match compact_booked_for_actor(&conn, &tables, &versions) { + match compact_booked_for_actor(&conn, &versions) { Ok(to_clear) => { if to_clear.is_empty() { return Ok(()); @@ -580,7 +588,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { } }; - let mut conn = pool.blocking_write_low()?; let tx = conn.transaction()?; let deleted = tx @@ -621,7 +628,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { info!("compacted in-db version state for actor {actor_id}, deleted: {deleted}, inserted: {inserted}"); - let mut bookedw = booked.write(); **bookedw.inner_mut() = new_copy; info!("compacted in-memory cache by clearing {cleared_len} db versions for actor {actor_id}, new total: {}", bookedw.inner().len()); @@ -917,7 +923,6 @@ pub async fn handle_change( fn compact_booked_for_actor( conn: &Connection, - tables: &BTreeSet, versions: &BTreeMap, ) -> eyre::Result> { // TODO: optimize that in a single query once cr-sqlite supports aggregation @@ -930,10 +935,17 @@ fn compact_booked_for_actor( _ => return Ok(HashSet::new()), }; + let tables = conn + .prepare_cached( + "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE '%__crsql_clock'", + )? + .query_map([], |row| row.get(0))? + .collect::, _>>()?; + let still_live: HashSet = conn .prepare(&format!( "SELECT db_version FROM ({});", - tables.iter().map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table}__crsql_clock WHERE db_version >= {first} AND db_version <= {last}")).collect::>().join(" UNION ") + tables.iter().map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table} WHERE db_version >= {first} AND db_version <= {last}")).collect::>().join(" UNION ") ))? .query_map([], |row| row.get(0), @@ -2433,11 +2445,7 @@ pub mod tests { assert_eq!(db_version, 2); - let diff = compact_booked_for_actor( - &conn, - &vec!["foo".to_string()].into_iter().collect(), - &vec![(1, 1)].into_iter().collect(), - )?; + let diff = compact_booked_for_actor(&conn, &vec![(1, 1)].into_iter().collect())?; assert!(diff.contains(&1)); assert!(!diff.contains(&2));