Skip to content

Commit

Permalink
improve consumer group creation (#976)
Browse files Browse the repository at this point in the history
Creating a consumer group should return detailed information about the
group - so the caller doesn't need to rely on picking the consumer group
id, the server could provide it.

---------

Co-authored-by: Piotr Gankiewicz <piotr.gankiewicz@gmail.com>
  • Loading branch information
gzsombor and spetz authored Jun 1, 2024
1 parent 3ff5fea commit 31df584
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 49 deletions.
1 change: 0 additions & 1 deletion cli/src/args/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use clap::{Args, Subcommand};
use iggy::identifier::Identifier;
use std::convert::From;

#[derive(Debug, Clone, Subcommand)]
pub(crate) enum PartitionAction {
Expand Down
1 change: 0 additions & 1 deletion cli/src/args/personal_access_token.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::args::common::ListMode;
use clap::{Args, Subcommand};
use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use std::convert::From;

#[derive(Debug, Clone, Subcommand)]
pub(crate) enum PersonalAccessTokenAction {
Expand Down
1 change: 0 additions & 1 deletion cli/src/args/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::identifier::Identifier;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use std::convert::From;

#[derive(Debug, Clone, Subcommand)]
pub(crate) enum TopicAction {
Expand Down
1 change: 0 additions & 1 deletion cli/src/args/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::args::permissions::stream::StreamPermissionsArg;
use crate::args::permissions::UserStatusArg;
use clap::{Args, Subcommand};
use iggy::identifier::Identifier;
use std::convert::From;

use super::permissions::global::GlobalPermissionsArg;

Expand Down
27 changes: 14 additions & 13 deletions sdk/src/binary/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,21 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
topic_id: &Identifier,
name: &str,
group_id: Option<u32>,
) -> Result<(), IggyError> {
) -> Result<ConsumerGroupDetails, IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
CREATE_CONSUMER_GROUP_CODE,
CreateConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
group_id,
}
.as_bytes(),
)
.await?;
Ok(())
let response = self
.send_with_response(
CREATE_CONSUMER_GROUP_CODE,
CreateConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
group_id,
}
.as_bytes(),
)
.await?;
mapper::map_consumer_group(response)
}

async fn delete_consumer_group(
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ pub trait ConsumerGroupClient {
topic_id: &Identifier,
name: &str,
group_id: Option<u32>,
) -> Result<(), IggyError>;
) -> Result<ConsumerGroupDetails, IggyError>;
/// Delete a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to manage the streams or topics.
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ impl ConsumerGroupClient for IggyClient {
topic_id: &Identifier,
name: &str,
group_id: Option<u32>,
) -> Result<(), IggyError> {
) -> Result<ConsumerGroupDetails, IggyError> {
self.client
.read()
.await
Expand Down
26 changes: 14 additions & 12 deletions sdk/src/http/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ impl ConsumerGroupClient for HttpClient {
topic_id: &Identifier,
name: &str,
group_id: Option<u32>,
) -> Result<(), IggyError> {
self.post(
&get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
&CreateConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
group_id,
},
)
.await?;
Ok(())
) -> Result<ConsumerGroupDetails, IggyError> {
let response = self
.post(
&get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
&CreateConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
group_id,
},
)
.await?;
let consumer_group = response.json().await?;
Ok(consumer_group)
}

