From f3c07daf430cea41564d723ebf4d93a16a3a0e15 Mon Sep 17 00:00:00 2001 From: Philipp Korber Date: Thu, 11 Jan 2024 15:26:30 +0100 Subject: [PATCH] ignore conflicts with delete_by_parent and upsert for freshly_insert_document, also warn on race detection --- web-api-shared/src/elastic.rs | 18 ++++++++++++++++-- web-api/src/storage/elastic.rs | 23 +++++++++++++++++------ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/web-api-shared/src/elastic.rs b/web-api-shared/src/elastic.rs index 2a94992bb..db48b1628 100644 --- a/web-api-shared/src/elastic.rs +++ b/web-api-shared/src/elastic.rs @@ -42,7 +42,7 @@ use serde::{ }; use serde_json::{json, Value}; use thiserror::Error; -use tracing::error; +use tracing::{error, warn}; use crate::{ net::{ExponentialJitterRetryPolicy, ExponentialJitterRetryPolicyConfig}, @@ -227,6 +227,8 @@ pub struct BulkItemResponse { pub id: I, pub status: u16, #[serde(default)] + pub result: Option, + #[serde(default)] pub error: Value, } @@ -247,7 +249,12 @@ pub struct BulkResponse { } impl BulkResponse { - pub fn failed_documents(self, operation: &'static str, allow_not_found: bool) -> Vec + pub fn failed_documents( + self, + operation: &'static str, + allow_not_found: bool, + expected_result: &str, + ) -> Vec where I: Display + Debug, { @@ -257,6 +264,7 @@ impl BulkResponse { .into_iter() .filter_map(|mut response| { if let Some(response) = response.remove(operation) { + let result = response.result.as_deref().unwrap_or("none"); if !response.is_success_status(allow_not_found) { error!( document_id=%response.id, @@ -264,6 +272,12 @@ impl BulkResponse { "Elastic failed to {operation} document.", ); return Some(response.id); + } else if result != expected_result { + warn!( + expected=%expected_result, + got=%result, + "Mismatch in expected result for bulk operation" + ) } } else { error!("Bulk {operation} request contains non {operation} responses: {response:?}"); diff --git a/web-api/src/storage/elastic.rs b/web-api/src/storage/elastic.rs index 0cda4bd2f..45f65b7ec 100644 --- a/web-api/src/storage/elastic.rs +++ b/web-api/src/storage/elastic.rs @@ -23,7 +23,7 @@ use itertools::Itertools; use reqwest::Method; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::info; +use tracing::{info, warn}; use xayn_ai_bert::NormalizedEmbedding; pub(crate) use xayn_web_api_shared::elastic::{BulkInstruction, Config}; use xayn_web_api_shared::{ @@ -201,7 +201,7 @@ impl Client { #[allow(clippy::cast_possible_truncation)] let id = SnippetId::new(document.id.clone(), idx as _); let header = - serde_json::to_value(BulkInstruction::Create { id: id.to_es_id() }); + serde_json::to_value(BulkInstruction::Index { id: id.to_es_id() }); let data = serde_json::to_value(Document { snippet, properties: &document.properties, @@ -221,7 +221,7 @@ impl Client { } let response = self.bulk_request(snippets).await?; - Ok(response.failed_documents("index", false).into()) + Ok(response.failed_documents("index", false, "created").into()) } pub(super) async fn delete_by_parents( @@ -229,7 +229,10 @@ impl Client { parents: impl SerializeDocumentIds, ) -> Result<(), Error> { // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - let url = self.create_url(["_delete_by_query"], [("refresh", None)]); + let url = self.create_url( + ["_delete_by_query"], + [("refresh", None), ("conflicts", Some("proceed"))], + ); let body = json!({ "query": { "terms": { @@ -237,8 +240,16 @@ impl Client { } } }); - self.query_with_json::<_, SerdeDiscard>(Method::POST, url, Some(body)) - .await?; + + let result: Value = self.query_with_json(Method::POST, url, Some(body)).await?; + + if !result + .get("failures") + .and_then(Value::as_array) + .map_or(true, Vec::is_empty) + { + warn!(response=%result, "ignored deletion failures when deleting by parent"); + } Ok(()) }