Skip to content

Commit

Permalink
enh(performance): incrementally cache the DB schema (#2707)
Browse files Browse the repository at this point in the history
* (squashed) implement incremental schema caching

* avoid cloning the table

* get rid of databaseTableSchema

* error if incompatible types

* improve tests

* use try_join_all

---------

Co-authored-by: Henry Fontanier <henry@dust.tt>
  • Loading branch information
fontanierh and Henry Fontanier authored Nov 30, 2023
1 parent a025a23 commit 29ac44e
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 280 deletions.
47 changes: 0 additions & 47 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2098,49 +2098,6 @@ async fn databases_rows_list(
}
}

async fn databases_schema_retrieve(
extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.load_database(&project, &data_source_id, &database_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database",
Some(e),
),
Ok(None) => error_response(
StatusCode::NOT_FOUND,
"database_not_found",
&format!("No database found for id `{}`", database_id),
None,
),
Ok(Some(db)) => match db.get_schema(state.store.clone()).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database schema",
Some(e),
),
Ok(schema) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"schema": schema
})),
}),
),
},
}
}

#[derive(serde::Deserialize)]
struct DatabaseQueryRunPayload {
query: String,
Expand Down Expand Up @@ -2395,10 +2352,6 @@ fn main() {
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id/rows",
get(databases_rows_list),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/schema",
get(databases_schema_retrieve),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/query",
post(databases_query_run),
Expand Down
8 changes: 4 additions & 4 deletions core/src/blocks/database_schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::blocks::block::{Block, BlockResult, BlockType, Env};
use crate::databases::database::DatabaseSchemaTable;
use crate::databases::database::DatabaseTable;
use crate::Rule;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Block for DatabaseSchema {
.into_iter()
.map(|t| {
json!({
"table_schema": t,
"table_schema": t.schema(),
"dbml": t.render_dbml(),
})
})
Expand All @@ -100,7 +100,7 @@ async fn get_database_schema(
data_source_id: &String,
database_id: &String,
env: &Env,
) -> Result<Vec<DatabaseSchemaTable>> {
) -> Result<Vec<DatabaseTable>> {
let project = get_data_source_project(workspace_id, data_source_id, env).await?;
let database = match env
.store
Expand All @@ -115,7 +115,7 @@ async fn get_database_schema(
))?,
};

match database.get_schema(env.store.clone()).await {
match database.get_tables(env.store.clone()).await {
Ok(s) => Ok(s),
Err(e) => Err(anyhow!(
"Error getting schema for database `{}` in data source `{}`: {}",
Expand Down
182 changes: 110 additions & 72 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::table_schema::TableSchema;
use crate::{project::Project, stores::store::Store, utils};
use anyhow::{anyhow, Result};
use futures::future::try_join_all;
use itertools::Itertools;
use rayon::prelude::*;
use rusqlite::{params_from_iter, Connection};
Expand Down Expand Up @@ -51,23 +52,30 @@ impl Database {
}
}

pub async fn get_schema(
pub async fn get_tables(
&self,
store: Box<dyn Store + Sync + Send>,
) -> Result<Vec<DatabaseSchemaTable>> {
) -> Result<Vec<DatabaseTable>> {
match self.db_type {
DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")),
DatabaseType::LOCAL => {
let rows = self.get_rows(store).await?;

rows.par_iter()
.map(|(table, r)| {
Ok(DatabaseSchemaTable::new(
table.clone(),
TableSchema::from_rows(&r)?,
))
let (tables, _) = store
.list_databases_tables(
&self.project,
&self.data_source_id,
&self.database_id,
None,
)
.await?;

Ok(tables
.into_iter()
// Ignore empty tables.
.filter_map(|t| match t.schema() {
None => None,
Some(_) => Some(t),
})
.collect::<Result<Vec<_>>>()
.collect::<Vec<_>>())
}
}
}
Expand All @@ -79,20 +87,53 @@ impl Database {
rows: Vec<DatabaseRow>,
truncate: bool,
) -> Result<()> {
// This will be used to update the schema incrementally once we store schemas. For now this
// is a way to validate the content of the rows (only primitive types).
let _ = TableSchema::from_rows(&rows)?;
let table = match store
.load_database_table(
&self.project,
&self.data_source_id,
&self.database_id,
table_id,
)
.await?
{
Some(t) => t,
None => Err(anyhow!(
"Table {} not found in database {}",
table_id,
self.database_id
))?,
};

let new_rows_table_schema = TableSchema::from_rows(&rows)?;
let table_schema = match table.schema() {
// If there is no existing schema cache, simply use the new schema.
None => new_rows_table_schema,
Some(existing_table_schema) => {
// If there is an existing schema cache, merge it with the new schema.
existing_table_schema.merge(&new_rows_table_schema)?
}
};

store
.batch_upsert_database_rows(
try_join_all(vec![
store.update_database_table_schema(
&self.project,
&self.data_source_id,
&self.database_id,
table_id,
&table_schema,
),
store.batch_upsert_database_rows(
&self.project,
&self.data_source_id,
&self.database_id,
table_id,
&rows,
truncate,
)
.await
),
])
.await?;

Ok(())
}

pub async fn create_in_memory_sqlite_conn(
Expand All @@ -106,7 +147,7 @@ impl Database {
DatabaseType::LOCAL => {
let time_build_db_start = utils::now();

let schema = self.get_schema(store.clone()).await?;
let tables = self.get_tables(store.clone()).await?;
utils::done(&format!(
"DSSTRUCTSTAT Finished retrieving schema: duration={}ms",
utils::now() - time_build_db_start
Expand All @@ -120,10 +161,18 @@ impl Database {
));

let generate_create_table_sql_start = utils::now();
let create_tables_sql: String = schema
.iter()
.filter(|s| !s.is_empty())
.map(|s| s.schema.get_create_table_sql_string(s.table.name()))
let create_tables_sql: String = tables
.into_iter()
.filter_map(|t| match t.schema() {
Some(s) => {
if s.is_empty() {
None
} else {
Some(s.get_create_table_sql_string(t.name()))
}
}
None => None,
})
.collect::<Vec<_>>()
.join("\n");
utils::done(&format!(
Expand All @@ -144,26 +193,22 @@ impl Database {
rows.iter()
.filter(|(_, rows)| !rows.is_empty())
.map(|(table, rows)| {
let table_schema =
match schema.iter().find(|s| s.table.name() == table.name()) {
Some(s) => Ok(s),
None => Err(anyhow!("No schema found for table {}", table.name())),
}?;

let (sql, field_names) = table_schema.schema.get_insert_sql(table.name());
if table.schema().is_none() {
Err(anyhow!("No schema found for table {}", table.name()))?;
}
let table_schema = table.schema().unwrap();
let (sql, field_names) = table_schema.get_insert_sql(table.name());
let mut stmt = conn.prepare(&sql)?;

rows.par_iter()
.map(
|r| match table_schema.schema.get_insert_params(&field_names, r) {
Ok(params) => Ok(params_from_iter(params)),
Err(e) => Err(anyhow!(
"Error getting insert params for row {}: {}",
r.row_id(),
e
)),
},
)
.map(|r| match table_schema.get_insert_params(&field_names, r) {
Ok(params) => Ok(params_from_iter(params)),
Err(e) => Err(anyhow!(
"Error getting insert params for row {}: {}",
r.row_id(),
e
)),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|params| match stmt.execute(params) {
Expand Down Expand Up @@ -337,6 +382,7 @@ pub struct DatabaseTable {
table_id: String,
name: String,
description: String,
schema: Option<TableSchema>,
}

impl DatabaseTable {
Expand All @@ -346,13 +392,15 @@ impl DatabaseTable {
table_id: &str,
name: &str,
description: &str,
schema: &Option<TableSchema>,
) -> Self {
DatabaseTable {
created: created,
database_id: database_id.to_string(),
table_id: table_id.to_string(),
name: name.to_string(),
description: description.to_string(),
schema: schema.clone(),
}
}

Expand All @@ -371,6 +419,25 @@ impl DatabaseTable {
pub fn description(&self) -> &str {
&self.description
}
pub fn schema(&self) -> Option<&TableSchema> {
self.schema.as_ref()
}

pub fn render_dbml(&self) -> String {
match self.schema {
None => format!("Table {} {{\n}}", self.name()),
Some(ref schema) => format!(
"Table {} {{\n{}\n\n Note: '{}'\n}}",
self.name(),
schema
.columns()
.iter()
.map(|c| format!(" {}", c.render_dbml()))
.join("\n"),
self.description()
),
}
}
}

pub trait HasValue {
Expand Down Expand Up @@ -413,43 +480,14 @@ impl HasValue for DatabaseResult {
}
}

#[derive(Debug, Serialize)]
pub struct DatabaseSchemaTable {
table: DatabaseTable,
schema: TableSchema,
}

impl DatabaseSchemaTable {
pub fn new(table: DatabaseTable, schema: TableSchema) -> Self {
DatabaseSchemaTable { table, schema }
}

pub fn is_empty(&self) -> bool {
self.schema.is_empty()
}

pub fn render_dbml(&self) -> String {
format!(
"Table {} {{\n{}\n\n Note: '{}'\n}}",
self.table.name(),
self.schema
.columns()
.iter()
.map(|c| format!(" {}", c.render_dbml()))
.join("\n"),
self.table.description()
)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::utils;
use serde_json::json;

#[test]
fn test_database_schema_table_to_dbml() -> Result<()> {
fn test_database_table_to_dbml() -> Result<()> {
let row_1 = json!({
"user_id": 1,
"temperature": 1.2,
Expand All @@ -475,8 +513,8 @@ mod tests {
"table_id",
"test_dbml",
"Test records for DBML rendering",
&Some(schema),
);
let table_schema = DatabaseSchemaTable::new(table, schema);

let expected = r#"Table test_dbml {
user_id integer [note: 'possible values: 1, 2']
Expand All @@ -488,7 +526,7 @@ mod tests {
Note: 'Test records for DBML rendering'
}"#
.to_string();
assert_eq!(table_schema.render_dbml(), expected);
assert_eq!(table.render_dbml(), expected);

Ok(())
}
Expand Down
Loading

0 comments on commit 29ac44e

Please sign in to comment.