Skip to content

Commit

Permalink
Revert "Mutex Poison Error if a large amount of data is sent" (#629)
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht authored Jan 19, 2024
1 parent 39d6c67 commit f21bfe4
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 30 deletions.
2 changes: 1 addition & 1 deletion server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);

#[derive(Default)]
pub struct Writer {
pub mem: MemWriter,
pub mem: MemWriter<16384>,
pub disk: FileWriter,
}

Expand Down
20 changes: 9 additions & 11 deletions server/src/event/writer/mem_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ use arrow_schema::Schema;
use arrow_select::concat::concat_batches;
use itertools::Itertools;

use crate::{option::CONFIG, utils::arrow::adapt_batch};
use crate::utils::arrow::adapt_batch;

/// Structure to keep recordbatches in memory.
///
/// Any new schema is updated in the schema map.
/// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer
#[derive(Debug)]
pub struct MemWriter {
pub struct MemWriter<const N: usize> {
schema: Schema,
// for checking uniqueness of schema
schema_map: HashSet<String>,
read_buffer: Vec<RecordBatch>,
mutable_buffer: MutableBuffer,
mutable_buffer: MutableBuffer<N>,
}

impl Default for MemWriter {
impl<const N: usize> Default for MemWriter<N> {
fn default() -> Self {
Self {
schema: Schema::empty(),
Expand All @@ -49,7 +49,7 @@ impl Default for MemWriter {
}
}

impl MemWriter {
impl<const N: usize> MemWriter<N> {
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
if !self.schema_map.contains(schema_key) {
self.schema_map.insert(schema_key.to_owned());
Expand Down Expand Up @@ -83,17 +83,15 @@ fn concat_records(schema: &Arc<Schema>, record: &[RecordBatch]) -> RecordBatch {
}

#[derive(Debug, Default)]
struct MutableBuffer {
struct MutableBuffer<const N: usize> {
pub inner: Vec<RecordBatch>,
pub rows: usize,
}

impl MutableBuffer {
impl<const N: usize> MutableBuffer<N> {
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
let maxima = CONFIG.parseable.records_per_request;

if self.rows + rb.num_rows() >= maxima {
let left = maxima - self.rows;
if self.rows + rb.num_rows() >= N {
let left = N - self.rows;
let right = rb.num_rows() - left;
let left_slice = rb.slice(0, left);
let right_slice = if left < rb.num_rows() {
Expand Down
18 changes: 0 additions & 18 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,6 @@ pub struct Server {

/// Parquet compression algorithm
pub parquet_compression: Compression,

/// Max value a record can be before splitting the request
pub records_per_request: usize,
}

impl FromArgMatches for Server {
Expand All @@ -250,10 +247,6 @@ impl FromArgMatches for Server {
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
let openid_issuer = m.get_one::<Url>(Self::OPENID_ISSUER).cloned();

self.records_per_request = m
.get_one(Self::BUFFER_SIZE)
.cloned()
.expect("default value for records per request");
self.address = m
.get_one::<String>(Self::ADDRESS)
.cloned()
Expand Down Expand Up @@ -369,7 +362,6 @@ impl Server {
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
pub const DEFAULT_USERNAME: &'static str = "admin";
pub const DEFAULT_PASSWORD: &'static str = "admin";
pub const BUFFER_SIZE: &'static str = "buffer-size";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
Expand Down Expand Up @@ -517,16 +509,6 @@ impl Server {
.value_parser(validation::url)
.help("OIDC provider's host address"),
)
.arg(
Arg::new(Self::BUFFER_SIZE)
.long(Self::BUFFER_SIZE)
.env("P_BUFFER_SIZE")
.value_name("NUMBER")
.default_value("16384")
.required(false)
.value_parser(value_parser!(usize))
.help("buffer size for internal request buffer"),
)
.arg(
Arg::new(Self::DOMAIN_URI)
.long(Self::DOMAIN_URI)
Expand Down

0 comments on commit f21bfe4

Please sign in to comment.