Skip to content

Commit

Permalink
attempt at fixing compaction issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Aug 22, 2023
1 parent ed76472 commit 8251797
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
convert::Infallible,
net::SocketAddr,
ops::RangeInclusive,
Expand Down Expand Up @@ -548,7 +548,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
loop {
sleep(COMPACT_BOOKED_INTERVAL).await;

let tables = agent.schema().read().tables.keys().cloned().collect();
let to_check: Vec<ActorId> = { bookie.read().keys().copied().collect() };

for actor_id in to_check {
Expand All @@ -563,10 +562,19 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
continue;
}

let res = block_in_place(|| {
let mut conn = match pool.write_low().await {
Ok(conn) => conn,
Err(e) => {
error!("could not acquire low priority write connection for compaction: {e}");
continue;
}
};

let res = block_in_place(move || {
let mut bookedw = booked.write();

let to_clear = {
let conn = pool.read_blocking()?;
match compact_booked_for_actor(&conn, &tables, &versions) {
match compact_booked_for_actor(&conn, &versions) {
Ok(to_clear) => {
if to_clear.is_empty() {
return Ok(());
Expand All @@ -580,7 +588,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
}
};

let mut conn = pool.blocking_write_low()?;
let tx = conn.transaction()?;

let deleted = tx
Expand Down Expand Up @@ -621,7 +628,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {

info!("compacted in-db version state for actor {actor_id}, deleted: {deleted}, inserted: {inserted}");

let mut bookedw = booked.write();
**bookedw.inner_mut() = new_copy;
info!("compacted in-memory cache by clearing {cleared_len} db versions for actor {actor_id}, new total: {}", bookedw.inner().len());

Expand Down Expand Up @@ -917,7 +923,6 @@ pub async fn handle_change(

fn compact_booked_for_actor(
conn: &Connection,
tables: &BTreeSet<String>,
versions: &BTreeMap<i64, i64>,
) -> eyre::Result<HashSet<i64>> {
// TODO: optimize that in a single query once cr-sqlite supports aggregation
Expand All @@ -930,10 +935,17 @@ fn compact_booked_for_actor(
_ => return Ok(HashSet::new()),
};

let tables = conn
.prepare_cached(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE '%__crsql_clock'",
)?
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;

let still_live: HashSet<i64> = conn
.prepare(&format!(
"SELECT db_version FROM ({});",
tables.iter().map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table}__crsql_clock WHERE db_version >= {first} AND db_version <= {last}")).collect::<Vec<_>>().join(" UNION ")
tables.iter().map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table} WHERE db_version >= {first} AND db_version <= {last}")).collect::<Vec<_>>().join(" UNION ")
))?
.query_map([],
|row| row.get(0),
Expand Down Expand Up @@ -2433,11 +2445,7 @@ pub mod tests {

assert_eq!(db_version, 2);

let diff = compact_booked_for_actor(
&conn,
&vec!["foo".to_string()].into_iter().collect(),
&vec![(1, 1)].into_iter().collect(),
)?;
let diff = compact_booked_for_actor(&conn, &vec![(1, 1)].into_iter().collect())?;

assert!(diff.contains(&1));
assert!(!diff.contains(&2));
Expand Down

0 comments on commit 8251797

Please sign in to comment.