diff --git a/.gitignore b/.gitignore index 5bbc1b194..57ea8e65e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ target data* -staging* +staging/ limitcache examples cert.pem @@ -14,4 +14,3 @@ parseable parseable_* parseable-env-secret cache - diff --git a/Cargo.lock b/Cargo.lock index 91b670830..90aa480ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,7 +672,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -1644,7 +1644,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -1829,6 +1829,29 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -2697,15 +2720,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "matchit" version = "0.7.3" @@ -2818,16 +2832,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num" version = "0.4.2" @@ -3008,12 +3012,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "parking" version = "2.2.0" @@ -3099,6 +3097,7 @@ dependencies = [ "actix-web-static-files", "anyhow", "argon2", + "arrow", "arrow-array", "arrow-flight", "arrow-ipc", @@ -3119,6 +3118,7 @@ dependencies = [ "crossterm", "datafusion", "derive_more", + "env_logger", "fs_extra", "futures", "futures-util", @@ -3168,7 +3168,6 @@ dependencies = [ "tonic", "tonic-web", "tower-http 0.6.1", - "tracing-subscriber", "ulid", "uptime_lib", "ureq", @@ -3633,17 +3632,8 @@ checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.8", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -3654,7 +3644,7 @@ checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -3663,12 +3653,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.5" @@ -4140,15 +4124,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sharded-slab" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" -dependencies = [ - "lazy_static", -] - [[package]] name = "shlex" version = "1.3.0" @@ -4444,16 +4419,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "thread_local" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" -dependencies = [ - "cfg-if", - "once_cell", -] - [[package]] name = "thrift" version = "0.17.0" @@ -4789,36 +4754,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" -dependencies = [ - "matchers", - "nu-ansi-term", - "once_cell", - "regex", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", ] [[package]] @@ -4988,12 +4923,6 @@ dependencies = [ "syn 2.0.79", ] -[[package]] -name = "valuable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" - [[package]] name = "vergen" version = "8.3.1" diff --git a/Cargo.toml b/Cargo.toml index 156820ec5..684c560fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,129 @@ -[workspace] -members = ["server"] -resolver = "2" +[package] +name = "parseable" +version = "1.6.2" +authors = ["Parseable Team "] +edition = "2021" +rust-version = "1.77.1" +categories = ["logging", "observability", "log analytics"] +build = "build.rs" + +[dependencies] +### apache arrow/datafusion dependencies +arrow-schema = { version = "53.0.0", features = ["serde"] } +arrow-array = { version = "53.0.0" } +arrow-json = "53.0.0" +arrow-ipc = { version = "53.0.0", features = ["zstd"] } +arrow-select = "53.0.0" +datafusion = "42.0.0" +object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] } +parquet = "53.0.0" +arrow-flight = { version = "53.0.0", features = ["tls"] } +tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } +tonic-web = "0.12.3" +tower-http = { version = "0.6.1", features = ["cors"] } + +### actix dependencies +actix-web-httpauth = "0.8" +actix-web = { version = "4.9.0", features = ["rustls-0_22"] } +actix-cors = "0.7.0" +actix-web-prometheus = { version = "0.1" } +actix-web-static-files = "4.0" +mime = "0.3.17" + +### other dependencies +anyhow = { version = "1.0", features = ["backtrace"] } +argon2 = "0.5.0" +async-trait = "0.1.82" +base64 = "0.22.0" +lazy_static = "1.4" +bytes = "1.4" +byteorder = "1.4.3" +bzip2 = { version = "*", features = ["static"] } +cookie = "0.18.1" +chrono = "0.4" +chrono-humanize = "0.2" +clap = { version = "4.1", default-features = false, features = [ + "std", + "color", + "help", + "derive", + "env", + "cargo", + "error-context", +] } +clokwerk = "0.4" +crossterm = "0.28.1" +derive_more = "0.99.18" +env_logger = "0.11.3" +fs_extra = "1.3" +futures = "0.3" +futures-util = "0.3.28" +hex = "0.4" +hostname = "0.4.0" +http = "0.2.7" +humantime-serde = "1.1" +itertools = "0.13.0" +log = "0.4" +num_cpus = "1.15" +once_cell = "1.17.1" +prometheus = { version = "0.13", features = ["process"] } +rand = "0.8.5" +regex = "1.7.3" +relative-path = { version = "1.7", features = ["serde"] } +reqwest = { version = "0.11.27", default-features = false, features = [ + "rustls-tls", + "json", +] } # cannot update cause rustls is not latest `see rustls` +rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet +rustls-pemfile = "2.1.2" +semver = "1.0" +serde = { version = "1.0", features = ["rc", "derive"] } +serde_json = "1.0" +static-files = "0.2" +sysinfo = "0.31.4" +thiserror = "1.0.64" +thread-priority = "1.0.0" +tokio = { version = "1.28", default-features = false, features = [ + "sync", + "macros", + "fs", +] } +tokio-stream = { version = "0.1", features = ["fs"] } +ulid = { version = "1.0", features = ["serde"] } +uptime_lib = "0.3.0" +xxhash-rust = { version = "0.8", features = ["xxh3"] } +xz2 = { version = "*", features = ["static"] } +nom = "7.1.3" +humantime = "2.1.0" +human-size = "0.4" +openid = { version = "0.15.0", default-features = false, features = ["rustls"] } +url = "2.4.0" +http-auth-basic = "0.3.3" +serde_repr = "0.1.17" +hashlru = { version = "0.11.0", features = ["serde"] } +path-clean = "1.0.1" +prost = "0.13.3" +prometheus-parse = "0.2.5" +sha2 = "0.10.8" + +[build-dependencies] +cargo_toml = "0.20.1" +sha1_smol = { version = "1.0", features = ["std"] } +static-files = "0.2" +ureq = "2.6" +vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] } +zip = { version = "2.2.0", default-features = false, features = ["deflate"] } +url = "2.4.0" +prost-build = "0.13.3" + +[dev-dependencies] +maplit = "1.0" +rstest = "0.23.0" +arrow = "53.0.0" + +[package.metadata.parseable_ui] +assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.11/build.zip" +assets-sha1 = "3f0c0f0e9fe23c6a01f0eb45115da4bfe29f9c3f" + +[features] +debug = [] diff --git a/server/build.rs b/build.rs similarity index 100% rename from server/build.rs rename to build.rs diff --git a/server/Cargo.toml b/server/Cargo.toml deleted file mode 100644 index 50f357452..000000000 --- a/server/Cargo.toml +++ /dev/null @@ -1,129 +0,0 @@ -[package] -name = "parseable" -version = "1.6.2" -authors = ["Parseable Team "] -edition = "2021" -rust-version = "1.77.1" -categories = ["logging", "observability", "log analytics"] -build = "build.rs" - -[dependencies] -### apache arrow/datafusion dependencies -# arrow = "51.0.0" -arrow-schema = { version = "53.0.0", features = ["serde"] } -arrow-array = { version = "53.0.0" } -arrow-json = "53.0.0" -arrow-ipc = { version = "53.0.0", features = ["zstd"] } -arrow-select = "53.0.0" -datafusion = "42.0.0" -object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] } -parquet = "53.0.0" -arrow-flight = { version = "53.0.0", features = [ "tls" ] } -tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } -tonic-web = "0.12.3" -tower-http = { version = "0.6.1", features = ["cors"] } - -### actix dependencies -actix-web-httpauth = "0.8" -actix-web = { version = "4.9.0", features = ["rustls-0_22"] } -actix-cors = "0.7.0" -actix-web-prometheus = { version = "0.1" } -actix-web-static-files = "4.0" -mime = "0.3.17" - -### other dependencies -anyhow = { version = "1.0", features = ["backtrace"] } -argon2 = "0.5.0" -async-trait = "0.1.82" -base64 = "0.22.0" -lazy_static = "1.4" -bytes = "1.4" -byteorder = "1.4.3" -bzip2 = { version = "*", features = ["static"] } -cookie = "0.18.1" -chrono = "0.4" -chrono-humanize = "0.2" -clap = { version = "4.1", default-features = false, features = [ - "std", - "color", - "help", - "derive", - "env", - "cargo", - "error-context", -] } -clokwerk = "0.4" -crossterm = "0.28.1" -derive_more = "0.99.18" -fs_extra = "1.3" -futures = "0.3" -futures-util = "0.3.28" -hex = "0.4" -hostname = "0.4.0" -http = "0.2.7" -humantime-serde = "1.1" -itertools = "0.13.0" -log = "0.4" -num_cpus = "1.15" -once_cell = "1.17.1" -prometheus = { version = "0.13", features = ["process"] } -rand = "0.8.5" -regex = "1.7.3" -relative-path = { version = "1.7", features = ["serde"] } -reqwest = { version = "0.11.27", default-features = false, features = [ - "rustls-tls", - "json", -] } # cannot update cause rustls is not latest `see rustls` -rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet -rustls-pemfile = "2.1.2" -semver = "1.0" -serde = { version = "1.0", features = ["rc", "derive"] } -serde_json = "1.0" -static-files = "0.2" -sysinfo = "0.31.4" -thiserror = "1.0.64" -thread-priority = "1.0.0" -tokio = { version = "1.28", default-features = false, features = [ - "sync", - "macros", - "fs", -] } -tokio-stream = { version = "0.1", features = ["fs"] } -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } -ulid = { version = "1.0", features = ["serde"] } -uptime_lib = "0.3.0" -xxhash-rust = { version = "0.8", features = ["xxh3"] } -xz2 = { version = "*", features = ["static"] } -nom = "7.1.3" -humantime = "2.1.0" -human-size = "0.4" -openid = { version = "0.15.0", default-features = false, features = ["rustls"] } -url = "2.4.0" -http-auth-basic = "0.3.3" -serde_repr = "0.1.17" -hashlru = { version = "0.11.0", features = ["serde"] } -path-clean = "1.0.1" -prost = "0.13.3" -prometheus-parse = "0.2.5" -sha2 = "0.10.8" - -[build-dependencies] -cargo_toml = "0.20.1" -sha1_smol = { version = "1.0", features = ["std"] } -static-files = "0.2" -ureq = "2.6" -vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] } -zip = { version = "2.2.0", default-features = false, features = ["deflate"] } -url = "2.4.0" -prost-build = "0.13.3" - -[dev-dependencies] -maplit = "1.0" -rstest = "0.23.0" - -[package.metadata.parseable_ui] -assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.11/build.zip" -assets-sha1 = "3f0c0f0e9fe23c6a01f0eb45115da4bfe29f9c3f" - -[features] -debug = [] diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs deleted file mode 100644 index 6f6d2bfd7..000000000 --- a/server/src/handlers/http/modal/mod.rs +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -pub mod ingest; -pub mod ingest_server; -pub mod query; -pub mod query_server; -pub mod server; -pub mod ssl_acceptor; -pub mod utils; - -use std::sync::Arc; - -use actix_web_prometheus::PrometheusMetrics; -use async_trait::async_trait; -use openid::Discovered; - -use crate::oidc; -use base64::Engine; -use serde::Deserialize; -use serde::Serialize; -pub type OpenIdClient = Arc>; - -// to be decided on what the Default version should be -pub const DEFAULT_VERSION: &str = "v3"; - -include!(concat!(env!("OUT_DIR"), "/generated.rs")); - -#[async_trait(?Send)] -pub trait ParseableServer { - // async fn validate(&self) -> Result<(), ObjectStorageError>; - - /// configure the server - async fn start( - &self, - prometheus: PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()>; - - async fn init(&self) -> anyhow::Result<()>; - - fn validate(&self) -> anyhow::Result<()>; -} - -#[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq)] -pub struct IngestorMetadata { - pub version: String, - pub port: String, - pub domain_name: String, - pub bucket_name: String, - pub token: String, - pub ingestor_id: String, - pub flight_port: String, -} - -impl IngestorMetadata { - #[allow(clippy::too_many_arguments)] - pub fn new( - port: String, - domain_name: String, - version: String, - bucket_name: String, - username: &str, - password: &str, - ingestor_id: String, - flight_port: String, - ) -> Self { - let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); - - let token = format!("Basic {}", token); - - Self { - port, - domain_name, - version, - bucket_name, - token, - ingestor_id, - flight_port, - } - } - - pub fn get_ingestor_id(&self) -> String { - self.ingestor_id.clone() - } -} - -#[cfg(test)] -mod test { - use actix_web::body::MessageBody; - use rstest::rstest; - - use super::{IngestorMetadata, DEFAULT_VERSION}; - - #[rstest] - fn test_deserialize_resource() { - let lhs: IngestorMetadata = IngestorMetadata::new( - "8000".to_string(), - "https://localhost:8000".to_string(), - DEFAULT_VERSION.to_string(), - "somebucket".to_string(), - "admin", - "admin", - "ingestor_id".to_string(), - "8002".to_string(), - ); - - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap(); - - assert_eq!(rhs, lhs); - } - - #[rstest] - fn test_serialize_resource() { - let im = IngestorMetadata::new( - "8000".to_string(), - "https://localhost:8000".to_string(), - DEFAULT_VERSION.to_string(), - "somebucket".to_string(), - "admin", - "admin", - "ingestor_id".to_string(), - "8002".to_string(), - ); - - let lhs = serde_json::to_string(&im) - .unwrap() - .try_into_bytes() - .unwrap(); - let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"# - .try_into_bytes() - .unwrap(); - - assert_eq!(lhs, rhs); - } -} diff --git a/server/src/about.rs b/src/about.rs similarity index 95% rename from server/src/about.rs rename to src/about.rs index 5a1d20e03..1897e1299 100644 --- a/server/src/about.rs +++ b/src/about.rs @@ -24,24 +24,26 @@ use crate::utils::update::{self, LatestRelease}; use chrono::Duration; use chrono_humanize::{Accuracy, Tense}; use crossterm::style::Stylize; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use std::env; use std::path::Path; use sysinfo::System; use ulid::Ulid; + // Expose some static variables for internal usage pub static LATEST_RELEASE: OnceCell> = OnceCell::new(); -static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST"; - -fn is_docker() -> bool { - Path::new("/.dockerenv").exists() -} +static K8S_ENV_TO_CHECK: &str = "KUBERNETES_SERVICE_HOST"; fn is_k8s() -> bool { env::var(K8S_ENV_TO_CHECK).is_ok() } -pub fn platform() -> &'static str { +static DOCKERENV_FILE: &str = "/.dockerenv"; +fn is_docker() -> bool { + Path::new(DOCKERENV_FILE).exists() +} + +static PLATFORM: Lazy<&'static str> = Lazy::new(|| { if is_k8s() { "Kubernetes" } else if is_docker() { @@ -49,6 +51,10 @@ pub fn platform() -> &'static str { } else { "Native" } +}); + +pub fn platform() -> &'static str { + PLATFORM.as_ref() } pub fn set_latest_release(latest_release: Option) { diff --git a/server/src/alerts/mod.rs b/src/alerts/mod.rs similarity index 99% rename from server/src/alerts/mod.rs rename to src/alerts/mod.rs index 587bad773..9523e5e1f 100644 --- a/server/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -31,9 +31,9 @@ pub mod rule; pub mod target; use crate::metrics::ALERTS_STATES; +use crate::option::CONFIG; use crate::utils::arrow::get_field; use crate::utils::uid; -use crate::CONFIG; use crate::{storage, utils}; pub use self::rule::Rule; diff --git a/server/src/alerts/parser.rs b/src/alerts/parser.rs similarity index 100% rename from server/src/alerts/parser.rs rename to src/alerts/parser.rs diff --git a/server/src/alerts/rule.rs b/src/alerts/rule.rs similarity index 100% rename from server/src/alerts/rule.rs rename to src/alerts/rule.rs diff --git a/server/src/alerts/target.rs b/src/alerts/target.rs similarity index 100% rename from server/src/alerts/target.rs rename to src/alerts/target.rs diff --git a/server/src/analytics.rs b/src/analytics.rs similarity index 100% rename from server/src/analytics.rs rename to src/analytics.rs diff --git a/server/src/banner.rs b/src/banner.rs similarity index 100% rename from server/src/banner.rs rename to src/banner.rs diff --git a/server/src/catalog/column.rs b/src/catalog/column.rs similarity index 100% rename from server/src/catalog/column.rs rename to src/catalog/column.rs diff --git a/server/src/catalog/manifest.rs b/src/catalog/manifest.rs similarity index 100% rename from server/src/catalog/manifest.rs rename to src/catalog/manifest.rs diff --git a/server/src/catalog.rs b/src/catalog/mod.rs similarity index 98% rename from server/src/catalog.rs rename to src/catalog/mod.rs index e93f6cdd4..5c502ac89 100644 --- a/server/src/catalog.rs +++ b/src/catalog/mod.rs @@ -19,10 +19,11 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; +use crate::handlers; use crate::handlers::http::base_path_without_preceding_slash; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::CONFIG; +use crate::option::{Mode, CONFIG}; use crate::stats::{ event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, }; @@ -32,7 +33,6 @@ use crate::{ query::PartialTimeFilter, storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, }; -use crate::{handlers, Mode}; use bytes::Bytes; use chrono::{DateTime, Local, NaiveTime, Utc}; use relative_path::RelativePathBuf; @@ -101,7 +101,7 @@ fn get_file_bounds( } pub async fn update_snapshot( - storage: Arc, + storage: Arc, stream_name: &str, change: manifest::File, ) -> Result<(), ObjectStorageError> { @@ -239,7 +239,7 @@ pub async fn update_snapshot( async fn create_manifest( lower_bound: DateTime, change: manifest::File, - storage: Arc, + storage: Arc, stream_name: &str, update_snapshot: bool, mut meta: ObjectStoreFormat, @@ -318,7 +318,7 @@ async fn create_manifest( } pub async fn remove_manifest_from_snapshot( - storage: Arc, + storage: Arc, stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { @@ -343,7 +343,7 @@ pub async fn remove_manifest_from_snapshot( } pub async fn get_first_event( - storage: Arc, + storage: Arc, stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { diff --git a/server/src/catalog/snapshot.rs b/src/catalog/snapshot.rs similarity index 100% rename from server/src/catalog/snapshot.rs rename to src/catalog/snapshot.rs diff --git a/server/src/cli.rs b/src/cli.rs similarity index 100% rename from server/src/cli.rs rename to src/cli.rs diff --git a/server/src/event/format/json.rs b/src/event/format/json.rs similarity index 100% rename from server/src/event/format/json.rs rename to src/event/format/json.rs diff --git a/server/src/event/format.rs b/src/event/format/mod.rs similarity index 100% rename from server/src/event/format.rs rename to src/event/format/mod.rs diff --git a/server/src/event.rs b/src/event/mod.rs similarity index 100% rename from server/src/event.rs rename to src/event/mod.rs diff --git a/server/src/event/writer/file_writer.rs b/src/event/writer/file_writer.rs similarity index 100% rename from server/src/event/writer/file_writer.rs rename to src/event/writer/file_writer.rs diff --git a/server/src/event/writer/mem_writer.rs b/src/event/writer/mem_writer.rs similarity index 100% rename from server/src/event/writer/mem_writer.rs rename to src/event/writer/mem_writer.rs diff --git a/server/src/event/writer.rs b/src/event/writer/mod.rs similarity index 100% rename from server/src/event/writer.rs rename to src/event/writer/mod.rs diff --git a/server/src/handlers/airplane.rs b/src/handlers/airplane.rs similarity index 100% rename from server/src/handlers/airplane.rs rename to src/handlers/airplane.rs diff --git a/server/src/handlers/http/about.rs b/src/handlers/http/about.rs similarity index 100% rename from server/src/handlers/http/about.rs rename to src/handlers/http/about.rs diff --git a/server/src/handlers/http/cache.rs b/src/handlers/http/cache.rs similarity index 100% rename from server/src/handlers/http/cache.rs rename to src/handlers/http/cache.rs diff --git a/server/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs similarity index 100% rename from server/src/handlers/http/cluster/mod.rs rename to src/handlers/http/cluster/mod.rs diff --git a/server/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs similarity index 100% rename from server/src/handlers/http/cluster/utils.rs rename to src/handlers/http/cluster/utils.rs diff --git a/server/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs similarity index 100% rename from server/src/handlers/http/health_check.rs rename to src/handlers/http/health_check.rs diff --git a/server/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs similarity index 100% rename from server/src/handlers/http/ingest.rs rename to src/handlers/http/ingest.rs diff --git a/server/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs similarity index 100% rename from server/src/handlers/http/kinesis.rs rename to src/handlers/http/kinesis.rs diff --git a/server/src/handlers/http/llm.rs b/src/handlers/http/llm.rs similarity index 100% rename from server/src/handlers/http/llm.rs rename to src/handlers/http/llm.rs diff --git a/server/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs similarity index 100% rename from server/src/handlers/http/logstream.rs rename to src/handlers/http/logstream.rs diff --git a/server/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs similarity index 100% rename from server/src/handlers/http/middleware.rs rename to src/handlers/http/middleware.rs diff --git a/server/src/handlers/http.rs b/src/handlers/http/mod.rs similarity index 92% rename from server/src/handlers/http.rs rename to src/handlers/http/mod.rs index 2a2279800..6ccdaf3cc 100644 --- a/server/src/handlers/http.rs +++ b/src/handlers/http/mod.rs @@ -25,28 +25,28 @@ use crate::option::CONFIG; use self::{cluster::get_ingestor_info, query::Query}; -pub(crate) mod about; -mod cache; +pub mod about; +pub mod cache; pub mod cluster; -pub(crate) mod health_check; -pub(crate) mod ingest; +pub mod health_check; +pub mod ingest; mod kinesis; -pub(crate) mod llm; -pub(crate) mod logstream; -pub(crate) mod middleware; +pub mod llm; +pub mod logstream; +pub mod middleware; pub mod modal; -pub(crate) mod oidc; +pub mod oidc; mod otel; -pub(crate) mod query; -pub(crate) mod rbac; -pub(crate) mod role; -pub(crate) mod trino; +pub mod query; +pub mod rbac; +pub mod role; +pub mod trino; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; -pub(crate) fn base_path() -> String { +pub fn base_path() -> String { format!("/{API_BASE_PATH}/{API_VERSION}") } diff --git a/server/src/handlers/http/modal/ingest/ingestor_ingest.rs b/src/handlers/http/modal/ingest/ingestor_ingest.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_ingest.rs rename to src/handlers/http/modal/ingest/ingestor_ingest.rs diff --git a/server/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs similarity index 99% rename from server/src/handlers/http/modal/ingest/ingestor_logstream.rs rename to src/handlers/http/modal/ingest/ingestor_logstream.rs index 88ad68765..16711ebbb 100644 --- a/server/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -115,7 +115,7 @@ pub async fn put_enable_cache( } metadata::STREAM_INFO .upsert_stream_info( - &*storage, + storage.as_ref(), LogStream { name: stream_name.clone().to_owned(), }, diff --git a/server/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_rbac.rs rename to src/handlers/http/modal/ingest/ingestor_rbac.rs diff --git a/server/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/ingestor_role.rs rename to src/handlers/http/modal/ingest/ingestor_role.rs diff --git a/server/src/handlers/http/modal/ingest/mod.rs b/src/handlers/http/modal/ingest/mod.rs similarity index 100% rename from server/src/handlers/http/modal/ingest/mod.rs rename to src/handlers/http/modal/ingest/mod.rs diff --git a/server/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs similarity index 77% rename from server/src/handlers/http/modal/ingest_server.rs rename to src/handlers/http/modal/ingest_server.rs index 1e0e9dd21..0e1226b83 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -15,10 +15,16 @@ * along with this program. If not, see . * */ + +use super::ingest::ingestor_logstream; +use super::ingest::ingestor_rbac; +use super::ingest::ingestor_role; +use super::server::Server; +use super::IngestorMetadata; +use super::OpenIdClient; +use super::ParseableServer; use crate::analytics; -use crate::banner; use crate::handlers::airplane; -use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::DisAllowRootUser; @@ -28,9 +34,7 @@ use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; -use crate::rbac; use crate::rbac::role::Action; -use crate::storage; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; use crate::storage::staging; @@ -38,27 +42,11 @@ use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; -use std::sync::Arc; - -use super::ingest::ingestor_logstream; -use super::ingest::ingestor_rbac; -use super::ingest::ingestor_role; -use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; -use super::IngestorMetadata; -use super::OpenIdClient; -use super::ParseableServer; - -use crate::{ - handlers::http::{base_path, cross_origin_config}, - option::CONFIG, -}; +use crate::{handlers::http::base_path, option::CONFIG}; use actix_web::body::MessageBody; -use actix_web::middleware::from_fn; +use actix_web::web; use actix_web::web::resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; @@ -66,142 +54,15 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; -use tokio::sync::{oneshot, Mutex}; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json")); -#[derive(Default)] pub struct IngestServer; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for IngestServer { - // we dont need oidc client here its just here to satisfy the trait - async fn start( - &self, - prometheus: PrometheusMetrics, - _oidc_client: Option, - ) -> anyhow::Result<()> { - // set the ingestor metadata - self.set_ingestor_metadata().await?; - - // get the ssl stuff - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - // fn that creates the app - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| IngestServer::configure_routes(config, None)) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(60); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implement the init method will just invoke the initialize method - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - - // check for querier state. Is it there, or was it there in the past - let parseable_json = self.check_querier_state().await?; - // to get the .parseable.json file in staging - self.validate_credentials().await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - // set the info in the global metadata - metadata.set_global(); - self.initialize().await - } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::Error::msg( - // Error Message can be better - "Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } -} - -impl IngestServer { // configure the api routes fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option) { config @@ -221,6 +82,83 @@ impl IngestServer { .service(Server::get_ingest_otel_factory()); } + async fn load_metadata(&self) -> anyhow::Result> { + // parseable can't use local storage for persistence when running a distributed setup + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::Error::msg( + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", + )); + } + + // check for querier state. Is it there, or was it there in the past + let parseable_json = self.check_querier_state().await?; + // to get the .parseable.json file in staging + self.validate_credentials().await?; + + Ok(parseable_json) + } + + /// configure the server and start an instance to ingest data + async fn init(&self) -> anyhow::Result<()> { + // ! Undefined and Untested behaviour + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + + // set the ingestor metadata + self.set_ingestor_metadata().await?; + + // Ingestors shouldn't have to deal with OpenId auth flow + let app = self.start(prometheus, None); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} + +impl IngestServer { fn analytics_factory() -> Scope { web::scope("/analytics").service( // GET "/analytics" ==> Get analytics data @@ -459,58 +397,4 @@ impl IngestServer { Ok(()) } - - async fn initialize(&self) -> anyhow::Result<()> { - // ! Undefined and Untested behaviour - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs new file mode 100644 index 000000000..57d5313f2 --- /dev/null +++ b/src/handlers/http/modal/mod.rs @@ -0,0 +1,268 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod ingest; +pub mod ingest_server; +pub mod query; +pub mod query_server; +pub mod server; +pub mod ssl_acceptor; +pub mod utils; + +use std::sync::Arc; + +use actix_web::middleware::from_fn; +use actix_web::web::ServiceConfig; +use actix_web::App; +use actix_web::HttpServer; +use actix_web_prometheus::PrometheusMetrics; +use async_trait::async_trait; +use base64::Engine; +use bytes::Bytes; +use openid::Discovered; +use serde::Deserialize; +use serde::Serialize; +use ssl_acceptor::get_ssl_acceptor; +use tokio::sync::{oneshot, Mutex}; + +use super::cross_origin_config; +use super::API_BASE_PATH; +use super::API_VERSION; +use crate::handlers::http::health_check; +use crate::oidc; +use crate::option::CONFIG; + +pub type OpenIdClient = Arc>; + +// to be decided on what the Default version should be +pub const DEFAULT_VERSION: &str = "v3"; + +include!(concat!(env!("OUT_DIR"), "/generated.rs")); + +#[async_trait] +pub trait ParseableServer { + /// configure the router + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) + where + Self: Sized; + + /// load metadata/configuration from persistence for previous sessions of parseable + async fn load_metadata(&self) -> anyhow::Result>; + + /// code that describes starting and setup procedures for each type of server + async fn init(&self) -> anyhow::Result<()>; + + /// configure the server + async fn start( + &self, + prometheus: PrometheusMetrics, + oidc_client: Option, + ) -> anyhow::Result<()> + where + Self: Sized, + { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } + + None => None, + }; + + // get the ssl stuff + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + &CONFIG.parseable.trusted_ca_certs_path, + )?; + + // fn that creates the app + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| Self::configure_routes(config, oidc_client.clone())) + .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); + + // Spawn the signal handler task + let signal_task = tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + println!("Received shutdown signal, notifying server to shut down..."); + }); + + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + } else { + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + let sync_task = tokio::spawn(async move { + // Wait for the shutdown signal + let _ = shutdown_rx.await; + + // Perform S3 sync and wait for completion + log::info!("Starting data sync to S3..."); + if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } else { + log::info!("Successfully synced all data to S3."); + } + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the HTTP server to run + let server_result = srv.await; + + // Await the signal handler to ensure proper cleanup + if let Err(e) = signal_task.await { + log::error!("Error in signal handler: {:?}", e); + } + + // Wait for the sync task to complete before exiting + if let Err(e) = sync_task.await { + log::error!("Error in sync task: {:?}", e); + } else { + log::info!("Sync task completed successfully."); + } + + // Return the result of the server + server_result?; + + Ok(()) + } +} + +#[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq)] +pub struct IngestorMetadata { + pub version: String, + pub port: String, + pub domain_name: String, + pub bucket_name: String, + pub token: String, + pub ingestor_id: String, + pub flight_port: String, +} + +impl IngestorMetadata { + #[allow(clippy::too_many_arguments)] + pub fn new( + port: String, + domain_name: String, + version: String, + bucket_name: String, + username: &str, + password: &str, + ingestor_id: String, + flight_port: String, + ) -> Self { + let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); + + let token = format!("Basic {}", token); + + Self { + port, + domain_name, + version, + bucket_name, + token, + ingestor_id, + flight_port, + } + } + + pub fn get_ingestor_id(&self) -> String { + self.ingestor_id.clone() + } +} + +#[cfg(test)] +mod test { + use actix_web::body::MessageBody; + use rstest::rstest; + + use super::{IngestorMetadata, DEFAULT_VERSION}; + + #[rstest] + fn test_deserialize_resource() { + let lhs: IngestorMetadata = IngestorMetadata::new( + "8000".to_string(), + "https://localhost:8000".to_string(), + DEFAULT_VERSION.to_string(), + "somebucket".to_string(), + "admin", + "admin", + "ingestor_id".to_string(), + "8002".to_string(), + ); + + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap(); + + assert_eq!(rhs, lhs); + } + + #[rstest] + fn test_serialize_resource() { + let im = IngestorMetadata::new( + "8000".to_string(), + "https://localhost:8000".to_string(), + DEFAULT_VERSION.to_string(), + "somebucket".to_string(), + "admin", + "admin", + "ingestor_id".to_string(), + "8002".to_string(), + ); + + let lhs = serde_json::to_string(&im) + .unwrap() + .try_into_bytes() + .unwrap(); + let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"# + .try_into_bytes() + .unwrap(); + + assert_eq!(lhs, rhs); + } +} diff --git a/server/src/handlers/http/modal/query/mod.rs b/src/handlers/http/modal/query/mod.rs similarity index 100% rename from server/src/handlers/http/modal/query/mod.rs rename to src/handlers/http/modal/query/mod.rs diff --git a/server/src/handlers/http/modal/query/querier_ingest.rs b/src/handlers/http/modal/query/querier_ingest.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_ingest.rs rename to src/handlers/http/modal/query/querier_ingest.rs diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_logstream.rs rename to src/handlers/http/modal/query/querier_logstream.rs diff --git a/server/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_rbac.rs rename to src/handlers/http/modal/query/querier_rbac.rs diff --git a/server/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs similarity index 100% rename from server/src/handlers/http/modal/query/querier_role.rs rename to src/handlers/http/modal/query/querier_role.rs diff --git a/server/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs similarity index 78% rename from server/src/handlers/http/modal/query_server.rs rename to src/handlers/http/modal/query_server.rs index 302bd977e..ffde5fb11 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -17,171 +17,38 @@ */ use crate::handlers::airplane; +use crate::handlers::http::base_path; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{self, role}; -use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; -use crate::handlers::http::{health_check, logstream, MAX_EVENT_PAYLOAD_SIZE}; +use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use crate::{analytics, banner, metrics, migration, rbac, storage}; -use actix_web::middleware::from_fn; +use crate::{analytics, metrics, migration, storage}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; -use actix_web::{App, HttpServer}; use async_trait::async_trait; -use std::sync::Arc; -use tokio::sync::{oneshot, Mutex}; +use bytes::Bytes; -use crate::option::CONFIG; +use crate::{option::CONFIG, ParseableServer}; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; -use super::{OpenIdClient, ParseableServer}; +use super::OpenIdClient; -#[derive(Default, Debug)] pub struct QueryServer; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for QueryServer { - async fn start( - &self, - prometheus: actix_web_prometheus::PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - - None => None, - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(120); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - // initialize the rbac map - rbac::map::init(&metadata); - // keep metadata info in mem - metadata.set_global(); - self.initialize().await - } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::anyhow!( - "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } -} - -impl QueryServer { // configure the api routes fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { config .service( web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body .service(Server::get_query_factory()) .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) @@ -201,6 +68,91 @@ impl QueryServer { .service(Server::get_generated()); } + async fn load_metadata(&self) -> anyhow::Result> { + // parseable can't use local storage for persistence when running a distributed setup + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::anyhow!( + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", + )); + } + + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + + Ok(parseable_json) + } + + /// initialize the server, run migrations as needed and start an instance + async fn init(&self) -> anyhow::Result<()> { + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + //create internal stream at server start + create_internal_stream_if_not_exists().await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.put_internal_stream_hot_tier().await?; + hot_tier_manager.download_from_s3()?; + }; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining localsync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} + +impl QueryServer { // get the role webscope fn get_user_role_webscope() -> Scope { web::scope("/role") @@ -439,72 +391,4 @@ impl QueryServer { ), ) } - - /// initialize the server, run migrations as needed and start the server - async fn initialize(&self) -> anyhow::Result<()> { - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - //create internal stream at server start - create_internal_stream_if_not_exists().await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); - - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - if matches!(init_cluster_metrics_schedular(), Ok(())) { - log::info!("Cluster metrics scheduler started successfully"); - } - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.put_internal_stream_hot_tier().await?; - hot_tier_manager.download_from_s3()?; - }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining localsync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/server/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs similarity index 83% rename from server/src/handlers/http/modal/server.rs rename to src/handlers/http/modal/server.rs index ba1ab055c..48e931619 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -17,7 +17,6 @@ */ use crate::analytics; -use crate::banner; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; @@ -27,32 +26,26 @@ use crate::handlers::http::query; use crate::handlers::http::trino; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; -use crate::handlers::http::API_BASE_PATH; -use crate::handlers::http::API_VERSION; use crate::hottier::HotTierManager; use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; -use crate::rbac; use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use actix_web::middleware::from_fn; -use std::sync::Arc; -use tokio::sync::{oneshot, Mutex}; +use actix_web::web; use actix_web::web::resource; use actix_web::Resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; +use bytes::Bytes; use crate::{ handlers::http::{ - self, cross_origin_config, ingest, llm, logstream, + self, ingest, llm, logstream, middleware::{DisAllowRootUser, RouteExt}, oidc, role, MAX_EVENT_PAYLOAD_SIZE, }, @@ -62,139 +55,18 @@ use crate::{ // use super::generate; use super::generate; -use super::ssl_acceptor::get_ssl_acceptor; use super::OpenIdClient; use super::ParseableServer; -#[derive(Default)] + pub struct Server; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for Server { - async fn start( - &self, - prometheus: PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - None => None, - }; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(60); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - self.initialize().await?; - Ok(()) - } - - fn validate(&self) -> anyhow::Result<()> { - Ok(()) - } -} - -impl Server { fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { // there might be a bug in the configure routes method config .service( web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body .service(Self::get_query_factory()) .service(Self::get_trino_factory()) .service(Self::get_cache_webscope()) @@ -215,6 +87,85 @@ impl Server { .service(Self::get_generated()); } + async fn load_metadata(&self) -> anyhow::Result> { + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + + Ok(parseable_json) + } + + // configure the server and start an instance of the single server setup + async fn init(&self) -> anyhow::Result<()> { + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + + storage::retention::load_retention_from_global(); + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.download_from_s3()?; + }; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); + + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} + +impl Server { // get the trino factory pub fn get_trino_factory() -> Resource { web::resource("/trinoquery") @@ -292,6 +243,7 @@ impl Server { } // get the query factory + // POST "/query" ==> Get results of the SQL query passed in request body pub fn get_query_factory() -> Resource { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) } @@ -591,72 +543,4 @@ impl Server { pub fn get_generated() -> ResourceFiles { ResourceFiles::new("/", generate()).resolve_not_found_to_root() } - - async fn initialize(&self) -> anyhow::Result<()> { - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - - storage::retention::load_retention_from_global(); - - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.download_from_s3()?; - }; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - tokio::spawn(handlers::livetail::server()); - tokio::spawn(handlers::airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/server/src/handlers/http/modal/ssl_acceptor.rs b/src/handlers/http/modal/ssl_acceptor.rs similarity index 100% rename from server/src/handlers/http/modal/ssl_acceptor.rs rename to src/handlers/http/modal/ssl_acceptor.rs diff --git a/server/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs similarity index 100% rename from server/src/handlers/http/modal/utils/ingest_utils.rs rename to src/handlers/http/modal/utils/ingest_utils.rs diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs similarity index 100% rename from server/src/handlers/http/modal/utils/logstream_utils.rs rename to src/handlers/http/modal/utils/logstream_utils.rs diff --git a/server/src/handlers/http/modal/utils/mod.rs b/src/handlers/http/modal/utils/mod.rs similarity index 100% rename from server/src/handlers/http/modal/utils/mod.rs rename to src/handlers/http/modal/utils/mod.rs diff --git a/server/src/handlers/http/modal/utils/rbac_utils.rs b/src/handlers/http/modal/utils/rbac_utils.rs similarity index 100% rename from server/src/handlers/http/modal/utils/rbac_utils.rs rename to src/handlers/http/modal/utils/rbac_utils.rs diff --git a/server/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs similarity index 100% rename from server/src/handlers/http/oidc.rs rename to src/handlers/http/oidc.rs diff --git a/server/src/handlers/http/otel.rs b/src/handlers/http/otel.rs similarity index 100% rename from server/src/handlers/http/otel.rs rename to src/handlers/http/otel.rs diff --git a/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs b/src/handlers/http/otel/opentelemetry.proto.common.v1.rs similarity index 100% rename from server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs rename to src/handlers/http/otel/opentelemetry.proto.common.v1.rs diff --git a/server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs b/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs similarity index 100% rename from server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs rename to src/handlers/http/otel/opentelemetry.proto.logs.v1.rs diff --git a/server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs b/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs similarity index 100% rename from server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs rename to src/handlers/http/otel/opentelemetry.proto.resource.v1.rs diff --git a/server/src/handlers/http/otel/opentelemetry/proto/README.md b/src/handlers/http/otel/opentelemetry/proto/README.md similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/README.md rename to src/handlers/http/otel/opentelemetry/proto/README.md diff --git a/server/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto b/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto rename to src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto diff --git a/server/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto b/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto rename to src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto diff --git a/server/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto b/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto similarity index 100% rename from server/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto rename to src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto diff --git a/server/src/handlers/http/otel/proto.rs b/src/handlers/http/otel/proto.rs similarity index 100% rename from server/src/handlers/http/otel/proto.rs rename to src/handlers/http/otel/proto.rs diff --git a/server/src/handlers/http/query.rs b/src/handlers/http/query.rs similarity index 100% rename from server/src/handlers/http/query.rs rename to src/handlers/http/query.rs diff --git a/server/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs similarity index 100% rename from server/src/handlers/http/rbac.rs rename to src/handlers/http/rbac.rs diff --git a/server/src/handlers/http/role.rs b/src/handlers/http/role.rs similarity index 100% rename from server/src/handlers/http/role.rs rename to src/handlers/http/role.rs diff --git a/server/src/handlers/http/trino.rs b/src/handlers/http/trino.rs similarity index 100% rename from server/src/handlers/http/trino.rs rename to src/handlers/http/trino.rs diff --git a/server/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs similarity index 100% rename from server/src/handlers/http/users/dashboards.rs rename to src/handlers/http/users/dashboards.rs diff --git a/server/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs similarity index 100% rename from server/src/handlers/http/users/filters.rs rename to src/handlers/http/users/filters.rs diff --git a/server/src/handlers/http/users/mod.rs b/src/handlers/http/users/mod.rs similarity index 100% rename from server/src/handlers/http/users/mod.rs rename to src/handlers/http/users/mod.rs diff --git a/server/src/handlers/livetail.rs b/src/handlers/livetail.rs similarity index 100% rename from server/src/handlers/livetail.rs rename to src/handlers/livetail.rs diff --git a/server/src/handlers.rs b/src/handlers/mod.rs similarity index 100% rename from server/src/handlers.rs rename to src/handlers/mod.rs diff --git a/server/src/hottier.rs b/src/hottier.rs similarity index 99% rename from server/src/hottier.rs rename to src/hottier.rs index 0528a1228..b6f29f609 100644 --- a/server/src/hottier.rs +++ b/src/hottier.rs @@ -289,7 +289,7 @@ impl HotTierManager { stream: &str, manifest_files_to_download: &mut BTreeMap>, parquet_file_size: &mut u64, - object_store: Arc, + object_store: Arc, ) -> Result<(), HotTierError> { if manifest_files_to_download.is_empty() { return Ok(()); diff --git a/server/src/main.rs b/src/lib.rs similarity index 55% rename from server/src/main.rs rename to src/lib.rs index 1a8562160..140c32dcc 100644 --- a/server/src/main.rs +++ b/src/lib.rs @@ -18,60 +18,34 @@ mod about; mod alerts; -mod analytics; -mod banner; +pub mod analytics; +pub mod banner; mod catalog; mod cli; mod event; -mod handlers; -mod hottier; +pub mod handlers; +pub mod hottier; mod livetail; -mod localcache; +pub mod localcache; mod metadata; -mod metrics; -mod migration; +pub mod metrics; +pub mod migration; mod oidc; -mod option; +pub mod option; mod query; mod querycache; -mod rbac; +pub mod rbac; mod response; mod static_schema; mod stats; -mod storage; -mod sync; -mod users; +pub mod storage; +pub mod sync; +pub mod users; mod utils; mod validator; -use std::sync::Arc; - -use handlers::http::modal::ParseableServer; -use option::{Mode, CONFIG}; -use tracing_subscriber::EnvFilter; - -use crate::handlers::http::modal::{ - ingest_server::IngestServer, query_server::QueryServer, server::Server, +pub use handlers::http::modal::{ + ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, }; -pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; - -#[actix_web::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .init(); - - // these are empty ptrs so mem footprint should be minimal - let server: Arc = match CONFIG.parseable.mode { - Mode::Query => Arc::new(QueryServer), - Mode::Ingest => Arc::new(IngestServer), - - Mode::All => Arc::new(Server), - }; - - server.init().await?; - - Ok(()) -} +pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; diff --git a/server/src/livetail.rs b/src/livetail.rs similarity index 100% rename from server/src/livetail.rs rename to src/livetail.rs diff --git a/server/src/localcache.rs b/src/localcache.rs similarity index 100% rename from server/src/localcache.rs rename to src/localcache.rs diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 000000000..9399c52e2 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,48 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use parseable::{ + banner, + option::{Mode, CONFIG}, + rbac, storage, IngestServer, ParseableServer, QueryServer, Server, +}; + +#[actix_web::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // these are empty ptrs so mem footprint should be minimal + let server: Box = match CONFIG.parseable.mode { + Mode::Query => Box::new(QueryServer), + Mode::Ingest => Box::new(IngestServer), + Mode::All => Box::new(Server), + }; + + // load metadata from persistence + let parseable_json = server.load_metadata().await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + // initialize the rbac map + rbac::map::init(&metadata); + // keep metadata info in mem + metadata.set_global(); + + server.init().await?; + + Ok(()) +} diff --git a/server/src/metadata.rs b/src/metadata.rs similarity index 100% rename from server/src/metadata.rs rename to src/metadata.rs diff --git a/server/src/metrics/mod.rs b/src/metrics/mod.rs similarity index 100% rename from server/src/metrics/mod.rs rename to src/metrics/mod.rs diff --git a/server/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs similarity index 100% rename from server/src/metrics/prom_utils.rs rename to src/metrics/prom_utils.rs diff --git a/server/src/metrics/storage.rs b/src/metrics/storage.rs similarity index 100% rename from server/src/metrics/storage.rs rename to src/metrics/storage.rs diff --git a/server/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs similarity index 100% rename from server/src/migration/metadata_migration.rs rename to src/migration/metadata_migration.rs diff --git a/server/src/migration.rs b/src/migration/mod.rs similarity index 98% rename from server/src/migration.rs rename to src/migration/mod.rs index c57eafcff..53ab5a511 100644 --- a/server/src/migration.rs +++ b/src/migration/mod.rs @@ -357,7 +357,7 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { } async fn run_meta_file_migration( - object_store: &Arc, + object_store: &Arc, old_meta_file_path: RelativePathBuf, ) -> anyhow::Result<()> { // get the list of all meta files @@ -388,9 +388,7 @@ async fn run_meta_file_migration( Ok(()) } -async fn run_stream_files_migration( - object_store: &Arc, -) -> anyhow::Result<()> { +async fn run_stream_files_migration(object_store: &Arc) -> anyhow::Result<()> { let streams = object_store .list_old_streams() .await? diff --git a/server/src/migration/schema_migration.rs b/src/migration/schema_migration.rs similarity index 100% rename from server/src/migration/schema_migration.rs rename to src/migration/schema_migration.rs diff --git a/server/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs similarity index 100% rename from server/src/migration/stream_metadata_migration.rs rename to src/migration/stream_metadata_migration.rs diff --git a/server/src/oidc.rs b/src/oidc.rs similarity index 100% rename from server/src/oidc.rs rename to src/oidc.rs diff --git a/server/src/option.rs b/src/option.rs similarity index 99% rename from server/src/option.rs rename to src/option.rs index 73b701c6e..fb9f3c75c 100644 --- a/server/src/option.rs +++ b/src/option.rs @@ -39,7 +39,7 @@ pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] pub struct Config { pub parseable: Cli, - storage: Arc, + storage: Arc, pub storage_name: &'static str, } @@ -170,7 +170,7 @@ Cloud Native, log analytics platform for modern applications."#, Err(ObjectStorageError::Custom(format!("Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))) } - pub fn storage(&self) -> Arc { + pub fn storage(&self) -> Arc { self.storage.clone() } diff --git a/server/src/query/filter_optimizer.rs b/src/query/filter_optimizer.rs similarity index 100% rename from server/src/query/filter_optimizer.rs rename to src/query/filter_optimizer.rs diff --git a/server/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs similarity index 99% rename from server/src/query/listing_table_builder.rs rename to src/query/listing_table_builder.rs index 71a61998c..685a34a4b 100644 --- a/server/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -56,7 +56,7 @@ impl ListingTableBuilder { pub async fn populate_via_listing( self, - storage: Arc, + storage: Arc, client: Arc, time_filters: &[PartialTimeFilter], ) -> Result { diff --git a/server/src/query.rs b/src/query/mod.rs similarity index 99% rename from server/src/query.rs rename to src/query/mod.rs index 24dc39d58..b41a066f8 100644 --- a/server/src/query.rs +++ b/src/query/mod.rs @@ -61,9 +61,7 @@ pub struct Query { impl Query { // create session context for this query - pub fn create_session_context( - storage: Arc, - ) -> SessionContext { + pub fn create_session_context(storage: Arc) -> SessionContext { let runtime_config = storage .get_datafusion_runtime() .with_disk_manager(DiskManagerConfig::NewOs); diff --git a/server/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs similarity index 99% rename from server/src/query/stream_schema_provider.rs rename to src/query/stream_schema_provider.rs index 6f1ceecf4..f27cb6998 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -18,7 +18,7 @@ use crate::catalog::manifest::File; use crate::hottier::HotTierManager; -use crate::Mode; +use crate::option::Mode; use crate::{ catalog::snapshot::{self, Snapshot}, storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, @@ -75,7 +75,7 @@ use crate::catalog::Snapshot as CatalogSnapshot; // schema provider for stream based on global data pub struct GlobalSchemaProvider { - pub storage: Arc, + pub storage: Arc, } #[async_trait::async_trait] @@ -614,7 +614,7 @@ async fn get_hottier_exectuion_plan( #[allow(clippy::too_many_arguments)] async fn legacy_listing_table( stream: String, - glob_storage: Arc, + glob_storage: Arc, object_store: Arc, time_filters: &[PartialTimeFilter], schema: Arc, diff --git a/server/src/querycache.rs b/src/querycache.rs similarity index 100% rename from server/src/querycache.rs rename to src/querycache.rs diff --git a/server/src/rbac/map.rs b/src/rbac/map.rs similarity index 100% rename from server/src/rbac/map.rs rename to src/rbac/map.rs diff --git a/server/src/rbac.rs b/src/rbac/mod.rs similarity index 100% rename from server/src/rbac.rs rename to src/rbac/mod.rs diff --git a/server/src/rbac/role.rs b/src/rbac/role.rs similarity index 100% rename from server/src/rbac/role.rs rename to src/rbac/role.rs diff --git a/server/src/rbac/user.rs b/src/rbac/user.rs similarity index 100% rename from server/src/rbac/user.rs rename to src/rbac/user.rs diff --git a/server/src/response.rs b/src/response.rs similarity index 100% rename from server/src/response.rs rename to src/response.rs diff --git a/server/src/static_schema.rs b/src/static_schema.rs similarity index 100% rename from server/src/static_schema.rs rename to src/static_schema.rs diff --git a/server/src/stats.rs b/src/stats.rs similarity index 99% rename from server/src/stats.rs rename to src/stats.rs index b7845ecc9..52ffc3b24 100644 --- a/server/src/stats.rs +++ b/src/stats.rs @@ -100,7 +100,7 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option, + storage: Arc, stream_name: &str, meta: ObjectStoreFormat, dates: Vec, diff --git a/server/src/storage/azure_blob.rs b/src/storage/azure_blob.rs similarity index 99% rename from server/src/storage/azure_blob.rs rename to src/storage/azure_blob.rs index c5491be6f..d50e2d901 100644 --- a/server/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -163,7 +163,7 @@ impl ObjectStorageProvider for AzureBlobConfig { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); diff --git a/server/src/storage/localfs.rs b/src/storage/localfs.rs similarity index 99% rename from server/src/storage/localfs.rs rename to src/storage/localfs.rs index a84247f0b..b3d3e09cd 100644 --- a/server/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -67,7 +67,7 @@ impl ObjectStorageProvider for FSConfig { RuntimeConfig::new() } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { Arc::new(LocalFS::new(self.root.clone())) } diff --git a/server/src/storage/metrics_layer.rs b/src/storage/metrics_layer.rs similarity index 100% rename from server/src/storage/metrics_layer.rs rename to src/storage/metrics_layer.rs diff --git a/server/src/storage.rs b/src/storage/mod.rs similarity index 100% rename from server/src/storage.rs rename to src/storage/mod.rs diff --git a/server/src/storage/object_storage.rs b/src/storage/object_storage.rs similarity index 99% rename from server/src/storage/object_storage.rs rename to src/storage/object_storage.rs index ff2a56953..78f51685d 100644 --- a/server/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -58,15 +58,15 @@ use std::{ time::{Duration, Instant}, }; -pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { +pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeConfig; - fn get_object_store(&self) -> Arc; + fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } #[async_trait] -pub trait ObjectStorage: Sync + 'static { +pub trait ObjectStorage: Send + Sync + 'static { async fn get_object(&self, path: &RelativePath) -> Result; // TODO: make the filter function optional as we may want to get all objects async fn get_objects( diff --git a/server/src/storage/retention.rs b/src/storage/retention.rs similarity index 100% rename from server/src/storage/retention.rs rename to src/storage/retention.rs diff --git a/server/src/storage/s3.rs b/src/storage/s3.rs similarity index 99% rename from server/src/storage/s3.rs rename to src/storage/s3.rs index 6a546a148..89f5b361d 100644 --- a/server/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -289,7 +289,7 @@ impl ObjectStorageProvider for S3Config { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit diff --git a/server/src/storage/staging.rs b/src/storage/staging.rs similarity index 100% rename from server/src/storage/staging.rs rename to src/storage/staging.rs diff --git a/server/src/storage/store_metadata.rs b/src/storage/store_metadata.rs similarity index 96% rename from server/src/storage/store_metadata.rs rename to src/storage/store_metadata.rs index 54735ab71..dac0d26be 100644 --- a/server/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -65,8 +65,8 @@ pub struct StorageMetadata { pub default_role: Option, } -impl StorageMetadata { - pub fn new() -> Self { +impl Default for StorageMetadata { + fn default() -> Self { Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: CONFIG.storage_name.to_owned(), @@ -80,6 +80,9 @@ impl StorageMetadata { default_role: None, } } +} + +impl StorageMetadata { pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() @@ -103,13 +106,10 @@ pub async fn resolve_parseable_metadata( parseable_metadata: &Option, ) -> Result { let staging_metadata = get_staging_metadata()?; - let mut remote_metadata: Option = None; - if parseable_metadata.is_some() { - remote_metadata = Some( - serde_json::from_slice(parseable_metadata.as_ref().unwrap()) - .expect("parseable config is valid json"), - ); - } + let remote_metadata = parseable_metadata + .as_ref() + .map(|meta| serde_json::from_slice(meta).expect("parseable config is valid json")); + // Env Change needs to be updated let check = determine_environment(staging_metadata, remote_metadata); // flags for if metadata needs to be synced @@ -169,7 +169,7 @@ pub async fn resolve_parseable_metadata( } EnvChange::CreateBoth => { create_dir_all(CONFIG.staging_dir())?; - let metadata = StorageMetadata::new(); + let metadata = StorageMetadata::default(); // new metadata needs to be set // if mode is query or all then both staging and remote match CONFIG.parseable.mode { diff --git a/server/src/sync.rs b/src/sync.rs similarity index 100% rename from server/src/sync.rs rename to src/sync.rs diff --git a/server/src/users/dashboards.rs b/src/users/dashboards.rs similarity index 100% rename from server/src/users/dashboards.rs rename to src/users/dashboards.rs diff --git a/server/src/users/filters.rs b/src/users/filters.rs similarity index 100% rename from server/src/users/filters.rs rename to src/users/filters.rs diff --git a/server/src/users/mod.rs b/src/users/mod.rs similarity index 100% rename from server/src/users/mod.rs rename to src/users/mod.rs diff --git a/server/src/utils/actix.rs b/src/utils/actix.rs similarity index 100% rename from server/src/utils/actix.rs rename to src/utils/actix.rs diff --git a/server/src/utils/arrow/batch_adapter.rs b/src/utils/arrow/batch_adapter.rs similarity index 100% rename from server/src/utils/arrow/batch_adapter.rs rename to src/utils/arrow/batch_adapter.rs diff --git a/server/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs similarity index 100% rename from server/src/utils/arrow/flight.rs rename to src/utils/arrow/flight.rs diff --git a/server/src/utils/arrow/merged_reader.rs b/src/utils/arrow/merged_reader.rs similarity index 100% rename from server/src/utils/arrow/merged_reader.rs rename to src/utils/arrow/merged_reader.rs diff --git a/server/src/utils/arrow.rs b/src/utils/arrow/mod.rs similarity index 98% rename from server/src/utils/arrow.rs rename to src/utils/arrow/mod.rs index f88be6048..87af65735 100644 --- a/server/src/utils/arrow.rs +++ b/src/utils/arrow/mod.rs @@ -34,8 +34,9 @@ pub use merged_reader::MergedRecordReader; use serde_json::{Map, Value}; /// example function for concat recordbatch(may not work) -/// use arrow::record_batch::RecordBatch; -/// use arrow::error::Result; +/// ```rust +/// # use arrow::record_batch::RecordBatch; +/// # use arrow::error::Result; /// /// fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result { /// let schema = batch1.schema(); @@ -53,6 +54,7 @@ use serde_json::{Map, Value}; /// /// RecordBatch::try_new(schema.clone(), columns) /// } +/// ``` /// /// Replaces columns in a record batch with new arrays. diff --git a/server/src/utils/arrow/reverse_reader.rs b/src/utils/arrow/reverse_reader.rs similarity index 100% rename from server/src/utils/arrow/reverse_reader.rs rename to src/utils/arrow/reverse_reader.rs diff --git a/server/src/utils/header_parsing.rs b/src/utils/header_parsing.rs similarity index 100% rename from server/src/utils/header_parsing.rs rename to src/utils/header_parsing.rs diff --git a/server/src/utils/json/flatten.rs b/src/utils/json/flatten.rs similarity index 100% rename from server/src/utils/json/flatten.rs rename to src/utils/json/flatten.rs diff --git a/server/src/utils/json.rs b/src/utils/json/mod.rs similarity index 100% rename from server/src/utils/json.rs rename to src/utils/json/mod.rs diff --git a/server/src/utils.rs b/src/utils/mod.rs similarity index 100% rename from server/src/utils.rs rename to src/utils/mod.rs diff --git a/server/src/utils/uid.rs b/src/utils/uid.rs similarity index 100% rename from server/src/utils/uid.rs rename to src/utils/uid.rs diff --git a/server/src/utils/update.rs b/src/utils/update.rs similarity index 100% rename from server/src/utils/update.rs rename to src/utils/update.rs diff --git a/server/src/validator.rs b/src/validator.rs similarity index 100% rename from server/src/validator.rs rename to src/validator.rs