async fn delete_consumer_group(
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/utils/expiry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use humantime::Duration as HumanDuration;
use std::fmt::Display;
use std::iter::Sum;
use std::ops::Add;
use std::str::FromStr;
use std::time::Duration;
use std::{convert::From, str::FromStr};

/// Helper enum for various time-based expiry related functionalities
#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down
4 changes: 3 additions & 1 deletion server/server.http
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ Content-Type: application/json
{
"topic_id": {{topic_id}},
"name": "topic1",
"compression_algorithm": "none",
"partitions_count": 3,
"message_expiry": 0
}
Expand All @@ -237,6 +238,7 @@ Content-Type: application/json

{
"name": "topic1",
"compression_algorithm": "none",
"message_expiry": 1000
}

Expand Down Expand Up @@ -320,7 +322,7 @@ Content-Type: application/json

{
"consumer_group_id": {{consumer_group_id}},
"name": "consumer_group_1",
"name": "consumer_group_1"
}

###
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::binary::mapper;
use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
Expand All @@ -14,7 +15,7 @@ pub async fn handle(
) -> Result<(), IggyError> {
debug!("session: {session}, command: {command}");
let mut system = system.write();
system
let consumer_group = system
.create_consumer_group(
session,
&command.stream_id,
Expand All @@ -23,6 +24,8 @@ pub async fn handle(
&command.name,
)
.await?;
sender.send_empty_ok_response().await?;
let consumer_group = consumer_group.read().await;
let consumer_group = mapper::map_consumer_group(&consumer_group).await;
sender.send_ok_response(&consumer_group).await?;
Ok(())
}
1 change: 0 additions & 1 deletion server/src/compat/snapshots/message_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use iggy::bytes_serializable::BytesSerializable;
use iggy::models::header::{self, HeaderKey, HeaderValue};
use iggy::models::messages::MessageState;
use std::collections::HashMap;
use std::convert::TryFrom;

#[derive(Debug)]
pub struct MessageSnapshot {
Expand Down
8 changes: 5 additions & 3 deletions server/src/http/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ async fn create_consumer_group(
Extension(identity): Extension<Identity>,
Path((stream_id, topic_id)): Path<(String, String)>,
Json(mut command): Json<CreateConsumerGroup>,
) -> Result<StatusCode, CustomError> {
) -> Result<(StatusCode, Json<ConsumerGroupDetails>), CustomError> {
command.stream_id = Identifier::from_str_value(&stream_id)?;
command.topic_id = Identifier::from_str_value(&topic_id)?;
command.validate()?;
let mut system = state.system.write();
system
let consumer_group = system
.create_consumer_group(
&Session::stateless(identity.user_id, identity.ip_address),
&command.stream_id,
Expand All @@ -82,7 +82,9 @@ async fn create_consumer_group(
&command.name,
)
.await?;
Ok(StatusCode::CREATED)
let consumer_group = consumer_group.read().await;
let consumer_group = mapper::map_consumer_group(&consumer_group).await;
Ok((StatusCode::CREATED, Json(consumer_group)))
}

async fn delete_consumer_group(
Expand Down
5 changes: 2 additions & 3 deletions server/src/streaming/systems/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl System {
topic_id: &Identifier,
group_id: Option<u32>,
name: &str,
) -> Result<(), IggyError> {
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
self.ensure_authenticated(session)?;
{
let stream = self.get_stream(stream_id)?;
Expand All @@ -64,8 +64,7 @@ impl System {
}

let topic = self.get_stream_mut(stream_id)?.get_topic_mut(topic_id)?;
topic.create_consumer_group(group_id, name).await?;
Ok(())
topic.create_consumer_group(group_id, name).await
}

pub async fn delete_consumer_group(
Expand Down
22 changes: 15 additions & 7 deletions server/src/streaming/topics/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Topic {
&mut self,
group_id: Option<u32>,
name: &str,
) -> Result<(), IggyError> {
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
let name = text::to_lowercase_non_whitespace(name);
if self.consumer_groups_ids.contains_key(&name) {
return Err(IggyError::ConsumerGroupNameAlreadyExists(
Expand Down Expand Up @@ -105,16 +105,16 @@ impl Topic {
self.consumer_groups.insert(id, RwLock::new(consumer_group));
self.consumer_groups_ids.insert(name, id);
let consumer_group = self.get_consumer_group_by_id(id)?;
let consumer_group = consumer_group.read().await;
let consumer_group_guard = consumer_group.read().await;
self.storage
.topic
.save_consumer_group(self, &consumer_group)
.save_consumer_group(self, &consumer_group_guard)
.await?;
info!(
"Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.",
id, self.topic_id, self.stream_id
);
Ok(())
Ok(consumer_group)
}

pub async fn delete_consumer_group(
Expand Down Expand Up @@ -202,16 +202,24 @@ mod tests {
let group_id = 1;
let name = "test";
let mut topic = get_topic();
let topic_id = topic.topic_id;
let result = topic.create_consumer_group(Some(group_id), name).await;
assert!(result.is_ok());
{
let created_consumer_group = result.unwrap().read().await;
assert_eq!(created_consumer_group.group_id, group_id);
assert_eq!(created_consumer_group.name, name);
assert_eq!(created_consumer_group.topic_id, topic_id);
}

assert_eq!(topic.consumer_groups.len(), 1);
let consumer_group = topic
.get_consumer_group(&Identifier::numeric(group_id).unwrap())
.unwrap();
let consumer_group = consumer_group.read().await;
assert_eq!(consumer_group.group_id, group_id);
assert_eq!(consumer_group.name, name);
assert_eq!(consumer_group.topic_id, topic.topic_id);
assert_eq!(consumer_group.topic_id, topic_id);
assert_eq!(
consumer_group.partitions_count,
topic.partitions.len() as u32
Expand All @@ -228,9 +236,9 @@ mod tests {
assert_eq!(topic.consumer_groups.len(), 1);
let result = topic.create_consumer_group(Some(group_id), "test2").await;
assert!(result.is_err());
assert_eq!(topic.consumer_groups.len(), 1);
let err = result.unwrap_err();
assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _)));
assert_eq!(topic.consumer_groups.len(), 1);
}

#[tokio::test]
Expand All @@ -244,12 +252,12 @@ mod tests {
let group_id = group_id + 1;
let result = topic.create_consumer_group(Some(group_id), name).await;
assert!(result.is_err());
assert_eq!(topic.consumer_groups.len(), 1);
let err = result.unwrap_err();
assert!(matches!(
err,
IggyError::ConsumerGroupNameAlreadyExists(_, _)
));
assert_eq!(topic.consumer_groups.len(), 1);
}

#[tokio::test]
Expand Down

0 comments on commit 31df584

Please sign in to comment.