Skip to content

Commit

Permalink
fix: issues cause by prev commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 16, 2024
1 parent c33d8e9 commit 0ea2511
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 29 deletions.
13 changes: 6 additions & 7 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Event {
&key,
self.rb.clone(),
self.parsed_timestamp,
self.custom_partition_values,
&self.custom_partition_values,
)?;

metadata::STREAM_INFO.update_stats(
Expand All @@ -96,18 +96,17 @@ impl Event {
Ok(())
}

pub fn process_unchecked(self) -> Result<Self, PostError> {
pub fn process_unchecked(&self) -> Result<(), PostError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
&self.custom_partition_values,
)
.map_err(PostError::Event)?;

Ok(self)
.map_err(PostError::Event)
}

pub fn clear(&self, stream_name: &str) {
Expand All @@ -121,14 +120,14 @@ impl Event {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values,
custom_partition_values.clone(),
)?;
Ok(())
}
Expand Down
16 changes: 13 additions & 3 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Writer {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand Down Expand Up @@ -102,15 +102,22 @@ impl WriterTable {
schema_key,
record,
parsed_timestamp,
custom_partition_values,
&custom_partition_values,
)?;
}
None => {
drop(hashmap_guard);
let map = self.write().unwrap();
// check for race condition
// if map contains entry then just
self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?;
self.handle_missing_writer(
map,
stream_name,
schema_key,
record,
parsed_timestamp,
&custom_partition_values,
)?;
}
};
Ok(())
Expand All @@ -123,13 +130,15 @@ impl WriterTable {
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
if CONFIG.parseable.mode != Mode::Query {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
} else {
stream_writer
Expand All @@ -148,6 +157,7 @@ impl WriterTable {
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
match map.get(stream_name) {
Some(writer) => {
Expand Down
4 changes: 2 additions & 2 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl FileWriter {
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
Expand Down Expand Up @@ -89,7 +89,7 @@ fn init_new_stream_writer_file(
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values);
Expand Down
11 changes: 7 additions & 4 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,19 @@ pub async fn push_logs_unchecked(
batches: RecordBatch,
stream_name: &str,
) -> Result<event::Event, PostError> {
event::Event {
let unchecked_event = event::Event {
rb: batches,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: 0,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
is_first_event: true, // NOTE: Maybe should be false
}
.process_unchecked()
is_first_event: true, // NOTE: Maybe should be false
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
};
unchecked_event.process_unchecked()?;

Ok(unchecked_event)
}

async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
Expand Down
23 changes: 11 additions & 12 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::{
};

const ARROW_FILE_EXTENSION: &str = "data.arrows";
const PARQUET_FILE_EXTENSION: &str = "data.parquet";
// const PARQUET_FILE_EXTENSION: &str = "data.parquet";

#[derive(Debug)]
pub struct StorageDir {
Expand All @@ -68,7 +68,7 @@ impl StorageDir {

pub fn file_time_suffix(
time: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
extention: &str,
) -> String {
let mut uri = utils::date_to_prefix(time.date())
Expand All @@ -90,7 +90,7 @@ impl StorageDir {
fn filename_by_time(
stream_hash: &str,
time: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> String {
format!(
"{}.{}",
Expand All @@ -102,7 +102,7 @@ impl StorageDir {
fn filename_by_current_time(
stream_hash: &str,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> String {
Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values)
}
Expand All @@ -111,7 +111,7 @@ impl StorageDir {
&self,
stream_hash: &str,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> PathBuf {
let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string();
let mut filename =
Expand Down Expand Up @@ -201,13 +201,12 @@ impl StorageDir {
}
}

#[allow(unused)]
pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
let dir = StorageDir::file_time_suffix(time, HashMap::new(), PARQUET_FILE_EXTENSION);

data_path.join(dir)
}
// pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
// let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
// let dir = StorageDir::file_time_suffix(time, &HashMap::new(), PARQUET_FILE_EXTENSION);
//
// data_path.join(dir)
// }

pub fn convert_disk_files_to_parquet(
stream: &str,
Expand Down
2 changes: 1 addition & 1 deletion server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn date_to_prefix(date: NaiveDate) -> String {
date.replace("UTC", "")
}

pub fn custom_partition_to_prefix(custom_partition: HashMap<String, String>) -> String {
pub fn custom_partition_to_prefix(custom_partition: &HashMap<String, String>) -> String {
let mut prefix = String::default();
for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) {
prefix.push_str(&format!("{key}={value}/", key = key, value = value));
Expand Down

0 comments on commit 0ea2511

Please sign in to comment.