diff --git a/Cargo.lock b/Cargo.lock index 67d2e28f6..eecaa27e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4225,7 +4225,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.12" +version = "0.4.13" dependencies = [ "anyhow", "async-stream", diff --git a/examples/src/multi-tenant/consumer/main.rs b/examples/src/multi-tenant/consumer/main.rs index bb7626d83..f62c403c4 100644 --- a/examples/src/multi-tenant/consumer/main.rs +++ b/examples/src/multi-tenant/consumer/main.rs @@ -288,16 +288,14 @@ async fn ensure_stream_topics_access( let topic_id = Identifier::named(topic)?; client .get_topic(&available_stream.try_into()?, &topic_id) - .await - .unwrap_or_else(|_| { - panic!("No access to topic: {topic} in stream: {available_stream}") - }); + .await? + .unwrap_or_else(|| panic!("No access to topic: {topic} in stream: {available_stream}")); info!("Ensured access to topic: {topic} in stream: {available_stream}"); for stream in unavailable_streams { if client .get_topic(&Identifier::named(stream)?, &topic_id) - .await - .is_err() + .await? + .is_none() { info!("Ensured no access to topic: {topic} in stream: {stream}"); } else { diff --git a/examples/src/multi-tenant/producer/main.rs b/examples/src/multi-tenant/producer/main.rs index d6a18992b..18b1af8c7 100644 --- a/examples/src/multi-tenant/producer/main.rs +++ b/examples/src/multi-tenant/producer/main.rs @@ -275,14 +275,14 @@ async fn ensure_stream_access( ) -> Result<(), IggyError> { client .get_stream(&available_stream.try_into()?) - .await - .unwrap_or_else(|_| panic!("No access to stream: {available_stream}")); + .await? + .unwrap_or_else(|| panic!("No access to stream: {available_stream}")); info!("Ensured access to stream: {available_stream}"); for stream in unavailable_streams { if client .get_stream(&Identifier::named(stream)?) - .await - .is_err() + .await? + .is_none() { info!("Ensured no access to stream: {stream}"); } else { diff --git a/server/Cargo.toml b/server/Cargo.toml index 07ff02279..0e84f84ab 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.12" +version = "0.4.13" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/systems/consumer_groups.rs b/server/src/streaming/systems/consumer_groups.rs index 730a2030b..4196e6eea 100644 --- a/server/src/streaming/systems/consumer_groups.rs +++ b/server/src/streaming/systems/consumer_groups.rs @@ -15,11 +15,10 @@ impl System { group_id: &Identifier, ) -> Result<&RwLock, IggyError> { self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.get_consumer_group( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; @@ -33,11 +32,10 @@ impl System { topic_id: &Identifier, ) -> Result>, IggyError> { self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.get_consumer_groups( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; @@ -54,11 +52,10 @@ impl System { ) -> Result<&RwLock, IggyError> { self.ensure_authenticated(session)?; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.create_consumer_group( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; } @@ -78,14 +75,13 @@ impl System { let stream_id_value; let topic_id_value; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.delete_consumer_group( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; - stream_id_value = stream.stream_id; + stream_id_value = topic.stream_id; topic_id_value = topic.topic_id; } @@ -124,14 +120,13 @@ impl System { let stream_id_value; let topic_id_value; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.join_consumer_group( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; - stream_id_value = stream.stream_id; + stream_id_value = topic.stream_id; topic_id_value = topic.topic_id; } @@ -167,11 +162,10 @@ impl System { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.leave_consumer_group( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; } diff --git a/server/src/streaming/systems/consumer_offsets.rs b/server/src/streaming/systems/consumer_offsets.rs index 3ae9434ea..e426d5d6b 100644 --- a/server/src/streaming/systems/consumer_offsets.rs +++ b/server/src/streaming/systems/consumer_offsets.rs @@ -16,11 +16,10 @@ impl System { offset: u64, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.store_consumer_offset( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; @@ -38,11 +37,10 @@ impl System { partition_id: Option, ) -> Result { self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.get_consumer_offset( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 7f1178604..443b80416 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -139,10 +139,14 @@ impl System { identifier: &Identifier, ) -> Result<&Stream, IggyError> { self.ensure_authenticated(session)?; - let stream = self.get_stream(identifier)?; - self.permissioner - .get_stream(session.get_user_id(), stream.stream_id)?; - Ok(stream) + let stream = self.get_stream(identifier); + if let Ok(stream) = stream { + self.permissioner + .get_stream(session.get_user_id(), stream.stream_id)?; + return Ok(stream); + } + + stream } pub fn get_stream(&self, identifier: &Identifier) -> Result<&Stream, IggyError> { diff --git a/server/src/streaming/systems/topics.rs b/server/src/streaming/systems/topics.rs index bfc9c5c1c..dca5236fa 100644 --- a/server/src/streaming/systems/topics.rs +++ b/server/src/streaming/systems/topics.rs @@ -16,11 +16,15 @@ impl System { topic_id: &Identifier, ) -> Result<&Topic, IggyError> { self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; - self.permissioner - .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id)?; - Ok(topic) + let stream = self.find_stream(session, stream_id)?; + let topic = stream.get_topic(topic_id); + if let Ok(topic) = topic { + self.permissioner + .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id)?; + return Ok(topic); + } + + topic } pub fn find_topics( diff --git a/server/src/streaming/systems/users.rs b/server/src/streaming/systems/users.rs index 09f45cc91..f6619b892 100644 --- a/server/src/streaming/systems/users.rs +++ b/server/src/streaming/systems/users.rs @@ -113,13 +113,17 @@ impl System { pub fn find_user(&self, session: &Session, user_id: &Identifier) -> Result<&User, IggyError> { self.ensure_authenticated(session)?; - let user = self.get_user(user_id)?; - let session_user_id = session.get_user_id(); - if user.id != session_user_id { - self.permissioner.get_user(session_user_id)?; + let user = self.get_user(user_id); + if let Ok(user) = user { + let session_user_id = session.get_user_id(); + if user.id != session_user_id { + self.permissioner.get_user(session_user_id)?; + } + + return Ok(user); } - Ok(user) + user } pub fn get_user(&self, user_id: &Identifier) -> Result<&User, IggyError> {