Skip to content

Commit

Permalink
instrument mutation queries with SQL perf counters
Browse files Browse the repository at this point in the history
Summary:
instrument mutation queries with SQL perf counters

this will give us correct scuba logging

increment counters before sql queries

==== this is a sev follow up task ====

Reviewed By: markbt

Differential Revision: D54362984

fbshipit-source-id: 898df4b53faec650385e0b20b045023f087186cd
  • Loading branch information
Liubov Dmitrieva authored and facebook-github-bot committed Mar 1, 2024
1 parent bb37cf9 commit 724c142
Showing 1 changed file with 53 additions and 12 deletions.
65 changes: 53 additions & 12 deletions eden/mononoke/mercurial/mutation/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ impl SqlHgMutationStore {
})
.collect();

ctx.perf_counters()
.add_to_counter(PerfCounterType::SqlWrites, 4);
if let Some(cri) = ctx.metadata().client_request_info() {
let (txn, _) =
AddChangesets::traced_query_with_transaction(txn, cri, db_csets.as_slice()).await?;
Expand Down Expand Up @@ -261,14 +263,20 @@ impl SqlHgMutationStore {
&self,
ctx: &CoreContext,
changeset_ids: &HashSet<HgChangesetId>,
) -> Result<&Connection> {
) -> Result<(&Connection, PerfCounterType)> {
// Check if the replica is up-to-date with respect to all changesets we
// are interested in.
let changeset_ids: Vec<_> = changeset_ids.iter().collect();
if changeset_ids.is_empty() {
// There are no interesting changesets, so just use the replica.
return Ok(&self.connections.read_connection);
return Ok((
&self.connections.read_connection,
PerfCounterType::SqlReadsReplica,
));
}

ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
let count = match ctx.metadata().client_request_info() {
Some(cri) => {
CountChangesets::traced_query(
Expand All @@ -288,22 +296,30 @@ impl SqlHgMutationStore {
.await
}
}?;

if let Some((count,)) = count.into_iter().next() {
if count as usize == changeset_ids.len() {
// The replica knows all of the changesets, so use it.
return Ok(&self.connections.read_connection);
return Ok((
&self.connections.read_connection,
PerfCounterType::SqlReadsReplica,
));
}
}
// The replica doesn't know all of the changesets, use the connection to
// the master.
Ok(&self.connections.read_master_connection)
Ok((
&self.connections.read_master_connection,
PerfCounterType::SqlReadsMaster,
))
}

/// Collect entries from the database into an entry set.
async fn collect_entries<I>(
&self,
ctx: &CoreContext,
connection: &Connection,
sql_perf_counter: PerfCounterType,
entry_set: &mut HgMutationEntrySet,
rows: I,
) -> Result<()>
Expand Down Expand Up @@ -362,6 +378,7 @@ impl SqlHgMutationStore {
})?;
}
if !to_fetch_split.is_empty() {
ctx.perf_counters().increment_counter(sql_perf_counter);
let rows = match ctx.metadata().client_request_info() {
Some(cri) => {
SelectSplitsBySuccessor::traced_query(
Expand Down Expand Up @@ -401,6 +418,7 @@ impl SqlHgMutationStore {
&self,
ctx: &CoreContext,
connection: &Connection,
sql_perf_counter: PerfCounterType,
entry_set: &mut HgMutationEntrySet,
changesets: &HashSet<HgChangesetId>,
) -> Result<()> {
Expand All @@ -415,6 +433,7 @@ impl SqlHgMutationStore {

let cri = ctx.metadata().client_request_info();
let chunk_rows = stream::iter(chunks.into_iter().map(move |chunk| async move {
ctx.perf_counters().increment_counter(sql_perf_counter);
match cri {
Some(cri) => {
SelectBySuccessor::traced_query(
Expand All @@ -433,8 +452,14 @@ impl SqlHgMutationStore {
.try_collect::<Vec<_>>()
.await?;

self.collect_entries(ctx, connection, entry_set, chunk_rows.into_iter().flatten())
.await?;
self.collect_entries(
ctx,
connection,
sql_perf_counter,
entry_set,
chunk_rows.into_iter().flatten(),
)
.await?;
Ok(())
}

Expand All @@ -443,6 +468,7 @@ impl SqlHgMutationStore {
&self,
ctx: &CoreContext,
connection: &Connection,
sql_perf_counter: PerfCounterType,
entry_set: &mut HgMutationEntrySet,
) -> Result<()> {
if entry_set.entries.is_empty() {
Expand All @@ -458,6 +484,7 @@ impl SqlHgMutationStore {

let cri = ctx.metadata().client_request_info();
let rows = stream::iter(chunks.into_iter().map(|changesets| async move {
ctx.perf_counters().increment_counter(sql_perf_counter);
match cri {
Some(cri) => {
SelectBySuccessorChain::traced_query(
Expand Down Expand Up @@ -485,8 +512,14 @@ impl SqlHgMutationStore {
.try_collect::<Vec<_>>()
.await?;

self.collect_entries(ctx, connection, entry_set, rows.into_iter().flatten())
.await?;
self.collect_entries(
ctx,
connection,
sql_perf_counter,
entry_set,
rows.into_iter().flatten(),
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -537,6 +570,7 @@ impl HgMutationStore for SqlHgMutationStore {
self.fetch_by_successor(
ctx,
&self.connections.read_master_connection,
PerfCounterType::SqlReadsMaster,
&mut entry_set,
&changeset_ids,
)
Expand Down Expand Up @@ -587,6 +621,7 @@ impl HgMutationStore for SqlHgMutationStore {
self.fetch_by_successor(
ctx,
&self.connections.read_master_connection,
PerfCounterType::SqlReadsMaster,
&mut entry_set,
&predecessor_ids,
)
Expand Down Expand Up @@ -620,12 +655,18 @@ impl HgMutationStore for SqlHgMutationStore {
}

let mut entry_set = HgMutationEntrySet::new();
let connection = self
let (connection, sql_perf_counter) = self
.read_connection_for_changesets(ctx, &changeset_ids)
.await?;
self.fetch_by_successor(ctx, connection, &mut entry_set, &changeset_ids)
.await?;
self.fetch_all_predecessors(ctx, connection, &mut entry_set)
self.fetch_by_successor(
ctx,
connection,
sql_perf_counter,
&mut entry_set,
&changeset_ids,
)
.await?;
self.fetch_all_predecessors(ctx, connection, sql_perf_counter, &mut entry_set)
.await?;
let changeset_count = changeset_ids.len();
let entries = entry_set.into_all_predecessors_by_changeset(changeset_ids);
Expand Down

0 comments on commit 724c142

Please sign in to comment.