From 29ac44e4ab664f5c228f360a95faafe1afa160bc Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 30 Nov 2023 17:37:01 +0100 Subject: [PATCH] enh(performance): incrementally cache the DB schema (#2707) * (squashed) implement incremental schema caching * avoid cloning the table * get rid of databaseTableSchema * error if incompatible types * improve tests * use try_join_all --------- Co-authored-by: Henry Fontanier --- core/bin/dust_api.rs | 47 ---- core/src/blocks/database_schema.rs | 8 +- core/src/databases/database.rs | 182 ++++++++----- core/src/databases/table_schema.rs | 255 +++++++++++++++++- core/src/stores/postgres.rs | 118 +++++++- core/src/stores/store.rs | 9 + front/lib/core_api.ts | 39 +-- .../[name]/databases/[dId]/query.ts | 4 +- .../[name]/databases/[dId]/schema.ts | 108 -------- 9 files changed, 490 insertions(+), 280 deletions(-) delete mode 100644 front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/schema.ts diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index 3c66ec00865c..83ed38d8e164 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -2098,49 +2098,6 @@ async fn databases_rows_list( } } -async fn databases_schema_retrieve( - extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>, - extract::Extension(state): extract::Extension>, -) -> (StatusCode, Json) { - let project = project::Project::new_from_id(project_id); - - match state - .store - .load_database(&project, &data_source_id, &database_id) - .await - { - Err(e) => error_response( - StatusCode::INTERNAL_SERVER_ERROR, - "internal_server_error", - "Failed to retrieve database", - Some(e), - ), - Ok(None) => error_response( - StatusCode::NOT_FOUND, - "database_not_found", - &format!("No database found for id `{}`", database_id), - None, - ), - Ok(Some(db)) => match db.get_schema(state.store.clone()).await { - Err(e) => error_response( - StatusCode::INTERNAL_SERVER_ERROR, - "internal_server_error", - "Failed to retrieve database schema", - Some(e), - ), - Ok(schema) => ( - StatusCode::OK, - Json(APIResponse { - error: None, - response: Some(json!({ - "schema": schema - })), - }), - ), - }, - } -} - #[derive(serde::Deserialize)] struct DatabaseQueryRunPayload { query: String, @@ -2395,10 +2352,6 @@ fn main() { "/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id/rows", get(databases_rows_list), ) - .route( - "/projects/:project_id/data_sources/:data_source_id/databases/:database_id/schema", - get(databases_schema_retrieve), - ) .route( "/projects/:project_id/data_sources/:data_source_id/databases/:database_id/query", post(databases_query_run), diff --git a/core/src/blocks/database_schema.rs b/core/src/blocks/database_schema.rs index a9534b2be6eb..0a10dbf97318 100644 --- a/core/src/blocks/database_schema.rs +++ b/core/src/blocks/database_schema.rs @@ -1,5 +1,5 @@ use crate::blocks::block::{Block, BlockResult, BlockType, Env}; -use crate::databases::database::DatabaseSchemaTable; +use crate::databases::database::DatabaseTable; use crate::Rule; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -76,7 +76,7 @@ impl Block for DatabaseSchema { .into_iter() .map(|t| { json!({ - "table_schema": t, + "table_schema": t.schema(), "dbml": t.render_dbml(), }) }) @@ -100,7 +100,7 @@ async fn get_database_schema( data_source_id: &String, database_id: &String, env: &Env, -) -> Result> { +) -> Result> { let project = get_data_source_project(workspace_id, data_source_id, env).await?; let database = match env .store @@ -115,7 +115,7 @@ async fn get_database_schema( ))?, }; - match database.get_schema(env.store.clone()).await { + match database.get_tables(env.store.clone()).await { Ok(s) => Ok(s), Err(e) => Err(anyhow!( "Error getting schema for database `{}` in data source `{}`: {}", diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 581636eb61cc..199c6274231b 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -1,6 +1,7 @@ use super::table_schema::TableSchema; use crate::{project::Project, stores::store::Store, utils}; use anyhow::{anyhow, Result}; +use futures::future::try_join_all; use itertools::Itertools; use rayon::prelude::*; use rusqlite::{params_from_iter, Connection}; @@ -51,23 +52,30 @@ impl Database { } } - pub async fn get_schema( + pub async fn get_tables( &self, store: Box, - ) -> Result> { + ) -> Result> { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { - let rows = self.get_rows(store).await?; - - rows.par_iter() - .map(|(table, r)| { - Ok(DatabaseSchemaTable::new( - table.clone(), - TableSchema::from_rows(&r)?, - )) + let (tables, _) = store + .list_databases_tables( + &self.project, + &self.data_source_id, + &self.database_id, + None, + ) + .await?; + + Ok(tables + .into_iter() + // Ignore empty tables. + .filter_map(|t| match t.schema() { + None => None, + Some(_) => Some(t), }) - .collect::>>() + .collect::>()) } } } @@ -79,20 +87,53 @@ impl Database { rows: Vec, truncate: bool, ) -> Result<()> { - // This will be used to update the schema incrementally once we store schemas. For now this - // is a way to validate the content of the rows (only primitive types). - let _ = TableSchema::from_rows(&rows)?; + let table = match store + .load_database_table( + &self.project, + &self.data_source_id, + &self.database_id, + table_id, + ) + .await? + { + Some(t) => t, + None => Err(anyhow!( + "Table {} not found in database {}", + table_id, + self.database_id + ))?, + }; + + let new_rows_table_schema = TableSchema::from_rows(&rows)?; + let table_schema = match table.schema() { + // If there is no existing schema cache, simply use the new schema. + None => new_rows_table_schema, + Some(existing_table_schema) => { + // If there is an existing schema cache, merge it with the new schema. + existing_table_schema.merge(&new_rows_table_schema)? + } + }; - store - .batch_upsert_database_rows( + try_join_all(vec![ + store.update_database_table_schema( + &self.project, + &self.data_source_id, + &self.database_id, + table_id, + &table_schema, + ), + store.batch_upsert_database_rows( &self.project, &self.data_source_id, &self.database_id, table_id, &rows, truncate, - ) - .await + ), + ]) + .await?; + + Ok(()) } pub async fn create_in_memory_sqlite_conn( @@ -106,7 +147,7 @@ impl Database { DatabaseType::LOCAL => { let time_build_db_start = utils::now(); - let schema = self.get_schema(store.clone()).await?; + let tables = self.get_tables(store.clone()).await?; utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", utils::now() - time_build_db_start @@ -120,10 +161,18 @@ impl Database { )); let generate_create_table_sql_start = utils::now(); - let create_tables_sql: String = schema - .iter() - .filter(|s| !s.is_empty()) - .map(|s| s.schema.get_create_table_sql_string(s.table.name())) + let create_tables_sql: String = tables + .into_iter() + .filter_map(|t| match t.schema() { + Some(s) => { + if s.is_empty() { + None + } else { + Some(s.get_create_table_sql_string(t.name())) + } + } + None => None, + }) .collect::>() .join("\n"); utils::done(&format!( @@ -144,26 +193,22 @@ impl Database { rows.iter() .filter(|(_, rows)| !rows.is_empty()) .map(|(table, rows)| { - let table_schema = - match schema.iter().find(|s| s.table.name() == table.name()) { - Some(s) => Ok(s), - None => Err(anyhow!("No schema found for table {}", table.name())), - }?; - - let (sql, field_names) = table_schema.schema.get_insert_sql(table.name()); + if table.schema().is_none() { + Err(anyhow!("No schema found for table {}", table.name()))?; + } + let table_schema = table.schema().unwrap(); + let (sql, field_names) = table_schema.get_insert_sql(table.name()); let mut stmt = conn.prepare(&sql)?; rows.par_iter() - .map( - |r| match table_schema.schema.get_insert_params(&field_names, r) { - Ok(params) => Ok(params_from_iter(params)), - Err(e) => Err(anyhow!( - "Error getting insert params for row {}: {}", - r.row_id(), - e - )), - }, - ) + .map(|r| match table_schema.get_insert_params(&field_names, r) { + Ok(params) => Ok(params_from_iter(params)), + Err(e) => Err(anyhow!( + "Error getting insert params for row {}: {}", + r.row_id(), + e + )), + }) .collect::>>()? .into_iter() .map(|params| match stmt.execute(params) { @@ -337,6 +382,7 @@ pub struct DatabaseTable { table_id: String, name: String, description: String, + schema: Option, } impl DatabaseTable { @@ -346,6 +392,7 @@ impl DatabaseTable { table_id: &str, name: &str, description: &str, + schema: &Option, ) -> Self { DatabaseTable { created: created, @@ -353,6 +400,7 @@ impl DatabaseTable { table_id: table_id.to_string(), name: name.to_string(), description: description.to_string(), + schema: schema.clone(), } } @@ -371,6 +419,25 @@ impl DatabaseTable { pub fn description(&self) -> &str { &self.description } + pub fn schema(&self) -> Option<&TableSchema> { + self.schema.as_ref() + } + + pub fn render_dbml(&self) -> String { + match self.schema { + None => format!("Table {} {{\n}}", self.name()), + Some(ref schema) => format!( + "Table {} {{\n{}\n\n Note: '{}'\n}}", + self.name(), + schema + .columns() + .iter() + .map(|c| format!(" {}", c.render_dbml())) + .join("\n"), + self.description() + ), + } + } } pub trait HasValue { @@ -413,35 +480,6 @@ impl HasValue for DatabaseResult { } } -#[derive(Debug, Serialize)] -pub struct DatabaseSchemaTable { - table: DatabaseTable, - schema: TableSchema, -} - -impl DatabaseSchemaTable { - pub fn new(table: DatabaseTable, schema: TableSchema) -> Self { - DatabaseSchemaTable { table, schema } - } - - pub fn is_empty(&self) -> bool { - self.schema.is_empty() - } - - pub fn render_dbml(&self) -> String { - format!( - "Table {} {{\n{}\n\n Note: '{}'\n}}", - self.table.name(), - self.schema - .columns() - .iter() - .map(|c| format!(" {}", c.render_dbml())) - .join("\n"), - self.table.description() - ) - } -} - #[cfg(test)] mod tests { use super::*; @@ -449,7 +487,7 @@ mod tests { use serde_json::json; #[test] - fn test_database_schema_table_to_dbml() -> Result<()> { + fn test_database_table_to_dbml() -> Result<()> { let row_1 = json!({ "user_id": 1, "temperature": 1.2, @@ -475,8 +513,8 @@ mod tests { "table_id", "test_dbml", "Test records for DBML rendering", + &Some(schema), ); - let table_schema = DatabaseSchemaTable::new(table, schema); let expected = r#"Table test_dbml { user_id integer [note: 'possible values: 1, 2'] @@ -488,7 +526,7 @@ mod tests { Note: 'Test records for DBML rendering' }"# .to_string(); - assert_eq!(table_schema.render_dbml(), expected); + assert_eq!(table.render_dbml(), expected); Ok(()) } diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index b5e673ef094f..5e3b84933b0b 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -1,5 +1,8 @@ +use std::collections::{HashMap, HashSet}; + use super::database::{DatabaseRow, HasValue}; use anyhow::{anyhow, Result}; +use itertools::Itertools; use rusqlite::{types::ToSqlOutput, ToSql}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -262,6 +265,89 @@ impl TableSchema { .collect::>>(), } } + + pub fn merge(&self, other: &Self) -> Result { + // Index our and other columns by name. + let our_columns_set = self.0.iter().map(|c| &c.name).collect::>(); + let other_columns_map = other + .0 + .iter() + .map(|c| (&c.name, c)) + .collect::>(); + + let merged_schema = self + .0 + .iter() + // Iterate over all of our columns. + .map(|our_column| { + let mut our_column = our_column.clone(); + + // If the other schema has a column with the same name, merge it into our column. + if let Some(other_column) = other_columns_map.get(&our_column.name) { + // If types are different, we need to merge them. + if our_column.value_type != other_column.value_type { + use TableSchemaFieldType::*; + our_column.value_type = + match (&our_column.value_type, &other_column.value_type) { + // Ints and Floats can be merged into Floats. + // Other types are incompatible. + (Int, Float) | (Float, Int) => TableSchemaFieldType::Float, + _ => Err(anyhow!( + "Cannot merge types {:?} and {:?}", + our_column.value_type, + other_column.value_type + ))?, + }; + } + + // Concatenate the unique possible values from both columns. + our_column.possible_values = match our_column + .possible_values + .as_ref() + .unwrap_or(&vec![]) + .iter() + .chain(other_column.possible_values.as_ref().unwrap_or(&vec![])) + .map(|v| v.to_string()) + .unique() + .enumerate() + .map(|(i, v)| { + // If the total number of possible values is too large, or if any of the values are + // too long, then we give up on possible values. + // If there are no possible values, then we set it to None. + if v.len() > POSSIBLE_VALUES_MAX_LEN || i >= POSSIBLE_VALUES_MAX_COUNT { + None + } else { + Some(v) + } + }) + .collect::>>() + { + None => None, + Some(possible_values) => { + if possible_values.is_empty() { + None + } else { + Some(possible_values) + } + } + } + } + + Ok(our_column) + }) + // Include all of the other columns that we don't have. + .chain(other.0.iter().filter_map(|other_column| { + if our_columns_set.contains(&other_column.name) { + // We already have this column. + None + } else { + Some(Ok(other_column.clone())) + } + })) + .collect::>>()?; + + Ok(TableSchema(merged_schema)) + } } #[cfg(test)] @@ -439,7 +525,6 @@ mod tests { let (sql, field_names) = schema.get_insert_sql("test_table"); let params_vec = schema.get_insert_params(&field_names, &row)?; - println!("{:?}", params_vec); let mut stmt = conn.prepare(&sql)?; @@ -530,6 +615,160 @@ mod tests { ]) } + #[test] + fn test_merge() -> Result<()> { + let test_cases = vec![ + // (col_schema1, col_schema2, expected_type, expected_possible_values) + // Test float / int merge. + ( + Some(create_test_column_with_values( + "float_int_merge", + TableSchemaFieldType::Int, + vec!["1", "2"], + )), + Some(create_test_column_with_values( + "float_int_merge", + TableSchemaFieldType::Float, + vec!["3.5", "4.5", "5"], + )), + TableSchemaFieldType::Float, + Some(vec!["1", "2", "3.5", "4.5", "5"]), + ), + // Test when our_column doesn't exist. + ( + None, + Some(create_test_column_with_values( + "other_column", + TableSchemaFieldType::Text, + vec!["The value is 1234", "The value is 5678", "12"], + )), + TableSchemaFieldType::Text, + Some(vec!["The value is 1234", "The value is 5678", "12"]), + ), + // Test when other_column doesn't exist. + ( + Some(create_test_column_with_values( + "our_column", + TableSchemaFieldType::Float, + vec!["1.2", "1.3"], + )), + None, + TableSchemaFieldType::Float, + Some(vec!["1.2", "1.3"]), + ), + // Test without possible values. + ( + Some(TableSchemaColumn::new( + "no_possible_values", + TableSchemaFieldType::Text, + )), + Some(TableSchemaColumn::new( + "no_possible_values", + TableSchemaFieldType::Text, + )), + TableSchemaFieldType::Text, + None, + ), + // Test when only other column has possible values. + ( + None, + Some(create_test_column_with_values( + "other_column_possible_values", + TableSchemaFieldType::Text, + vec!["The value is 1234", "The value is 5678", "12"], + )), + TableSchemaFieldType::Text, + Some(vec!["The value is 1234", "The value is 5678", "12"]), + ), + // Test when only our column has possible values. + ( + Some(create_test_column_with_values( + "our_column_possible_values", + TableSchemaFieldType::Text, + vec!["The value is 1234", "The value is 5678", "12"], + )), + None, + TableSchemaFieldType::Text, + Some(vec!["The value is 1234", "The value is 5678", "12"]), + ), + // Test case for possible values going over max count. + ( + Some(create_test_column_with_values( + "going_over_max_count", + TableSchemaFieldType::Int, + (1..=POSSIBLE_VALUES_MAX_COUNT) + .map(|i| i.to_string()) + .collect::>() + .iter() + .map(AsRef::as_ref) + .collect(), + )), + Some(create_test_column_with_values( + "going_over_max_count", + TableSchemaFieldType::Int, + vec!["that's one too many"], + )), + TableSchemaFieldType::Int, + None, + ), + // Test case for possible values going over max length. + ( + Some(create_test_column_with_values( + "going_over_max_length", + TableSchemaFieldType::Text, + vec!["hello"], + )), + Some(create_test_column_with_values( + "going_over_max_length", + TableSchemaFieldType::Text, + vec![&"a".repeat(POSSIBLE_VALUES_MAX_LEN + 1)], + )), + TableSchemaFieldType::Text, + None, + ), + ]; + + let mut schema1_columns = Vec::new(); + let mut schema2_columns = Vec::new(); + + for (col1, col2, _, _) in test_cases.iter() { + if col1.is_some() { + schema1_columns.push(col1.clone().unwrap()); + } + if col2.is_some() { + schema2_columns.push(col2.clone().unwrap()); + } + } + + let schema1 = TableSchema(schema1_columns); + let schema2 = TableSchema(schema2_columns); + + let merged_schema = schema1.merge(&schema2)?; + + for (col_1, col_2, expected_type, expected_values) in test_cases.into_iter() { + let field_name = col_1 + .map(|c| c.name) + .or_else(|| col_2.map(|c| c.name)) + .unwrap(); + + let column = merged_schema + .columns() + .iter() + .find(|c| c.name == field_name) + .expect(&format!("Column {} not found", field_name)); + + assert_eq!(column.value_type, expected_type, "{}", field_name); + assert_eq!( + column.possible_values, + expected_values.map(|vals| vals.into_iter().map(|v| v.to_string()).collect()), + "{}", + field_name + ); + } + + Ok(()) + } + // Helper function to set up an in-memory database with a test table fn setup_in_memory_db(schema: &TableSchema) -> Result { let conn = Connection::open_in_memory()?; @@ -537,4 +776,18 @@ mod tests { conn.execute(&sql_create_table, [])?; Ok(conn) } + + // Helper function to create a test column with possible values + fn create_test_column_with_values( + name: &str, + value_type: TableSchemaFieldType, + values: Vec<&str>, + ) -> TableSchemaColumn { + let possible_values = values.into_iter().map(|v| v.to_string()).collect(); + TableSchemaColumn { + name: name.to_string(), + value_type, + possible_values: Some(possible_values), + } + } } diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index 3ad27ac470dd..b02c552bc051 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -4,6 +4,7 @@ use crate::data_sources::data_source::{ DataSource, DataSourceConfig, Document, DocumentVersion, SearchFilter, }; use crate::databases::database::{Database, DatabaseRow, DatabaseTable}; +use crate::databases::table_schema::TableSchema; use crate::dataset::Dataset; use crate::http::request::{HttpRequest, HttpResponse}; use crate::project::Project; @@ -1943,7 +1944,7 @@ impl Store for PostgresStore { let pool = self.pool.clone(); let c = pool.get().await?; - // get the data source row id + // Get the data source row id. let stmt = c .prepare( "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", @@ -1956,7 +1957,7 @@ impl Store for PostgresStore { _ => unreachable!(), }; - // get the database row id + // Get the database row id. let stmt = c .prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1") .await?; @@ -2003,9 +2004,66 @@ impl Store for PostgresStore { &table_id, &table_name, &table_description, + &None, )) } + async fn update_database_table_schema( + &self, + project: &Project, + data_source_id: &str, + database_id: &str, + table_id: &str, + schema: &TableSchema, + ) -> Result<()> { + let project_id = project.project_id(); + let data_source_id = data_source_id.to_string(); + let database_id = database_id.to_string(); + let table_id = table_id.to_string(); + + let pool = self.pool.clone(); + let c = pool.get().await?; + + // Get the data source row id. + let stmt = c + .prepare( + "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", + ) + .await?; + let r = c.query(&stmt, &[&project_id, &data_source_id]).await?; + let data_source_row_id: i64 = match r.len() { + 0 => Err(anyhow!("Unknown DataSource: {}", data_source_id))?, + 1 => r[0].get(0), + _ => unreachable!(), + }; + + // Get the database row id. + let stmt = c + .prepare("SELECT id FROM databases WHERE data_source = $1 AND database_id = $2 LIMIT 1") + .await?; + let r = c.query(&stmt, &[&data_source_row_id, &database_id]).await?; + let database_row_id: i64 = match r.len() { + 0 => Err(anyhow!("Unknown Database: {}", database_id))?, + 1 => r[0].get(0), + _ => unreachable!(), + }; + + // Update the schema. + let stmt = c + .prepare( + "UPDATE databases_tables SET schema = $1 \ + WHERE database = $2 AND table_id = $3", + ) + .await?; + c.query( + &stmt, + &[&serde_json::to_string(schema)?, &database_row_id, &table_id], + ) + .await?; + + Ok(()) + } + async fn load_database_table( &self, project: &Project, @@ -2023,7 +2081,7 @@ impl Store for PostgresStore { let stmt = c .prepare( - "SELECT created, table_id, name, description FROM databases_tables \ + "SELECT created, table_id, name, description, schema FROM databases_tables \ WHERE database IN ( SELECT id FROM databases WHERE data_source IN ( SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 @@ -2039,21 +2097,40 @@ impl Store for PostgresStore { ) .await?; - let d: Option<(i64, String, String, String)> = match r.len() { + let d: Option<(i64, String, String, String, Option)> = match r.len() { 0 => None, - 1 => Some((r[0].get(0), r[0].get(1), r[0].get(2), r[0].get(3))), + 1 => Some(( + r[0].get(0), + r[0].get(1), + r[0].get(2), + r[0].get(3), + r[0].get(4), + )), _ => unreachable!(), }; match d { None => Ok(None), - Some((created, table_id, name, description)) => Ok(Some(DatabaseTable::new( - created as u64, - &database_id, - &table_id, - &name, - &description, - ))), + Some((created, table_id, name, description, schema)) => { + let parsed_schema: Option = match schema { + None => None, + Some(schema) => { + if schema.is_empty() { + None + } else { + Some(serde_json::from_str(&schema)?) + } + } + }; + Ok(Some(DatabaseTable::new( + created as u64, + &database_id, + &table_id, + &name, + &description, + &parsed_schema, + ))) + } } } @@ -2103,7 +2180,7 @@ impl Store for PostgresStore { None => { let stmt = c .prepare( - "SELECT created, table_id, name, description FROM databases_tables \ + "SELECT created, table_id, name, description, schema FROM databases_tables \ WHERE database = $1", ) .await?; @@ -2112,7 +2189,7 @@ impl Store for PostgresStore { Some((limit, offset)) => { let stmt = c .prepare( - "SELECT created, table_id, name, description FROM databases_tables \ + "SELECT created, table_id, name, description, schema FROM databases_tables \ WHERE database = $1 LIMIT $2 OFFSET $3", ) .await?; @@ -2131,6 +2208,18 @@ impl Store for PostgresStore { let table_id: String = r.get(1); let name: String = r.get(2); let description: String = r.get(3); + let schema: Option = r.get(4); + + let parsed_schema: Option = match schema { + None => None, + Some(schema) => { + if schema.is_empty() { + None + } else { + Some(serde_json::from_str(&schema)?) + } + } + }; Ok(DatabaseTable::new( created as u64, @@ -2138,6 +2227,7 @@ impl Store for PostgresStore { &table_id, &name, &description, + &parsed_schema, )) }) .collect::>>()?; diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index a71a067164b3..3865380e37c1 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -3,6 +3,7 @@ use crate::data_sources::data_source::{ DataSource, DataSourceConfig, Document, DocumentVersion, SearchFilter, }; use crate::databases::database::{Database, DatabaseRow, DatabaseTable}; +use crate::databases::table_schema::TableSchema; use crate::dataset::Dataset; use crate::http::request::{HttpRequest, HttpResponse}; use crate::project::Project; @@ -181,6 +182,14 @@ pub trait Store { name: &str, description: &str, ) -> Result; + async fn update_database_table_schema( + &self, + project: &Project, + data_source_id: &str, + database_id: &str, + table_id: &str, + schema: &TableSchema, + ) -> Result<()>; async fn load_database_table( &self, project: &Project, diff --git a/front/lib/core_api.ts b/front/lib/core_api.ts index b9c523a75d8d..282e8148d1d9 100644 --- a/front/lib/core_api.ts +++ b/front/lib/core_api.ts @@ -83,12 +83,18 @@ export type CoreAPIDatabase = { name: string; }; +export type CoreAPIDatabaseTableSchema = Record< + string, + "int" | "float" | "text" | "bool" +>; + export type CoreAPIDatabaseTable = { created: number; database_id: string; table_id: string; name: string; description: string; + schema: CoreAPIDatabaseTableSchema; }; export type CoreAPIDatabaseRow = { @@ -100,14 +106,6 @@ export type CoreAPIDatabaseResult = { value: Record; }; -export type CoreAPIDatabaseSchema = Record< - string, - { - table: CoreAPIDatabaseTable; - schema: Record; - } ->; - export const CoreAPI = { async createProject(): Promise> { const response = await fetch(`${CORE_API}/projects`, { @@ -1044,29 +1042,6 @@ export const CoreAPI = { return _resultFromResponse(response); }, - async getDatabaseSchema({ - projectId, - dataSourceName, - databaseId, - }: { - projectId: string; - dataSourceName: string; - databaseId: string; - }): Promise< - CoreAPIResponse<{ - schema: CoreAPIDatabaseSchema; - }> - > { - const response = await fetch( - `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/schema`, - { - method: "GET", - } - ); - - return _resultFromResponse(response); - }, - async queryDatabase({ projectId, dataSourceName, @@ -1079,7 +1054,7 @@ export const CoreAPI = { query: string; }): Promise< CoreAPIResponse<{ - schema: CoreAPIDatabaseSchema; + schema: CoreAPIDatabaseTableSchema; results: CoreAPIDatabaseResult[]; }> > { diff --git a/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/query.ts b/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/query.ts index 59d01c1947e0..e7fefc6b158a 100644 --- a/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/query.ts +++ b/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/query.ts @@ -8,7 +8,7 @@ import { Authenticator, getAPIKey } from "@app/lib/auth"; import { CoreAPI, CoreAPIDatabaseResult, - CoreAPIDatabaseSchema, + CoreAPIDatabaseTableSchema, } from "@app/lib/core_api"; import { isDevelopmentOrDustWorkspace } from "@app/lib/development"; import logger from "@app/logger/logger"; @@ -19,7 +19,7 @@ const GetDatabaseSchemaReqBodySchema = t.type({ }); type QueryDatabaseSchemaResponseBody = { - schema: CoreAPIDatabaseSchema; + schema: CoreAPIDatabaseTableSchema; results: CoreAPIDatabaseResult[]; }; diff --git a/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/schema.ts b/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/schema.ts deleted file mode 100644 index d5a5bd6a18b0..000000000000 --- a/front/pages/api/v1/w/[wId]/data_sources/[name]/databases/[dId]/schema.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { NextApiRequest, NextApiResponse } from "next"; - -import { getDataSource } from "@app/lib/api/data_sources"; -import { Authenticator, getAPIKey } from "@app/lib/auth"; -import { CoreAPI, CoreAPIDatabaseSchema } from "@app/lib/core_api"; -import { isDevelopmentOrDustWorkspace } from "@app/lib/development"; -import logger from "@app/logger/logger"; -import { apiError, withLogging } from "@app/logger/withlogging"; - -type GetDatabaseSchemaResponseBody = { - schema: CoreAPIDatabaseSchema; -}; - -async function handler( - req: NextApiRequest, - res: NextApiResponse -): Promise { - const keyRes = await getAPIKey(req); - if (keyRes.isErr()) { - return apiError(req, res, keyRes.error); - } - - const { auth } = await Authenticator.fromKey( - keyRes.value, - req.query.wId as string - ); - - const owner = auth.workspace(); - const plan = auth.plan(); - if (!owner || !plan) { - return apiError(req, res, { - status_code: 404, - api_error: { - type: "workspace_not_found", - message: "The workspace you requested was not found.", - }, - }); - } - - if (!isDevelopmentOrDustWorkspace(owner)) { - res.status(404).end(); - return; - } - - const dataSource = await getDataSource(auth, req.query.name as string); - if (!dataSource) { - return apiError(req, res, { - status_code: 404, - api_error: { - type: "data_source_not_found", - message: "The data source you requested was not found.", - }, - }); - } - - const databaseId = req.query.dId; - if (!databaseId || typeof databaseId !== "string") { - return apiError(req, res, { - status_code: 400, - api_error: { - type: "invalid_request_error", - message: "Invalid request query: id is required.", - }, - }); - } - - switch (req.method) { - case "GET": - const schemaRes = await CoreAPI.getDatabaseSchema({ - projectId: dataSource.dustAPIProjectId, - dataSourceName: dataSource.name, - databaseId, - }); - if (schemaRes.isErr()) { - logger.error( - { - dataSourceName: dataSource.name, - workspaceId: owner.id, - databaseId, - error: schemaRes.error, - }, - "Failed to get database schema." - ); - return apiError(req, res, { - status_code: 500, - api_error: { - type: "internal_server_error", - message: "Failed to get database schema.", - }, - }); - } - - const { schema } = schemaRes.value; - - return res.status(200).json({ schema }); - - default: - return apiError(req, res, { - status_code: 405, - api_error: { - type: "method_not_supported_error", - message: "The method passed is not supported, GET is expected.", - }, - }); - } -} - -export default withLogging(handler);