Skip to content

Commit

Permalink
experiments
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Aug 23, 2024
1 parent 17ba64c commit e22adba
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 11 deletions.
115 changes: 115 additions & 0 deletions examples/t.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use lapin::{
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let recovery_config = lapin::experimental::RecoveryConfig {
auto_recover_channels: true,
};

async_global_executor::block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default().with_experimental_recovery_config(recovery_config))
.await
.expect("connection error");

info!("CONNECTED");

{
let channel1 = conn.create_channel().await.expect("create_channel");
let channel2 = conn.create_channel().await.expect("create_channel");
channel1
.queue_declare(
"recover-test",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");

info!("will consume");
let channel = channel2.clone();
channel2
.basic_consume(
"recover-test",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume")
.set_delegate(move |delivery: DeliveryResult| {
let channel = channel.clone();
async move {
info!(message=?delivery, "received message");
if let Ok(Some(delivery)) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
if &delivery.data[..] == b"after" {
channel
.basic_cancel("my_consumer", BasicCancelOptions::default())
.await
.expect("basic_cancel");
}
}
}
});

info!("will publish");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"before",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::NotRequested);

info!("before fail");
assert!(channel1
.queue_declare(
"fake queue",
QueueDeclareOptions {
passive: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.is_err());
info!("after fail");

info!("publish after");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"after",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::NotRequested);
}

conn.run().expect("conn.run");
});
}
31 changes: 22 additions & 9 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,13 +573,18 @@ impl Channel {
}

fn on_channel_close_ok_sent(&self, error: Option<Error>) {
self.set_closed(
error
.clone()
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
);
if let Some(error) = error {
self.error_handler.on_error(error);
match (self.recovery_config.auto_recover_channels, error) {
(true, Some(error)) if error.is_amqp_soft_error() => self.status.set_reconnecting(error),
(_, error) => {
self.set_closed(
error
.clone()
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
);
if let Some(error) = error {
self.error_handler.on_error(error);
}
}
}
}

Expand Down Expand Up @@ -862,6 +867,9 @@ impl Channel {
resolver: PromiseResolver<Channel>,
channel: Channel,
) -> Result<()> {
if let Some(error) = self.status.reconnect_cause() {
self.frames.clear_expected_replies(self.id, error);
}
self.set_state(ChannelState::Connected);
resolver.resolve(channel);
Ok(())
Expand Down Expand Up @@ -901,8 +909,13 @@ impl Channel {
self.set_closing(error.clone().ok());
let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
let channel = self.clone();
self.internal_rpc
.register_internal_future(async move { channel.channel_close_ok(error).await });
self.internal_rpc.register_internal_future(async move {
channel.channel_close_ok(error).await?;
if channel.recovery_config.auto_recover_channels {
channel.clone().channel_open(channel).await?;
}
Ok(())
});
Ok(())
}

Expand Down
17 changes: 15 additions & 2 deletions src/channel_status.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
channel_receiver_state::{ChannelReceiverStates, DeliveryCause},
types::{ChannelId, Identifier, PayloadSize},
Result,
Error, Result,
};
use parking_lot::Mutex;
use std::{fmt, sync::Arc};
Expand All @@ -12,7 +12,7 @@ pub struct ChannelStatus(Arc<Mutex<Inner>>);

impl ChannelStatus {
pub fn initializing(&self) -> bool {
self.0.lock().state == ChannelState::Initial
[ChannelState::Initial, ChannelState::Reconnecting].contains(&self.0.lock().state)
}

pub fn closing(&self) -> bool {
Expand All @@ -23,6 +23,10 @@ impl ChannelStatus {
self.0.lock().state == ChannelState::Connected
}

pub(crate) fn reconnect_cause(&self) -> Option<Error> {
self.0.lock().reconnect_cause.take()
}

pub(crate) fn can_receive_messages(&self) -> bool {
[ChannelState::Closing, ChannelState::Connected].contains(&self.0.lock().state)
}
Expand All @@ -44,6 +48,12 @@ impl ChannelStatus {
self.0.lock().state = state;
}

pub(crate) fn set_reconnecting(&self, error: Error) {
let mut inner = self.0.lock();
inner.state = ChannelState::Reconnecting;
inner.reconnect_cause = Some(error);
}

pub(crate) fn auto_close(&self, id: ChannelId) -> bool {
id != 0 && self.0.lock().state == ChannelState::Connected
}
Expand Down Expand Up @@ -116,6 +126,7 @@ impl ChannelStatus {
pub enum ChannelState {
#[default]
Initial,
Reconnecting,
Connected,
Closing,
Closed,
Expand All @@ -141,6 +152,7 @@ struct Inner {
send_flow: bool,
state: ChannelState,
receiver_state: ChannelReceiverStates,
reconnect_cause: Option<Error>,
}

impl Default for Inner {
Expand All @@ -150,6 +162,7 @@ impl Default for Inner {
send_flow: true,
state: ChannelState::default(),
receiver_state: ChannelReceiverStates::default(),
reconnect_cause: None,
}
}
}

0 comments on commit e22adba

Please sign in to comment.