Skip to content

Commit

Permalink
Lint cleanup (#42)
Browse files Browse the repository at this point in the history
* fix a billion clippy lints

* allow a few things we can't fix yet

* fix casting issue for platform-depend sizes

* allow needless return that's not so needless

* need &mut self for rhai functions
  • Loading branch information
jeromegn authored Sep 11, 2023
1 parent 8e78f9c commit f78776b
Show file tree
Hide file tree
Showing 23 changed files with 260 additions and 425 deletions.
19 changes: 12 additions & 7 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use corro_types::{
Agent, AgentConfig, Booked, BookedVersions, Bookie, ChangeError, KnownDbVersion, SplitPool,
},
broadcast::{
BiPayload, BiPayloadV1, BroadcastInput, BroadcastV1, ChangeV1, Changeset, FocaInput,
Timestamp, UniPayload, UniPayloadV1,
BiPayload, BiPayloadV1, BroadcastInput, BroadcastV1, ChangeV1, Changeset, ChangesetParts,
FocaInput, Timestamp, UniPayload, UniPayloadV1,
},
change::{Change, SqliteValue},
config::{AuthzConfig, Config, DEFAULT_GOSSIP_PORT},
Expand Down Expand Up @@ -237,7 +237,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
gossip_addr,
api_addr,
members: RwLock::new(Members::default()),
clock: clock.clone(),
clock,
bookie,
tx_bcast,
tx_apply,
Expand Down Expand Up @@ -1424,8 +1424,7 @@ fn store_empty_changeset(
.execute(params![actor_id, version,])?;
}

tx.commit()?;
return Ok(());
tx.commit()
}

