Skip to content

Commit

Permalink
improved push_tuple supporting nested tuples, chunking and error hand…
Browse files Browse the repository at this point in the history
…ling
  • Loading branch information
rustonaut committed Aug 7, 2023
1 parent d908013 commit edbd19a
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 39 deletions.
60 changes: 31 additions & 29 deletions web-api/src/storage/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use super::{
IndexedPropertyDefinition,
IndexedPropertyType,
},
utils::IterAsTuple,
InteractionUpdateContext,
TagWeights,
};
Expand Down Expand Up @@ -165,12 +166,12 @@ impl Database {
let mut builder = QueryBuilder::new("DELETE FROM document WHERE document_id IN ");
let all = ids.into_iter();
let mut deleted = Vec::with_capacity(all.len());
let mut ids = all.clone().peekable();
while ids.peek().is_some() {
let mut ids = all.clone();
while let Ok(ids) = IterAsTuple::new(ids.by_ref().take(Self::BIND_LIMIT)) {
deleted.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.push(" RETURNING document_id, is_candidate;")
.build()
.persistent(false)
Expand Down Expand Up @@ -225,13 +226,13 @@ impl Database {
FROM document
WHERE document_id IN ",
);
let mut ids = ids.into_iter().peekable();
let mut documents = Vec::with_capacity(ids.len());
while ids.peek().is_some() {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, ids);
let mut documents = Vec::with_capacity(chunks.len());
while let Some(ids) = chunks.next() {
documents.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.build()
.try_map(|row| {
QueriedInteractedDocument::from_row(&row).map(|document| {
Expand Down Expand Up @@ -266,12 +267,12 @@ impl Database {
include_snippet.then_some(", snippet").unwrap_or_default(),
));
let mut documents = Vec::with_capacity(scores.len());
let mut ids = scores.keys().map(SnippetId::document_id).peekable();
while ids.peek().is_some() {
let mut ids = scores.keys().map(SnippetId::document_id);
while let Ok(ids) = IterAsTuple::new(ids.by_ref().take(Self::BIND_LIMIT)) {
documents.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.build()
.try_map(|row: PgRow| {
// TODO[pmk/now] for this PR we blindly assume snippet_idx==0, this function
Expand Down Expand Up @@ -319,13 +320,13 @@ impl Database {
FROM document
WHERE document_id IN ",
);
let mut ids = ids.into_iter().peekable();
let mut ids = ids.into_iter();
let mut documents = Vec::with_capacity(ids.len());
while ids.peek().is_some() {
while let Ok(ids) = IterAsTuple::new(ids.by_ref().take(Self::BIND_LIMIT)) {
documents.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.build()
.try_map(|row| {
let (id, snippet, preprocessing_step, Json(properties), tags, is_candidate) =
Expand Down Expand Up @@ -381,7 +382,8 @@ impl Database {
SET is_candidate = FALSE
WHERE document_id IN ",
);
for ids in removed.chunks(Self::BIND_LIMIT) {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, removed);
while let Some(ids) = chunks.next() {
builder
.reset()
.push_tuple(ids)
Expand All @@ -396,12 +398,12 @@ impl Database {
WHERE document_id IN ",
);
let mut ingested = Vec::with_capacity(ingestable.len());
let mut ids = ingestable.iter().peekable();
while ids.peek().is_some() {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, ingestable.iter());
while let Some(ids) = chunks.next() {
ingested.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.push(" RETURNING document_id, snippet, preprocessing_step, properties, tags, embedding;")
.build()
.try_map(|row| {
Expand Down Expand Up @@ -449,12 +451,12 @@ impl Database {
WHERE is_candidate AND document_id IN ",
);
let mut unchanged = Vec::new();
let mut ids = ingestable.iter().peekable();
while ids.peek().is_some() {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, ingestable.iter());
while let Some(ids) = chunks.next() {
unchanged.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.build()
.try_map(|row| DocumentId::from_row(&row))
.fetch_all(&mut tx)
Expand All @@ -471,12 +473,12 @@ impl Database {
WHERE document_id IN ",
);
let mut ingested = Vec::with_capacity(ingestable.len());
let mut ids = ingestable.iter().peekable();
while ids.peek().is_some() {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, ingestable.iter());
while let Some(ids) = chunks.next() {
ingested.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.push(" RETURNING document_id, snippet, preprocessing_step, properties, tags, embedding;")
.build()
.try_map(|row| {
Expand Down Expand Up @@ -524,12 +526,12 @@ impl Database {
WHERE NOT is_candidate AND document_id IN ",
);
let mut unchanged = Vec::new();
let mut ids = removable.iter().peekable();
while ids.peek().is_some() {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, removable.iter());
while let Some(ids) = chunks.next() {
unchanged.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.build()
.try_map(|row| DocumentId::from_row(&row))
.fetch_all(&mut tx)
Expand All @@ -546,12 +548,12 @@ impl Database {
WHERE document_id IN ",
);
let mut removed = Vec::with_capacity(removable.len());
let mut ids = removable.iter().peekable();
while ids.peek().is_some() {
let mut chunks = IterAsTuple::chunks(Self::BIND_LIMIT, removable.iter());
while let Some(ids) = chunks.next() {
removed.extend(
builder
.reset()
.push_tuple(ids.by_ref().take(Self::BIND_LIMIT))
.push_tuple(ids)
.push(" RETURNING document_id;")
.build()
.try_map(|row| DocumentId::from_row(&row))
Expand Down
Loading

0 comments on commit edbd19a

Please sign in to comment.