diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 4e7d3219f..e438ab8f4 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; use crate::stats::Stats; +use crate::storage::get_staging_metadata; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; @@ -64,6 +65,7 @@ pub async fn sync_streams_with_ingestors( headers: HeaderMap, body: Bytes, stream_name: &str, + skip_ingestor: Option, ) -> Result<(), StreamError> { let mut reqwest_headers = http_header::HeaderMap::new(); @@ -76,7 +78,16 @@ pub async fn sync_streams_with_ingestors( })?; let client = reqwest::Client::new(); - for ingestor in ingestor_infos.iter() { + + let final_ingestor_infos = match skip_ingestor { + None => ingestor_infos, + Some(skip_ingestor) => ingestor_infos + .into_iter() + .filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone())) + .collect::>(), + }; + + for ingestor in final_ingestor_infos { if !utils::check_liveness(&ingestor.domain_name).await { log::warn!("Ingestor {} is not live", ingestor.domain_name); continue; @@ -841,3 +852,62 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { Ok(()) } + +pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> { + let client = reqwest::Client::new(); + + let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { + StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) + })?; + let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); + let token = staging_metadata.querier_auth_token.unwrap(); + + if !check_liveness(&querier_endpoint).await { + log::warn!("Querier {} is not live", querier_endpoint); + return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live"))); + } + + let url = format!( + "{}{}/logstream/{}?skip_ingestors={}", + querier_endpoint, + base_path_without_preceding_slash(), + stream_name, + CONFIG.parseable.ingestor_endpoint, + ); + + let response = client + .put(&url) + .header(header::AUTHORIZATION, &token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward create stream request to querier: {}\n Error: {:?}", + &url, + err + ); + StreamError::Network(err) + })?; + + let status = response.status(); + + if !status.is_success() { + let response_text = response.text().await.map_err(|err| { + log::error!("Failed to read response text from querier: {}", &url); + StreamError::Network(err) + })?; + + log::error!( + "Failed to forward create stream request to querier: {}\nResponse Returned: {:?}", + &url, + response_text + ); + + return Err(StreamError::Anyhow(anyhow::anyhow!( + "Request failed with status: {}", + status, + ))); + } + + Ok(()) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7e36a4d8a..48ec1d9dc 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -26,6 +26,7 @@ use crate::event::{ error::EventError, format::{self, EventFormat}, }; +use crate::handlers::http::cluster::forward_create_stream_request; use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; @@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists( if !streams.contains(&LogStream { name: stream_name.to_owned(), }) { - log::error!("Stream {} not found", stream_name); - return Err(PostError::Invalid(anyhow::anyhow!( - "Stream `{}` not found. Please create it using the Query server.", - stream_name - ))); + match forward_create_stream_request(stream_name).await { + Ok(()) => log::info!("Stream {} created", stream_name), + Err(e) => { + return Err(PostError::Invalid(anyhow::anyhow!( + "Unable to create stream: {} using query server. Error: {}", + stream_name, + e.to_string(), + ))) + } + }; } metadata::STREAM_INFO .upsert_stream_info( diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 2dd34d4dc..00ddff3f3 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { header::CONTENT_TYPE, HeaderValue::from_static("application/json"), ); - sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?; + sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?; } Ok(()) } diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index 4fb060850..c67d173ae 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -1,9 +1,14 @@ +use core::str; use std::fs; use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use serde::Deserialize; +use tokio::sync::Mutex; + +static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); use crate::{ event, @@ -74,11 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { +#[derive(Deserialize)] +pub struct PutStreamQuery { + skip_ingestors: Option, +} + +pub async fn put_stream( + req: HttpRequest, + body: Bytes, + info: web::Query, +) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let _ = CREATE_STREAM_LOCK.lock().await; let headers = create_update_stream(&req, &body, &stream_name).await?; - sync_streams_with_ingestors(headers, body, &stream_name).await?; + + sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?; Ok(("Log stream created", StatusCode::OK)) } diff --git a/server/src/migration.rs b/server/src/migration.rs index c0c483b2b..2b1d8a5e5 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -78,6 +78,10 @@ pub async fn run_metadata_migration( let metadata = metadata_migration::v3_v4(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } + Some("v4") => { + let metadata = metadata_migration::v4_v5(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } _ => (), } } diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 10bfa940c..3c32ff241 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -16,6 +16,7 @@ * */ +use base64::Engine; use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; @@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } +// maybe rename +pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + metadata.remove_entry("version"); + metadata.insert("version".to_string(), JsonValue::String("v5".to_string())); + + match metadata.get("server_mode") { + None => { + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), + ); + } + Some(JsonValue::String(mode)) => match mode.as_str() { + "Query" => { + metadata.insert( + "querier_endpoint".to_string(), + JsonValue::String(CONFIG.parseable.address.clone()), + ); + } + "All" => { + metadata.insert( + "server_mode".to_string(), + JsonValue::String(CONFIG.parseable.mode.to_string()), + ); + metadata.insert( + "querier_endpoint".to_string(), + JsonValue::String(CONFIG.parseable.address.clone()), + ); + } + _ => (), + }, + _ => (), + } + + metadata.insert( + "querier_auth_token".to_string(), + JsonValue::String(format!( + "Basic {}", + base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )) + )), + ); + + storage_metadata +} + pub async fn migrate_ingester_metadata() -> anyhow::Result> { let imp = ingestor_metadata_path(None); let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await { diff --git a/server/src/storage.rs b/server/src/storage.rs index a018c2b1c..4dc108534 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -40,7 +40,8 @@ pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; pub use store_metadata::{ - put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, + get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, + StorageMetadata, }; // metadata file names in a Stream prefix diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index acdbeb8e4..f19765f02 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -22,6 +22,7 @@ use std::{ path::PathBuf, }; +use base64::Engine; use bytes::Bytes; use once_cell::sync::OnceCell; use relative_path::RelativePathBuf; @@ -63,10 +64,29 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, + pub querier_endpoint: Option, + pub querier_auth_token: Option, } impl StorageMetadata { pub fn new() -> Self { + let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode { + Mode::All | Mode::Query => { + let querier_auth_token = format!( + "Basic {}", + base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )) + ); + ( + Some(CONFIG.parseable.address.clone()), + Some(querier_auth_token), + ) + } + Mode::Ingest => (None, None), + }; + Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), @@ -78,9 +98,10 @@ impl StorageMetadata { streams: Vec::new(), roles: HashMap::default(), default_role: None, + querier_endpoint, + querier_auth_token, } } - pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get()