Skip to content

Commit

Permalink
Update all dependencies for 2023.
Browse files Browse the repository at this point in the history
  • Loading branch information
tychedelia committed Sep 16, 2023
1 parent 5570236 commit 02d2fbc
Show file tree
Hide file tree
Showing 11 changed files with 878 additions and 641 deletions.
1,214 changes: 765 additions & 449 deletions Cargo.lock

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ path = "src/main.rs"
name = "josefine"

[dependencies]
anyhow = "1.0.52"
async-trait = "0.1.51"
anyhow = "1.0.75"
async-trait = "0.1.73"
bincode = "1.3.3"
byteorder = "1.4.3"
bytes = "1.0.1"
clap = "2.33.3"
config = "0.11.0"
ctrlc = "3.2.1"
bytes = "1.5.0"
clap = { version = "4.4.3", features = ["derive"] }
config = "0.13.3"
ctrlc = "3.4.1"
derive_more = "0.99.17"
futures = "0.3.15"
futures-util = "0.3.15"
kafka-protocol = "0.1.0"
futures = "0.3.28"
futures-util = "0.3.28"
kafka-protocol = "0.7.0"
memmap = "0.7.0"
rand = "0.8.4"
regex = "1.5.4"
serde = "1.0.126"
serde_derive = "1.0.126"
serde_json = "1.0.64"
sled = "0.34.6"
rand = "0.8.5"
regex = "1.9.5"
serde = "1.0.188"
serde_derive = "1.0.188"
serde_json = "1.0.107"
sled = "0.34.7"
string = "0.3.0"
tempfile = "3.2.0"
tokio = { version = "1.8.1", features = ["macros", "net", "io-util", "time", "sync", "rt", "rt-multi-thread", "tracing"] }
tempfile = "3.8.0"
tokio = { version = "1.32.0", features = ["macros", "net", "io-util", "time", "sync", "rt", "rt-multi-thread", "tracing"] }
tokio-serde = { version = "0.8.0", features = ["json"] }
tokio-stream = "0.1.7"
tokio-stream = "0.1.14"
tokio-tower = "0.6.0"
tokio-util = { version = "0.6.7", features = ["codec"] }
tower = "0.4.1"
tokio-util = { version = "0.7.8", features = ["codec"] }
tower = "0.4.13"
tracing = "0.1"
tracing-appender = "0.2.0"
tracing-appender = "0.2.2"
tracing-futures = "0.2.5"
tracing-subscriber = "0.2"
tracing-test = "0.1"
uuid = { version = "0.8.2", features = [ "v4", "serde" ] }
tracing-subscriber = "0.3"
tracing-test = "0.2"
uuid = { version = "1.4.1", features = [ "v4", "serde" ] }
103 changes: 23 additions & 80 deletions src/broker/handler/api_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use kafka_protocol::messages::api_versions_response::ApiVersion;
use kafka_protocol::messages::*;
use kafka_protocol::protocol::Message;

fn api_version<T: Message>() -> ApiVersion {
let mut v = ApiVersion::default();
v.max_version = T::VERSIONS.max;
v.min_version = T::VERSIONS.min;
v
}

