diff --git a/core/Cargo.lock b/core/Cargo.lock index ac3b76e2f37d..9d57916f96b6 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -245,7 +245,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -293,7 +293,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -310,7 +310,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -674,7 +674,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -864,7 +864,7 @@ checksum = "3c65c2ffdafc1564565200967edc4851c7b55422d3913466688907efd05ea26f" dependencies = [ "deno-proc-macro-rules-macros", "proc-macro2", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -876,7 +876,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -922,7 +922,7 @@ dependencies = [ "strum", "strum_macros", "syn 1.0.109", - "syn 2.0.38", + "syn 2.0.48", "thiserror", ] @@ -1055,6 +1055,7 @@ dependencies = [ "serde_json", "shellexpand", "tera", + "thiserror", "tokio", "tokio-postgres", "tokio-stream", @@ -1284,7 +1285,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -2051,7 +2052,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -2218,7 +2219,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -2287,7 +2288,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -2327,7 +2328,7 @@ checksum = "52a40bc70c2c58040d2d8b167ba9a5ff59fc9dab7ad44771cfde3dcfde7a09c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -2399,9 +2400,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -2456,9 +2457,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -2931,7 +2932,7 @@ checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -3137,7 +3138,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -3159,9 +3160,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.38" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -3232,22 +3233,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -3342,7 +3343,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -3577,7 +3578,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] @@ -3883,7 +3884,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -3917,7 +3918,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4158,7 +4159,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.48", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index 2223c9ade53d..d7732842b1eb 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -64,3 +64,4 @@ async-recursion = "1.0" chrono = "0.4.31" yup-oauth2 = "8.3.0" datadog-formatting-layer = "1.1" +thiserror = "1.0.57" diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index 84ada6baf705..749db91fd396 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -18,15 +18,13 @@ use dust::{ data_source::{self, SearchFilter, Section}, qdrant::QdrantClients, }, - databases::database::{query_database, Row, Table}, - databases_store::{store as databases_store, store::DatabasesStore}, - dataset, - project::{self}, + databases::database::{query_database, QueryDatabaseError, Row, Table}, + databases_store::store::{self as databases_store, DatabasesStore}, + dataset, project, providers::provider::{provider, ProviderID}, run, sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS}, - stores::postgres, - stores::store, + stores::{postgres, store}, utils::{error_response, APIError, APIResponse}, }; use futures::future::try_join_all; @@ -2254,11 +2252,17 @@ async fn databases_query_run( } Some(tables) => { match query_database(&tables, state.store.clone(), &payload.query).await { + Err(QueryDatabaseError::TooManyResultRows) => error_response( + StatusCode::BAD_REQUEST, + "too_many_result_rows", + "The query returned too many rows", + None, + ), Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", "Failed to run query", - Some(e), + Some(e.into()), ), Ok((results, schema)) => ( StatusCode::OK, diff --git a/core/src/blocks/database.rs b/core/src/blocks/database.rs index 7daf2b27329c..fe8c8095b0c8 100644 --- a/core/src/blocks/database.rs +++ b/core/src/blocks/database.rs @@ -1,5 +1,5 @@ use crate::blocks::block::{parse_pair, Block, BlockResult, BlockType, Env}; -use crate::databases::database::query_database; +use crate::databases::database::{query_database, QueryDatabaseError}; use crate::Rule; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -103,7 +103,14 @@ impl Block for Database { let query = replace_variables_in_string(&self.query, "query", env)?; let tables = load_tables_from_identifiers(&table_identifiers, env).await?; - let (results, schema) = query_database(&tables, env.store.clone(), &query).await?; + let (results, schema) = query_database(&tables, env.store.clone(), &query) + .await + .map_err(|e| match &e { + // We don't have a proper way to return a typed error from a block, so we just return a generic error with a string. + // We expect the frontend to match the error code. + QueryDatabaseError::TooManyResultRows => anyhow!("too_many_result_rows"), + _ => e.into(), + })?; Ok(BlockResult { value: json!({ diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index beee2a617bd1..f8ecb55089dd 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -2,7 +2,7 @@ use super::table_schema::TableSchema; use crate::{ databases_store::store::DatabasesStore, project::Project, - sqlite_workers::client::{SqliteWorker, HEARTBEAT_INTERVAL_MS}, + sqlite_workers::client::{SqliteWorker, SqliteWorkerError, HEARTBEAT_INTERVAL_MS}, stores::store::Store, utils, }; @@ -11,6 +11,7 @@ use futures::future::try_join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::Value; +use thiserror::Error; use tracing::info; #[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)] @@ -29,11 +30,31 @@ impl ToString for DatabaseType { } } +#[derive(Debug, Error)] +pub enum QueryDatabaseError { + #[error("{0}")] + GenericError(#[from] anyhow::Error), + #[error("Too many result rows")] + TooManyResultRows, +} + +impl From for QueryDatabaseError { + fn from(e: SqliteWorkerError) -> Self { + match &e { + SqliteWorkerError::ServerError(code, _) => match code.as_str() { + "too_many_result_rows" => QueryDatabaseError::TooManyResultRows, + _ => e.into(), + }, + _ => e.into(), + } + } +} + pub async fn query_database( tables: &Vec, store: Box, query: &str, -) -> Result<(Vec, TableSchema)> { +) -> Result<(Vec, TableSchema), QueryDatabaseError> { let table_ids_hash = tables.iter().map(|t| t.unique_id()).sorted().join("/"); let database = store .upsert_database(&table_ids_hash, HEARTBEAT_INTERVAL_MS) diff --git a/core/src/sqlite_workers/client.rs b/core/src/sqlite_workers/client.rs index 2f5a998eb123..65166f18aff5 100644 --- a/core/src/sqlite_workers/client.rs +++ b/core/src/sqlite_workers/client.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use hyper::{body::Bytes, Body, Client, Request}; use serde::{Deserialize, Serialize}; use serde_json::json; +use thiserror::Error; use urlencoding::encode; use crate::{ @@ -17,30 +18,31 @@ pub struct SqliteWorker { url: String, } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum SqliteWorkerError { - ClientError(anyhow::Error), - ServerError(anyhow::Error, Option, u16), + #[error("SqliteWorkerError Server error (code={0}, status={1})")] + ServerError(String, u16), + #[error("SqliteWorkerError Unexpected error: {0}")] + UnexpectedError(anyhow::Error), } -impl std::fmt::Display for SqliteWorkerError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Self::ClientError(e) => write!(f, "SqliteWorkerError: Client error: {}", e), - Self::ServerError(e, code, status) => { - write!( - f, - "SqliteWorkerError (code={}, status={}): Server error: {}", - code.clone().unwrap_or_default(), - status, - e - ) - } - } +impl From for SqliteWorkerError { + fn from(e: hyper::http::Error) -> Self { + SqliteWorkerError::UnexpectedError(anyhow!(e)) + } +} + +impl From for SqliteWorkerError { + fn from(e: hyper::Error) -> Self { + SqliteWorkerError::UnexpectedError(anyhow!(e)) } } -impl std::error::Error for SqliteWorkerError {} +impl From for SqliteWorkerError { + fn from(e: serde_json::Error) -> Self { + SqliteWorkerError::UnexpectedError(anyhow!(e)) + } +} impl SqliteWorker { pub fn new(url: String, last_heartbeat: u64) -> Self { @@ -83,14 +85,9 @@ impl SqliteWorker { "query": query, }) .to_string(), - )) - .map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to build request: {}", e)) - })?; + ))?; - let res = Client::new().request(req).await.map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to execute request: {}", e)) - })?; + let res = Client::new().request(req).await?; let body_bytes = get_response_body(res).await?; @@ -100,19 +97,15 @@ impl SqliteWorker { response: Option>, } - let body: ExecuteQueryResponseBody = serde_json::from_slice(&body_bytes).map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to parse response: {}", e)) - })?; + let body: ExecuteQueryResponseBody = serde_json::from_slice(&body_bytes)?; match body.error { - Some(e) => Err(SqliteWorkerError::ServerError(anyhow!(e), None, 200))?, + Some(e) => Err(SqliteWorkerError::UnexpectedError(anyhow!(e)))?, None => match body.response { Some(r) => Ok(r), - None => Err(SqliteWorkerError::ServerError( - anyhow!("No response in body"), - None, - 200, - ))?, + None => Err(SqliteWorkerError::UnexpectedError(anyhow!( + "No response in body" + )))?, }, } } @@ -126,14 +119,9 @@ impl SqliteWorker { let req = Request::builder() .method("DELETE") .uri(format!("{}/databases/{}", worker_url, database_unique_id)) - .body(Body::from("")) - .map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to build request: {}", e)) - })?; + .body(Body::from(""))?; - let res = Client::new().request(req).await.map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to execute request: {}", e)) - })?; + let res = Client::new().request(req).await?; let _ = get_response_body(res).await?; @@ -146,14 +134,9 @@ impl SqliteWorker { let req = Request::builder() .method("DELETE") .uri(format!("{}/databases", worker_url)) - .body(Body::from("")) - .map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to build request: {}", e)) - })?; - - let res = Client::new().request(req).await.map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to execute request: {}", e)) - })?; + .body(Body::from(""))?; + + let res = Client::new().request(req).await?; let _ = get_response_body(res).await?; Ok(()) @@ -162,16 +145,12 @@ impl SqliteWorker { async fn get_response_body(res: hyper::Response) -> Result { let status = res.status().as_u16(); - let body = hyper::body::to_bytes(res.into_body()) - .await - .map_err(|e| SqliteWorkerError::ClientError(anyhow!("Failed to read response: {}", e)))?; + let body = hyper::body::to_bytes(res.into_body()).await?; match status { 200 => Ok(body), s => { - let body_json: serde_json::Value = serde_json::from_slice(&body).map_err(|e| { - SqliteWorkerError::ClientError(anyhow!("Failed to parse response: {}", e)) - })?; + let body_json: serde_json::Value = serde_json::from_slice(&body)?; let error = body_json.get("error"); let error_code = match error { Some(e) => e @@ -181,11 +160,13 @@ async fn get_response_body(res: hyper::Response) -> Result None, }; - Err(SqliteWorkerError::ServerError( - anyhow!("Received error response from SQLite worker",), - error_code, - s, - ))? + match error_code { + Some(code) => Err(SqliteWorkerError::ServerError(code.to_string(), s))?, + None => Err(SqliteWorkerError::UnexpectedError(anyhow!( + "Received unexpected error response with status {} from SQLite worker", + s + )))?, + } } } } diff --git a/core/src/sqlite_workers/sqlite_database.rs b/core/src/sqlite_workers/sqlite_database.rs index c800512956fc..501672a46c5a 100644 --- a/core/src/sqlite_workers/sqlite_database.rs +++ b/core/src/sqlite_workers/sqlite_database.rs @@ -10,6 +10,7 @@ use futures::future::try_join_all; use parking_lot::Mutex; use rayon::prelude::*; use rusqlite::{params_from_iter, Connection, InterruptHandle}; +use thiserror::Error; use tokio::{task, time::timeout}; use tracing::info; @@ -19,24 +20,25 @@ pub struct SqliteDatabase { interrupt_handle: Option>>, } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum QueryError { + #[error("Query returned more than {0} rows")] ExceededMaxRows(usize), + #[error("Query execution error: {0}")] QueryExecutionError(anyhow::Error), } -impl std::fmt::Display for QueryError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - QueryError::ExceededMaxRows(limit) => { - write!(f, "Query returned more than {} rows", limit) - } - QueryError::QueryExecutionError(e) => write!(f, "Query execution error: {}", e), - } +impl From for QueryError { + fn from(e: rusqlite::Error) -> Self { + QueryError::QueryExecutionError(anyhow!(e)) } } -impl std::error::Error for QueryError {} +impl From for QueryError { + fn from(e: anyhow::Error) -> Self { + QueryError::QueryExecutionError(e) + } +} const MAX_ROWS: usize = 128; @@ -136,15 +138,9 @@ impl SqliteDatabase { )) }) .collect::>() - }) - // At this point we have a result (from the query_and_then fn itself) of results (for each - // individual row parsing). We wrap the potential top-level error in a QueryError and bubble it up. - .map_err(|e| QueryError::QueryExecutionError(anyhow::Error::new(e)))? + })? .take(MAX_ROWS + 1) - .collect::, _>>() - // Thanks to the collect above, we now have a single top-level result. - // We wrap the potential error in a QueryError and bubble up if needed. - .map_err(QueryError::QueryExecutionError)? + .collect::, _>>()? .into_par_iter() .map(|value| QueryResult { value }) .collect::>();