Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Enable Kafka integration #1021

Merged
merged 11 commits into from
Dec 20, 2024
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand Down
68 changes: 68 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::path::PathBuf;
use url::Url;

use crate::{
kafka::SslProtocol,
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
};
Expand Down Expand Up @@ -119,6 +120,14 @@ pub struct Cli {
pub trino_auth: Option<String>,
pub trino_schema: Option<String>,
pub trino_catalog: Option<String>,

// Kafka specific env vars
pub kafka_topic: Option<String>,
pub kafka_host: Option<String>,
pub kafka_group: Option<String>,
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<SslProtocol>,
pub kafka_partitions: Option<String>,
}

impl Cli {
Expand Down Expand Up @@ -164,6 +173,14 @@ impl Cli {
pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization";
pub const TRINO_SCHEMA: &'static str = "p-trino-schema";

// Kafka specific env vars
pub const KAFKA_TOPIC: &'static str = "kafka-topic";
pub const KAFKA_HOST: &'static str = "kafka-host";
pub const KAFKA_GROUP: &'static str = "kafka-group";
pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id";
pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol";
pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions";
parmesant marked this conversation as resolved.
Show resolved Hide resolved

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
}
Expand All @@ -177,6 +194,48 @@ impl Cli {

pub fn create_cli_command_with_clap(name: &'static str) -> Command {
Command::new(name).next_line_help(false)
.arg(
Arg::new(Self::KAFKA_TOPIC)
.long(Self::KAFKA_TOPIC)
.env("P_KAFKA_TOPIC")
.value_name("STRING")
.help("Kafka topic to subscribe to"),
)
.arg(
Arg::new(Self::KAFKA_HOST)
.long(Self::KAFKA_HOST)
.env("P_KAFKA_HOST")
.value_name("STRING")
.help("Address and port for Kafka server"),
)
.arg(
Arg::new(Self::KAFKA_GROUP)
.long(Self::KAFKA_GROUP)
.env("P_KAFKA_GROUP")
.value_name("STRING")
.help("Kafka group"),
)
.arg(
Arg::new(Self::KAFKA_CLIENT_ID)
.long(Self::KAFKA_CLIENT_ID)
.env("P_KAFKA_CLIENT_ID")
.value_name("STRING")
.help("Kafka client id"),
)
.arg(
Arg::new(Self::KAFKA_SECURITY_PROTOCOL)
.long(Self::KAFKA_SECURITY_PROTOCOL)
.env("P_KAFKA_SECURITY_PROTOCOL")
.value_name("STRING")
.help("Kafka security protocol (ssl)"),
)
.arg(
Arg::new(Self::KAFKA_PARTITIONS)
.long(Self::KAFKA_PARTITIONS)
.env("P_KAFKA_PARTITIONS")
.value_name("STRING")
.help("Kafka partitions"),
)
.arg(
Arg::new(Self::TRINO_ENDPOINT)
.long(Self::TRINO_ENDPOINT)
Expand Down Expand Up @@ -520,6 +579,15 @@ impl FromArgMatches for Cli {
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();

self.kafka_topic = m.get_one::<String>(Self::KAFKA_TOPIC).cloned();
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
Expand Down
Loading
Loading