#[async_trait]
impl Handler<ApiVersionsRequest> for Broker {
async fn handle(
Expand All @@ -14,131 +21,67 @@ impl Handler<ApiVersionsRequest> for Broker {
) -> anyhow::Result<ApiVersionsResponse> {
res.api_keys.insert(
ApiKey::ProduceKey as i16,
ApiVersion {
max_version: ProduceRequest::VERSIONS.max,
min_version: ProduceRequest::VERSIONS.min,
..Default::default()
},
api_version::<ProduceRequest>(),
);
res.api_keys.insert(
ApiKey::FetchKey as i16,
ApiVersion {
max_version: FetchRequest::VERSIONS.max,
min_version: FetchRequest::VERSIONS.min,
..Default::default()
},
api_version::<FetchRequest>(),
);
res.api_keys.insert(
ApiKey::ListOffsetsKey as i16,
ApiVersion {
max_version: ListOffsetsRequest::VERSIONS.max,
min_version: ListOffsetsRequest::VERSIONS.min,
..Default::default()
},
api_version::<ListOffsetsRequest>(),
);
res.api_keys.insert(
ApiKey::MetadataKey as i16,
ApiVersion {
max_version: MetadataRequest::VERSIONS.max,
min_version: MetadataRequest::VERSIONS.min,
..Default::default()
},
api_version::<MetadataRequest>(),
);
res.api_keys.insert(
ApiKey::LeaderAndIsrKey as i16,
ApiVersion {
max_version: LeaderAndIsrRequest::VERSIONS.max,
min_version: LeaderAndIsrRequest::VERSIONS.min,
..Default::default()
},
api_version::<LeaderAndIsrRequest>(),
);
res.api_keys.insert(
ApiKey::StopReplicaKey as i16,
ApiVersion {
max_version: StopReplicaRequest::VERSIONS.max,
min_version: StopReplicaRequest::VERSIONS.min,
..Default::default()
},
api_version::<StopReplicaRequest>(),
);
res.api_keys.insert(
ApiKey::FindCoordinatorKey as i16,
ApiVersion {
max_version: FindCoordinatorRequest::VERSIONS.max,
min_version: FindCoordinatorRequest::VERSIONS.min,
..Default::default()
},
api_version::<FindCoordinatorRequest>(),
);
res.api_keys.insert(
ApiKey::JoinGroupKey as i16,
ApiVersion {
max_version: JoinGroupRequest::VERSIONS.max,
min_version: JoinGroupRequest::VERSIONS.min,
..Default::default()
},
api_version::<JoinGroupRequest>(),
);
res.api_keys.insert(
ApiKey::HeartbeatKey as i16,
ApiVersion {
max_version: HeartbeatRequest::VERSIONS.max,
min_version: HeartbeatRequest::VERSIONS.min,
..Default::default()
},
api_version::<HeartbeatRequest>(),
);
res.api_keys.insert(
ApiKey::ListGroupsKey as i16,
ApiVersion {
max_version: LeaveGroupRequest::VERSIONS.max,
min_version: LeaveGroupRequest::VERSIONS.min,
..Default::default()
},
api_version::<ListGroupsRequest>(),
);
res.api_keys.insert(
ApiKey::SyncGroupKey as i16,
ApiVersion {
max_version: SyncGroupRequest::VERSIONS.max,
min_version: SyncGroupRequest::VERSIONS.min,
..Default::default()
},
api_version::<SyncGroupRequest>(),
);
res.api_keys.insert(
ApiKey::CreateTopicsKey as i16,
ApiVersion {
max_version: CreateTopicsRequest::VERSIONS.max,
min_version: CreateTopicsRequest::VERSIONS.min,
..Default::default()
},
api_version::<CreateTopicsRequest>(),
);
res.api_keys.insert(
ApiKey::DeleteGroupsKey as i16,
ApiVersion {
max_version: DescribeGroupsRequest::VERSIONS.max,
min_version: DescribeGroupsRequest::VERSIONS.min,
..Default::default()
},
api_version::<DeleteGroupsRequest>(),
);
res.api_keys.insert(
ApiKey::ListGroupsKey as i16,
ApiVersion {
max_version: ListGroupsRequest::VERSIONS.max,
min_version: ListGroupsRequest::VERSIONS.min,
..Default::default()
},
api_version::<ListGroupsRequest>(),
);
res.api_keys.insert(
ApiKey::ApiVersionsKey as i16,
ApiVersion {
max_version: ApiVersionsRequest::VERSIONS.max,
min_version: ApiVersionsRequest::VERSIONS.min,
..Default::default()
},
api_version::<ApiVersionsRequest>(),
);
res.api_keys.insert(
ApiKey::DeleteTopicsKey as i16,
ApiVersion {
max_version: DeleteTopicsRequest::VERSIONS.max,
min_version: DeleteTopicsRequest::VERSIONS.min,
..Default::default()
},
api_version::<DeleteTopicsRequest>(),
);

Ok(res)
Expand Down
38 changes: 18 additions & 20 deletions src/broker/handler/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::broker::handler::Handler;
use crate::broker::Broker;
use crate::kafka::util::ToStrBytes;
use async_trait::async_trait;
use bytes::Bytes;
use kafka_protocol::messages::{BrokerId, MetadataRequest, MetadataResponse, TopicName};
use kafka_protocol::messages::metadata_response::{
MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic,
};
use kafka_protocol::messages::{BrokerId, MetadataRequest, MetadataResponse, TopicName};
use kafka_protocol::protocol::Builder;
use kafka_protocol::protocol::StrBytes;
use string::TryFrom;

use crate::broker::Broker;
use crate::broker::handler::Handler;
use crate::kafka::util::ToStrBytes;

#[async_trait]
impl Handler<MetadataRequest> for Broker {
Expand All @@ -19,20 +19,19 @@ impl Handler<MetadataRequest> for Broker {
) -> anyhow::Result<MetadataResponse> {
res.brokers.insert(
BrokerId(self.config.id.0),
MetadataResponseBroker {
host: self.config.ip.to_string().to_str_bytes(),
port: self.config.port as i32,
..Default::default()
},
MetadataResponseBroker::builder()
.host(self.config.ip.to_string().to_str_bytes())
.port(self.config.port as i32)
.build()?,
);
res.controller_id = BrokerId(1);
res.cluster_id = Some(StrBytes::from_str("josefine"));

let topics = self.store.get_topics()?;
for (name, topic) in topics.into_iter() {
let t = MetadataResponseTopic {
topic_id: topic.id,
partitions: topic
let t = MetadataResponseTopic::builder()
.topic_id(topic.id)
.partitions(topic
.partitions
.iter()
.map(|(k, _v)| {
Expand All @@ -51,9 +50,8 @@ impl Handler<MetadataRequest> for Broker {
}
Ok(mp)
})
.collect::<anyhow::Result<Vec<MetadataResponsePartition>>>()?,
..Default::default()
};
.collect::<anyhow::Result<Vec<MetadataResponsePartition>>>()?)
.build()?;
let s = name.to_str_bytes();
res.topics.insert(TopicName(s), t);
}
Expand All @@ -64,12 +62,12 @@ impl Handler<MetadataRequest> for Broker {

#[cfg(test)]
mod tests {
use anyhow::Result;
use kafka_protocol::messages::{MetadataRequest, MetadataResponse};
use kafka_protocol::protocol::Builder;

use crate::broker::handler::test::new_broker;
use crate::broker::handler::Handler;

use anyhow::Result;
use crate::broker::handler::test::new_broker;

#[tokio::test]
async fn execute() -> Result<()> {
Expand Down
6 changes: 2 additions & 4 deletions src/broker/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ async fn stream_messages(
let res = cb_rx.await?;
let version = header.request_api_version;
let correlation_id = header.correlation_id;
let header = ResponseHeader {
correlation_id,
..Default::default()
};
let mut header = ResponseHeader::default();
header.correlation_id = correlation_id;
stream_out.send((version, header, res)).await?;
}
Ok(())
Expand Down
13 changes: 6 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ pub struct JosefineConfig {
}

pub fn config<P: AsRef<std::path::Path>>(config_path: P) -> JosefineConfig {
let mut settings = config::Config::default();
settings
.merge(config::File::from(config_path.as_ref()))
.expect("Could not read configuration file")
.merge(config::Environment::with_prefix("JOSEFINE"))
.expect("Could not read environment variables");
let settings = config::Config::builder();
let config = settings
.add_source(config::File::from(config_path.as_ref()))
.add_source(config::Environment::with_prefix("JOSEFINE"))
.build().expect("Could not build configuration");

settings.try_into().expect("Could not create configuration")
config.try_deserialize().expect("Could not deserialize configuration")
}
32 changes: 11 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use clap::App;
use clap::Arg;

use std::path::Path;
use std::path::{Path, PathBuf};
use tracing_subscriber::layer::SubscriberExt;
use clap::Parser;

use josefine::util::Shutdown;
use tracing_subscriber::{fmt, EnvFilter};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
config: PathBuf,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::registry()
Expand All @@ -18,22 +22,6 @@ async fn main() -> anyhow::Result<()> {
.with(fmt::Layer::new().pretty().with_writer(std::io::stdout));
tracing::subscriber::set_global_default(subscriber).expect("Unable to set a global collector");

let matches = App::new("Josefine")
.version("0.0.1")
.author("jcm")
.about("Distributed log in rust.")
.arg(
Arg::with_name("config")
.long("config")
.value_name("PATH")
.required(true)
.default_value("Config.toml")
.help("Location of the config file."),
)
.get_matches();

let config_path = matches.value_of("config").unwrap();

let shutdown = Shutdown::new();
let s = shutdown.clone();
ctrlc::set_handler(move || {
Expand All @@ -42,5 +30,7 @@ async fn main() -> anyhow::Result<()> {
})
.unwrap();

josefine::josefine(Path::new(&config_path), shutdown).await
let args = Args::parse();

josefine::josefine(&args.config, shutdown).await
}
Loading

0 comments on commit 02d2fbc

Please sign in to comment.