Skip to content

Commit

Permalink
fix: cache fix ingestors (#767)
Browse files Browse the repository at this point in the history
caching for distributed mode can be 
enabled from querier UI. Querier calls the PUT 
/cache API to all ingestors. An ingestor, first checks 
if stream exists, if not found in local map, checks 
in S3 and creates stream. Then checks if caching 
env vars are set. If yes, add cache_enabled flag to 
STREAM_INFO and update its stream.json in S3

Fixes: #764
  • Loading branch information
nikhilsinhaparseable authored Apr 19, 2024
1 parent 8e710f2 commit bb849d1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 27 deletions.
42 changes: 41 additions & 1 deletion server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,46 @@ use super::base_path_without_preceding_slash;

use super::modal::IngestorMetadata;

pub async fn sync_cache_with_ingestors(
url: &str,
ingestor: IngestorMetadata,
body: bool,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let request_body: Bytes = Bytes::from(body.to_string());
let client = reqwest::Client::new();
let resp = client
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(request_body)
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to set cache: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to set cache: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await
);
}

Ok(())
}

// forward the request to all ingestors to keep them in sync
#[allow(dead_code)]
pub async fn sync_streams_with_ingestors(
Expand Down Expand Up @@ -220,7 +260,7 @@ pub async fn send_stream_delete_request(
log::error!(
"failed to delete stream: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp
resp.text().await
);
}

Expand Down
85 changes: 59 additions & 26 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/

use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use crate::alerts::Alerts;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::metadata::STREAM_INFO;
Expand All @@ -25,15 +28,12 @@ use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
use crate::{catalog, event, stats};
use crate::{metadata, validator};

use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -301,33 +301,66 @@ pub async fn put_enable_cache(
req: HttpRequest,
body: web::Json<bool>,
) -> Result<impl Responder, StreamError> {
let enable_cache = body.into_inner();
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let storage = CONFIG.storage().get_object_store();

if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
if CONFIG.parseable.mode == Mode::Ingest {
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let streams = storage.list_streams().await?;
if !streams.contains(&LogStream {
name: stream_name.clone().to_owned(),
}) {
log::error!("Stream {} not found", stream_name.clone());
return Err(StreamError::StreamNotFound(stream_name.clone()));
match CONFIG.parseable.mode {
Mode::Query => {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::from(err)
})?;
for ingestor in ingestor_metadata {
let url = format!(
"{}{}/logstream/{}/cache",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);

super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
}
}
Mode::Ingest => {
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let check = storage
.list_streams()
.await?
.iter()
.map(|stream| stream.name.clone())
.contains(&stream_name);

if !check {
log::error!("Stream {} not found", stream_name.clone());
return Err(StreamError::StreamNotFound(stream_name.clone()));
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
},
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
}
Mode::All => {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
},
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
}
let enable_cache = body.into_inner();
let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?;
stream_metadata.cache_enabled = enable_cache;
storage
Expand Down
6 changes: 6 additions & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ impl IngestServer {
web::put()
.to(logstream::put_enable_cache)
.authorize_for_stream(Action::PutCacheEnabled),
)
// GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
.route(
web::get()
.to(logstream::get_cache_enabled)
.authorize_for_stream(Action::GetCacheEnabled),
),
),
)
Expand Down

0 comments on commit bb849d1

Please sign in to comment.