async fn process_fully_buffered_changes(
Expand Down Expand Up @@ -1580,7 +1579,13 @@ pub async fn process_single_version(
let tx = conn.transaction()?;

let versions = changeset.versions();
let (version, changes, seqs, last_seq, ts) = match changeset.into_parts() {
let ChangesetParts {
version,
changes,
seqs,
last_seq,
ts,
} = match changeset.into_parts() {
None => {
store_empty_changeset(tx, actor_id, versions.clone())?;
booked_write.insert_many(versions.clone(), KnownDbVersion::Cleared);
Expand Down Expand Up @@ -1742,7 +1747,7 @@ pub async fn process_single_version(
impactful_changeset.push(change);
if let Some(c) = impactful_changeset.last() {
if let Some(counter) = changes_per_table.get_mut(&c.table) {
*counter = *counter + 1;
*counter += 1;
} else {
changes_per_table.insert(c.table.clone(), 1);
}
Expand Down
90 changes: 49 additions & 41 deletions crates/corro-agent/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
}
}

impl<'stmt, I> Iterator for ChunkedChanges<I>
impl<I> Iterator for ChunkedChanges<I>
where
I: Iterator<Item = rusqlite::Result<Change>>,
{
Expand Down Expand Up @@ -122,10 +122,10 @@ where
self.done = true;

// return buffered changes
return Some(Ok((
Some(Ok((
self.changes.clone(), // no need to drain here like before
self.last_start_seq..=self.last_seq, // even if empty, this is all we have still applied
)));
)))
}
}

Expand Down Expand Up @@ -222,8 +222,8 @@ where
ORDER BY seq ASC
"#)?;
let rows = prepped.query_map([db_version], row_to_change)?;
let mut chunked = ChunkedChanges::new(rows, 0, last_seq, MAX_CHANGES_PER_MESSAGE);
while let Some(changes_seqs) = chunked.next() {
let chunked = ChunkedChanges::new(rows, 0, last_seq, MAX_CHANGES_PER_MESSAGE);
for changes_seqs in chunked {
match changes_seqs {
Ok((changes, seqs)) => {
for (table_name, count) in
Expand Down Expand Up @@ -274,10 +274,10 @@ where

fn execute_statement(tx: &Transaction, stmt: &Statement) -> rusqlite::Result<usize> {
match stmt {
Statement::Simple(q) => tx.execute(&q, []),
Statement::WithParams(q, params) => tx.execute(&q, params_from_iter(params)),
Statement::Simple(q) => tx.execute(q, []),
Statement::WithParams(q, params) => tx.execute(q, params_from_iter(params)),
Statement::WithNamedParams(q, params) => tx.execute(
&q,
q,
params
.iter()
.map(|(k, v)| (k.as_str(), v as &dyn ToSql))
Expand Down Expand Up @@ -309,11 +309,11 @@ pub async fn api_v1_transactions(

let results = statements
.iter()
.filter_map(|stmt| {
.map(|stmt| {
let start = Instant::now();
let res = execute_statement(&tx, stmt);
let res = execute_statement(tx, stmt);

Some(match res {
match res {
Ok(rows_affected) => {
total_rows_affected += rows_affected;
RqliteResult::Execute {
Expand All @@ -324,7 +324,7 @@ pub async fn api_v1_transactions(
Err(e) => RqliteResult::Error {
error: e.to_string(),
},
})
}
})
.collect::<Vec<RqliteResult>>();

Expand All @@ -334,20 +334,18 @@ pub async fn api_v1_transactions(

let (results, elapsed) = match res {
Ok(res) => res,
Err(e) => match e {
e => {
error!("could not execute statement(s): {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(RqliteResponse {
results: vec![RqliteResult::Error {
error: e.to_string(),
}],
time: None,
}),
);
}
},
Err(e) => {
error!("could not execute statement(s): {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(RqliteResponse {
results: vec![RqliteResult::Error {
error: e.to_string(),
}],
time: None,
}),
);
}
};

(
Expand Down Expand Up @@ -391,9 +389,9 @@ async fn build_query_rows_response(
};

let prepped_res = block_in_place(|| match &stmt {
Statement::Simple(q) => conn.prepare(q.as_str()),
Statement::WithParams(q, _) => conn.prepare(q.as_str()),
Statement::WithNamedParams(q, _) => conn.prepare(q.as_str()),
Statement::Simple(q) => conn.prepare(q),
Statement::WithParams(q, _) => conn.prepare(q),
Statement::WithNamedParams(q, _) => conn.prepare(q),
});

let mut prepped = match prepped_res {
Expand All @@ -411,6 +409,7 @@ async fn build_query_rows_response(

block_in_place(|| {
let col_count = prepped.column_count();
trace!("inside block in place, col count: {col_count}");

if let Err(e) = data_tx.blocking_send(QueryEvent::Columns(
prepped
Expand Down Expand Up @@ -458,9 +457,12 @@ async fn build_query_rows_response(

let mut rowid = 1;

trace!("about to loop through rows!");

loop {
match rows.next() {
Ok(Some(row)) => {
trace!("got a row: {row:?}");
match (0..col_count)
.map(|i| row.get::<_, SqliteValue>(i))
.collect::<rusqlite::Result<Vec<_>>>()
Expand Down Expand Up @@ -546,14 +548,18 @@ pub async fn api_v1_queries(
debug!("query body channel done");
});

trace!("building query rows response...");

match build_query_rows_response(&agent, data_tx, stmt).await {
Ok(_) => {
#[allow(clippy::needless_return)]
return hyper::Response::builder()
.status(StatusCode::OK)
.body(body)
.expect("could not build query response body");
}
Err((status, res)) => {
#[allow(clippy::needless_return)]
return hyper::Response::builder()
.status(status)
.body(
Expand Down Expand Up @@ -845,6 +851,8 @@ mod tests {

assert!(body.0.results.len() == 2);

println!("transaction body: {body:?}");

let res = api_v1_queries(
Extension(agent.clone()),
axum::Json(Statement::Simple("select * from tests".into())),
Expand Down Expand Up @@ -962,14 +970,14 @@ mod tests {
let id_col = tests.columns.get("id").unwrap();
assert_eq!(id_col.name, "id");
assert_eq!(id_col.sql_type, SqliteType::Integer);
assert_eq!(id_col.nullable, true);
assert_eq!(id_col.primary_key, true);
assert!(id_col.nullable);
assert!(id_col.primary_key);

let foo_col = tests.columns.get("foo").unwrap();
assert_eq!(foo_col.name, "foo");
assert_eq!(foo_col.sql_type, SqliteType::Text);
assert_eq!(foo_col.nullable, true);
assert_eq!(foo_col.primary_key, false);
assert!(foo_col.nullable);
assert!(!foo_col.primary_key);
}

let (status_code, _body) = api_v1_db_schema(
Expand All @@ -992,14 +1000,14 @@ mod tests {
let id_col = tests.columns.get("id").unwrap();
assert_eq!(id_col.name, "id");
assert_eq!(id_col.sql_type, SqliteType::Integer);
assert_eq!(id_col.nullable, true);
assert_eq!(id_col.primary_key, true);
assert!(id_col.nullable);
assert!(id_col.primary_key);

let foo_col = tests.columns.get("foo").unwrap();
assert_eq!(foo_col.name, "foo");
assert_eq!(foo_col.sql_type, SqliteType::Text);
assert_eq!(foo_col.nullable, true);
assert_eq!(foo_col.primary_key, false);
assert!(foo_col.nullable);
assert!(!foo_col.primary_key);

let tests = schema
.tables
Expand All @@ -1009,14 +1017,14 @@ mod tests {
let id_col = tests.columns.get("id").unwrap();
assert_eq!(id_col.name, "id");
assert_eq!(id_col.sql_type, SqliteType::Integer);
assert_eq!(id_col.nullable, true);
assert_eq!(id_col.primary_key, true);
assert!(id_col.nullable);
assert!(id_col.primary_key);

let foo_col = tests.columns.get("foo").unwrap();
assert_eq!(foo_col.name, "foo");
assert_eq!(foo_col.sql_type, SqliteType::Text);
assert_eq!(foo_col.nullable, true);
assert_eq!(foo_col.primary_key, false);
assert!(foo_col.nullable);
assert!(!foo_col.primary_key);

Ok(())
}
Expand Down
35 changes: 19 additions & 16 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ async fn process_range(
version,
known_version,
vec![],
&sender,
sender,
)
.await?;
}
Expand Down Expand Up @@ -411,23 +411,26 @@ async fn process_version(
row_to_change,
)?;

let mut chunked =
let chunked =
ChunkedChanges::new(rows, *start_seq, *end_seq, MAX_CHANGES_PER_MESSAGE);
while let Some(changes_seqs) = chunked.next() {
for changes_seqs in chunked {
match changes_seqs {
Ok((changes, seqs)) => {
if let Err(_) = sender.blocking_send(SyncMessage::V1(
SyncMessageV1::Changeset(ChangeV1 {
actor_id,
changeset: Changeset::Full {
version,
changes,
seqs,
last_seq: *last_seq,
ts: *ts,
if sender
.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version,
changes,
seqs,
last_seq: *last_seq,
ts: *ts,
},
},
}),
)) {
)))
.is_err()
{
eyre::bail!("sync message sender channel is closed");
}
}
Expand Down Expand Up @@ -471,13 +474,13 @@ async fn process_version(
row_to_change,
)?;

let mut chunked = ChunkedChanges::new(
let chunked = ChunkedChanges::new(
rows,
*start_seq,
*end_seq,
MAX_CHANGES_PER_MESSAGE,
);
while let Some(changes_seqs) = chunked.next() {
for changes_seqs in chunked {
match changes_seqs {
Ok((changes, seqs)) => {
if let Err(_e) = sender.blocking_send(SyncMessage::V1(
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-agent/src/api/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn sub_by_id(agent: Agent, id: Uuid) -> hyper::Response<hyper::Body> {
init_tx.blocking_send(QueryEvent::Row(rowid, cells))?;
}

_ = init_tx.blocking_send(QueryEvent::EndOfQuery {
init_tx.blocking_send(QueryEvent::EndOfQuery {
time: elapsed.as_secs_f64(),
})?;

Expand Down Expand Up @@ -181,7 +181,7 @@ async fn process_sub_channel(
}
},
_ = check_ready.tick() => {
if let Err(_) = poll_fn(|cx| tx.poll_ready(cx)).await {
if poll_fn(|cx| tx.poll_ready(cx)).await.is_err() {
break;
}
continue;
Expand Down
6 changes: 3 additions & 3 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ impl TimerSpawner {
.unwrap();

std::thread::spawn({
let timer_tx = timer_tx.clone();
move || {
let local = LocalSet::new();

Expand Down Expand Up @@ -93,6 +92,7 @@ impl TimerSpawner {
}
}

#[allow(clippy::too_many_arguments)]
pub fn runtime_loop(
actor: Actor,
agent: Agent,
Expand Down Expand Up @@ -625,9 +625,9 @@ pub fn runtime_loop(
.states
.iter()
.filter_map(|(member_id, state)| {
(*member_id != actor_id).then(|| state.addr)
(*member_id != actor_id).then_some(state.addr)
})
.choose_multiple(&mut rng, member_count as usize)
.choose_multiple(&mut rng, member_count)
};

for addr in broadcast_to {
Expand Down
Loading

0 comments on commit f78776b

Please sign in to comment.