Skip to content

Commit

Permalink
fix qdrant migrator + move index creation next to collection creation (
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu authored Nov 21, 2023
1 parent 7eb736e commit 4b17b68
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 35 deletions.
70 changes: 70 additions & 0 deletions core/bin/qdrant_migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ enum Commands {
project_id: i64,
data_source_id: String,
},
#[command(arg_required_else_help = true)]
#[command(about = "Ensure indexes on the collection", long_about = None)]
EnsureIndexes {
project_id: i64,
data_source_id: String,
},
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -366,6 +372,7 @@ fn main() -> Result<()> {
"Error migrating points (read): retry={} error={:?}",
retry, e
));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
} else {
Err(e)?
Expand Down Expand Up @@ -399,6 +406,7 @@ fn main() -> Result<()> {
"Error migrating points (write): retry={} error={:?}",
retry, e
));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
} else {
Err(e)?
Expand Down Expand Up @@ -462,6 +470,68 @@ fn main() -> Result<()> {
}
));

Ok::<(), anyhow::Error>(())
}
Commands::EnsureIndexes {
project_id,
data_source_id,
} => {
let project = project::Project::new_from_id(project_id);
let ds = match store.load_data_source(&project, &data_source_id).await? {
Some(ds) => ds,
None => Err(anyhow!("Data source not found"))?,
};

let qdrant_client = qdrant_clients.main_client(&ds.config().qdrant_config);

let _ = qdrant_client
.create_field_index(
ds.qdrant_collection(),
"document_id_hash",
qdrant::FieldType::Keyword,
None,
None,
)
.await?;

let _ = qdrant_client
.create_field_index(
ds.qdrant_collection(),
"tags",
qdrant::FieldType::Keyword,
None,
None,
)
.await?;

let _ = qdrant_client
.create_field_index(
ds.qdrant_collection(),
"parents",
qdrant::FieldType::Keyword,
None,
None,
)
.await?;

let _ = qdrant_client
.create_field_index(
ds.qdrant_collection(),
"timestamp",
qdrant::FieldType::Integer,
None,
None,
)
.await?;

utils::done(&format!(
"Created indexes for data source: collection={} cluster={}",
ds.qdrant_collection(),
qdrant_clients
.main_cluster(&ds.config().qdrant_config)
.to_string(),
));

Ok::<(), anyhow::Error>(())
}
}
Expand Down
71 changes: 36 additions & 35 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,41 +356,6 @@ impl DataSource {
..Default::default()
})
.await?;
Ok(())
}

pub async fn setup(
&self,
credentials: Credentials,
qdrant_clients: QdrantClients,
) -> Result<()> {
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);

// GCP store created data to test GCP.
let bucket = match std::env::var("DUST_DATA_SOURCES_BUCKET") {
Ok(bucket) => bucket,
Err(_) => Err(anyhow!("DUST_DATA_SOURCES_BUCKET is not set"))?,
};

let bucket_path = format!("{}/{}", self.project.project_id(), self.internal_id);
let data_source_created_path = format!("{}/created.txt", bucket_path);

Object::create(
&bucket,
format!("{}", self.created).as_bytes().to_vec(),
&data_source_created_path,
"application/text",
)
.await?;

utils::done(&format!(
"Created GCP bucket for data_source `{}`",
self.data_source_id
));

// Qdrant create collection.
self.create_qdrant_collection(credentials, qdrant_client.clone())
.await?;

let _ = qdrant_client
.create_field_index(
Expand Down Expand Up @@ -432,6 +397,42 @@ impl DataSource {
)
.await?;

Ok(())
}

pub async fn setup(
&self,
credentials: Credentials,
qdrant_clients: QdrantClients,
) -> Result<()> {
let qdrant_client = qdrant_clients.main_client(&self.config.qdrant_config);

// GCP store created data to test GCP.
let bucket = match std::env::var("DUST_DATA_SOURCES_BUCKET") {
Ok(bucket) => bucket,
Err(_) => Err(anyhow!("DUST_DATA_SOURCES_BUCKET is not set"))?,
};

let bucket_path = format!("{}/{}", self.project.project_id(), self.internal_id);
let data_source_created_path = format!("{}/created.txt", bucket_path);

Object::create(
&bucket,
format!("{}", self.created).as_bytes().to_vec(),
&data_source_created_path,
"application/text",
)
.await?;

utils::done(&format!(
"Created GCP bucket for data_source `{}`",
self.data_source_id
));

// Qdrant create collection and indexes.
self.create_qdrant_collection(credentials, qdrant_client.clone())
.await?;

utils::done(&format!(
"Created Qdrant collection and indexes for data_source `{}`",
self.data_source_id
Expand Down

0 comments on commit 4b17b68

Please sign in to comment.