Skip to content

Commit

Permalink
propagate recovery_config in each Channel
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Aug 23, 2024
1 parent fbb273b commit 17ba64c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
7 changes: 6 additions & 1 deletion src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
consumer::Consumer,
consumers::Consumers,
error_handler::ErrorHandler,
experimental::RecoveryConfig,
frames::{ExpectedReply, Frames},
internal_rpc::InternalRPCHandle,
message::{BasicGetMessage, BasicReturnMessage, Delivery},
Expand Down Expand Up @@ -59,6 +60,7 @@ pub struct Channel {
executor: Arc<dyn FullExecutor + Send + Sync>,
channel_closer: Option<Arc<ChannelCloser>>,
connection_closer: Option<Arc<ConnectionCloser>>,
recovery_config: RecoveryConfig,
}

impl PartialEq for Channel {
Expand Down Expand Up @@ -94,6 +96,7 @@ impl Channel {
frames: Frames,
executor: Arc<dyn FullExecutor + Send + Sync>,
connection_closer: Option<Arc<ConnectionCloser>>,
recovery_config: RecoveryConfig,
) -> Channel {
let returned_messages = ReturnedMessages::default();
let status = ChannelStatus::default();
Expand All @@ -106,7 +109,7 @@ impl Channel {
internal_rpc.clone(),
)))
};
Channel {
Self {
id: channel_id,
configuration,
status,
Expand All @@ -124,6 +127,7 @@ impl Channel {
executor,
channel_closer,
connection_closer,
recovery_config,
}
}

Expand Down Expand Up @@ -267,6 +271,7 @@ impl Channel {
executor: self.executor.clone(),
channel_closer: None,
connection_closer: self.connection_closer.clone(),
recovery_config: self.recovery_config.clone(),
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/channels.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
connection_closer::ConnectionCloser,
error_handler::ErrorHandler,
experimental::RecoveryConfig,
frames::Frames,
id_sequence::IdSequence,
internal_rpc::InternalRPCHandle,
Expand Down Expand Up @@ -38,9 +39,10 @@ impl Channels {
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn FullExecutor + Send + Sync>,
recovery_config: RecoveryConfig,
) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner::new(configuration, waker))),
inner: Arc::new(Mutex::new(Inner::new(configuration, waker, recovery_config))),
connection_status,
global_registry,
internal_rpc,
Expand Down Expand Up @@ -296,15 +298,17 @@ struct Inner {
channel_id: IdSequence<ChannelId>,
configuration: Configuration,
waker: SocketStateHandle,
recovery_config: RecoveryConfig,
}

impl Inner {
fn new(configuration: Configuration, waker: SocketStateHandle) -> Self {
fn new(configuration: Configuration, waker: SocketStateHandle, recovery_config: RecoveryConfig) -> Self {
Self {
channels: HashMap::default(),
channel_id: IdSequence::new(false),
configuration,
waker,
recovery_config,
}
}

Expand All @@ -329,6 +333,7 @@ impl Inner {
frames,
executor,
connection_closer,
self.recovery_config.clone(),
);
self.channels.insert(id, channel.clone_internal());
channel
Expand Down
6 changes: 6 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
connection_closer::ConnectionCloser,
connection_properties::ConnectionProperties,
connection_status::{ConnectionState, ConnectionStatus, ConnectionStep},
experimental::RecoveryConfig,
frames::Frames,
heartbeat::Heartbeat,
internal_rpc::{InternalRPC, InternalRPCHandle},
Expand Down Expand Up @@ -53,6 +54,7 @@ impl Connection {
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn FullExecutor + Send + Sync>,
recovery_config: RecoveryConfig,
) -> Self {
let configuration = Configuration::default();
let status = ConnectionStatus::default();
Expand All @@ -65,6 +67,7 @@ impl Connection {
internal_rpc.clone(),
frames,
executor,
recovery_config,
);
let closer = Arc::new(ConnectionCloser::new(status.clone(), internal_rpc));
let connection = Self {
Expand Down Expand Up @@ -343,6 +346,7 @@ impl Connection {
internal_rpc.handle(),
frames.clone(),
executor.clone(),
options.recovery_config.clone().unwrap_or_default(),
);
let status = conn.status.clone();
let configuration = conn.configuration.clone();
Expand Down Expand Up @@ -507,6 +511,7 @@ mod tests {
internal_rpc.handle(),
Frames::default(),
executor.clone(),
RecoveryConfig::default(),
);
conn.status.set_state(ConnectionState::Connected);
conn.configuration.set_channel_max(2047);
Expand Down Expand Up @@ -586,6 +591,7 @@ mod tests {
internal_rpc.handle(),
Frames::default(),
executor.clone(),
RecoveryConfig::default(),
);
conn.status.set_state(ConnectionState::Connected);
conn.configuration.set_channel_max(2047);
Expand Down

0 comments on commit 17ba64c

Please sign in to comment.