Skip to content

Commit

Permalink
Improve high-level SDK perf, consumer groups for single client, use u…
Browse files Browse the repository at this point in the history
…uid v7 (#1118)
  • Loading branch information
spetz authored Aug 10, 2024
1 parent 7db0f52 commit 42fe472
Show file tree
Hide file tree
Showing 24 changed files with 407 additions and 420 deletions.
47 changes: 38 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cli"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
authors = ["bartosz.ciesla@gmail.com"]
repository = "https://github.com/iggy-rs/iggy"
Expand Down
18 changes: 11 additions & 7 deletions examples/src/multi-tenant/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures_util::future::join_all;
use futures_util::StreamExt;
use iggy::client::{Client, StreamClient, TopicClient, UserClient};
use iggy::clients::client::IggyClient;
use iggy::clients::consumer::{AutoCommit, AutoCommitAfter, IggyConsumer};
use iggy::clients::consumer::{AutoCommit, AutoCommitWhen, IggyConsumer};
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::PollingStrategy;
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
.parse::<bool>()
.expect("Invalid ensure stream access");

print_info("Multi-tenant consumers has started, tenants: {tenants_count}, consumers: {producers_count}, partitions: {partitions_count}");
print_info(&format!("Multi-tenant consumers has started, tenants: {tenants_count}, consumers: {consumers_count}"));
let address = args.tcp_server_address;

print_info("Creating root client to manage streams and users");
Expand Down Expand Up @@ -130,7 +130,9 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
}
}

print_info("Creating {consumers_count} consumer(s) for each tenant");
print_info(&format!(
"Creating {consumers_count} consumer(s) for each tenant"
));
for tenant in tenants.iter_mut() {
let consumers = create_consumers(
&tenant.client,
Expand All @@ -148,11 +150,13 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
);
}

print_info("Starting {consumers_count} consumers(s) for each tenant");
print_info(&format!(
"Starting {consumers_count} consumers(s) for each tenant"
));
let mut tasks = Vec::new();
for tenant in tenants.into_iter() {
let producers_tasks = start_consumers(tenant.id, tenant.consumers);
tasks.extend(producers_tasks);
let consumer_tasks = start_consumers(tenant.id, tenant.consumers);
tasks.extend(consumer_tasks);
}

