diff --git a/Cargo.lock b/Cargo.lock index 7f12ce97c..69f33c3e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2374,6 +2374,7 @@ checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown 0.14.5", + "serde", ] [[package]] @@ -3049,6 +3050,8 @@ dependencies = [ "uptime_lib", "ureq", "url", + "utoipa", + "utoipa-swagger-ui", "vergen", "xxhash-rust", "xz2", @@ -3531,6 +3534,7 @@ checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ "base64 0.22.0", "bytes", + "futures-channel", "futures-core", "futures-util", "h2 0.4.5", @@ -3612,6 +3616,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rust-embed" +version = "8.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa66af4a4fdd5e7ebc276f115e895611a34739a9c1c01028383d612d550953c0" +dependencies = [ + "rust-embed-impl", + "rust-embed-utils", + "walkdir", +] + +[[package]] +name = "rust-embed-impl" +version = "8.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6125dbc8867951125eec87294137f4e9c2c96566e61bf72c45095a7c77761478" +dependencies = [ + "proc-macro2", + "quote", + "rust-embed-utils", + "syn 2.0.60", + "walkdir", +] + +[[package]] +name = "rust-embed-utils" +version = "8.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e5347777e9aacb56039b0e1f28785929a8a3b709e87482e7442c72e7c12529d" +dependencies = [ + "sha2", + "walkdir", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -3887,6 +3925,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap 2.2.6", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4590,6 +4641,12 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" @@ -4642,6 +4699,50 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "utoipa" +version = "4.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5afb1a60e207dca502682537fefcfd9921e71d0b83e9576060f09abc6efab23" +dependencies = [ + "indexmap 2.2.6", + "serde", + "serde_json", + "serde_yaml", + "utoipa-gen", +] + +[[package]] +name = "utoipa-gen" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bf0e16c02bc4bf5322ab65f10ab1149bdbcaa782cba66dc7057370a3f8190be" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "regex", + "syn 2.0.60", +] + +[[package]] +name = "utoipa-swagger-ui" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943e0ff606c6d57d410fd5663a4d7c074ab2c5f14ab903b9514565e59fa1189e" +dependencies = [ + "actix-web", + "mime_guess", + "regex", + "reqwest 0.12.4", + "rust-embed", + "serde", + "serde_json", + "url", + "utoipa", + "zip", +] + [[package]] name = "uuid" version = "1.8.0" diff --git a/openapi.yaml b/openapi.yaml new file mode 100644 index 000000000..89893214c --- /dev/null +++ b/openapi.yaml @@ -0,0 +1,1081 @@ +openapi: 3.0.3 +info: + title: parseable + description: Parseable API documents [https://www.parseable.com/docs/](https://www.parseable.com/docs/) + contact: + name: Parseable + url: https://www.parseable.com/ + email: hi@parseable.com + license: + name: '' + version: 1.4.0 +paths: + /api/v1/ingest: + post: + tags: + - Log Stream Ingestion + operationId: Ingest with HTTP Header + parameters: + - name: X-P-META-{someTag} + in: header + description: (Optional) Creates a meta tag with the given name + required: true + schema: + type: string + - name: X-P-Stream + in: header + description: (Mandatory) Name of stream + required: true + schema: + type: string + requestBody: + description: Log events + content: + application/json: + schema: + type: array + items: + type: object + example: "[\n {key: value}\n ]\n " + required: true + responses: + '200': + description: Ingested event + content: + application/json: + schema: + type: array + items: + type: string + '400': + description: Error + '404': + description: Stream not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/liveness: + get: + tags: + - Health Status + operationId: liveness + responses: + '200': + description: The server is live. + /api/v1/logstream/{streamName}: + post: + tags: + - Log Stream Ingestion + operationId: Send logs to log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + - name: X-P-Time-Partition + in: header + description: (Optional) Identifies a field from the logs as the time partition field (a field `p_timestamp` is created and used otherwise + required: true + schema: + type: string + example: DateTime + - name: X-P-Time-Partition-Limit + in: header + description: (Mandatory if `X-P-Time-Partition` is being used) The number of days, prior to log creation time, till which data from the given source is to be ingested. 30 days by default + required: true + schema: + type: string + example: 50d + requestBody: + description: Log events + content: + application/json: + schema: + type: array + items: + type: object + required: true + responses: + '200': + description: Ingested event + content: + application/json: + schema: + type: array + items: + type: string + '400': + description: Error + '404': + description: Stream not found + '500': + description: Failure + security: + - basic_auth: [] + put: + tags: + - Log Stream Management + operationId: Create a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + requestBody: + description: '' + content: + application/octet-stream: + schema: + type: string + format: binary + required: true + responses: + '200': + description: Created new stream + content: + application/json: + schema: + type: array + items: + type: string + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/alert: + get: + tags: + - Log Stream Alerts + operationId: Get alert for a stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Fetched alert for stream + content: + application/json: + schema: + type: object + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + put: + tags: + - Log Stream Alerts + operationId: Set alert for a stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + requestBody: + description: Alert to be set + content: + application/json: + schema: + $ref: '#/components/schemas/Alerts' + required: true + responses: + '200': + description: Put alert for stream + content: + application/json: + schema: + type: array + items: + type: string + '400': + description: Error + '404': + description: Stream not found + '405': + description: Log stream not initialized + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/cache: + get: + tags: + - Log Stream Management + operationId: Get cache state state for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: '' + content: + text/plain: + schema: + type: boolean + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + put: + tags: + - Log Stream Management + operationId: Enable cache for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + requestBody: + description: '' + content: + text/plain: + schema: + type: boolean + required: true + responses: + '200': + description: Enabled cache for stream + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/hottier: + get: + tags: + - Log Stream Hottier + operationId: Get hottier state for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Fetched hottier for stream + content: + application/json: + schema: + $ref: '#/components/schemas/StreamHotTier' + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + put: + tags: + - Log Stream Hottier + operationId: Enable hottier for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + requestBody: + description: '' + content: + application/json: + schema: {} + required: true + responses: + '200': + description: Enabled hottier for stream + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + delete: + tags: + - Log Stream Hottier + operationId: Delete hottier for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Deleted hottier for stream + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/info: + get: + tags: + - Log Stream Management + operationId: Get info for log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Stream info + content: + application/json: + schema: + $ref: '#/components/schemas/StreamInfo' + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/retention: + get: + tags: + - Log Stream Retention + operationId: Get retention for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Fetched retention for stream + content: + application/json: + schema: + $ref: '#/components/schemas/Retention' + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + put: + tags: + - Log Stream Retention + operationId: Set retention for a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + requestBody: + description: Retention details + content: + application/json: + schema: + $ref: '#/components/schemas/Retention' + required: true + responses: + '200': + description: Put retention for stream + content: + application/json: + schema: + $ref: '#/components/schemas/Retention' + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/schema: + get: + tags: + - Log Stream Management + operationId: Get schema of a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Fetched schema for stream + content: + application/json: + schema: + type: object + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/logstream/{streamName}/stats: + get: + tags: + - Log Stream Management + operationId: Get stats of a log stream + parameters: + - name: streamName + in: path + description: Name of stream + required: true + schema: + type: string + responses: + '200': + description: Fetched stats for stream + content: + application/json: + schema: + $ref: '#/components/schemas/QueriedStats' + '400': + description: Error + '404': + description: Stream not found + '405': + description: Method not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/query: + post: + tags: + - Log Stream Query + operationId: Query a Log Stream + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/Query' + required: true + responses: + '200': + description: Queries successfully + content: + application/json: + schema: + $ref: '#/components/schemas/QueryResponse' + '400': + description: Error + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/readiness: + get: + tags: + - Health Status + operationId: readiness + responses: + '200': + description: The object store is live. + '503': + description: Service Unavailable. + security: + - basic_auth: [] + /api/v1/role: + get: + tags: + - Role Management + operationId: List all roles + responses: + '200': + description: Fetches all roles in the system + content: + application/json: + schema: + type: object + '400': + description: Cannot perform this operation as role is assigned to an existing user. + '500': + description: 'Failed to connect to storage: 0' + security: + - basic_auth: [] + /api/v1/role/default: + get: + tags: + - Role Management + operationId: Get default role + responses: + '200': + description: Fetched the default role + content: + application/json: + schema: + type: object + '400': + description: Cannot perform this operation as role is assigned to an existing user. + '500': + description: 'Failed to connect to storage: 0' + security: + - basic_auth: [] + put: + tags: + - Role Management + operationId: Set defautl role + requestBody: + description: Name of the role + content: + application/json: + schema: + type: string + required: true + responses: + '200': + description: Created a default role with the given name + '400': + description: Cannot perform this operation as role is assigned to an existing user. + '500': + description: 'Failed to connect to storage: 0' + security: + - basic_auth: [] + /api/v1/role/{name}: + get: + tags: + - Role Management + operationId: Get a role + parameters: + - name: name + in: path + description: Name of the role to fetch + required: true + schema: + type: string + responses: + '200': + description: Fetched role + '400': + description: Cannot perform this operation as role is assigned to an existing user. + '500': + description: 'Failed to connect to storage: 0' + security: + - basic_auth: [] + put: + tags: + - Role Management + operationId: Create a role + parameters: + - name: name + in: path + description: Name of the role to create or update + required: true + schema: + type: string + requestBody: + description: Privilege and resource stream + content: + application/json: + schema: + type: array + items: + type: object + required: true + responses: + '200': + description: Created/updated the role + '400': + description: Cannot perform this operation as role is assigned to an existing user. + '500': + description: 'Failed to connect to storage: 0' + security: + - basic_auth: [] + delete: + tags: + - Role Management + operationId: Delete a role + parameters: + - name: name + in: path + description: Deletes the given role + required: true + schema: + type: string + responses: + '200': + description: Deleted given role + '400': + description: Cannot perform this operation as role is assigned to an existing user. + '500': + description: 'Failed to connect to storage: 0' + security: + - basic_auth: [] + /api/v1/user: + get: + tags: + - User Management + operationId: Get all users + responses: + '200': + description: Fetches all users in the system + content: + application/json: + schema: + type: object + security: + - basic_auth: [] + /api/v1/user/{username}: + post: + tags: + - User Management + operationId: Create a user + parameters: + - name: username + in: path + description: username of the user to be created + required: true + schema: + type: string + requestBody: + description: List of roles to be assigned to the user + content: + application/json: + schema: + type: object + required: true + responses: + '200': + description: Created user and returned the initial password + content: + text/plain: + schema: + type: string + '400': + description: Error + '404': + description: User not found + '500': + description: Failure + security: + - basic_auth: [] + delete: + tags: + - User Management + operationId: Delete a user + parameters: + - name: username + in: path + description: username of the user to be deleted + required: true + schema: + type: string + responses: + '200': + description: Deleted the user + content: + text/plain: + schema: + type: string + '400': + description: Error + '404': + description: User not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/user/{username}/generate-new-password: + post: + tags: + - User Management + operationId: Set a new password for user + parameters: + - name: username + in: path + description: username of the user + required: true + schema: + type: string + requestBody: + description: Password for the user + content: + text/plain: + schema: + type: string + required: true + responses: + '200': + description: Assigned new password + content: + text/plain: + schema: + type: string + '400': + description: Error + '404': + description: User not found + '500': + description: Failure + security: + - basic_auth: [] + /api/v1/user/{username}/role: + get: + tags: + - User Management + operationId: Get a user's roles + parameters: + - name: username + in: path + description: username for which the role needs to be fetched + required: true + schema: + type: string + responses: + '200': + description: Fetched role for the given user + content: + application/json: + schema: + type: object + '400': + description: Error + '404': + description: User not found + '500': + description: Failure + security: + - basic_auth: [] + put: + tags: + - RBAC + operationId: Add role to a user + parameters: + - name: username + in: path + description: username to which the roles need to be assigned + required: true + schema: + type: string + requestBody: + description: Roles to be assigned to the user + content: + application/json: + schema: + type: array + items: + type: string + required: true + responses: + '200': + description: Assigned roles to the user + content: + application/json: + schema: + type: object + '400': + description: Error + '404': + description: User not found + '500': + description: Failure + security: + - basic_auth: [] +components: + schemas: + Alert: + allOf: + - type: string + - type: object + required: + - name + - rule + - targets + properties: + id: + type: string + name: + type: string + rule: + type: string + targets: + type: array + items: + type: object + AlertVersion: + type: string + enum: + - v1 + Alerts: + type: object + required: + - version + - alerts + properties: + version: + $ref: '#/components/schemas/AlertVersion' + alerts: + type: array + items: + $ref: '#/components/schemas/Alert' + IngestionStats: + type: object + required: + - count + - size + - format + - lifetime_count + - lifetime_size + - deleted_count + - deleted_size + properties: + count: + type: integer + format: int64 + minimum: 0 + size: + type: string + format: + type: string + lifetime_count: + type: integer + format: int64 + minimum: 0 + lifetime_size: + type: string + deleted_count: + type: integer + format: int64 + minimum: 0 + deleted_size: + type: string + QueriedStats: + type: object + required: + - stream + - time + - ingestion + - storage + properties: + stream: + type: string + time: + type: string + format: date-time + ingestion: + $ref: '#/components/schemas/IngestionStats' + storage: + $ref: '#/components/schemas/StorageStats' + Query: + type: object + description: Query Request through http endpoint. + required: + - query + - startTime + - endTime + properties: + query: + type: string + example: SELECT * FROM test; + startTime: + type: string + example: 2024-07-06T14:34:00.000Z + endTime: + type: string + example: 2024-07-06T14:45:00.000Z + sendNull: + type: boolean + QueryResponse: + type: object + required: + - records + - fields + - fill_null + - with_fields + properties: + records: + type: array + items: + type: object + fields: + type: array + items: + type: string + fill_null: + type: boolean + with_fields: + type: boolean + Retention: + type: object + required: + - tasks + properties: + tasks: + type: array + items: + $ref: '#/components/schemas/Task' + StorageStats: + type: object + required: + - size + - format + - lifetime_size + - deleted_size + properties: + size: + type: string + format: + type: string + lifetime_size: + type: string + deleted_size: + type: string + StreamHotTier: + type: object + required: + - size + properties: + size: + type: string + used_size: + type: string + nullable: true + available_size: + type: string + nullable: true + oldest_date_time_entry: + type: string + nullable: true + StreamInfo: + type: object + required: + - created-at + - stream_type + properties: + created-at: + type: string + first-event-at: + type: string + nullable: true + cache_enabled: + type: boolean + time_partition: + type: string + nullable: true + time_partition_limit: + type: string + nullable: true + custom_partition: + type: string + nullable: true + static_schema_flag: + type: string + nullable: true + stream_type: + type: string + Task: + type: object + required: + - description + - action + - days + properties: + description: + type: string + action: + type: string + days: + type: integer + format: int32 + minimum: 0 + securitySchemes: + basic_auth: + type: http + scheme: basic +tags: +- name: About + description: Details about this Parseable executable +- name: Health Status + description: Health of Parseable server +- name: Log Stream Ingestion + description: Sending data to log streams +- name: Log Stream Alerts + description: Manipulation of alerts for log streams +- name: Log Stream Management + description: Create, List, Delete, log streams +- name: Log Stream Query + description: Query log streams +- name: Log Stream Retention + description: Get and Set retention policies for log streams +- name: Log Stream Hottier + description: Hottier related actions for a log stream +- name: User Management + description: Actions pertaining to Users +- name: Role Management + description: Actions pertaining to Roles +- name: RBAC + description: Add a role to a user diff --git a/server/Cargo.toml b/server/Cargo.toml index d9307a77e..b12c7a275 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -22,6 +22,8 @@ arrow-flight = { version = "52.1.0", features = [ "tls" ] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.11.0" tower-http = { version = "0.4.4", features = ["cors"] } +utoipa = {version = "4.2.3", features = ["actix_extras","chrono","openapi_extensions","preserve_order","yaml"]} +utoipa-swagger-ui = {version = "7.1.0", features = ["actix-web"]} ### actix dependencies actix-web-httpauth = "0.8" diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 587bad773..7c5682781 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -24,7 +24,9 @@ use datafusion::arrow::compute::kernels::cast; use datafusion::arrow::datatypes::Schema; use regex::Regex; use serde::{Deserialize, Serialize}; + use std::fmt; +use utoipa::ToSchema; pub mod parser; pub mod rule; @@ -39,29 +41,33 @@ use crate::{storage, utils}; pub use self::rule::Rule; use self::target::Target; -#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Default, Debug, serde::Serialize, serde::Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct Alerts { - pub version: AlertVerison, + pub version: AlertVersion, pub alerts: Vec, } -#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Default, Debug, serde::Serialize, serde::Deserialize, ToSchema)] #[serde(rename_all = "lowercase")] -pub enum AlertVerison { +pub enum AlertVersion { #[default] V1, } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct Alert { #[serde(default = "crate::utils::uid::gen")] + #[schema(value_type = String)] pub id: uid::Uid, pub name: String, #[serde(flatten)] + #[schema(value_type = String)] pub message: Message, + #[schema(value_type = String)] pub rule: Rule, + #[schema(value_type = Vec)] pub targets: Vec, } diff --git a/server/src/handlers/http/about.rs b/server/src/handlers/http/about.rs index 4acd01ad2..52bfbbafe 100644 --- a/server/src/handlers/http/about.rs +++ b/server/src/handlers/http/about.rs @@ -48,6 +48,19 @@ use std::path::PathBuf; /// "path": store_endpoint /// } /// } +#[utoipa::path( + get, + tag = "About", + operation_id = "About Parseable Server", + context_path = "/api/v1", + path = "/about", + responses( + (status = 200, body = Value) + ), + security( + ("basic_auth" = []) + ) +)] pub async fn about() -> Json { let meta = StorageMetadata::global(); diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 6f41755f4..9e25ae6df 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -26,8 +26,9 @@ use itertools::Itertools; use reqwest::Response; use serde::{Deserialize, Serialize}; use url::Url; +use utoipa::ToSchema; -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] pub struct QueriedStats { pub stream: String, pub time: DateTime, @@ -81,7 +82,7 @@ impl ClusterInfo { } } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] pub struct IngestionStats { pub count: u64, pub size: String, @@ -114,7 +115,7 @@ impl IngestionStats { } } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, ToSchema)] pub struct StorageStats { pub size: String, pub format: String, diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index bb988b933..ca89507aa 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -21,10 +21,32 @@ use actix_web::HttpResponse; use crate::option::CONFIG; +#[utoipa::path( + get, + tag = "Health Status", + context_path = "/api/v1", + path = "/liveness", + responses( + (status = 200, description = "The server is live.") + ) +)] pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) } +#[utoipa::path( + get, + tag = "Health Status", + context_path = "/api/v1", + path = "/readiness", + responses( + (status = 200, description = "The object store is live."), + (status = 503, description = "Service Unavailable.") + ), + security( + ("basic_auth" = []) + ) +)] pub async fn readiness() -> HttpResponse { if CONFIG.storage().get_object_store().check().await.is_ok() { return HttpResponse::new(StatusCode::OK); diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 698bda10a..306151886 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -45,10 +45,39 @@ use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use utoipa::ToSchema; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist +#[utoipa::path( + post, + tag = "Log Stream Ingestion", + operation_id = "Ingest with HTTP Header", + context_path = "/api/v1", + path = "/ingest", + params( + ("X-P-META-{someTag}" = String, Header, description = "(Optional) Creates a meta tag with the given name"), + ("X-P-Stream" = String, Header, description = "(Mandatory) Name of stream"), + ), + request_body( + content = Vec, description = "Log events", content_type = "application/json", example = json!( + "[ + {key: value} + ] + " + ) + ), + responses( + (status = 200, description = "Ingested event", body = Vec), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { if let Some((_, stream_name)) = req .headers() @@ -174,6 +203,30 @@ async fn flatten_and_push_logs( // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist +#[utoipa::path( + post, + tag = "Log Stream Ingestion", + operation_id = "Send logs to log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}", + params( + ("streamName" = String, Path, description = "Name of stream"), + ("X-P-Time-Partition" = String, Header, description = "(Optional) Identifies a field from the logs as the time partition field (a field `p_timestamp` is created and used otherwise", example = "DateTime"), + ("X-P-Time-Partition-Limit" = String, Header, description = "(Mandatory if `X-P-Time-Partition` is being used) The number of days, prior to log creation time, till which data from the given source is to be ingested. 30 days by default", example = "50d"), + ), + request_body( + content = Vec, description = "Log events" + ), + responses( + (status = 200, description = "Ingested event", body = Vec), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let internal_stream_names = STREAM_INFO.list_internal_streams(); @@ -462,7 +515,7 @@ pub async fn create_stream_if_not_exists( Ok(()) } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, ToSchema)] pub enum PostError { #[error("Stream {0} not found")] StreamNotFound(String), diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 711a88aa2..eac92400f 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -59,6 +59,25 @@ use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; +#[utoipa::path( + delete, + tag = "Log Stream Management", + operation_id = "Delete a log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, description = "Deleted stream", body = Vec), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -129,6 +148,23 @@ pub async fn retention_cleanup( Ok((first_event_at, StatusCode::OK)) } +#[utoipa::path( + get, + tag = "Log Stream Management", + operation_id = "List all log streams", + context_path = "/api/v1", + path = "/logstream", + responses( + (status = 200, description = "Fetched all streams in the system", body = Vec), + (status = 400, description = "Error", body = HttpResponse), + (status = 500, description = "Failure", body = HttpResponse), + (status = 404, description = "Stream not found", body = HttpResponse), + (status = 405, description = "Method not found", body = HttpResponse), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn list(_: HttpRequest) -> impl Responder { let res: Vec = STREAM_INFO .list_streams() @@ -139,12 +175,52 @@ pub async fn list(_: HttpRequest) -> impl Responder { web::Json(res) } +#[utoipa::path( + get, + tag = "Log Stream Management", + operation_id = "Get schema of a log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/schema", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, description = "Fetched schema for stream", body = Object), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let schema = STREAM_INFO.schema(&stream_name)?; Ok((web::Json(schema), StatusCode::OK)) } +#[utoipa::path( + get, + tag = "Log Stream Alerts", + operation_id = "Get alert for a stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/alert", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, description = "Fetched alert for stream", body = Object), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_alert(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -178,6 +254,26 @@ pub async fn get_alert(req: HttpRequest) -> Result Ok((web::Json(alerts), StatusCode::OK)) } +#[utoipa::path( + put, + tag = "Log Stream Management", + operation_id = "Create a log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}", + params( + ("streamName" = String, Path, description = "Name of stream"), + ), + responses( + (status = 200, description = "Created new stream", body = Vec), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -399,6 +495,30 @@ async fn create_update_stream( Ok(req.headers().clone()) } + +#[utoipa::path( + put, + tag = "Log Stream Alerts", + operation_id = "Set alert for a stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/alert", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + request_body( + content = Alerts, description = "Alert to be set" + ), + responses( + (status = 200, description = "Put alert for stream", body = Vec), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Log stream not initialized"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn put_alert( req: HttpRequest, body: web::Json, @@ -456,6 +576,26 @@ pub async fn put_alert( )) } +#[utoipa::path( + get, + tag = "Log Stream Retention", + operation_id = "Get retention for a log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/retention", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, description = "Fetched retention for stream", body = Retention), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_retention(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !STREAM_INFO.stream_exists(&stream_name) { @@ -475,6 +615,29 @@ pub async fn get_retention(req: HttpRequest) -> Result, @@ -503,6 +666,26 @@ pub async fn put_retention( )) } +#[utoipa::path( + get, + tag = "Log Stream Management", + operation_id = "Get cache state state for a log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/cache", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, body = bool), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_cache_enabled(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -519,6 +702,26 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result, @@ -619,6 +822,26 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -903,6 +1126,26 @@ pub async fn create_stream( Ok(()) } +#[utoipa::path( + get, + tag = "Log Stream Management", + operation_id = "Get info for log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/info", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, description = "Stream info", body = StreamInfo), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_stream_info(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -943,6 +1186,26 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, @@ -1011,6 +1274,26 @@ pub async fn put_stream_hot_tier( )) } +#[utoipa::path( + get, + tag = "Log Stream Hottier", + operation_id = "Get hottier state for a log stream", + context_path = "/api/v1", + path = "/logstream/{streamName}/hottier", + params( + ("streamName" = String, Path, description = "Name of stream") + ), + responses( + (status = 200, description = "Fetched hottier for stream", body = StreamHotTier), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "Stream not found"), + (status = 405, description = "Method not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { if CONFIG.parseable.mode != Mode::Query { return Err(StreamError::Custom { @@ -1040,6 +1323,26 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result { if CONFIG.parseable.mode != Mode::Query { return Err(StreamError::Custom { @@ -1099,6 +1402,7 @@ pub mod error { use actix_web::http::header::ContentType; use http::StatusCode; + use utoipa::ToSchema; use crate::{ hottier::HotTierError, @@ -1127,7 +1431,7 @@ pub mod error { SerdeError(#[from] serde_json::Error), } - #[derive(Debug, thiserror::Error)] + #[derive(Debug, thiserror::Error, ToSchema)] pub enum StreamError { #[error("{0}")] CreateStream(#[from] CreateStreamError), diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index 8af1119e3..0ec13ccb3 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -17,6 +17,7 @@ */ pub mod ingest_server; +pub mod openapi; pub mod query_server; pub mod server; pub mod ssl_acceptor; diff --git a/server/src/handlers/http/modal/openapi.rs b/server/src/handlers/http/modal/openapi.rs new file mode 100644 index 000000000..ecd9c8eb3 --- /dev/null +++ b/server/src/handlers/http/modal/openapi.rs @@ -0,0 +1,135 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + alerts::{Alert, AlertVersion, Alerts}, + handlers::http::{ + // about, + cluster::utils::{IngestionStats, QueriedStats, StorageStats}, + health_check, + ingest::{self}, + logstream::{self}, + query::{self}, + rbac::{self}, + role, + }, + hottier::StreamHotTier, + response, + storage::{ + retention::{Retention, Task}, + StreamInfo, + }, +}; +use utoipa::{ + openapi::security::{HttpBuilder, SecurityScheme}, + Modify, OpenApi, +}; + +#[derive(OpenApi)] +#[openapi( +paths( + query::query, + health_check::liveness, + health_check::readiness, + // about::about, + role::list, + role::put_default, + role::delete, + role::get_default, + role::get, + role::put, + rbac::post_user, + rbac::list_users, + rbac::delete_user, + rbac::get_role, + rbac::put_role, + rbac::post_gen_password, + logstream::put_stream, + ingest::post_event, + ingest::ingest, + logstream::get_stream_info, + logstream::put_alert, + logstream::get_alert, + logstream::schema, + logstream::get_stats, + logstream::get_retention, + logstream::put_retention, + logstream::put_enable_cache, + logstream::get_cache_enabled, + logstream::put_stream_hot_tier, + logstream::get_stream_hot_tier, + logstream::delete_stream_hot_tier, +), +components( + schemas( + response::QueryResponse, + query::Query, + StreamInfo, + Alerts, + QueriedStats, + IngestionStats, + StorageStats, + Retention, + StreamHotTier, + AlertVersion, + Alert, + Task, + ) +), +info( + description = "Parseable API documents [https://www.parseable.com/docs/](https://www.parseable.com/docs/)", + contact(name = "Parseable", email = "hi@parseable.com", url = "https://www.parseable.com/"), +), +modifiers(&SecurityAddon), +tags( + (name = "About", description = "Details about this Parseable executable"), + (name = "Health Status", description = "Health of Parseable server"), + (name = "Log Stream Ingestion", description = "Sending data to log streams"), + (name = "Log Stream Alerts", description = "Manipulation of alerts for log streams"), + (name = "Log Stream Management", description = "Create, List, Delete, log streams"), + (name = "Log Stream Query", description = "Query log streams"), + (name = "Log Stream Retention", description = "Get and Set retention policies for log streams"), + (name = "Log Stream Hottier", description = "Hottier related actions for a log stream"), + (name = "User Management", description = "Actions pertaining to Users"), + (name = "Role Management", description = "Actions pertaining to Roles"), + (name = "RBAC", description = "Add a role to a user"), +) +)] +pub struct ApiDoc; + +pub struct SecurityAddon; + +impl Modify for SecurityAddon { + fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) { + let components = openapi.components.as_mut().unwrap(); + // components.add_security_scheme( + // "Authorization", + // SecurityScheme::ApiKey(utoipa::openapi::security::ApiKey::Header( + // utoipa::openapi::security::ApiKeyValue::new("Authorization"), + // )), + // ); + components.add_security_scheme( + "basic_auth", + SecurityScheme::Http( + HttpBuilder::new() + .scheme(utoipa::openapi::security::HttpAuthScheme::Basic) + .build(), + ), + ) + } +} diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index e17080e31..2cf3b981b 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -45,6 +45,8 @@ use actix_web::{web, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; +use utoipa::OpenApi; +use utoipa_swagger_ui::SwaggerUi; use crate::{ handlers::http::{ @@ -58,6 +60,7 @@ use crate::{ // use super::generate; use super::generate; +use super::openapi::ApiDoc; use super::ssl_acceptor::get_ssl_acceptor; use super::OpenIdClient; use super::ParseableServer; @@ -112,6 +115,10 @@ impl ParseableServer for Server { /// implementation of init should just invoke a call to initialize async fn init(&self) -> anyhow::Result<()> { + let openapi = ApiDoc::openapi(); + let yaml_spec = openapi.to_yaml()?; + std::fs::write("./openapi.yaml", yaml_spec)?; + self.validate()?; migration::run_file_migration(&CONFIG).await?; let parseable_json = CONFIG.validate_storage().await?; @@ -133,6 +140,10 @@ impl Server { fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { // there might be a bug in the configure routes method config + .service( + SwaggerUi::new("/swagger-ui/{_:.*}") + .url("/api-docs/openapi.json", ApiDoc::openapi()), + ) .service( web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 576f838fe..a255a7888 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -50,13 +50,17 @@ use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; +use utoipa::ToSchema; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct Query { + #[schema(example = "SELECT * FROM test;", required = true)] pub query: String, + #[schema(example = "2024-07-06T14:34:00.000Z", required = true)] pub start_time: String, + #[schema(example = "2024-07-06T14:45:00.000Z", required = true)] pub end_time: String, #[serde(default)] pub send_null: bool, @@ -66,6 +70,22 @@ pub struct Query { pub filter_tags: Option>, } +#[utoipa::path( + post, + tag = "Log Stream Query", + operation_id = "Query a Log Stream", + context_path = "/api/v1", + path = "/query", + request_body = Query, + responses( + (status = 200, description = "Queries successfully", body = QueryResponse), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); @@ -451,7 +471,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option { Some(q) } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, ToSchema)] pub enum QueryError { #[error("Query cannot be empty")] EmptyQuery, diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 71f66918f..f1ef9493e 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -27,11 +27,12 @@ use crate::{ use actix_web::{http::header::ContentType, web, Responder}; use http::StatusCode; use tokio::sync::Mutex; +use utoipa::ToSchema; // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); -#[derive(serde::Serialize)] +#[derive(serde::Serialize, ToSchema)] struct User { id: String, method: String, @@ -53,12 +54,47 @@ impl From<&user::User> for User { // Handler for GET /api/v1/user // returns list of all registerd users +#[utoipa::path( + get, + tag = "User Management", + operation_id = "Get all users", + context_path = "/api/v1", + path = "/user", + responses( + (status = 200, description = "Fetches all users in the system", body = Object>), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn list_users() -> impl Responder { web::Json(Users.collect_user::()) } // Handler for POST /api/v1/user/{username} // Creates a new user by username if it does not exists +#[utoipa::path( + post, + tag = "User Management", + operation_id = "Create a user", + context_path = "/api/v1/user", + path = "/{username}", + params( + ("username" = String, Path, description = "username of the user to be created"), + ), + request_body( + content = Object, description = "List of roles to be assigned to the user" + ), + responses( + (status = 200, description = "Created user and returned the initial password", body = String), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "User not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn post_user( username: web::Path, body: Option>, @@ -97,6 +133,28 @@ pub async fn post_user( // Handler for POST /api/v1/user/{username}/generate-new-password // Resets password for the user to a newly generated one and returns it +#[utoipa::path( + post, + tag = "User Management", + operation_id = "Set a new password for user", + context_path = "/api/v1/user", + path = "/{username}/generate-new-password", + params( + ("username" = String, Path, description = "username of the user"), + ), + request_body( + content = String, description = "Password for the user" + ), + responses( + (status = 200, description = "Assigned new password", body = String), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "User not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn post_gen_password(username: web::Path) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; @@ -122,6 +180,25 @@ pub async fn post_gen_password(username: web::Path) -> Result), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "User not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_role(username: web::Path) -> Result { if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); @@ -140,6 +217,25 @@ pub async fn get_role(username: web::Path) -> Result) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; @@ -158,6 +254,28 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user // Put roles for given user +#[utoipa::path( + put, + tag = "RBAC", + operation_id = "Add role to a user", + context_path = "/api/v1/user", + path = "/{username}/role", + params( + ("username" = String, Path, description = "username to which the roles need to be assigned"), + ), + request_body( + content = Vec, description = "Roles to be assigned to the user", + ), + responses( + (status = 200, description = "Assigned roles to the user", body = Object), + (status = 400, description = "Error"), + (status = 500, description = "Failure"), + (status = 404, description = "User not found"), + ), + security( + ("basic_auth" = []) + ) +)] pub async fn put_role( username: web::Path, role: web::Json>, @@ -203,7 +321,7 @@ async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageErr Ok(()) } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, ToSchema)] pub enum RBACError { #[error("User exists already")] UserExists, diff --git a/server/src/handlers/http/role.rs b/server/src/handlers/http/role.rs index 56759fec5..9cc242562 100644 --- a/server/src/handlers/http/role.rs +++ b/server/src/handlers/http/role.rs @@ -18,6 +18,7 @@ use actix_web::{http::header::ContentType, web, HttpResponse, Responder}; use http::StatusCode; +use utoipa::ToSchema; use crate::{ option::CONFIG, @@ -30,6 +31,27 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one +#[utoipa::path( + put, + tag = "Role Management", + operation_id = "Create a role", + context_path = "/api/v1/role", + path = "/{name}", + params( + ("name" = String, Path, description = "Name of the role to create or update") + ), + request_body( + content = Vec, description = "Privilege and resource stream" + ), + responses( + (status = 200, description = "Created/updated the role"), + (status = 500, description = "Failed to connect to storage: 0"), + (status = 400, description = "Cannot perform this operation as role is assigned to an existing user.") + ), + security( + ("basic_auth" = []) + ) +)] pub async fn put( name: web::Path, body: web::Json>, @@ -45,6 +67,24 @@ pub async fn put( // Handler for GET /api/v1/role/{name} // Fetch role by name +#[utoipa::path( + get, + tag = "Role Management", + operation_id = "Get a role", + context_path = "/api/v1/role", + path = "/{name}", + params( + ("name" = String, Path, description = "Name of the role to fetch") + ), + responses( + (status = 200, description = "Fetched role"), + (status = 500, description = "Failed to connect to storage: 0"), + (status = 400, description = "Cannot perform this operation as role is assigned to an existing user.") + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get(name: web::Path) -> Result { let name = name.into_inner(); let metadata = get_metadata().await?; @@ -54,6 +94,21 @@ pub async fn get(name: web::Path) -> Result { // Handler for GET /api/v1/role // Fetch all roles in the system +#[utoipa::path( + get, + tag = "Role Management", + operation_id = "List all roles", + context_path = "/api/v1", + path = "/role", + responses( + (status = 200, description = "Fetches all roles in the system", body = Object>), + (status = 500, description = "Failed to connect to storage: 0"), + (status = 400, description = "Cannot perform this operation as role is assigned to an existing user.") + ), + security( + ("basic_auth" = []) + ) +)] pub async fn list() -> Result { let metadata = get_metadata().await?; let roles: Vec = metadata.roles.keys().cloned().collect(); @@ -62,6 +117,24 @@ pub async fn list() -> Result { // Handler for DELETE /api/v1/role/{username} // Delete existing role +#[utoipa::path( + delete, + tag = "Role Management", + operation_id = "Delete a role", + context_path = "/api/v1/role", + path = "/{name}", + params( + ("name" = String, Path, description = "Deletes the given role") + ), + responses( + (status = 200, description = "Deleted given role"), + (status = 500, description = "Failed to connect to storage: 0"), + (status = 400, description = "Cannot perform this operation as role is assigned to an existing user.") + ), + security( + ("basic_auth" = []) + ) +)] pub async fn delete(name: web::Path) -> Result { let name = name.into_inner(); let mut metadata = get_metadata().await?; @@ -75,7 +148,23 @@ pub async fn delete(name: web::Path) -> Result) -> Result { let name = name.into_inner(); let mut metadata = get_metadata().await?; @@ -86,7 +175,22 @@ pub async fn put_default(name: web::Json) -> Result), + (status = 500, description = "Failed to connect to storage: 0"), + (status = 400, description = "Cannot perform this operation as role is assigned to an existing user.") + ), + security( + ("basic_auth" = []) + ) +)] pub async fn get_default() -> Result { let res = match DEFAULT_ROLE.lock().unwrap().clone() { Some(role) => serde_json::Value::String(role), @@ -112,7 +216,7 @@ async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageErr Ok(()) } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, ToSchema)] pub enum RoleError { #[error("Failed to connect to storage: {0}")] ObjectStorageError(#[from] ObjectStorageError), diff --git a/server/src/hottier.rs b/server/src/hottier.rs index 8501ba5c2..fee2e7871 100644 --- a/server/src/hottier.rs +++ b/server/src/hottier.rs @@ -48,13 +48,14 @@ use sysinfo::{Disks, System}; use tokio::fs::{self, DirEntry}; use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; +use utoipa::ToSchema; pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, ToSchema)] pub struct StreamHotTier { #[serde(rename = "size")] pub size: String, diff --git a/server/src/response.rs b/server/src/response.rs index e2abfa2d2..1f0f88481 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -28,11 +28,17 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; use tonic::{Response, Status}; +use utoipa::ToSchema; +#[derive(ToSchema)] pub struct QueryResponse { + #[schema(value_type = Vec)] pub records: Vec, + #[schema(value_type = Vec)] pub fields: Vec, + #[schema(value_type = bool)] pub fill_null: bool, + #[schema(value_type = bool)] pub with_fields: bool, } diff --git a/server/src/storage.rs b/server/src/storage.rs index 539f84ce7..9fe8abb37 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -21,6 +21,7 @@ use crate::{ }; use chrono::Local; +use utoipa::ToSchema; use std::fmt::Debug; @@ -105,7 +106,7 @@ pub struct ObjectStoreFormat { pub stream_type: String, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, ToSchema)] pub struct StreamInfo { #[serde(rename = "created-at")] pub created_at: String, diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index c65c94c88..e1519d9a9 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -27,6 +27,7 @@ use clokwerk::TimeUnits; use derive_more::Display; use once_cell::sync::Lazy; use tokio::task::JoinHandle; +use utoipa::ToSchema; use crate::metadata::STREAM_INFO; @@ -87,17 +88,19 @@ pub fn init_scheduler() { log::info!("Scheduler is initialized") } -#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize, ToSchema)] #[serde(try_from = "Vec")] #[serde(into = "Vec")] pub struct Retention { tasks: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, ToSchema)] pub struct Task { description: String, + #[schema(value_type = String)] action: Action, + #[schema(value_type = u32)] days: NonZeroU32, }