Skip to content

Commit

Permalink
ignore conflicts with delete_by_parent and upsert for freshly_insert_…
Browse files Browse the repository at this point in the history
…document, also warn on race detection
  • Loading branch information
rustonaut committed Jan 11, 2024
1 parent 5f06c46 commit f3c07da
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
18 changes: 16 additions & 2 deletions web-api-shared/src/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use serde::{
};
use serde_json::{json, Value};
use thiserror::Error;
use tracing::error;
use tracing::{error, warn};

use crate::{
net::{ExponentialJitterRetryPolicy, ExponentialJitterRetryPolicyConfig},
Expand Down Expand Up @@ -227,6 +227,8 @@ pub struct BulkItemResponse<I> {
pub id: I,
pub status: u16,
#[serde(default)]
pub result: Option<String>,
#[serde(default)]
pub error: Value,
}

Expand All @@ -247,7 +249,12 @@ pub struct BulkResponse<I> {
}

impl<I> BulkResponse<I> {
pub fn failed_documents(self, operation: &'static str, allow_not_found: bool) -> Vec<I>
pub fn failed_documents(
self,
operation: &'static str,
allow_not_found: bool,
expected_result: &str,
) -> Vec<I>
where
I: Display + Debug,
{
Expand All @@ -257,13 +264,20 @@ impl<I> BulkResponse<I> {
.into_iter()
.filter_map(|mut response| {
if let Some(response) = response.remove(operation) {
let result = response.result.as_deref().unwrap_or("none");
if !response.is_success_status(allow_not_found) {
error!(
document_id=%response.id,
error=%response.error,
"Elastic failed to {operation} document.",
);
return Some(response.id);
} else if result != expected_result {
warn!(
expected=%expected_result,
got=%result,
"Mismatch in expected result for bulk operation"
)
}
} else {
error!("Bulk {operation} request contains non {operation} responses: {response:?}");
Expand Down
23 changes: 17 additions & 6 deletions web-api/src/storage/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use itertools::Itertools;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tracing::info;
use tracing::{info, warn};
use xayn_ai_bert::NormalizedEmbedding;
pub(crate) use xayn_web_api_shared::elastic::{BulkInstruction, Config};
use xayn_web_api_shared::{
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Client {
#[allow(clippy::cast_possible_truncation)]
let id = SnippetId::new(document.id.clone(), idx as _);
let header =
serde_json::to_value(BulkInstruction::Create { id: id.to_es_id() });
serde_json::to_value(BulkInstruction::Index { id: id.to_es_id() });
let data = serde_json::to_value(Document {
snippet,
properties: &document.properties,
Expand All @@ -221,24 +221,35 @@ impl Client {
}

let response = self.bulk_request(snippets).await?;
Ok(response.failed_documents("index", false).into())
Ok(response.failed_documents("index", false, "created").into())
}

pub(super) async fn delete_by_parents(
&self,
parents: impl SerializeDocumentIds,
) -> Result<(), Error> {
// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
let url = self.create_url(["_delete_by_query"], [("refresh", None)]);
let url = self.create_url(
["_delete_by_query"],
[("refresh", None), ("conflicts", Some("proceed"))],
);
let body = json!({
"query": {
"terms": {
"parent": parents,
}
}
});
self.query_with_json::<_, SerdeDiscard>(Method::POST, url, Some(body))
.await?;

let result: Value = self.query_with_json(Method::POST, url, Some(body)).await?;

if !result
.get("failures")
.and_then(Value::as_array)
.map_or(true, Vec::is_empty)
{
warn!(response=%result, "ignored deletion failures when deleting by parent");
}
Ok(())
}

Expand Down

0 comments on commit f3c07da

Please sign in to comment.