From c540014fc5460778d588a0f6d0fd24af6ec6a18e Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Sun, 18 Aug 2024 18:15:51 +0200 Subject: [PATCH] Use aHash, improve permissioner (#1159) --- Cargo.lock | 27 ++++++++++++++++--- cli/Cargo.toml | 5 ++-- cli/src/args/permissions/mod.rs | 8 +++--- cli/src/args/permissions/stream.rs | 11 ++++---- examples/Cargo.toml | 1 + examples/src/multi-tenant/consumer/main.rs | 5 ++-- examples/src/multi-tenant/producer/main.rs | 3 ++- integration/Cargo.toml | 1 + .../cli/user/test_user_create_command.rs | 12 ++++----- .../tests/cli/user/test_user_get_command.rs | 7 +++-- .../cli/user/test_user_permissions_command.rs | 12 ++++----- sdk/Cargo.toml | 3 ++- sdk/src/models/permissions.rs | 14 +++++----- server/Cargo.toml | 3 ++- .../src/streaming/systems/consumer_groups.rs | 3 +-- server/src/streaming/systems/messages.rs | 10 +++---- server/src/streaming/systems/partitions.rs | 10 +++---- server/src/streaming/systems/topics.rs | 17 +++++------- server/src/streaming/users/permissioner.rs | 17 ++++++------ 19 files changed, 93 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eecaa27e3..dce7cbbe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,20 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "serde", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -1922,7 +1936,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.8", ] [[package]] @@ -2245,9 +2259,10 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.11" +version = "0.6.12" dependencies = [ "aes-gcm", + "ahash 0.8.11", "anyhow", "async-broadcast 0.7.1", "async-dropper", @@ -2293,8 +2308,9 @@ dependencies = [ [[package]] name = "iggy-cli" -version = "0.5.11" +version = "0.5.12" dependencies = [ + "ahash 0.8.11", "anyhow", "clap", "clap_complete", @@ -2313,6 +2329,7 @@ dependencies = [ name = "iggy_examples" version = "0.0.4" dependencies = [ + "ahash 0.8.11", "anyhow", "bytes", "clap", @@ -2380,6 +2397,7 @@ dependencies = [ name = "integration" version = "0.0.1" dependencies = [ + "ahash 0.8.11", "assert_cmd", "async-trait", "bytes", @@ -4225,8 +4243,9 @@ dependencies = [ [[package]] name = "server" -version = "0.4.13" +version = "0.4.14" dependencies = [ + "ahash 0.8.11", "anyhow", "async-stream", "async-trait", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 2458c09b4..3fd8f492d 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy-cli" -version = "0.5.11" +version = "0.5.12" edition = "2021" authors = ["bartosz.ciesla@gmail.com"] repository = "https://github.com/iggy-rs/iggy" @@ -10,11 +10,12 @@ license = "MIT" keywords = ["iggy", "cli", "messaging", "streaming"] [dependencies] +ahash = { version = "0.8.11", features = ["serde"] } anyhow = "1.0.86" clap = { version = "4.5.4", features = ["derive"] } clap_complete = "4.5.16" figlet-rs = "0.1.5" -iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.10" } +iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.12" } keyring = "2.3.3" passterm = "2.0.1" thiserror = "1.0.61" diff --git a/cli/src/args/permissions/mod.rs b/cli/src/args/permissions/mod.rs index 9c963d9cd..b6309b0d3 100644 --- a/cli/src/args/permissions/mod.rs +++ b/cli/src/args/permissions/mod.rs @@ -1,10 +1,10 @@ use self::{global::GlobalPermissionsArg, stream::StreamPermissionsArg}; +use ahash::AHashMap; use clap::ValueEnum; use iggy::models::{ permissions::{Permissions, StreamPermissions}, user_status::UserStatus, }; -use std::collections::HashMap; pub(crate) mod constants; pub(crate) mod global; @@ -34,7 +34,7 @@ impl From for Option { .stream .into_iter() .map(|s| (s.stream_id, s.into())) - .collect::>(); + .collect::>(); match (value.global, stream_permissions.is_empty()) { (Some(global), true) => Some(Permissions { @@ -104,7 +104,7 @@ mod tests { Option::from(PermissionsArgs::new(None, Some(vec![stream]))); let permissions = Permissions { - streams: Some(HashMap::from([(1, StreamPermissions::default())])), + streams: Some(AHashMap::from([(1, StreamPermissions::default())])), ..Default::default() }; assert_eq!(permissions_args, Some(permissions)); @@ -118,7 +118,7 @@ mod tests { Option::from(PermissionsArgs::new(Some(global), Some(vec![stream]))); let mut permissions = Permissions { - streams: Some(HashMap::from([(1, StreamPermissions::default())])), + streams: Some(AHashMap::from([(1, StreamPermissions::default())])), ..Default::default() }; permissions.global.manage_topics = true; diff --git a/cli/src/args/permissions/stream.rs b/cli/src/args/permissions/stream.rs index c6605cb7e..5cb564830 100644 --- a/cli/src/args/permissions/stream.rs +++ b/cli/src/args/permissions/stream.rs @@ -4,8 +4,9 @@ use super::constants::{ READ_TOPICS_SHORT, SEND_MESSAGES_LONG, SEND_MESSAGES_SHORT, }; use crate::args::permissions::topic::TopicPermissionsArg; +use ahash::AHashMap; use iggy::models::permissions::StreamPermissions; -use std::{collections::HashMap, str::FromStr}; +use std::str::FromStr; #[derive(Debug, PartialEq)] pub(super) enum StreamPermission { @@ -65,7 +66,7 @@ impl StreamPermissionsArg { } if !topic_permissions.is_empty() { - let mut permissions = HashMap::new(); + let mut permissions = AHashMap::new(); for permission in topic_permissions { permissions.insert(permission.topic_id, permission.permissions); } @@ -363,7 +364,7 @@ mod tests { read_topics: false, poll_messages: false, send_messages: false, - topics: Some(HashMap::from([ + topics: Some(AHashMap::from([ ( 2, TopicPermissions { @@ -398,7 +399,7 @@ mod tests { read_topics: false, poll_messages: false, send_messages: false, - topics: Some(HashMap::from([ + topics: Some(AHashMap::from([ ( 2, TopicPermissions { @@ -496,7 +497,7 @@ mod tests { read_topics: false, poll_messages: false, send_messages: false, - topics: Some(HashMap::from([ + topics: Some(AHashMap::from([ ( 2, TopicPermissions { diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0d0bbfd28..1549240a8 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,6 +52,7 @@ name = "new-sdk-producer" path = "src/new-sdk/producer/main.rs" [dependencies] +ahash = { version = "0.8.11", features = ["serde"] } anyhow = "1.0.86" bytes = "1.6.0" clap = { version = "4.5.4", features = ["derive"] } diff --git a/examples/src/multi-tenant/consumer/main.rs b/examples/src/multi-tenant/consumer/main.rs index f62c403c4..97de8658e 100644 --- a/examples/src/multi-tenant/consumer/main.rs +++ b/examples/src/multi-tenant/consumer/main.rs @@ -1,3 +1,4 @@ +use ahash::AHashMap; use clap::Parser; use futures_util::future::join_all; use futures_util::StreamExt; @@ -174,7 +175,7 @@ async fn create_user( .get_stream(&stream_name.try_into()?) .await? .expect("Stream does not exist"); - let mut topic_permissions = HashMap::new(); + let mut topic_permissions = AHashMap::new(); for topic in topics { let topic_id = Identifier::named(topic)?; let topic = client @@ -192,7 +193,7 @@ async fn create_user( ); } - let mut streams_permissions = HashMap::new(); + let mut streams_permissions = AHashMap::new(); streams_permissions.insert( stream.id, StreamPermissions { diff --git a/examples/src/multi-tenant/producer/main.rs b/examples/src/multi-tenant/producer/main.rs index 18b1af8c7..cac1712a2 100644 --- a/examples/src/multi-tenant/producer/main.rs +++ b/examples/src/multi-tenant/producer/main.rs @@ -1,3 +1,4 @@ +use ahash::AHashMap; use clap::Parser; use futures_util::future::join_all; use iggy::client::{Client, StreamClient, UserClient}; @@ -310,7 +311,7 @@ async fn create_stream_and_user( ) -> Result<(), IggyError> { let stream = client.create_stream(stream_name, None).await?; info!("Created stream: {stream_name} with ID: {}", stream.id); - let mut streams_permissions = HashMap::new(); + let mut streams_permissions = AHashMap::new(); streams_permissions.insert( stream.id, StreamPermissions { diff --git a/integration/Cargo.toml b/integration/Cargo.toml index f0deee532..5d9fb97de 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -4,6 +4,7 @@ version = "0.0.1" edition = "2021" [dependencies] +ahash = { version = "0.8.11", features = ["serde"] } assert_cmd = "2.0.14" async-trait = "0.1.80" bytes = "1.6.0" diff --git a/integration/tests/cli/user/test_user_create_command.rs b/integration/tests/cli/user/test_user_create_command.rs index fbff9d15c..5d469370e 100644 --- a/integration/tests/cli/user/test_user_create_command.rs +++ b/integration/tests/cli/user/test_user_create_command.rs @@ -2,6 +2,7 @@ use crate::cli::common::{ IggyCmdCommand, IggyCmdTest, IggyCmdTestCase, TestHelpCmd, CLAP_INDENT, USAGE_PREFIX, }; use crate::cli::user::common::PermissionsTestArgs; +use ahash::AHashMap; use assert_cmd::assert::Assert; use async_trait::async_trait; use iggy::client::Client; @@ -9,7 +10,6 @@ use iggy::models::permissions::{GlobalPermissions, StreamPermissions, TopicPermi use iggy::models::{permissions::Permissions, user_status::UserStatus}; use predicates::str::diff; use serial_test::parallel; -use std::collections::HashMap; #[derive(Debug, Clone)] enum UserStatusTest { @@ -175,7 +175,7 @@ pub async fn should_be_successful() { vec![String::from("3")], Some(Permissions { global: GlobalPermissions::default(), - streams: Some(HashMap::from([(3u32, StreamPermissions::default())])), + streams: Some(AHashMap::from([(3u32, StreamPermissions::default())])), }), ), )) @@ -190,10 +190,10 @@ pub async fn should_be_successful() { vec![String::from("1#1:m_top,r_top,p_msg,s_msg")], Some(Permissions { global: GlobalPermissions::default(), - streams: Some(HashMap::from([( + streams: Some(AHashMap::from([( 1u32, StreamPermissions { - topics: Some(HashMap::from([( + topics: Some(AHashMap::from([( 1, TopicPermissions { manage_topic: true, @@ -230,10 +230,10 @@ pub async fn should_be_successful() { poll_messages: false, send_messages: false, }, - streams: Some(HashMap::from([( + streams: Some(AHashMap::from([( 2u32, StreamPermissions { - topics: Some(HashMap::from([( + topics: Some(AHashMap::from([( 1, TopicPermissions { manage_topic: false, diff --git a/integration/tests/cli/user/test_user_get_command.rs b/integration/tests/cli/user/test_user_get_command.rs index cd4d9d526..34327134b 100644 --- a/integration/tests/cli/user/test_user_get_command.rs +++ b/integration/tests/cli/user/test_user_get_command.rs @@ -1,9 +1,8 @@ -use std::collections::HashMap; - use crate::cli::common::{ IggyCmdCommand, IggyCmdTest, IggyCmdTestCase, TestHelpCmd, TestUserId, CLAP_INDENT, USAGE_PREFIX, }; +use ahash::AHashMap; use assert_cmd::assert::Assert; use async_trait::async_trait; use iggy::client::Client; @@ -75,10 +74,10 @@ impl TestUserGetCmd { if let Some(topic_id) = self.check_topic_perms { stream_perms.topics = - Some(HashMap::from([(topic_id, TopicPermissions::default())])); + Some(AHashMap::from([(topic_id, TopicPermissions::default())])); }; - permissions.streams = Some(HashMap::from([(stream_id, stream_perms)])); + permissions.streams = Some(AHashMap::from([(stream_id, stream_perms)])); } Some(permissions) diff --git a/integration/tests/cli/user/test_user_permissions_command.rs b/integration/tests/cli/user/test_user_permissions_command.rs index 7a1e407d2..bceacb606 100644 --- a/integration/tests/cli/user/test_user_permissions_command.rs +++ b/integration/tests/cli/user/test_user_permissions_command.rs @@ -3,6 +3,7 @@ use crate::cli::common::{ USAGE_PREFIX, }; use crate::cli::user::common::PermissionsTestArgs; +use ahash::AHashMap; use assert_cmd::assert::Assert; use async_trait::async_trait; use iggy::client::Client; @@ -12,7 +13,6 @@ use iggy::models::user_info::UserId; use iggy::models::user_status::UserStatus; use predicates::str::diff; use serial_test::parallel; -use std::collections::HashMap; struct TestUserPermissionsCmd { username: String, @@ -140,7 +140,7 @@ pub async fn should_be_successful() { vec![String::from("3")], Some(Permissions { global: GlobalPermissions::default(), - streams: Some(HashMap::from([(3u32, StreamPermissions::default())])), + streams: Some(AHashMap::from([(3u32, StreamPermissions::default())])), }), ), TestUserId::Numeric, @@ -154,10 +154,10 @@ pub async fn should_be_successful() { vec![String::from("1#2:m_top,r_top,p_msg,s_msg")], Some(Permissions { global: GlobalPermissions::default(), - streams: Some(HashMap::from([( + streams: Some(AHashMap::from([( 1u32, StreamPermissions { - topics: Some(HashMap::from([( + topics: Some(AHashMap::from([( 2, TopicPermissions { manage_topic: true, @@ -193,10 +193,10 @@ pub async fn should_be_successful() { poll_messages: false, send_messages: false, }, - streams: Some(HashMap::from([( + streams: Some(AHashMap::from([( 2u32, StreamPermissions { - topics: Some(HashMap::from([( + topics: Some(AHashMap::from([( 2u32, TopicPermissions { manage_topic: false, diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 745991cf8..0e0cb0f52 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.11" +version = "0.6.12" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" @@ -13,6 +13,7 @@ readme = "../README.md" [dependencies] aes-gcm = "0.10.3" +ahash = { version = "0.8.11", features = ["serde"] } anyhow = "1.0.86" async-broadcast = { version = "0.7.1" } async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } diff --git a/sdk/src/models/permissions.rs b/sdk/src/models/permissions.rs index b79ee6834..6c79bd2e8 100644 --- a/sdk/src/models/permissions.rs +++ b/sdk/src/models/permissions.rs @@ -1,8 +1,8 @@ use crate::bytes_serializable::BytesSerializable; use crate::error::IggyError; +use ahash::AHashMap; use bytes::{Buf, BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::fmt::Display; /// `Permissions` is used to define the permissions of a user. @@ -15,7 +15,7 @@ pub struct Permissions { pub global: GlobalPermissions, /// Stream permissions are applied to a specific stream. - pub streams: Option>, + pub streams: Option>, } /// `GlobalPermissions` are applied to all streams without a need to specify them one by one in the `streams` field. @@ -122,7 +122,7 @@ pub struct StreamPermissions { pub send_messages: bool, /// The `topics` field allows to define the granular permissions for each topic of a stream. - pub topics: Option>, + pub topics: Option>, } /// `TopicPermissions` are applied to a specific topic of a stream. This is the lowest level of permissions. @@ -274,7 +274,7 @@ impl BytesSerializable for Permissions { let send_messages = bytes.get_u8() == 1; let mut streams = None; if bytes.get_u8() == 1 { - let mut streams_map = HashMap::new(); + let mut streams_map = AHashMap::new(); loop { let stream_id = bytes.get_u32_le(); let manage_stream = bytes.get_u8() == 1; @@ -285,7 +285,7 @@ impl BytesSerializable for Permissions { let send_messages = bytes.get_u8() == 1; let mut topics = None; if bytes.get_u8() == 1 { - let mut topics_map = HashMap::new(); + let mut topics_map = AHashMap::new(); loop { let topic_id = bytes.get_u32_le(); let manage_topic = bytes.get_u8() == 1; @@ -362,7 +362,7 @@ mod tests { poll_messages: true, send_messages: true, }, - streams: Some(HashMap::from([ + streams: Some(AHashMap::from([ ( 1, StreamPermissions { @@ -372,7 +372,7 @@ mod tests { read_topics: true, poll_messages: true, send_messages: true, - topics: Some(HashMap::from([ + topics: Some(AHashMap::from([ ( 1, TopicPermissions { diff --git a/server/Cargo.toml b/server/Cargo.toml index 0e84f84ab..cd5defba6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.13" +version = "0.4.14" edition = "2021" build = "src/build.rs" @@ -10,6 +10,7 @@ jemalloc = ["dep:tikv-jemallocator"] tokio-console = ["dep:console-subscriber", "tokio/tracing"] [dependencies] +ahash = { version = "0.8.11" } anyhow = "1.0.86" async-stream = "0.3.5" async-trait = "0.1.80" diff --git a/server/src/streaming/systems/consumer_groups.rs b/server/src/streaming/systems/consumer_groups.rs index 4196e6eea..d9958b3f7 100644 --- a/server/src/streaming/systems/consumer_groups.rs +++ b/server/src/streaming/systems/consumer_groups.rs @@ -132,8 +132,7 @@ impl System { let group_id; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; { let consumer_group = topic.get_consumer_group(consumer_group_id)?; diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 78b69a32b..0ad76fe88 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -25,10 +25,9 @@ impl System { return Err(IggyError::InvalidMessagesCount); } - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner - .poll_messages(session.get_user_id(), stream.stream_id, topic.topic_id)?; + .poll_messages(session.get_user_id(), topic.stream_id, topic.topic_id)?; if !topic.has_partitions() { return Err(IggyError::NoPartitions(topic.topic_id, topic.stream_id)); @@ -94,11 +93,10 @@ impl System { messages: Vec, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let stream = self.get_stream(&stream_id)?; - let topic = stream.get_topic(&topic_id)?; + let topic = self.find_topic(session, &stream_id, &topic_id)?; self.permissioner.append_messages( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; diff --git a/server/src/streaming/systems/partitions.rs b/server/src/streaming/systems/partitions.rs index c2ad53cbd..86db98ec5 100644 --- a/server/src/streaming/systems/partitions.rs +++ b/server/src/streaming/systems/partitions.rs @@ -13,11 +13,10 @@ impl System { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.create_partitions( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; } @@ -39,11 +38,10 @@ impl System { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.delete_partitions( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; } diff --git a/server/src/streaming/systems/topics.rs b/server/src/streaming/systems/topics.rs index dca5236fa..3000b457b 100644 --- a/server/src/streaming/systems/topics.rs +++ b/server/src/streaming/systems/topics.rs @@ -94,11 +94,10 @@ impl System { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.update_topic( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; } @@ -129,14 +128,13 @@ impl System { self.ensure_authenticated(session)?; let stream_id_value; { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner.delete_topic( session.get_user_id(), - stream.stream_id, + topic.stream_id, topic.topic_id, )?; - stream_id_value = stream.stream_id; + stream_id_value = topic.stream_id; } let topic = self @@ -163,10 +161,9 @@ impl System { stream_id: &Identifier, topic_id: &Identifier, ) -> Result<(), IggyError> { - let stream = self.get_stream(stream_id)?; - let topic = stream.get_topic(topic_id)?; + let topic = self.find_topic(session, stream_id, topic_id)?; self.permissioner - .purge_topic(session.get_user_id(), stream.stream_id, topic.topic_id)?; + .purge_topic(session.get_user_id(), topic.stream_id, topic.topic_id)?; topic.purge().await } } diff --git a/server/src/streaming/users/permissioner.rs b/server/src/streaming/users/permissioner.rs index babfa801d..b13a40ba0 100644 --- a/server/src/streaming/users/permissioner.rs +++ b/server/src/streaming/users/permissioner.rs @@ -1,17 +1,16 @@ use crate::streaming::users::user::User; +use ahash::{AHashMap, AHashSet}; use iggy::models::permissions::{GlobalPermissions, Permissions, StreamPermissions}; use iggy::models::user_info::UserId; -use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Default)] pub struct Permissioner { - pub(super) users_permissions: HashMap, - pub(super) users_streams_permissions: HashMap<(UserId, u32), StreamPermissions>, - pub(super) users_that_can_poll_messages_from_all_streams: HashSet, - pub(super) users_that_can_send_messages_to_all_streams: HashSet, - pub(super) users_that_can_poll_messages_from_specific_streams: HashSet<(UserId, u32)>, - pub(super) users_that_can_send_messages_to_specific_streams: HashSet<(UserId, u32)>, + pub(super) users_permissions: AHashMap, + pub(super) users_streams_permissions: AHashMap<(UserId, u32), StreamPermissions>, + pub(super) users_that_can_poll_messages_from_all_streams: AHashSet, + pub(super) users_that_can_send_messages_to_all_streams: AHashSet, + pub(super) users_that_can_poll_messages_from_specific_streams: AHashSet<(UserId, u32)>, + pub(super) users_that_can_send_messages_to_specific_streams: AHashSet<(UserId, u32)>, } impl Permissioner {