Skip to content

Commit

Permalink
Replace ResultContext with ErrContext in error_set usage
Browse files Browse the repository at this point in the history
  • Loading branch information
hubcio committed Dec 31, 2024
1 parent 526ca77 commit 9936028
Show file tree
Hide file tree
Showing 90 changed files with 821 additions and 765 deletions.
392 changes: 199 additions & 193 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions server/src/archiver/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::archiver::{Archiver, COMPONENT};
use crate::configs::server::DiskArchiverConfig;
use crate::server_error::ArchiverError;
use async_trait::async_trait;
use error_set::ResultContext;
use error_set::ErrContext;
use std::path::Path;
use tokio::fs;
use tracing::{debug, info};
Expand All @@ -25,7 +25,7 @@ impl Archiver for DiskArchiver {
info!("Creating disk archiver directory: {}", self.config.path);
fs::create_dir_all(&self.config.path)
.await
.with_error(|err| {
.with_error_context(|err| {
format!(
"ARCHIVER - failed to create directory: {} with error: {err}",
self.config.path
Expand Down Expand Up @@ -68,10 +68,10 @@ impl Archiver for DiskArchiver {
let destination_path = destination.to_str().unwrap_or_default().to_owned();
fs::create_dir_all(destination.parent().unwrap())
.await
.with_error(|err| {
.with_error_context(|err| {
format!("{COMPONENT} - failed to create file: {file} at path: {destination_path} with error: {err}",)
})?;
fs::copy(source, destination).await.with_error(|err| {
fs::copy(source, destination).await.with_error_context(|err| {
format!("{COMPONENT} - failed to copy file: {file} to destination: {destination_path} with error: {err}")
})?;
debug!("Archived file: {file} at: {destination_path}");
Expand Down
14 changes: 7 additions & 7 deletions server/src/archiver/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::configs::server::S3ArchiverConfig;
use crate::server_error::ArchiverError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use error_set::ResultContext;
use error_set::ErrContext;
use s3::creds::Credentials;
use s3::{Bucket, Region};
use std::path::Path;
Expand Down Expand Up @@ -61,13 +61,13 @@ impl S3Archiver {
debug!("Creating temporary S3 upload directory: {destination_path}");
fs::create_dir_all(destination.parent().unwrap())
.await
.with_error(|err| {
.with_error_context(|err| {
format!(
"{COMPONENT} - failed to create temporary S3 upload directory for path: {destination_path} with error: {err}"
)
})?;
debug!("Copying file: {path} to temporary S3 upload path: {destination_path}");
fs::copy(source, &destination).await.with_error(|err| {
fs::copy(source, &destination).await.with_error_context(|err| {
format!("{COMPONENT} - failed to copy file: {path} to temporary S3 upload path: {destination_path} with error: {err}")
})?;
debug!("File: {path} copied to temporary S3 upload path: {destination_path}");
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Archiver for S3Archiver {
debug!("Archiving file: {source} on S3.");
let mut file = file::open(&source)
.await
.with_error(|err| format!("{COMPONENT} - failed to open source file: {source} for archiving with error: {err}"))?;
.with_error_context(|err| format!("{COMPONENT} - failed to open source file: {source} for archiving with error: {err}"))?;
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&base_directory).join(path);
let destination_path = destination.to_str().unwrap_or_default().to_owned();
Expand All @@ -150,7 +150,7 @@ impl Archiver for S3Archiver {
.await;
if let Err(error) = response {
error!("Cannot archive file: {path} on S3: {}", error);
fs::remove_file(&source).await.with_error(|err| {
fs::remove_file(&source).await.with_error_context(|err| {
format!("{COMPONENT} - failed to remove temporary file: {source} after S3 failure with error: {err}")
})?;
return Err(ArchiverError::CannotArchiveFile {
Expand All @@ -162,14 +162,14 @@ impl Archiver for S3Archiver {
let status = response.status_code();
if status == 200 {
debug!("Archived file: {path} on S3.");
fs::remove_file(&source).await.with_error(|err| {
fs::remove_file(&source).await.with_error_context(|err| {
format!("{COMPONENT} - failed to remove temporary file: {source} after successful archive with error: {err}")
})?;
continue;
}

error!("Cannot archive file: {path} on S3, received an invalid status code: {status}.");
fs::remove_file(&source).await.with_error(|err| {
fs::remove_file(&source).await.with_error_context(|err| {
format!("{COMPONENT} - failed to remove temporary file: {source} after invalid status code with error: {err}")
})?;
return Err(ArchiverError::CannotArchiveFile {
Expand Down
20 changes: 13 additions & 7 deletions server/src/binary/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::binary::COMPONENT;
use crate::command::ServerCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use tracing::{debug, error};

Expand All @@ -40,16 +40,22 @@ pub async fn handle(
Err(error) => {
error!("Command was not handled successfully, session: {session}, error: {error}.");
if let IggyError::ClientNotFound(_) = error {
sender.send_error_response(error).await.with_error(|_| {
format!("{COMPONENT} - failed to send error response, session: {session}")
})?;
sender
.send_error_response(error)
.await
.with_error_context(|_| {
format!("{COMPONENT} - failed to send error response, session: {session}")
})?;
debug!("TCP error response was sent to: {session}.");
error!("Session: {session} will be deleted.");
Err(IggyError::ClientNotFound(session.client_id))
} else {
sender.send_error_response(error).await.with_error(|_| {
format!("{COMPONENT} - failed to send error response, session: {session}")
})?;
sender
.send_error_response(error)
.await
.with_error_context(|_| {
format!("{COMPONENT} - failed to send error response, session: {session}")
})?;
debug!("TCP error response was sent to: {session}.");
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup;
use iggy::error::IggyError;
use tracing::{debug, instrument};
Expand All @@ -30,7 +30,7 @@ pub async fn handle(
&command.name,
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {:?}",
command.stream_id, command.topic_id, command.group_id, session
Expand All @@ -52,7 +52,7 @@ pub async fn handle(
EntryCommand::CreateConsumerGroup(command),
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to apply create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}",
stream_id, topic_id, group_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup;
use iggy::error::IggyError;
use tracing::{debug, instrument};
Expand All @@ -26,7 +26,7 @@ pub async fn handle(
&command.topic_id,
&command.group_id,
)
.await.with_error(|_| format!(
.await.with_error_context(|_| format!(
"{COMPONENT} - failed to delete consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}",
command.stream_id, command.topic_id, command.group_id, session
))?;
Expand All @@ -44,7 +44,7 @@ pub async fn handle(
EntryCommand::DeleteConsumerGroup(command),
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to apply delete consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}",
stream_id, topic_id, group_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::consumer_groups::get_consumer_groups::GetConsumerGroups;
use iggy::error::IggyError;
use tracing::debug;
Expand All @@ -19,7 +19,7 @@ pub async fn handle(
let system = system.read().await;
let consumer_groups = system
.get_consumer_groups(session, &command.stream_id, &command.topic_id)
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed on getting consumer groups for stream_id: {}, topic_id: {}, session: {}",
command.stream_id, command.topic_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup;
use iggy::error::IggyError;
use tracing::{debug, instrument};
Expand All @@ -25,7 +25,7 @@ pub async fn handle(
&command.group_id,
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to join consumer group for stream_id: {}, topic_id: {}, group_id: {}, session: {}",
command.stream_id, command.topic_id, command.group_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup;
use iggy::error::IggyError;
use tracing::{debug, instrument};
Expand All @@ -25,7 +25,7 @@ pub async fn handle(
&command.group_id,
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to leave consumer group for stream_id: {}, topic_id: {}, group_id: {}, session: {}",
command.stream_id, command.topic_id, command.group_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use iggy::error::IggyError;
use tracing::debug;
Expand All @@ -26,7 +26,7 @@ pub async fn handle(
command.offset,
)
.await
.with_error(|_| format!("{COMPONENT} - failed to store consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, session: {}",
.with_error_context(|_| format!("{COMPONENT} - failed to store consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, session: {}",
command.stream_id, command.topic_id, command.partition_id, command.offset, session
))?;
sender.send_empty_ok_response().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::messages::flush_unsaved_buffer::FlushUnsavedBuffer;
use tracing::{debug, instrument};
Expand All @@ -24,7 +24,7 @@ pub async fn handle(
system
.flush_unsaved_buffer(session, stream_id, topic_id, partition_id, fsync)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to flush unsaved buffer for stream_id: {}, topic_id: {}, partition_id: {}, session: {}",
command.stream_id, command.topic_id, command.partition_id, session
Expand Down
4 changes: 2 additions & 2 deletions server/src/binary/handlers/messages/poll_messages_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::streaming::session::Session;
use crate::streaming::systems::messages::PollingArgs;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollMessages;
use tracing::debug;
Expand All @@ -28,7 +28,7 @@ pub async fn handle(
PollingArgs::new(command.strategy, command.count, command.auto_commit),
)
.await
.with_error(|_| format!(
.with_error_context(|_| format!(
"{COMPONENT} - failed to poll messages for consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: {}",
command.consumer, command.stream_id, command.topic_id, command.partition_id, session
))?;
Expand Down
4 changes: 2 additions & 2 deletions server/src/binary/handlers/messages/send_messages_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::binary::sender::Sender;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::messages::send_messages::SendMessages;
use tracing::debug;
Expand All @@ -23,7 +23,7 @@ pub async fn handle(
system
.append_messages(session, stream_id, topic_id, partitioning, messages)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to append messages for stream_id: {}, topic_id: {}, partitioning: {}, session: {}",
command.stream_id, command.topic_id, command.partitioning, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::partitions::create_partitions::CreatePartitions;
use tracing::{debug, instrument};
Expand All @@ -27,7 +27,7 @@ pub async fn handle(
command.partitions_count,
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to create partitions for stream_id: {}, topic_id: {}, session: {}",
command.stream_id, command.topic_id, session
Expand All @@ -46,7 +46,7 @@ pub async fn handle(
EntryCommand::CreatePartitions(command),
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to apply create partitions for stream_id: {}, topic_id: {}, session: {}",
stream_id, topic_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::partitions::delete_partitions::DeletePartitions;
use tracing::{debug, instrument};
Expand All @@ -30,7 +30,7 @@ pub async fn handle(
command.partitions_count,
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to delete partitions for stream_id: {}, topic_id: {}, session: {}",
stream_id, topic_id, session
Expand All @@ -46,7 +46,7 @@ pub async fn handle(
EntryCommand::DeletePartitions(command),
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to apply delete partitions for stream_id: {}, topic_id: {}, session: {}",
stream_id, topic_id, session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::streaming::personal_access_tokens::personal_access_token::PersonalAcc
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use error_set::ResultContext;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::personal_access_tokens::create_personal_access_token::CreatePersonalAccessToken;
use tracing::{debug, instrument};
Expand All @@ -27,7 +27,7 @@ pub async fn handle(
let token = system
.create_personal_access_token(session, &command.name, command.expiry)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to create personal access token with name: {}, session: {session}",
command.name
Expand All @@ -51,7 +51,7 @@ pub async fn handle(
}),
)
.await
.with_error(|_| {
.with_error_context(|_| {
format!(
"{COMPONENT} - failed to create personal access token with name: {}, session: {session}",
command.name
Expand Down
Loading

0 comments on commit 9936028

Please sign in to comment.