join_all(tasks).await;
Expand Down Expand Up @@ -255,7 +259,7 @@ async fn create_consumers(
.poll_interval(IggyDuration::from_str(interval).expect("Invalid duration"))
.polling_strategy(PollingStrategy::next())
.auto_join_consumer_group()
.auto_commit(AutoCommit::After(AutoCommitAfter::PollingMessages))
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
.build();
consumer.init().await?;
consumers.push(TenantConsumer::new(
Expand Down
55 changes: 41 additions & 14 deletions examples/src/multi-tenant/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
.parse::<bool>()
.expect("Invalid ensure stream access");

print_info("Multi-tenant producer has started, tenants: {tenants_count}, producers: {producers_count}, partitions: {partitions_count}");
print_info(&format!("Multi-tenant producer has started, tenants: {tenants_count}, producers: {producers_count}, partitions: {partitions_count}"));
let address = args.tcp_server_address;

print_info("Creating root client to manage streams and users");
Expand Down Expand Up @@ -128,7 +128,9 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
}
}

print_info("Creating {producers_count} producer(s) for each tenant");
print_info(&format!(
"Creating {producers_count} producer(s) for each tenant"
));
for tenant in tenants.iter_mut() {
let producers = create_producers(
&tenant.client,
Expand All @@ -147,11 +149,17 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
);
}

print_info("Starting {producers_count} producer(s) for each tenant");
print_info(&format!(
"Starting {producers_count} producer(s) for each tenant"
));
let mut tasks = Vec::new();
for tenant in tenants.into_iter() {
let producers_tasks =
start_producers(tenant.id, tenant.producers, args.message_batches_limit);
let producers_tasks = start_producers(
tenant.id,
tenant.producers,
args.message_batches_limit,
args.messages_per_batch,
);
tasks.extend(producers_tasks);
}

Expand All @@ -164,6 +172,7 @@ fn start_producers(
tenant_id: u32,
producers: Vec<TenantProducer>,
batches_count: u64,
batch_size: u32,
) -> Vec<JoinHandle<()>> {
let mut tasks = Vec::new();
let topics_count = producers
Expand All @@ -175,25 +184,43 @@ fn start_producers(
let producer_id = producer.id;
let task = tokio::spawn(async move {
let mut counter = 1;
let mut events_id = 1;
let mut logs_id = 1;
let mut notifications_id = 1;
while counter <= topics_count * batches_count {
let message = match producer.topic.as_str() {
"events" => "event",
"logs" => "log",
"notifications" => "notification",
let (message_id, message) = match producer.topic.as_str() {
"events" => {
events_id += 1;
(events_id, "event")
}
"logs" => {
logs_id += 1;
(logs_id, "log")
}
"notifications" => {
notifications_id += 1;
(notifications_id, "notification")
}
_ => panic!("Invalid topic"),
};
let payload = format!("{message}-{producer_id}-{counter}");
let message = Message::from_str(&payload).expect("Invalid message");
if let Err(error) = producer.producer.send(vec![message]).await {

let mut messages = Vec::with_capacity(batch_size as usize);
for _ in 1..=batch_size {
let payload = format!("{message}-{producer_id}-{message_id}");
let message = Message::from_str(&payload).expect("Invalid message");
messages.push(message);
}

if let Err(error) = producer.producer.send(messages).await {
error!(
"Failed to send: '{payload}' to: {} -> {} by tenant: {tenant_id}, producer: {producer_id} with error: {error}", producer.stream, producer.topic,
"Failed to send: {batch_size} message(s) to: {} -> {} by tenant: {tenant_id}, producer: {producer_id} with error: {error}", producer.stream, producer.topic,
);
continue;
}

counter += 1;
info!(
"Sent: '{payload}' by tenant: {tenant_id}, producer: {producer_id}, to: {} -> {}",
"Sent: {batch_size} message(s) by tenant: {tenant_id}, producer: {producer_id}, to: {} -> {}",
producer.stream, producer.topic
);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/src/new-sdk/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures_util::StreamExt;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::clients::consumer::{AutoCommit, AutoCommitAfter, IggyConsumer};
use iggy::clients::consumer::{AutoCommit, AutoCommitWhen, IggyConsumer};
use iggy::consumer::ConsumerKind;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::models::messages::PolledMessage;
Expand Down Expand Up @@ -39,7 +39,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
client.consumer_group(name, &args.stream_id, &args.topic_id)?
}
}
.auto_commit(AutoCommit::After(AutoCommitAfter::PollingMessages))
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
.create_consumer_group_if_not_exists()
.auto_join_consumer_group()
.polling_strategy(PollingStrategy::next())
Expand Down
4 changes: 2 additions & 2 deletions integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ server = { path = "../server" }
tempfile = "3.10.1"
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = "0.3.18"
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "zerocopy"] }
xxhash-rust = { version = "0.8.10", features = ["xxh32"] }
uuid = { version = "1.1.0", features = ["v7", "fast-rng", "zerocopy"] }
xxhash-rust = { version = "0.8.12", features = ["xxh32"] }

# Some tests are failing in CI due to lack of IPv6 interfaces
# inside the docker containers. This is a temporary workaround (hopefully).
Expand Down
2 changes: 1 addition & 1 deletion integration/src/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl TestServer {
}

pub fn get_random_path() -> String {
format!("{}{}", LOCAL_DATA_PREFIX, Uuid::new_v4().to_u128_le())
format!("{}{}", LOCAL_DATA_PREFIX, Uuid::now_v7().to_u128_le())
}

pub fn get_http_api_addr(&self) -> Option<String> {
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/archiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct DiskArchiverSetup {

impl DiskArchiverSetup {
pub async fn init() -> DiskArchiverSetup {
let base_path = format!("test_local_data_{}", Uuid::new_v4().to_u128_le());
let base_path = format!("test_local_data_{}", Uuid::now_v7().to_u128_le());
let archive_path = format!("{}/archive", base_path);
let config = DiskArchiverConfig {
path: archive_path.clone(),
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,5 @@ pub fn run_bench_and_wait_for_finish(
}

pub fn get_random_path() -> String {
format!("{}{}", BENCH_FILES_PREFIX, Uuid::new_v4().to_u128_le())
format!("{}{}", BENCH_FILES_PREFIX, Uuid::now_v7().to_u128_le())
}
2 changes: 1 addition & 1 deletion integration/tests/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl StateSetup {
}

pub async fn create(encryption_key: Option<&[u8]>) -> StateSetup {
let directory_path = format!("state_{}", Uuid::new_v4().to_u128_le());
let directory_path = format!("state_{}", Uuid::now_v7().to_u128_le());
let log_path = format!("{}/log", directory_path);
create_dir(&directory_path).await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl TestSetup {
}

pub async fn init_with_config(mut config: SystemConfig) -> TestSetup {
config.path = format!("local_data_{}", Uuid::new_v4().to_u128_le());
config.path = format!("local_data_{}", Uuid::now_v7().to_u128_le());

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.0"
version = "0.6.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -28,7 +28,8 @@ chrono = { version = "0.4.38" }
clap = { version = "4.5.4", features = ["derive"] }
comfy-table = { version = "7.1.1", optional = true }
crc32fast = "1.4.2"
derive_more = "0.99.18"
dashmap = "6.0.1"
derive_more = { version = "1.0.0", features = ["full"] }
dirs = "5.0.1"
fast-async-mutex = { version = "0.6.7", optional = true }
flume = "0.11.0"
Expand All @@ -54,7 +55,7 @@ tokio = { version = "1.38.0", features = ["full"] }
tokio-native-tls = "0.3.1"
toml = "0.8.14"
tracing = { version = "0.1.40" }
uuid = { version = "1.8.0", features = ["v4", "fast-rng"] }
uuid = { version = "1.1.0", features = ["v7", "fast-rng", "zerocopy"] }

[build-dependencies]
convert_case = "0.6.0"
Expand Down
Loading

0 comments on commit 42fe472

Please sign in to comment.