Skip to content

Commit

Permalink
Add shutdown function to Client trait (#1237)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 11, 2024
1 parent 4149a19 commit 15de5d9
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.17"
version = "0.6.18"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub mod users;
/// The state of the client.
#[derive(Debug, Copy, Clone, PartialEq, Display)]
pub enum ClientState {
/// The client is shutdown.
#[display("shutdown")]
Shutdown,
/// The client is disconnected.
#[display("disconnected")]
Disconnected,
Expand Down Expand Up @@ -61,6 +64,7 @@ pub trait BinaryTransport {

async fn fail_if_not_authenticated<T: BinaryTransport>(transport: &T) -> Result<(), IggyError> {
match transport.get_state().await {
ClientState::Shutdown => Err(IggyError::ClientShutdown),
ClientState::Disconnected | ClientState::Connecting | ClientState::Authenticating => {
Err(IggyError::Disconnected)
}
Expand Down
3 changes: 3 additions & 0 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub trait Client:
/// Disconnect from the server. If the client is not connected, it will do nothing.
async fn disconnect(&self) -> Result<(), IggyError>;

// Shutdown the client and release all the resources.
async fn shutdown(&self) -> Result<(), IggyError>;

/// Subscribe to diagnostic events.
async fn subscribe_events(&self) -> Receiver<DiagnosticEvent>;
}
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ impl Client for IggyClient {
self.client.read().await.disconnect().await
}

async fn shutdown(&self) -> Result<(), IggyError> {
self.client.read().await.shutdown().await
}

async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
self.client.read().await.subscribe_events().await
}
Expand Down
7 changes: 7 additions & 0 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ impl IggyConsumer {
while let Some(event) = receiver.next().await {
trace!("Received diagnostic event: {event}");
match event {
DiagnosticEvent::Shutdown => {
warn!("Consumer has been shutdown");
joined_consumer_group.store(false, ORDERING);
can_poll.store(false, ORDERING);
break;
}

DiagnosticEvent::Connected => {
trace!("Connected to the server");
joined_consumer_group.store(false, ORDERING);
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/clients/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ impl IggyProducer {
while let Some(event) = receiver.next().await {
trace!("Received diagnostic event: {event}");
match event {
DiagnosticEvent::Shutdown => {
can_send.store(false, ORDERING);
warn!("Client has been shutdown");
}
DiagnosticEvent::Connected => {
can_send.store(false, ORDERING);
trace!("Connected to the server");
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/diagnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DiagnosticEvent {
#[display("shutdown")]
Shutdown,
#[display("disconnected")]
Disconnected,
#[display("connected")]
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum IggyError {
NotConnected = 61,
#[error("Request error")]
RequestError(#[from] reqwest::Error) = 62,
#[error("Client shutdown")]
ClientShutdown = 63,
#[error("Invalid encryption key")]
InvalidEncryptionKey = 70,
#[error("Cannot encrypt data")]
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl Client for HttpClient {
HttpClient::disconnect(self).await
}

async fn shutdown(&self) -> Result<(), IggyError> {
Ok(())
}

async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
self.events.1.clone()
}
Expand Down
30 changes: 30 additions & 0 deletions sdk/src/quic/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl Client for QuicClient {
QuicClient::disconnect(self).await
}

async fn shutdown(&self) -> Result<(), IggyError> {
QuicClient::shutdown(self).await
}

async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
self.events.1.clone()
}
Expand Down Expand Up @@ -208,6 +212,10 @@ impl QuicClient {

async fn connect(&self) -> Result<(), IggyError> {
match self.get_state().await {
ClientState::Shutdown => {
trace!("Cannot connect. Client is shutdown.");
return Err(IggyError::ClientShutdown);
}
ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => {
trace!("Client is already connected.");
return Ok(());
Expand Down Expand Up @@ -328,6 +336,24 @@ impl QuicClient {
}
}

async fn shutdown(&self) -> Result<(), IggyError> {
if self.get_state().await == ClientState::Shutdown {
return Ok(());
}

info!("Shutting down the {NAME} QUIC client.");
let connection = self.connection.lock().await.take();
if let Some(connection) = connection {
connection.close(0u32.into(), b"");
}

self.endpoint.wait_idle().await;
self.set_state(ClientState::Shutdown).await;
self.publish_event(DiagnosticEvent::Shutdown).await;
info!("{NAME} QUIC client has been shutdown.");
Ok(())
}

async fn disconnect(&self) -> Result<(), IggyError> {
if self.get_state().await == ClientState::Disconnected {
return Ok(());
Expand All @@ -351,6 +377,10 @@ impl QuicClient {

async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> {
match self.get_state().await {
ClientState::Shutdown => {
trace!("Cannot send data. Client is shutdown.");
return Err(IggyError::ClientShutdown);
}
ClientState::Disconnected => {
trace!(
"Cannot send data. Client: {} is not connected.",
Expand Down
50 changes: 50 additions & 0 deletions sdk/src/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub(crate) trait ConnectionStream: Debug + Sync + Send {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError>;
async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>;
async fn flush(&mut self) -> Result<(), IggyError>;
async fn shutdown(&mut self) -> Result<(), IggyError>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -121,6 +122,16 @@ impl ConnectionStream for TcpConnectionStream {
IggyError::from(error)
})
}

async fn shutdown(&mut self) -> Result<(), IggyError> {
self.writer.shutdown().await.map_err(|error| {
error!(
"Failed to shutdown the TCP connection by client: {} to the TCP connection: {error}",
self.client_address
);
IggyError::from(error)
})
}
}

#[async_trait]
Expand Down Expand Up @@ -154,6 +165,16 @@ impl ConnectionStream for TcpTlsConnectionStream {
IggyError::from(error)
})
}

async fn shutdown(&mut self) -> Result<(), IggyError> {
self.stream.shutdown().await.map_err(|error| {
error!(
"Failed to shutdown the TCP TLS connection by client: {} to the TCP TLS connection: {error}",
self.client_address
);
IggyError::from(error)
})
}
}

impl Default for TcpClient {
Expand All @@ -172,6 +193,10 @@ impl Client for TcpClient {
TcpClient::disconnect(self).await
}

async fn shutdown(&self) -> Result<(), IggyError> {
TcpClient::shutdown(self).await
}

async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
self.events.1.clone()
}
Expand Down Expand Up @@ -334,6 +359,10 @@ impl TcpClient {

async fn connect(&self) -> Result<(), IggyError> {
match self.get_state().await {
ClientState::Shutdown => {
trace!("Cannot connect. Client is shutdown.");
return Err(IggyError::ClientShutdown);
}
ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => {
let client_address = self.get_client_address_value().await;
trace!("Client: {client_address} is already connected.");
Expand Down Expand Up @@ -487,8 +516,29 @@ impl TcpClient {
Ok(())
}

async fn shutdown(&self) -> Result<(), IggyError> {
if self.get_state().await == ClientState::Shutdown {
return Ok(());
}

let client_address = self.get_client_address_value().await;
info!("Shutting down the {NAME} TCP client: {client_address}");
let stream = self.stream.lock().await.take();
if let Some(mut stream) = stream {
stream.shutdown().await?;
}
self.set_state(ClientState::Shutdown).await;
self.publish_event(DiagnosticEvent::Shutdown).await;
info!("{NAME} TCP client: {client_address} has been shutdown.");
Ok(())
}

async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> {
match self.get_state().await {
ClientState::Shutdown => {
trace!("Cannot send data. Client is shutdown.");
return Err(IggyError::ClientShutdown);
}
ClientState::Disconnected => {
trace!("Cannot send data. Client is not connected.");
return Err(IggyError::NotConnected);
Expand Down

0 comments on commit 15de5d9

Please sign in to comment.