Skip to content

Commit

Permalink
WIP qdrant_migrator
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu committed Nov 16, 2023
1 parent cdadd53 commit 5b007d5
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 9 deletions.
107 changes: 107 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ edition = "2021"
name = "dust-api"
path = "bin/dust_api.rs"

[[bin]]
name = "qdrant_migrator"
path = "bin/qdrant_migrator.rs"

[dependencies]
anyhow = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
Expand Down Expand Up @@ -50,4 +54,5 @@ tower-http = {version = "0.4", features = ["full"]}
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
deno_core = "0.200"
rayon = "1.8.0"
rayon = "1.8.0"
clap = { version = "4.4", features = ["derive"] }
14 changes: 13 additions & 1 deletion core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,22 @@ impl DataSource {
&self.config
}

fn qdrant_collection(&self) -> String {
pub fn qdrant_collection(&self) -> String {
format!("ds_{}", self.internal_id)
}

pub async fn update_config(
&mut self,
store: Box<dyn Store + Sync + Send>,
config: &DataSourceConfig,
) -> Result<()> {
self.config = config.clone();
store
.update_data_source_config(&self.project, &self.data_source_id, &self.config)
.await?;
Ok(())
}

pub async fn setup(
&self,
credentials: Credentials,
Expand Down
36 changes: 30 additions & 6 deletions core/src/data_sources/qdrant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::utils::ParseError;
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use parking_lot::Mutex;
Expand All @@ -16,6 +18,24 @@ pub enum QdrantCluster {

static QDRANT_CLUSTER_VARIANTS: &[QdrantCluster] = &[QdrantCluster::Main0];

impl ToString for QdrantCluster {
fn to_string(&self) -> String {
match self {
QdrantCluster::Main0 => String::from("main-0"),
}
}
}

impl FromStr for QdrantCluster {
type Err = ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"main-0" => Ok(QdrantCluster::Main0),
_ => Err(ParseError::with_message("Unknown QdrantCluster"))?,
}
}
}

pub fn env_var_prefix_for_cluster(cluster: QdrantCluster) -> &'static str {
match cluster {
QdrantCluster::Main0 => "QDRANT_MAIN_0",
Expand All @@ -30,8 +50,8 @@ pub struct QdrantClients {

#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct QdrantDataSourceConfig {
cluster: QdrantCluster,
shadow_write_cluster: Option<QdrantCluster>,
pub cluster: QdrantCluster,
pub shadow_write_cluster: Option<QdrantCluster>,
}

impl QdrantClients {
Expand Down Expand Up @@ -78,13 +98,17 @@ impl QdrantClients {
}
}

pub fn main_cluster(&self, config: &Option<QdrantDataSourceConfig>) -> QdrantCluster {
match config {
Some(config) => config.cluster,
None => QdrantCluster::Main0,
}
}

// Returns the client for the cluster specified in the config or the main-0 cluster if no config
// is provided.
pub fn main_client(&self, config: &Option<QdrantDataSourceConfig>) -> Arc<QdrantClient> {
match config {
Some(config) => self.client(config.cluster),
None => self.client(QdrantCluster::Main0),
}
self.client(self.main_cluster(config))
}

pub fn shadow_write_cluster(
Expand Down
26 changes: 26 additions & 0 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,32 @@ impl Store for PostgresStore {
}
}

async fn update_data_source_config(
&self,
project: &Project,
data_source_id: &str,
config: &DataSourceConfig,
) -> Result<()> {
let project_id = project.project_id();
let data_source_id = data_source_id.to_string();
let data_source_config = config.clone();

let pool = self.pool.clone();
let c = pool.get().await?;

let config_data = serde_json::to_string(&data_source_config)?;
let stmt = c
.prepare(
"UPDATE data_sources SET config_json = $1 \
WHERE project = $2 AND data_source_id = $3",
)
.await?;
c.execute(&stmt, &[&config_data, &project_id, &data_source_id])
.await?;

Ok(())
}

async fn load_data_source_document(
&self,
project: &Project,
Expand Down
10 changes: 9 additions & 1 deletion core/src/stores/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::blocks::block::BlockType;
use crate::data_sources::data_source::{DataSource, Document, DocumentVersion, SearchFilter};
use crate::data_sources::data_source::{
DataSource, DataSourceConfig, Document, DocumentVersion, SearchFilter,
};
use crate::databases::database::{Database, DatabaseRow, DatabaseTable};
use crate::dataset::Dataset;
use crate::http::request::{HttpRequest, HttpResponse};
Expand Down Expand Up @@ -88,6 +90,12 @@ pub trait Store {
project: &Project,
data_source_id: &str,
) -> Result<Option<DataSource>>;
async fn update_data_source_config(
&self,
project: &Project,
data_source_id: &str,
config: &DataSourceConfig,
) -> Result<()>;
async fn load_data_source_document(
&self,
project: &Project,
Expand Down

0 comments on commit 5b007d5

Please sign in to comment.