diff --git a/core/bin/core_api.rs b/core/bin/core_api.rs index d0c67d0360d1..c0efead2d212 100644 --- a/core/bin/core_api.rs +++ b/core/bin/core_api.rs @@ -48,6 +48,7 @@ use dust::{ providers::provider::{provider, ProviderID}, run, search_filter::{Filterable, SearchFilter}, + search_stores::search_store::{ElasticsearchSearchStore, SearchStore}, sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS}, stores::{ postgres, @@ -70,7 +71,7 @@ struct APIState { store: Box, databases_store: Box, qdrant_clients: QdrantClients, - + search_store: Box, run_manager: Arc>, } @@ -79,11 +80,13 @@ impl APIState { store: Box, databases_store: Box, qdrant_clients: QdrantClients, + search_store: Box, ) -> Self { APIState { store, qdrant_clients, databases_store, + search_store, run_manager: Arc::new(Mutex::new(RunManager { pending_apps: vec![], pending_runs: vec![], @@ -1730,6 +1733,7 @@ async fn data_sources_documents_upsert( &payload.source_url, payload.section, true, // preserve system tags + state.search_store.clone(), ) .await { @@ -3373,10 +3377,19 @@ fn main() { Err(_) => Err(anyhow!("DATABASES_STORE_DATABASE_URI not set."))?, }; + let url = std::env::var("ELASTICSEARCH_URL").expect("ELASTICSEARCH_URL must be set"); + let username = + std::env::var("ELASTICSEARCH_USERNAME").expect("ELASTICSEARCH_USERNAME must be set"); + let password = + std::env::var("ELASTICSEARCH_PASSWORD").expect("ELASTICSEARCH_PASSWORD must be set"); + + let search_store : Box = Box::new(ElasticsearchSearchStore::new(&url, &username, &password).await?); + let state = Arc::new(APIState::new( store, databases_store, QdrantClients::build().await?, + search_store, )); let router = Router::new() diff --git a/core/bin/elasticsearch/create_index.rs b/core/bin/elasticsearch/create_index.rs index 12e86deacd41..e65e4cf119cd 100644 --- a/core/bin/elasticsearch/create_index.rs +++ b/core/bin/elasticsearch/create_index.rs @@ -1,12 +1,9 @@ use std::collections::HashMap; use clap::{Parser, ValueEnum}; -use elasticsearch::auth::Credentials; -use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder}; +use dust::search_stores::search_store::ElasticsearchSearchStore; use elasticsearch::indices::{IndicesCreateParts, IndicesExistsParts}; -use elasticsearch::Elasticsearch; use http::StatusCode; -use url::Url; #[derive(Parser, Debug, Clone, ValueEnum)] enum Region { @@ -62,23 +59,14 @@ async fn main() -> Result<(), Box> { let region = std::env::var("DUST_REGION").expect("DUST_REGION must be set"); // create ES client - let credentials = Credentials::Basic(username, password); - let u = Url::parse(&url)?; - let conn_pool = SingleNodeConnectionPool::new(u); - let mut transport_builder = TransportBuilder::new(conn_pool); - transport_builder = transport_builder - .auth(credentials) - .disable_proxy() - .cert_validation(elasticsearch::cert::CertificateValidation::None); - let transport = transport_builder.build()?; - - let client = Elasticsearch::new(transport); + let search_store = ElasticsearchSearchStore::new(&url, &username, &password).await?; let index_fullname = format!("core.{}_{}", index_name, index_version); let index_alias = format!("core.{}", index_name); // do not create index if it already exists - let response = client + let response = search_store + .client .indices() .exists(IndicesExistsParts::Index(&[index_fullname.as_str()])) .send() @@ -144,7 +132,8 @@ async fn main() -> Result<(), Box> { } // create index with settings, mappings and alias - let response = client + let response = search_store + .client .indices() .create(IndicesCreateParts::Index(index_fullname.as_str())) .body(body) diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index d0dbf182ffce..04909ae03867 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -10,6 +10,7 @@ use crate::providers::embedder::{EmbedderRequest, EmbedderVector}; use crate::providers::provider::ProviderID; use crate::run::Credentials; use crate::search_filter::{Filterable, SearchFilter}; +use crate::search_stores::search_store::SearchStore; use crate::stores::store::{DocumentCreateParams, Store}; use crate::utils; use anyhow::{anyhow, Result}; @@ -614,6 +615,7 @@ impl DataSource { source_url: &Option, text: Section, preserve_system_tags: bool, + search_store: Box, ) -> Result { let full_text = text.full_text(); // Disallow preserve_system_tags=true if tags contains a string starting with the system @@ -730,7 +732,7 @@ impl DataSource { &qdrant_clients, &document_id, &document_id_hash, - document, + document.clone(), &document_hash, &text, // Cache is not used when writing to the shadow collection. @@ -758,6 +760,9 @@ impl DataSource { .create_data_source_document(&self.project, self.data_source_id.clone(), create_params) .await?; + // Upsert document in search index. + search_store.index_document(&document).await?; + // Clean-up old superseded versions. self.scrub_document_superseded_versions(store, &document_id) .await?; diff --git a/core/src/lib.rs b/core/src/lib.rs index f5d867d6a3ee..4a73f22f76ea 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,6 +9,10 @@ pub mod stores { pub mod postgres; pub mod store; } +pub mod search_stores { + pub mod search_store; +} + pub mod app; pub mod dataset; pub mod data_sources { diff --git a/core/src/search_stores/indices/data_sources_nodes_1.mappings.json b/core/src/search_stores/indices/data_sources_nodes_1.mappings.json index b0db2a157bc3..d08cd91fccc7 100644 --- a/core/src/search_stores/indices/data_sources_nodes_1.mappings.json +++ b/core/src/search_stores/indices/data_sources_nodes_1.mappings.json @@ -1,4 +1,5 @@ { + "dynamic": "strict", "properties": { "data_source_id": { "type": "keyword" @@ -6,7 +7,7 @@ "timestamp": { "type": "date" }, - "type": { + "node_type": { "type": "keyword" }, "node_id": { @@ -25,7 +26,10 @@ "parents": { "type": "keyword" }, - "mimeType": { + "parent_id": { + "type": "keyword" + }, + "mime_type": { "type": "keyword" } } diff --git a/core/src/search_stores/search_store.rs b/core/src/search_stores/search_store.rs new file mode 100644 index 000000000000..554a2ad54f7e --- /dev/null +++ b/core/src/search_stores/search_store.rs @@ -0,0 +1,94 @@ +use anyhow::Result; +use async_trait::async_trait; +use elasticsearch::{ + auth::Credentials, + http::transport::{SingleNodeConnectionPool, TransportBuilder}, + Elasticsearch, IndexParts, +}; +use rand::Rng; +use url::Url; + +use crate::data_sources::node::Node; +use crate::{data_sources::data_source::Document, utils}; +use tracing::{error, info}; +#[async_trait] +pub trait SearchStore { + async fn index_document(&self, document: &Document) -> Result<()>; + fn clone_box(&self) -> Box; +} + +impl Clone for Box { + fn clone(&self) -> Self { + self.clone_box() + } +} + +#[derive(Clone)] +pub struct ElasticsearchSearchStore { + pub client: Elasticsearch, +} + +impl ElasticsearchSearchStore { + pub async fn new(es_uri: &str, username: &str, password: &str) -> Result { + let credentials = Credentials::Basic(username.to_string(), password.to_string()); + let u = Url::parse(es_uri)?; + let conn_pool = SingleNodeConnectionPool::new(u); + let mut transport_builder = TransportBuilder::new(conn_pool); + transport_builder = transport_builder + .auth(credentials) + .disable_proxy() + .cert_validation(elasticsearch::cert::CertificateValidation::None); + let transport = transport_builder.build()?; + let client = Elasticsearch::new(transport); + Ok(Self { client }) + } +} + +const NODES_INDEX_NAME: &str = "core.data_sources_nodes"; + +#[async_trait] +impl SearchStore for ElasticsearchSearchStore { + async fn index_document(&self, document: &Document) -> Result<()> { + // elasticsearch needs to index a Node, not a Document + let node = Node::from(document.clone()); + + // safety for rollout: we only index one time on five + // TODO(kw-search): remove this once prod testing is ok + if rand::thread_rng().gen_bool(0.8) { + return Ok(()); + } + + // todo(kw-search): fail on error + let now = utils::now(); + match self + .client + .index(IndexParts::IndexId(NODES_INDEX_NAME, &document.document_id)) + .timeout("200ms") + .body(node) + .send() + .await + { + Ok(_) => { + info!( + duration = utils::now() - now, + document_id = document.document_id, + "[ElasticsearchSearchStore] Indexed document" + ); + Ok(()) + } + Err(e) => { + error!( + error = %e, + duration = utils::now() - now, + document_id = document.document_id, + "[ElasticsearchSearchStore] Failed to index document" + ); + Ok(()) + } + } + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +}