From 31df5847192fe23aebc346fa7f6473f25d21a563 Mon Sep 17 00:00:00 2001 From: Zsombor Date: Sat, 1 Jun 2024 10:17:51 +0200 Subject: [PATCH] improve consumer group creation (#976) Creating a consumer group should return detailed information about the group - so the caller doesn't need to rely on picking the consumer group id, the server could provide it. --------- Co-authored-by: Piotr Gankiewicz --- cli/src/args/partition.rs | 1 - cli/src/args/personal_access_token.rs | 1 - cli/src/args/topic.rs | 1 - cli/src/args/user.rs | 1 - sdk/src/binary/consumer_groups.rs | 27 ++++++++++--------- sdk/src/client.rs | 2 +- sdk/src/clients/client.rs | 2 +- sdk/src/http/consumer_groups.rs | 26 +++++++++--------- sdk/src/utils/expiry.rs | 2 +- server/server.http | 4 ++- .../create_consumer_group_handler.rs | 7 +++-- .../src/compat/snapshots/message_snapshot.rs | 1 - server/src/http/consumer_groups.rs | 8 +++--- .../src/streaming/systems/consumer_groups.rs | 5 ++-- .../src/streaming/topics/consumer_groups.rs | 22 ++++++++++----- 15 files changed, 61 insertions(+), 49 deletions(-) diff --git a/cli/src/args/partition.rs b/cli/src/args/partition.rs index 508a56fb0..b1f085c14 100644 --- a/cli/src/args/partition.rs +++ b/cli/src/args/partition.rs @@ -1,6 +1,5 @@ use clap::{Args, Subcommand}; use iggy::identifier::Identifier; -use std::convert::From; #[derive(Debug, Clone, Subcommand)] pub(crate) enum PartitionAction { diff --git a/cli/src/args/personal_access_token.rs b/cli/src/args/personal_access_token.rs index 8f2c26663..4603aa636 100644 --- a/cli/src/args/personal_access_token.rs +++ b/cli/src/args/personal_access_token.rs @@ -1,7 +1,6 @@ use crate::args::common::ListMode; use clap::{Args, Subcommand}; use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry; -use std::convert::From; #[derive(Debug, Clone, Subcommand)] pub(crate) enum PersonalAccessTokenAction { diff --git a/cli/src/args/topic.rs b/cli/src/args/topic.rs index 3aeaa6af9..42252d1ed 100644 --- a/cli/src/args/topic.rs +++ b/cli/src/args/topic.rs @@ -4,7 +4,6 @@ use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::identifier::Identifier; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; -use std::convert::From; #[derive(Debug, Clone, Subcommand)] pub(crate) enum TopicAction { diff --git a/cli/src/args/user.rs b/cli/src/args/user.rs index 3ca8908b8..907880c49 100644 --- a/cli/src/args/user.rs +++ b/cli/src/args/user.rs @@ -3,7 +3,6 @@ use crate::args::permissions::stream::StreamPermissionsArg; use crate::args::permissions::UserStatusArg; use clap::{Args, Subcommand}; use iggy::identifier::Identifier; -use std::convert::From; use super::permissions::global::GlobalPermissionsArg; diff --git a/sdk/src/binary/consumer_groups.rs b/sdk/src/binary/consumer_groups.rs index 7c7e5d5b0..759b88b2c 100644 --- a/sdk/src/binary/consumer_groups.rs +++ b/sdk/src/binary/consumer_groups.rs @@ -64,20 +64,21 @@ impl ConsumerGroupClient for B { topic_id: &Identifier, name: &str, group_id: Option, - ) -> Result<(), IggyError> { + ) -> Result { fail_if_not_authenticated(self).await?; - self.send_with_response( - CREATE_CONSUMER_GROUP_CODE, - CreateConsumerGroup { - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - name: name.to_string(), - group_id, - } - .as_bytes(), - ) - .await?; - Ok(()) + let response = self + .send_with_response( + CREATE_CONSUMER_GROUP_CODE, + CreateConsumerGroup { + stream_id: stream_id.clone(), + topic_id: topic_id.clone(), + name: name.to_string(), + group_id, + } + .as_bytes(), + ) + .await?; + mapper::map_consumer_group(response) } async fn delete_consumer_group( diff --git a/sdk/src/client.rs b/sdk/src/client.rs index ff1b5c57a..fc612e13a 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -350,7 +350,7 @@ pub trait ConsumerGroupClient { topic_id: &Identifier, name: &str, group_id: Option, - ) -> Result<(), IggyError>; + ) -> Result; /// Delete a consumer group by unique ID or name for the given stream and topic by unique IDs or names. /// /// Authentication is required, and the permission to manage the streams or topics. diff --git a/sdk/src/clients/client.rs b/sdk/src/clients/client.rs index 1d81652c7..fde4f48a4 100644 --- a/sdk/src/clients/client.rs +++ b/sdk/src/clients/client.rs @@ -896,7 +896,7 @@ impl ConsumerGroupClient for IggyClient { topic_id: &Identifier, name: &str, group_id: Option, - ) -> Result<(), IggyError> { + ) -> Result { self.client .read() .await diff --git a/sdk/src/http/consumer_groups.rs b/sdk/src/http/consumer_groups.rs index e547298a7..ae0b6832b 100644 --- a/sdk/src/http/consumer_groups.rs +++ b/sdk/src/http/consumer_groups.rs @@ -44,18 +44,20 @@ impl ConsumerGroupClient for HttpClient { topic_id: &Identifier, name: &str, group_id: Option, - ) -> Result<(), IggyError> { - self.post( - &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), - &CreateConsumerGroup { - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - name: name.to_string(), - group_id, - }, - ) - .await?; - Ok(()) + ) -> Result { + let response = self + .post( + &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), + &CreateConsumerGroup { + stream_id: stream_id.clone(), + topic_id: topic_id.clone(), + name: name.to_string(), + group_id, + }, + ) + .await?; + let consumer_group = response.json().await?; + Ok(consumer_group) } async fn delete_consumer_group( diff --git a/sdk/src/utils/expiry.rs b/sdk/src/utils/expiry.rs index e6630c5ee..f262d8c4f 100644 --- a/sdk/src/utils/expiry.rs +++ b/sdk/src/utils/expiry.rs @@ -4,8 +4,8 @@ use humantime::Duration as HumanDuration; use std::fmt::Display; use std::iter::Sum; use std::ops::Add; +use std::str::FromStr; use std::time::Duration; -use std::{convert::From, str::FromStr}; /// Helper enum for various time-based expiry related functionalities #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/server/server.http b/server/server.http index 2d9c03709..2ae9728b0 100644 --- a/server/server.http +++ b/server/server.http @@ -226,6 +226,7 @@ Content-Type: application/json { "topic_id": {{topic_id}}, "name": "topic1", + "compression_algorithm": "none", "partitions_count": 3, "message_expiry": 0 } @@ -237,6 +238,7 @@ Content-Type: application/json { "name": "topic1", + "compression_algorithm": "none", "message_expiry": 1000 } @@ -320,7 +322,7 @@ Content-Type: application/json { "consumer_group_id": {{consumer_group_id}}, - "name": "consumer_group_1", + "name": "consumer_group_1" } ### diff --git a/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 984a054f9..55d5f5c21 100644 --- a/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -1,3 +1,4 @@ +use crate::binary::mapper; use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; @@ -14,7 +15,7 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let mut system = system.write(); - system + let consumer_group = system .create_consumer_group( session, &command.stream_id, @@ -23,6 +24,8 @@ pub async fn handle( &command.name, ) .await?; - sender.send_empty_ok_response().await?; + let consumer_group = consumer_group.read().await; + let consumer_group = mapper::map_consumer_group(&consumer_group).await; + sender.send_ok_response(&consumer_group).await?; Ok(()) } diff --git a/server/src/compat/snapshots/message_snapshot.rs b/server/src/compat/snapshots/message_snapshot.rs index c264604f5..806615774 100644 --- a/server/src/compat/snapshots/message_snapshot.rs +++ b/server/src/compat/snapshots/message_snapshot.rs @@ -6,7 +6,6 @@ use iggy::bytes_serializable::BytesSerializable; use iggy::models::header::{self, HeaderKey, HeaderValue}; use iggy::models::messages::MessageState; use std::collections::HashMap; -use std::convert::TryFrom; #[derive(Debug)] pub struct MessageSnapshot { diff --git a/server/src/http/consumer_groups.rs b/server/src/http/consumer_groups.rs index 8382dd409..fcb5dda7c 100644 --- a/server/src/http/consumer_groups.rs +++ b/server/src/http/consumer_groups.rs @@ -68,12 +68,12 @@ async fn create_consumer_group( Extension(identity): Extension, Path((stream_id, topic_id)): Path<(String, String)>, Json(mut command): Json, -) -> Result { +) -> Result<(StatusCode, Json), CustomError> { command.stream_id = Identifier::from_str_value(&stream_id)?; command.topic_id = Identifier::from_str_value(&topic_id)?; command.validate()?; let mut system = state.system.write(); - system + let consumer_group = system .create_consumer_group( &Session::stateless(identity.user_id, identity.ip_address), &command.stream_id, @@ -82,7 +82,9 @@ async fn create_consumer_group( &command.name, ) .await?; - Ok(StatusCode::CREATED) + let consumer_group = consumer_group.read().await; + let consumer_group = mapper::map_consumer_group(&consumer_group).await; + Ok((StatusCode::CREATED, Json(consumer_group))) } async fn delete_consumer_group( diff --git a/server/src/streaming/systems/consumer_groups.rs b/server/src/streaming/systems/consumer_groups.rs index 4ba440bcb..730a2030b 100644 --- a/server/src/streaming/systems/consumer_groups.rs +++ b/server/src/streaming/systems/consumer_groups.rs @@ -51,7 +51,7 @@ impl System { topic_id: &Identifier, group_id: Option, name: &str, - ) -> Result<(), IggyError> { + ) -> Result<&RwLock, IggyError> { self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id)?; @@ -64,8 +64,7 @@ impl System { } let topic = self.get_stream_mut(stream_id)?.get_topic_mut(topic_id)?; - topic.create_consumer_group(group_id, name).await?; - Ok(()) + topic.create_consumer_group(group_id, name).await } pub async fn delete_consumer_group( diff --git a/server/src/streaming/topics/consumer_groups.rs b/server/src/streaming/topics/consumer_groups.rs index cf6fbcb3a..126eea587 100644 --- a/server/src/streaming/topics/consumer_groups.rs +++ b/server/src/streaming/topics/consumer_groups.rs @@ -66,7 +66,7 @@ impl Topic { &mut self, group_id: Option, name: &str, - ) -> Result<(), IggyError> { + ) -> Result<&RwLock, IggyError> { let name = text::to_lowercase_non_whitespace(name); if self.consumer_groups_ids.contains_key(&name) { return Err(IggyError::ConsumerGroupNameAlreadyExists( @@ -105,16 +105,16 @@ impl Topic { self.consumer_groups.insert(id, RwLock::new(consumer_group)); self.consumer_groups_ids.insert(name, id); let consumer_group = self.get_consumer_group_by_id(id)?; - let consumer_group = consumer_group.read().await; + let consumer_group_guard = consumer_group.read().await; self.storage .topic - .save_consumer_group(self, &consumer_group) + .save_consumer_group(self, &consumer_group_guard) .await?; info!( "Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", id, self.topic_id, self.stream_id ); - Ok(()) + Ok(consumer_group) } pub async fn delete_consumer_group( @@ -202,8 +202,16 @@ mod tests { let group_id = 1; let name = "test"; let mut topic = get_topic(); + let topic_id = topic.topic_id; let result = topic.create_consumer_group(Some(group_id), name).await; assert!(result.is_ok()); + { + let created_consumer_group = result.unwrap().read().await; + assert_eq!(created_consumer_group.group_id, group_id); + assert_eq!(created_consumer_group.name, name); + assert_eq!(created_consumer_group.topic_id, topic_id); + } + assert_eq!(topic.consumer_groups.len(), 1); let consumer_group = topic .get_consumer_group(&Identifier::numeric(group_id).unwrap()) @@ -211,7 +219,7 @@ mod tests { let consumer_group = consumer_group.read().await; assert_eq!(consumer_group.group_id, group_id); assert_eq!(consumer_group.name, name); - assert_eq!(consumer_group.topic_id, topic.topic_id); + assert_eq!(consumer_group.topic_id, topic_id); assert_eq!( consumer_group.partitions_count, topic.partitions.len() as u32 @@ -228,9 +236,9 @@ mod tests { assert_eq!(topic.consumer_groups.len(), 1); let result = topic.create_consumer_group(Some(group_id), "test2").await; assert!(result.is_err()); - assert_eq!(topic.consumer_groups.len(), 1); let err = result.unwrap_err(); assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _))); + assert_eq!(topic.consumer_groups.len(), 1); } #[tokio::test] @@ -244,12 +252,12 @@ mod tests { let group_id = group_id + 1; let result = topic.create_consumer_group(Some(group_id), name).await; assert!(result.is_err()); - assert_eq!(topic.consumer_groups.len(), 1); let err = result.unwrap_err(); assert!(matches!( err, IggyError::ConsumerGroupNameAlreadyExists(_, _) )); + assert_eq!(topic.consumer_groups.len(), 1); } #[tokio::test]