From bb849d16846ab1072b9b08a0074e2356a739405a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Fri, 19 Apr 2024 11:40:05 +0530 Subject: [PATCH] fix: cache fix ingestors (#767) caching for distributed mode can be enabled from querier UI. Querier calls the PUT /cache API to all ingestors. An ingestor, first checks if stream exists, if not found in local map, checks in S3 and creates stream. Then checks if caching env vars are set. If yes, add cache_enabled flag to STREAM_INFO and update its stream.json in S3 Fixes: #764 --- server/src/handlers/http/cluster/mod.rs | 42 ++++++++- server/src/handlers/http/logstream.rs | 85 +++++++++++++------ .../src/handlers/http/modal/ingest_server.rs | 6 ++ 3 files changed, 106 insertions(+), 27 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 0e091004b..a93094f05 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -50,6 +50,46 @@ use super::base_path_without_preceding_slash; use super::modal::IngestorMetadata; +pub async fn sync_cache_with_ingestors( + url: &str, + ingestor: IngestorMetadata, + body: bool, +) -> Result<(), StreamError> { + if !utils::check_liveness(&ingestor.domain_name).await { + return Ok(()); + } + let request_body: Bytes = Bytes::from(body.to_string()); + let client = reqwest::Client::new(); + let resp = client + .put(url) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingestor.token) + .body(request_body) + .send() + .await + .map_err(|err| { + // log the error and return a custom error + log::error!( + "Fatal: failed to set cache: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + StreamError::Network(err) + })?; + + // if the response is not successful, log the error and return a custom error + // this could be a bit too much, but we need to be sure it covers all cases + if !resp.status().is_success() { + log::error!( + "failed to set cache: {}\nResponse Returned: {:?}", + ingestor.domain_name, + resp.text().await + ); + } + + Ok(()) +} + // forward the request to all ingestors to keep them in sync #[allow(dead_code)] pub async fn sync_streams_with_ingestors( @@ -220,7 +260,7 @@ pub async fn send_stream_delete_request( log::error!( "failed to delete stream: {}\nResponse Returned: {:?}", ingestor.domain_name, - resp + resp.text().await ); } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index f857bd455..b8d95d95e 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -17,6 +17,9 @@ */ use self::error::{CreateStreamError, StreamError}; +use super::base_path_without_preceding_slash; +use super::cluster::fetch_stats_from_ingestors; +use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use crate::alerts::Alerts; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::metadata::STREAM_INFO; @@ -25,15 +28,12 @@ use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; - -use super::base_path_without_preceding_slash; -use super::cluster::fetch_stats_from_ingestors; -use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; +use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; @@ -301,33 +301,66 @@ pub async fn put_enable_cache( req: HttpRequest, body: web::Json, ) -> Result { - let enable_cache = body.into_inner(); let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - if CONFIG.parseable.mode == Mode::Ingest { - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let streams = storage.list_streams().await?; - if !streams.contains(&LogStream { - name: stream_name.clone().to_owned(), - }) { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); + match CONFIG.parseable.mode { + Mode::Query => { + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::from(err) + })?; + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}/cache", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?; + } + } + Mode::Ingest => { + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + // here the ingest server has not found the stream + // so it should check if the stream exists in storage + let check = storage + .list_streams() + .await? + .iter() + .map(|stream| stream.name.clone()) + .contains(&stream_name); + + if !check { + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + metadata::STREAM_INFO + .upsert_stream_info( + &*storage, + LogStream { + name: stream_name.clone().to_owned(), + }, + ) + .await + .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + } + Mode::All => { + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } } - metadata::STREAM_INFO - .upsert_stream_info( - &*storage, - LogStream { - name: stream_name.clone().to_owned(), - }, - ) - .await - .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; } + let enable_cache = body.into_inner(); let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; stream_metadata.cache_enabled = enable_cache; storage diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 1499913a7..ba3b7538d 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -184,6 +184,12 @@ impl IngestServer { web::put() .to(logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), ), ), )