Skip to content

Commit

Permalink
enh(query database): improve error handling (#3740)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Fontanier <henry@dust.tt>
  • Loading branch information
fontanierh and Henry Fontanier authored Feb 15, 2024
1 parent bb24d96 commit da17df4
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 119 deletions.
61 changes: 31 additions & 30 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ async-recursion = "1.0"
chrono = "0.4.31"
yup-oauth2 = "8.3.0"
datadog-formatting-layer = "1.1"
thiserror = "1.0.57"
18 changes: 11 additions & 7 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ use dust::{
data_source::{self, SearchFilter, Section},
qdrant::QdrantClients,
},
databases::database::{query_database, Row, Table},
databases_store::{store as databases_store, store::DatabasesStore},
dataset,
project::{self},
databases::database::{query_database, QueryDatabaseError, Row, Table},
databases_store::store::{self as databases_store, DatabasesStore},
dataset, project,
providers::provider::{provider, ProviderID},
run,
sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS},
stores::postgres,
stores::store,
stores::{postgres, store},
utils::{error_response, APIError, APIResponse},
};
use futures::future::try_join_all;
Expand Down Expand Up @@ -2254,11 +2252,17 @@ async fn databases_query_run(
}
Some(tables) => {
match query_database(&tables, state.store.clone(), &payload.query).await {
Err(QueryDatabaseError::TooManyResultRows) => error_response(
StatusCode::BAD_REQUEST,
"too_many_result_rows",
"The query returned too many rows",
None,
),
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to run query",
Some(e),
Some(e.into()),
),
Ok((results, schema)) => (
StatusCode::OK,
Expand Down
11 changes: 9 additions & 2 deletions core/src/blocks/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::blocks::block::{parse_pair, Block, BlockResult, BlockType, Env};
use crate::databases::database::query_database;
use crate::databases::database::{query_database, QueryDatabaseError};
use crate::Rule;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -103,7 +103,14 @@ impl Block for Database {
let query = replace_variables_in_string(&self.query, "query", env)?;
let tables = load_tables_from_identifiers(&table_identifiers, env).await?;

let (results, schema) = query_database(&tables, env.store.clone(), &query).await?;
let (results, schema) = query_database(&tables, env.store.clone(), &query)
.await
.map_err(|e| match &e {
// We don't have a proper way to return a typed error from a block, so we just return a generic error with a string.
// We expect the frontend to match the error code.
QueryDatabaseError::TooManyResultRows => anyhow!("too_many_result_rows"),
_ => e.into(),
})?;

Ok(BlockResult {
value: json!({
Expand Down
25 changes: 23 additions & 2 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::table_schema::TableSchema;
use crate::{
databases_store::store::DatabasesStore,
project::Project,
sqlite_workers::client::{SqliteWorker, HEARTBEAT_INTERVAL_MS},
sqlite_workers::client::{SqliteWorker, SqliteWorkerError, HEARTBEAT_INTERVAL_MS},
stores::store::Store,
utils,
};
Expand All @@ -11,6 +11,7 @@ use futures::future::try_join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
use tracing::info;

#[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)]
Expand All @@ -29,11 +30,31 @@ impl ToString for DatabaseType {
}
}

#[derive(Debug, Error)]
pub enum QueryDatabaseError {
#[error("{0}")]
GenericError(#[from] anyhow::Error),
#[error("Too many result rows")]
TooManyResultRows,
}

impl From<SqliteWorkerError> for QueryDatabaseError {
fn from(e: SqliteWorkerError) -> Self {
match &e {
SqliteWorkerError::ServerError(code, _) => match code.as_str() {
"too_many_result_rows" => QueryDatabaseError::TooManyResultRows,
_ => e.into(),
},
_ => e.into(),
}
}
}

pub async fn query_database(
tables: &Vec<Table>,
store: Box<dyn Store + Sync + Send>,
query: &str,
) -> Result<(Vec<QueryResult>, TableSchema)> {
) -> Result<(Vec<QueryResult>, TableSchema), QueryDatabaseError> {
let table_ids_hash = tables.iter().map(|t| t.unique_id()).sorted().join("/");
let database = store
.upsert_database(&table_ids_hash, HEARTBEAT_INTERVAL_MS)
Expand Down
Loading

0 comments on commit da17df4

Please sign in to comment.