Skip to content

Commit

Permalink
enh: qdrant_migrator retry capabilities (#2606)
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu authored Nov 20, 2023
1 parent ac13488 commit 1e9d8be
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions core/bin/qdrant_migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,34 @@ fn main() -> Result<()> {

let mut page_offset: Option<PointId> = None;
let mut total: usize = 0;
let mut retry: usize = 0;
loop {
let now = utils::now();
let scroll_results = qdrant_client
let scroll_results = match qdrant_client
.scroll(&ScrollPoints {
collection_name: ds.qdrant_collection(),
with_vectors: Some(true.into()),
with_payload: Some(true.into()),
limit: Some(points_per_request as u32),
offset: page_offset,
offset: page_offset.clone(),
..Default::default()
})
.await?;
.await
{
Ok(r) => r,
Err(e) => {
if retry < 3 {
retry += 1;
utils::error(&format!(
"Error migrating points (read): retry={} error={:?}",
retry, e
));
continue;
} else {
Err(e)?
}
}
};

let count = scroll_results.result.len();

Expand All @@ -371,9 +387,24 @@ fn main() -> Result<()> {
})
.collect::<Vec<_>>();

shadow_write_qdrant_client
match shadow_write_qdrant_client
.upsert_points(ds.qdrant_collection(), points, None)
.await?;
.await
{
Ok(_) => (),
Err(e) => {
if retry < 3 {
retry += 1;
utils::error(&format!(
"Error migrating points (write): retry={} error={:?}",
retry, e
));
continue;
} else {
Err(e)?
}
}
}

total += count;
utils::info(&format!(
Expand All @@ -387,6 +418,7 @@ fn main() -> Result<()> {
if page_offset.is_none() {
break;
}
retry = 0;
}

utils::info(&format!("Done migrating: total={}", total));
Expand Down

0 comments on commit 1e9d8be

Please sign in to comment.