Skip to content

Commit

Permalink
enh: refactor store.rs (more consistent, fewer clones) (#9035)
Browse files Browse the repository at this point in the history
* enh: refactor store.rs (more consistent, fewer clones)

* r

* reb

---------

Co-authored-by: Henry Fontanier <henry@dust.tt>
  • Loading branch information
fontanierh and Henry Fontanier authored Dec 2, 2024
1 parent c8ff0a8 commit 02463be
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 277 deletions.
83 changes: 46 additions & 37 deletions core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use dust::{
blocks::block::BlockType,
data_sources::{
data_source::{self, Section},
folder::Folder,
qdrant::QdrantClients,
},
databases::{
Expand All @@ -50,7 +49,10 @@ use dust::{
run,
search_filter::{Filterable, SearchFilter},
sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS},
stores::{postgres, store},
stores::{
postgres,
store::{self, UpsertFolder, UpsertTable},
},
utils::{self, error_response, APIError, APIResponse, CoreRequestMakeSpan},
};

Expand Down Expand Up @@ -2064,22 +2066,21 @@ async fn tables_upsert(

match state
.store
.upsert_table(
&project,
&data_source_id,
&payload.table_id,
&payload.name,
&payload.description,
match payload.timestamp {
Some(timestamp) => timestamp,
None => utils::now(),
.upsert_data_source_table(
project,
data_source_id,
UpsertTable {
table_id: payload.table_id,
name: payload.name,
description: payload.description,
timestamp: payload.timestamp.unwrap_or(utils::now()),
tags: payload.tags,
parents: payload.parents,
remote_database_table_id: payload.remote_database_table_id,
remote_database_secret_id: payload.remote_database_secret_id,
title: payload.title,
mime_type: payload.mime_type,
},
&payload.tags,
&payload.parents,
payload.remote_database_table_id,
payload.remote_database_secret_id,
payload.title,
payload.mime_type,
)
.await
{
Expand Down Expand Up @@ -2133,7 +2134,7 @@ async fn tables_retrieve(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => error_response(
Expand Down Expand Up @@ -2215,7 +2216,7 @@ async fn tables_list(

match state
.store
.list_tables(
.list_data_source_tables(
&project,
&data_source_id,
&view_filter,
Expand Down Expand Up @@ -2253,7 +2254,7 @@ async fn tables_delete(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => error_response(
Expand Down Expand Up @@ -2302,7 +2303,7 @@ async fn tables_update_parents(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => error_response(
Expand Down Expand Up @@ -2355,7 +2356,7 @@ async fn tables_rows_upsert(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => {
Expand Down Expand Up @@ -2444,7 +2445,7 @@ async fn tables_rows_retrieve(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => {
Expand Down Expand Up @@ -2520,7 +2521,7 @@ async fn tables_rows_delete(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => {
Expand Down Expand Up @@ -2612,7 +2613,7 @@ async fn tables_rows_list(

match state
.store
.load_table(&project, &data_source_id, &table_id)
.load_data_source_table(&project, &data_source_id, &table_id)
.await
{
Err(e) => {
Expand Down Expand Up @@ -2685,16 +2686,20 @@ async fn folders_upsert(
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

let folder = Folder::new(
&project,
&data_source_id,
&payload.folder_id.clone(),
payload.timestamp.unwrap_or(utils::now()),
&payload.title,
payload.parents,
);

match state.store.upsert_data_source_folder(&folder).await {
match state
.store
.upsert_data_source_folder(
project,
data_source_id,
UpsertFolder {
folder_id: payload.folder_id,
timestamp: payload.timestamp.unwrap_or(utils::now()),
parents: payload.parents,
title: payload.title,
},
)
.await
{
Err(e) => {
return error_response(
StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -2703,7 +2708,7 @@ async fn folders_upsert(
Some(e),
)
}
Ok(()) => (
Ok(folder) => (
StatusCode::OK,
Json(APIResponse {
error: None,
Expand Down Expand Up @@ -2898,7 +2903,11 @@ async fn databases_query_run(
.map(|(project_id, data_source_id, table_id)| {
let project = project::Project::new_from_id(project_id);
let store = state.store.clone();
async move { store.load_table(&project, &data_source_id, &table_id).await }
async move {
store
.load_data_source_table(&project, &data_source_id, &table_id)
.await
}
}),
)
.await
Expand Down
2 changes: 1 addition & 1 deletion core/src/blocks/database_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub async fn load_tables_from_identifiers(
let (project, data_source_name) = project_and_data_source_by_data_source_view
.get(&(*workspace_id, *data_source_or_view_id))
.expect("Unreachable: missing project.");
store.load_table(&project, &data_source_name, &table_id)
store.load_data_source_table(&project, &data_source_name, &table_id)
},
))
.await?)
Expand Down
36 changes: 27 additions & 9 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::providers::embedder::{EmbedderRequest, EmbedderVector};
use crate::providers::provider::ProviderID;
use crate::run::Credentials;
use crate::search_filter::{Filterable, SearchFilter};
use crate::stores::store::Store;
use crate::stores::store::{Store, UpsertDocument};
use crate::utils;
use anyhow::{anyhow, Result};
use futures::future::try_join_all;
Expand Down Expand Up @@ -725,20 +725,38 @@ impl DataSource {
.await?;
}

// Store upsert does not save the text and token count.
// These fields don't actually exist in the SQL table.
// Because of this, we have to manually construct the UpsertDocument, and save
// owned values for text and token count so we can return them.
// TODO(@fontanierh): use a different type for "DocumentWithTextAndTokenCount"
let doc_text = main_collection_document.text;
let doc_token_count = main_collection_document.token_count;
let params = UpsertDocument {
document_id: main_collection_document.document_id,
timestamp: main_collection_document.timestamp,
tags: main_collection_document.tags,
parents: main_collection_document.parents,
source_url: main_collection_document.source_url,
hash: main_collection_document.hash,
text_size: main_collection_document.text_size,
chunk_count: main_collection_document.chunk_count,
chunks: main_collection_document.chunks,
};

// Upsert document (SQL).
store
.upsert_data_source_document(
&self.project,
&self.data_source_id,
&main_collection_document,
)
let mut doc = store
.upsert_data_source_document(&self.project, self.data_source_id.clone(), params)
.await?;

doc.text = doc_text;
doc.token_count = doc_token_count;

// Clean-up old superseded versions.
self.scrub_document_superseded_versions(store, &document_id)
.await?;

Ok(main_collection_document)
Ok(doc)
}

async fn upsert_for_embedder(
Expand Down Expand Up @@ -1954,7 +1972,7 @@ impl DataSource {

// Delete tables (concurrently).
let (tables, total) = store
.list_tables(&self.project, &self.data_source_id, &None, &None, None)
.list_data_source_tables(&self.project, &self.data_source_id, &None, &None, None)
.await?;
try_join_all(
tables
Expand Down
57 changes: 8 additions & 49 deletions core/src/data_sources/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use serde::{Deserialize, Serialize};

use crate::project::Project;

use super::node::{Node, NodeType};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Folder {
project: Project,
Expand All @@ -19,34 +17,23 @@ pub const FOLDER_MIMETYPE: &str = "application/vnd.dust.folder";

impl Folder {
pub fn new(
project: &Project,
data_source_id: &str,
folder_id: &str,
project: Project,
data_source_id: String,
folder_id: String,
timestamp: u64,
title: &str,
title: String,
parents: Vec<String>,
) -> Self {
Folder {
project: project.clone(),
data_source_id: data_source_id.to_string(),
folder_id: folder_id.to_string(),
project: project,
data_source_id: data_source_id,
folder_id: folder_id,
timestamp,
title: title.to_string(),
title: title,
parents,
}
}

pub fn from_node(node: &Node) -> Self {
Folder::new(
node.project(),
node.data_source_id(),
node.node_id(),
node.timestamp(),
node.title(),
node.parents().clone(),
)
}

pub fn project(&self) -> &Project {
&self.project
}
Expand All @@ -66,31 +53,3 @@ impl Folder {
&self.parents
}
}

impl From<Node> for Folder {
fn from(node: Node) -> Self {
Folder::new(
node.project(),
node.data_source_id(),
node.node_id(),
node.timestamp(),
node.title(),
node.parents().clone(),
)
}
}

impl From<Folder> for Node {
fn from(folder: Folder) -> Self {
Node::new(
&folder.project,
&folder.data_source_id,
&folder.folder_id,
NodeType::Folder,
folder.timestamp,
&folder.title,
FOLDER_MIMETYPE,
folder.parents.clone(),
)
}
}
14 changes: 14 additions & 0 deletions core/src/data_sources/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};

use crate::project::Project;

use super::folder::Folder;

#[derive(Debug, Clone, Serialize, PartialEq, Deserialize, Copy)]
pub enum NodeType {
Document,
Expand Down Expand Up @@ -68,4 +70,16 @@ impl Node {
pub fn parents(&self) -> &Vec<String> {
&self.parents
}

// Consumes self into a Folder.
pub fn into_folder(self) -> Folder {
Folder::new(
self.project,
self.data_source_id,
self.node_id,
self.timestamp,
self.title,
self.parents,
)
}
}
Loading

0 comments on commit 02463be

Please sign in to comment.