diff --git a/Cargo.lock b/Cargo.lock index 0d10be325..c819a4a5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1964,7 +1964,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.18" +version = "0.6.19" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -2012,7 +2012,7 @@ dependencies = [ [[package]] name = "iggy-cli" -version = "0.7.0" +version = "0.8.0" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 16184f4e6..1ee5e52c1 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy-cli" -version = "0.7.0" +version = "0.8.0" edition = "2021" authors = ["bartosz.ciesla@gmail.com"] repository = "https://github.com/iggy-rs/iggy" @@ -20,7 +20,7 @@ anyhow = "1.0.86" clap = { version = "4.5.17", features = ["derive"] } clap_complete = "4.5.25" figlet-rs = "0.1.5" -iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.17" } +iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.19" } keyring = { version = "3.2.0", features = ["sync-secret-service", "vendored"], optional = true } passterm = "2.0.1" thiserror = "1.0.61" diff --git a/cli/src/args/message.rs b/cli/src/args/message.rs index e25c3987f..d334de05d 100644 --- a/cli/src/args/message.rs +++ b/cli/src/args/message.rs @@ -80,8 +80,8 @@ pub(crate) struct SendMessagesArgs { /// spaces, it should be enclosed in quotes. Limit of the messages and size /// of each message is defined by the used shell. #[clap(verbatim_doc_comment)] + #[clap(group = "input_messages")] pub(crate) messages: Option>, - /// Comma separated list of key:kind:value, sent as header with the message /// /// Headers are comma seperated key-value pairs that can be sent with the message. @@ -90,6 +90,17 @@ pub(crate) struct SendMessagesArgs { #[clap(verbatim_doc_comment)] #[clap(short = 'H', long, value_parser = parse_key_val, value_delimiter = ',')] pub(crate) headers: Vec<(HeaderKey, HeaderValue)>, + /// Input file with messages to be sent + /// + /// File should contain messages stored in binary format. If the file does + /// not exist, the command will fail. If the file is not specified, the command + /// will read the messages from the standard input and each line will + /// be sent as a separate message. If the file is specified, the messages + /// will be read from the file and sent as is. Option cannot be used + /// with the messages option (messages given as command line arguments). + #[clap(verbatim_doc_comment)] + #[clap(long, value_parser = NonEmptyStringValueParser::new(), group = "input_messages")] + pub(crate) input_file: Option, } /// Parse Header Key, Kind and Value from the string separated by a ':' diff --git a/cli/src/main.rs b/cli/src/main.rs index 3657c8e54..d8154557b 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -245,6 +245,7 @@ fn get_command( send_args.message_key.clone(), send_args.messages.clone(), send_args.headers.clone(), + send_args.input_file.clone(), )), MessageAction::Poll(poll_args) => Box::new(PollMessagesCmd::new( poll_args.stream_id.clone(), diff --git a/integration/tests/cli/message/mod.rs b/integration/tests/cli/message/mod.rs index b1546f8fb..c5efea3e9 100644 --- a/integration/tests/cli/message/mod.rs +++ b/integration/tests/cli/message/mod.rs @@ -2,4 +2,6 @@ mod test_message_flush_command; mod test_message_help_command; mod test_message_poll_command; mod test_message_poll_to_file_command; +mod test_message_reply_via_file; mod test_message_send_command; +mod test_message_send_from_file_command; diff --git a/integration/tests/cli/message/test_message_reply_via_file.rs b/integration/tests/cli/message/test_message_reply_via_file.rs new file mode 100644 index 000000000..5c27d4d00 --- /dev/null +++ b/integration/tests/cli/message/test_message_reply_via_file.rs @@ -0,0 +1,76 @@ +use crate::cli::common::IggyCmdTest; +use crate::cli::message::test_message_poll_to_file_command::TestMessagePollToFileCmd; +use crate::cli::message::test_message_send_from_file_command::TestMessageSendFromFileCmd; +use iggy::messages::poll_messages::PollingStrategy; +use iggy::models::header::{HeaderKey, HeaderValue}; +use serial_test::parallel; +use std::collections::HashMap; +use std::str::FromStr; + +#[tokio::test] +#[parallel] +pub async fn should_be_successful() { + let mut iggy_cmd_test = IggyCmdTest::default(); + + let test_messages: Vec<&str> = vec![ + "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua", + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris", + "nisi ut aliquip ex ea commodo consequat", + "Duis aute irure dolor in reprehenderit in voluptate velit esse", + "cillum dolore eu fugiat nulla pariatur", + "Excepteur sint occaecat cupidatat non proident, sunt in culpa", + "qui officia deserunt mollit anim id est laborum", + "Sed ut perspiciatis unde omnis iste natus error sit voluptatem", + "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa", + ]; + + let test_headers = HashMap::from([ + ( + HeaderKey::from_str("HeaderKey1").unwrap(), + HeaderValue::from_str("HeaderValue1").unwrap(), + ), + ( + HeaderKey::from_str("HeaderKey2").unwrap(), + HeaderValue::from_str("HeaderValue2").unwrap(), + ), + ( + HeaderKey::from_str("HeaderKey3").unwrap(), + HeaderValue::from_str("HeaderValue3").unwrap(), + ), + ]); + + iggy_cmd_test.setup().await; + + let temp_file = tempfile::Builder::new().tempfile().unwrap(); + let temp_path = temp_file.path().to_path_buf(); + temp_file.close().unwrap(); + let temp_path_str = temp_path.to_str().unwrap(); + + let message_count = test_messages.len(); + + iggy_cmd_test + .execute_test(TestMessagePollToFileCmd::new( + "input_stream", + "input_topic", + &test_messages, + message_count, + PollingStrategy::offset(0), + test_headers.clone(), + temp_path_str, + false, + )) + .await; + + iggy_cmd_test + .execute_test(TestMessageSendFromFileCmd::new( + false, + temp_path_str, + "output_stream", + "output_topic", + &test_messages, + message_count, + test_headers, + )) + .await; +} diff --git a/integration/tests/cli/message/test_message_send_command.rs b/integration/tests/cli/message/test_message_send_command.rs index 04bea44c5..efddafad2 100644 --- a/integration/tests/cli/message/test_message_send_command.rs +++ b/integration/tests/cli/message/test_message_send_command.rs @@ -421,6 +421,16 @@ Options: Kind can be one of the following: raw, string, bool, int8, int16, int32, int64, int128, uint8, uint16, uint32, uint64, uint128, float32, float64 + --input-file + Input file with messages to be sent +{CLAP_INDENT} + File should contain messages stored in binary format. If the file does + not exist, the command will fail. If the file is not specified, the command + will read the messages from the standard input and each line will + be sent as a separate message. If the file is specified, the messages + will be read from the file and sent as is. Option cannot be used + with the messages option (messages given as command line arguments). + -h, --help Print help (see a summary with '-h') "#, @@ -451,6 +461,7 @@ Options: -p, --partition-id ID of the partition to which the message will be sent -m, --message-key Messages key which will be used to partition the messages -H, --headers Comma separated list of key:kind:value, sent as header with the message + --input-file Input file with messages to be sent -h, --help Print help (see more with '--help') "#, ), diff --git a/integration/tests/cli/message/test_message_send_from_file_command.rs b/integration/tests/cli/message/test_message_send_from_file_command.rs new file mode 100644 index 000000000..739b94b44 --- /dev/null +++ b/integration/tests/cli/message/test_message_send_from_file_command.rs @@ -0,0 +1,260 @@ +use crate::cli::common::{IggyCmdCommand, IggyCmdTest, IggyCmdTestCase}; +use assert_cmd::assert::Assert; +use async_trait::async_trait; +use bytes::Bytes; +use iggy::bytes_serializable::BytesSerializable; +use iggy::client::Client; +use iggy::consumer::Consumer; +use iggy::identifier::Identifier; +use iggy::messages::poll_messages::PollingStrategy; +use iggy::messages::send_messages::Message; +use iggy::models::header::{HeaderKey, HeaderValue}; +use iggy::utils::expiry::IggyExpiry; +use iggy::utils::topic_size::MaxTopicSize; +use predicates::str::{ends_with, is_match, starts_with}; +use serial_test::parallel; +use std::collections::HashMap; +use std::str::FromStr; +use tokio::io::AsyncWriteExt; + +pub(super) struct TestMessageSendFromFileCmd<'a> { + initialize: bool, + input_file: String, + stream_name: String, + topic_name: String, + messages: Vec<&'a str>, + message_count: usize, + headers: HashMap, +} + +impl<'a> TestMessageSendFromFileCmd<'a> { + #[allow(clippy::too_many_arguments)] + pub(super) fn new( + initialize: bool, + input_file: &str, + stream_name: &str, + topic_name: &str, + messages: &[&'a str], + message_count: usize, + headers: HashMap, + ) -> Self { + assert!(message_count <= messages.len()); + Self { + initialize, + input_file: input_file.into(), + stream_name: stream_name.into(), + topic_name: topic_name.into(), + messages: messages.to_owned(), + message_count, + headers, + } + } + + fn to_args(&self) -> Vec { + let command = vec![ + "--input-file".into(), + self.input_file.clone(), + "--partition-id".into(), + "1".into(), + self.stream_name.clone(), + self.topic_name.clone(), + ]; + + command + } +} + +#[async_trait] +impl IggyCmdTestCase for TestMessageSendFromFileCmd<'_> { + async fn prepare_server_state(&mut self, client: &dyn Client) { + let stream = client.create_stream(&self.stream_name, None).await; + assert!(stream.is_ok()); + + let stream_id = Identifier::from_str(self.stream_name.as_str()); + assert!(stream_id.is_ok()); + let stream_id = stream_id.unwrap(); + + let topic = client + .create_topic( + &stream_id, + &self.topic_name, + 1, + Default::default(), + None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await; + assert!(topic.is_ok()); + + let topic_id = Identifier::from_str(self.topic_name.as_str()); + assert!(topic_id.is_ok()); + + if self.initialize { + let file = tokio::fs::OpenOptions::new() + .append(true) + .create(true) + .open(&self.input_file) + .await; + assert!( + file.is_ok(), + "Problem opening file for writing: {}", + self.input_file + ); + let mut file = file.unwrap(); + + let messages = self + .messages + .iter() + .map(|s| { + let payload = Bytes::from(s.as_bytes().to_vec()); + Message::new(None, payload, Some(self.headers.clone())) + }) + .collect::>(); + + for message in messages.iter() { + let message = Message::new( + Some(message.id), + message.payload.clone(), + message.headers.clone(), + ); + + let write_result = file.write_all(&message.to_bytes()).await; + assert!( + write_result.is_ok(), + "Problem writing message to file: {}", + self.input_file + ); + } + } + } + + fn get_command(&self) -> IggyCmdCommand { + IggyCmdCommand::new() + .arg("message") + .arg("send") + .args(self.to_args()) + .with_env_credentials() + } + + fn verify_command(&self, command_state: Assert) { + let message_prefix = format!( + "Executing send messages to topic with ID: {} and stream with ID: {}\n", + self.topic_name, self.stream_name + ); + let message_read = format!("Read [0-9]+ bytes from {} file", self.input_file); + let message_created = format!( + "Created {} using [0-9]+ bytes", + match self.message_count { + 1 => "1 message".into(), + _ => format!("{} messages", self.message_count), + } + ); + let message_sent = format!( + "Sent messages to topic with ID: {} and stream with ID: {}\n", + self.topic_name, self.stream_name + ); + + command_state + .success() + .stdout(starts_with(message_prefix)) + .stdout(is_match(message_read).unwrap().count(1)) + .stdout(is_match(message_created).unwrap().count(1)) + .stdout(ends_with(message_sent)); + } + + async fn verify_server_state(&self, client: &dyn Client) { + let stream_id = Identifier::from_str(self.stream_name.as_str()); + assert!(stream_id.is_ok()); + let stream_id = stream_id.unwrap(); + + let topic_id = Identifier::from_str(self.topic_name.as_str()); + assert!(topic_id.is_ok()); + let topic_id = topic_id.unwrap(); + + let messages = client + .poll_messages( + &stream_id, + &topic_id, + Some(1), + &Consumer::new(Identifier::default()), + &PollingStrategy::offset(0), + self.message_count as u32 * 2, + true, + ) + .await; + assert!(messages.is_ok()); + let messages = messages.unwrap(); + + // Check if there are only the expected number of messages + assert_eq!(messages.messages.len(), self.message_count); + + // Check message order and content (payload and headers) + for (i, message) in messages.messages.iter().enumerate() { + assert_eq!( + message.payload, + Bytes::from(self.messages[i].as_bytes().to_vec()) + ); + assert_eq!(message.headers.is_some(), !self.headers.is_empty()); + assert_eq!(message.headers.as_ref().unwrap(), &self.headers); + } + + let topic_delete = client.delete_topic(&stream_id, &topic_id).await; + assert!(topic_delete.is_ok()); + + let stream_delete = client.delete_stream(&stream_id).await; + assert!(stream_delete.is_ok()); + + let file_removal = std::fs::remove_file(&self.input_file); + assert!(file_removal.is_ok()); + } +} + +#[tokio::test] +#[parallel] +pub async fn should_be_successful() { + let mut iggy_cmd_test = IggyCmdTest::default(); + + let test_messages: Vec<&str> = vec![ + "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua", + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris", + "nisi ut aliquip ex ea commodo consequat", + "Duis aute irure dolor in reprehenderit in voluptate velit esse", + "cillum dolore eu fugiat nulla pariatur", + "Excepteur sint occaecat cupidatat non proident, sunt in culpa", + "qui officia deserunt mollit anim id est laborum", + "Sed ut perspiciatis unde omnis iste natus error sit voluptatem", + "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa", + ]; + + let test_headers = HashMap::from([ + ( + HeaderKey::from_str("HeaderKey1").unwrap(), + HeaderValue::from_str("HeaderValue1").unwrap(), + ), + ( + HeaderKey::from_str("HeaderKey2").unwrap(), + HeaderValue::from_str("HeaderValue2").unwrap(), + ), + ]); + + let temp_file = tempfile::Builder::new().tempfile().unwrap(); + let temp_path = temp_file.path().to_path_buf(); + temp_file.close().unwrap(); + let temp_path_str = temp_path.to_str().unwrap(); + + iggy_cmd_test.setup().await; + iggy_cmd_test + .execute_test(TestMessageSendFromFileCmd::new( + true, + temp_path_str, + "stream", + "topic", + &test_messages, + test_messages.len(), + test_headers, + )) + .await; +} diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index c240f18f7..b4cc0061d 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.18" +version = "0.6.19" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/cli/message/send_messages.rs b/sdk/src/cli/message/send_messages.rs index d28e7d328..588c78dfe 100644 --- a/sdk/src/cli/message/send_messages.rs +++ b/sdk/src/cli/message/send_messages.rs @@ -1,3 +1,4 @@ +use crate::bytes_serializable::BytesSerializable; use crate::cli_command::{CliCommand, PRINT_TARGET}; use crate::client::Client; use crate::identifier::Identifier; @@ -5,8 +6,10 @@ use crate::messages::send_messages::{Message, Partitioning}; use crate::models::header::{HeaderKey, HeaderValue}; use anyhow::Context; use async_trait::async_trait; +use bytes::Bytes; use std::collections::HashMap; use std::io::{self, Read}; +use tokio::io::AsyncReadExt; use tracing::{event, Level}; pub struct SendMessagesCmd { @@ -15,6 +18,7 @@ pub struct SendMessagesCmd { partitioning: Partitioning, messages: Option>, headers: Vec<(HeaderKey, HeaderValue)>, + input_file: Option, } impl SendMessagesCmd { @@ -25,6 +29,7 @@ impl SendMessagesCmd { message_key: Option, messages: Option>, headers: Vec<(HeaderKey, HeaderValue)>, + input_file: Option, ) -> Self { let partitioning = match (partition_id, message_key) { (Some(_), Some(_)) => unreachable!(), @@ -44,6 +49,7 @@ impl SendMessagesCmd { partitioning, messages, headers, + input_file, } } @@ -73,18 +79,61 @@ impl CliCommand for SendMessagesCmd { } async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> { - let mut messages = match &self.messages { - Some(messages) => messages - .iter() - .map(|s| Message::new(None, s.clone().into(), self.get_headers())) - .collect::>(), - None => { - let input = self.read_message_from_stdin()?; - - input - .lines() - .map(|m| Message::new(None, String::from(m).into(), self.get_headers())) - .collect() + let mut messages = if let Some(input_file) = &self.input_file { + let mut file = tokio::fs::OpenOptions::new() + .read(true) + .open(input_file) + .await + .with_context(|| format!("Problem opening file for reading: {input_file}"))?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer) + .await + .with_context(|| format!("Problem reading file: {input_file}"))?; + + event!(target: PRINT_TARGET, Level::INFO, + "Read {} bytes from {} file", buffer.len(), input_file, + ); + + let mut messages: Vec = Vec::new(); + let mut bytes_read = 0usize; + let all_messages_bytes: Bytes = buffer.into(); + + while bytes_read < all_messages_bytes.len() { + let message_bytes = all_messages_bytes.slice(bytes_read..); + let message = Message::from_bytes(message_bytes); + match message { + Ok(message) => { + let message_size = message.get_size_bytes() as usize; + messages.push(message); + bytes_read += message_size; + } + Err(e) => { + event!(target: PRINT_TARGET, Level::ERROR, + "Failed to parse message from bytes: {e} at offset {bytes_read}", + ); + break; + } + } + } + event!(target: PRINT_TARGET, Level::INFO, + "Created {} messages using {bytes_read} bytes", messages.len(), + ); + + messages + } else { + match &self.messages { + Some(messages) => messages + .iter() + .map(|s| Message::new(None, s.clone().into(), self.get_headers())) + .collect::>(), + None => { + let input = self.read_message_from_stdin()?; + + input + .lines() + .map(|m| Message::new(None, String::from(m).into(), self.get_headers())) + .collect() + } } };