From e22adbae0cb9c45250c34d7ebd213346e3a60e4d Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Tue, 6 Aug 2024 22:27:00 +0200 Subject: [PATCH] experiments Signed-off-by: Marc-Antoine Perennou --- examples/t.rs | 115 ++++++++++++++++++++++++++++++++++++++++++ src/channel.rs | 31 ++++++++---- src/channel_status.rs | 17 ++++++- 3 files changed, 152 insertions(+), 11 deletions(-) create mode 100644 examples/t.rs diff --git a/examples/t.rs b/examples/t.rs new file mode 100644 index 00000000..23a0b25f --- /dev/null +++ b/examples/t.rs @@ -0,0 +1,115 @@ +use lapin::{ + message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable, + BasicProperties, Connection, ConnectionProperties, +}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info"); + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let recovery_config = lapin::experimental::RecoveryConfig { + auto_recover_channels: true, + }; + + async_global_executor::block_on(async { + let conn = Connection::connect(&addr, ConnectionProperties::default().with_experimental_recovery_config(recovery_config)) + .await + .expect("connection error"); + + info!("CONNECTED"); + + { + let channel1 = conn.create_channel().await.expect("create_channel"); + let channel2 = conn.create_channel().await.expect("create_channel"); + channel1 + .queue_declare( + "recover-test", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + + info!("will consume"); + let channel = channel2.clone(); + channel2 + .basic_consume( + "recover-test", + "my_consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("basic_consume") + .set_delegate(move |delivery: DeliveryResult| { + let channel = channel.clone(); + async move { + info!(message=?delivery, "received message"); + if let Ok(Some(delivery)) = delivery { + delivery + .ack(BasicAckOptions::default()) + .await + .expect("basic_ack"); + if &delivery.data[..] == b"after" { + channel + .basic_cancel("my_consumer", BasicCancelOptions::default()) + .await + .expect("basic_cancel"); + } + } + } + }); + + info!("will publish"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"before", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::NotRequested); + + info!("before fail"); + assert!(channel1 + .queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err()); + info!("after fail"); + + info!("publish after"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"after", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::NotRequested); + } + + conn.run().expect("conn.run"); + }); +} diff --git a/src/channel.rs b/src/channel.rs index 198b45ff..58464342 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -573,13 +573,18 @@ impl Channel { } fn on_channel_close_ok_sent(&self, error: Option) { - self.set_closed( - error - .clone() - .unwrap_or(Error::InvalidChannelState(ChannelState::Closing)), - ); - if let Some(error) = error { - self.error_handler.on_error(error); + match (self.recovery_config.auto_recover_channels, error) { + (true, Some(error)) if error.is_amqp_soft_error() => self.status.set_reconnecting(error), + (_, error) => { + self.set_closed( + error + .clone() + .unwrap_or(Error::InvalidChannelState(ChannelState::Closing)), + ); + if let Some(error) = error { + self.error_handler.on_error(error); + } + } } } @@ -862,6 +867,9 @@ impl Channel { resolver: PromiseResolver, channel: Channel, ) -> Result<()> { + if let Some(error) = self.status.reconnect_cause() { + self.frames.clear_expected_replies(self.id, error); + } self.set_state(ChannelState::Connected); resolver.resolve(channel); Ok(()) @@ -901,8 +909,13 @@ impl Channel { self.set_closing(error.clone().ok()); let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok(); let channel = self.clone(); - self.internal_rpc - .register_internal_future(async move { channel.channel_close_ok(error).await }); + self.internal_rpc.register_internal_future(async move { + channel.channel_close_ok(error).await?; + if channel.recovery_config.auto_recover_channels { + channel.clone().channel_open(channel).await?; + } + Ok(()) + }); Ok(()) } diff --git a/src/channel_status.rs b/src/channel_status.rs index 230020e2..e3e14bc2 100644 --- a/src/channel_status.rs +++ b/src/channel_status.rs @@ -1,7 +1,7 @@ use crate::{ channel_receiver_state::{ChannelReceiverStates, DeliveryCause}, types::{ChannelId, Identifier, PayloadSize}, - Result, + Error, Result, }; use parking_lot::Mutex; use std::{fmt, sync::Arc}; @@ -12,7 +12,7 @@ pub struct ChannelStatus(Arc>); impl ChannelStatus { pub fn initializing(&self) -> bool { - self.0.lock().state == ChannelState::Initial + [ChannelState::Initial, ChannelState::Reconnecting].contains(&self.0.lock().state) } pub fn closing(&self) -> bool { @@ -23,6 +23,10 @@ impl ChannelStatus { self.0.lock().state == ChannelState::Connected } + pub(crate) fn reconnect_cause(&self) -> Option { + self.0.lock().reconnect_cause.take() + } + pub(crate) fn can_receive_messages(&self) -> bool { [ChannelState::Closing, ChannelState::Connected].contains(&self.0.lock().state) } @@ -44,6 +48,12 @@ impl ChannelStatus { self.0.lock().state = state; } + pub(crate) fn set_reconnecting(&self, error: Error) { + let mut inner = self.0.lock(); + inner.state = ChannelState::Reconnecting; + inner.reconnect_cause = Some(error); + } + pub(crate) fn auto_close(&self, id: ChannelId) -> bool { id != 0 && self.0.lock().state == ChannelState::Connected } @@ -116,6 +126,7 @@ impl ChannelStatus { pub enum ChannelState { #[default] Initial, + Reconnecting, Connected, Closing, Closed, @@ -141,6 +152,7 @@ struct Inner { send_flow: bool, state: ChannelState, receiver_state: ChannelReceiverStates, + reconnect_cause: Option, } impl Default for Inner { @@ -150,6 +162,7 @@ impl Default for Inner { send_flow: true, state: ChannelState::default(), receiver_state: ChannelReceiverStates::default(), + reconnect_cause: None, } } }