Skip to content

Commit

Permalink
[Keyword search] Index documents in elasticsearch (1/3) (#9384)
Browse files Browse the repository at this point in the history
* new mapping

* search store

* index mapping fix

* wip datasource

* update create_index with store

* impl clone in store

* add the indexation call

* add 200ms timeout

* use Node::from

* only index 1 on 5

* log indexation
  • Loading branch information
philipperolet authored Dec 17, 2024
1 parent 6bb3f7b commit ec60d8a
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 21 deletions.
15 changes: 14 additions & 1 deletion core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use dust::{
providers::provider::{provider, ProviderID},
run,
search_filter::{Filterable, SearchFilter},
search_stores::search_store::{ElasticsearchSearchStore, SearchStore},
sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS},
stores::{
postgres,
Expand All @@ -70,7 +71,7 @@ struct APIState {
store: Box<dyn store::Store + Sync + Send>,
databases_store: Box<dyn databases_store::DatabasesStore + Sync + Send>,
qdrant_clients: QdrantClients,

search_store: Box<dyn SearchStore + Sync + Send>,
run_manager: Arc<Mutex<RunManager>>,
}

Expand All @@ -79,11 +80,13 @@ impl APIState {
store: Box<dyn store::Store + Sync + Send>,
databases_store: Box<dyn databases_store::DatabasesStore + Sync + Send>,
qdrant_clients: QdrantClients,
search_store: Box<dyn SearchStore + Sync + Send>,
) -> Self {
APIState {
store,
qdrant_clients,
databases_store,
search_store,
run_manager: Arc::new(Mutex::new(RunManager {
pending_apps: vec![],
pending_runs: vec![],
Expand Down Expand Up @@ -1730,6 +1733,7 @@ async fn data_sources_documents_upsert(
&payload.source_url,
payload.section,
true, // preserve system tags
state.search_store.clone(),
)
.await
{
Expand Down Expand Up @@ -3373,10 +3377,19 @@ fn main() {
Err(_) => Err(anyhow!("DATABASES_STORE_DATABASE_URI not set."))?,
};

let url = std::env::var("ELASTICSEARCH_URL").expect("ELASTICSEARCH_URL must be set");
let username =
std::env::var("ELASTICSEARCH_USERNAME").expect("ELASTICSEARCH_USERNAME must be set");
let password =
std::env::var("ELASTICSEARCH_PASSWORD").expect("ELASTICSEARCH_PASSWORD must be set");

let search_store : Box<dyn SearchStore + Sync + Send> = Box::new(ElasticsearchSearchStore::new(&url, &username, &password).await?);

let state = Arc::new(APIState::new(
store,
databases_store,
QdrantClients::build().await?,
search_store,
));

let router = Router::new()
Expand Down
23 changes: 6 additions & 17 deletions core/bin/elasticsearch/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::collections::HashMap;

use clap::{Parser, ValueEnum};
use elasticsearch::auth::Credentials;
use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
use dust::search_stores::search_store::ElasticsearchSearchStore;
use elasticsearch::indices::{IndicesCreateParts, IndicesExistsParts};
use elasticsearch::Elasticsearch;
use http::StatusCode;
use url::Url;

#[derive(Parser, Debug, Clone, ValueEnum)]
enum Region {
Expand Down Expand Up @@ -62,23 +59,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let region = std::env::var("DUST_REGION").expect("DUST_REGION must be set");

// create ES client
let credentials = Credentials::Basic(username, password);
let u = Url::parse(&url)?;
let conn_pool = SingleNodeConnectionPool::new(u);
let mut transport_builder = TransportBuilder::new(conn_pool);
transport_builder = transport_builder
.auth(credentials)
.disable_proxy()
.cert_validation(elasticsearch::cert::CertificateValidation::None);
let transport = transport_builder.build()?;

let client = Elasticsearch::new(transport);
let search_store = ElasticsearchSearchStore::new(&url, &username, &password).await?;

let index_fullname = format!("core.{}_{}", index_name, index_version);
let index_alias = format!("core.{}", index_name);

// do not create index if it already exists
let response = client
let response = search_store
.client
.indices()
.exists(IndicesExistsParts::Index(&[index_fullname.as_str()]))
.send()
Expand Down Expand Up @@ -144,7 +132,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

// create index with settings, mappings and alias
let response = client
let response = search_store
.client
.indices()
.create(IndicesCreateParts::Index(index_fullname.as_str()))
.body(body)
Expand Down
7 changes: 6 additions & 1 deletion core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::providers::embedder::{EmbedderRequest, EmbedderVector};
use crate::providers::provider::ProviderID;
use crate::run::Credentials;
use crate::search_filter::{Filterable, SearchFilter};
use crate::search_stores::search_store::SearchStore;
use crate::stores::store::{DocumentCreateParams, Store};
use crate::utils;
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -614,6 +615,7 @@ impl DataSource {
source_url: &Option<String>,
text: Section,
preserve_system_tags: bool,
search_store: Box<dyn SearchStore + Sync + Send>,
) -> Result<Document> {
let full_text = text.full_text();
// Disallow preserve_system_tags=true if tags contains a string starting with the system
Expand Down Expand Up @@ -730,7 +732,7 @@ impl DataSource {
&qdrant_clients,
&document_id,
&document_id_hash,
document,
document.clone(),
&document_hash,
&text,
// Cache is not used when writing to the shadow collection.
Expand Down Expand Up @@ -758,6 +760,9 @@ impl DataSource {
.create_data_source_document(&self.project, self.data_source_id.clone(), create_params)
.await?;

// Upsert document in search index.
search_store.index_document(&document).await?;

// Clean-up old superseded versions.
self.scrub_document_superseded_versions(store, &document_id)
.await?;
Expand Down
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub mod stores {
pub mod postgres;
pub mod store;
}
pub mod search_stores {
pub mod search_store;
}

pub mod app;
pub mod dataset;
pub mod data_sources {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"dynamic": "strict",
"properties": {
"data_source_id": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"type": {
"node_type": {
"type": "keyword"
},
"node_id": {
Expand All @@ -25,7 +26,10 @@
"parents": {
"type": "keyword"
},
"mimeType": {
"parent_id": {
"type": "keyword"
},
"mime_type": {
"type": "keyword"
}
}
Expand Down
94 changes: 94 additions & 0 deletions core/src/search_stores/search_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use anyhow::Result;
use async_trait::async_trait;
use elasticsearch::{
auth::Credentials,
http::transport::{SingleNodeConnectionPool, TransportBuilder},
Elasticsearch, IndexParts,
};
use rand::Rng;
use url::Url;

use crate::data_sources::node::Node;
use crate::{data_sources::data_source::Document, utils};
use tracing::{error, info};
#[async_trait]
pub trait SearchStore {
async fn index_document(&self, document: &Document) -> Result<()>;
fn clone_box(&self) -> Box<dyn SearchStore + Sync + Send>;
}

impl Clone for Box<dyn SearchStore + Sync + Send> {
fn clone(&self) -> Self {
self.clone_box()
}
}

#[derive(Clone)]
pub struct ElasticsearchSearchStore {
pub client: Elasticsearch,
}

impl ElasticsearchSearchStore {
pub async fn new(es_uri: &str, username: &str, password: &str) -> Result<Self> {
let credentials = Credentials::Basic(username.to_string(), password.to_string());
let u = Url::parse(es_uri)?;
let conn_pool = SingleNodeConnectionPool::new(u);
let mut transport_builder = TransportBuilder::new(conn_pool);
transport_builder = transport_builder
.auth(credentials)
.disable_proxy()
.cert_validation(elasticsearch::cert::CertificateValidation::None);
let transport = transport_builder.build()?;
let client = Elasticsearch::new(transport);
Ok(Self { client })
}
}

const NODES_INDEX_NAME: &str = "core.data_sources_nodes";

#[async_trait]
impl SearchStore for ElasticsearchSearchStore {
async fn index_document(&self, document: &Document) -> Result<()> {
// elasticsearch needs to index a Node, not a Document
let node = Node::from(document.clone());

// safety for rollout: we only index one time on five
// TODO(kw-search): remove this once prod testing is ok
if rand::thread_rng().gen_bool(0.8) {
return Ok(());
}

// todo(kw-search): fail on error
let now = utils::now();
match self
.client
.index(IndexParts::IndexId(NODES_INDEX_NAME, &document.document_id))
.timeout("200ms")
.body(node)
.send()
.await
{
Ok(_) => {
info!(
duration = utils::now() - now,
document_id = document.document_id,
"[ElasticsearchSearchStore] Indexed document"
);
Ok(())
}
Err(e) => {
error!(
error = %e,
duration = utils::now() - now,
document_id = document.document_id,
"[ElasticsearchSearchStore] Failed to index document"
);
Ok(())
}
}
}

fn clone_box(&self) -> Box<dyn SearchStore + Sync + Send> {
Box::new(self.clone())
}
}

0 comments on commit ec60d8a

Please sign in to comment.