From ee7df97f6fdecce4af9706d883cbfff5abbd96ae Mon Sep 17 00:00:00 2001 From: Gang Liao Date: Thu, 27 Jan 2022 16:47:20 -0500 Subject: [PATCH] feat(arena): optimize data frame deserialization (#453) * feat(arena): optimize data frame deserialization * fix: no serialization for response --- flock-function/src/aws/actor.rs | 30 +++--- flock-function/src/aws/nexmark/source.rs | 2 +- flock/src/runtime/arena/mod.rs | 123 +++++++++++++++++------ 3 files changed, 106 insertions(+), 49 deletions(-) diff --git a/flock-function/src/aws/actor.rs b/flock-function/src/aws/actor.rs index 5ab3fd3c..f5d02026 100644 --- a/flock-function/src/aws/actor.rs +++ b/flock-function/src/aws/actor.rs @@ -112,13 +112,11 @@ pub async fn handler( let (input, status) = prepare_data_sources(ctx, arena, event).await?; if status == HashAggregateStatus::Processed { - let info = format!("[Ok] Function {}: data is already processed.", ctx.name); - info!("{}", info); - return Ok(json!({ "response": info })); + info!("[Ok] Function {}: data is already processed.", ctx.name); + return Ok(Value::Null); } else if status == HashAggregateStatus::NotReady { - let info = format!("[Ok] Function {}: data aggregation is not ready.", ctx.name); - info!("{}", info); - return Ok(json!({ "response": info })); + info!("[Ok] Function {}: data aggregation is not ready.", ctx.name); + return Ok(Value::Null); } let output = collect(ctx, input).await?; @@ -181,7 +179,8 @@ async fn prepare_data_sources( if status == HashAggregateStatus::Ready { info!("Received all data packets for the window: {:?}", window_id); arena - .take_batches(&window_id) + .take(&window_id) + .await? .into_iter() .for_each(|b| input.push(b)); PROCESSED_WINDOWS.lock().unwrap().insert(window_id); @@ -223,7 +222,8 @@ async fn prepare_data_sources( if arena.is_complete(&window_id) { info!("Received all data packets for the window: {:?}", window_id); arena - .take_batches(&window_id) + .take(&window_id) + .await? .into_iter() .for_each(|b| input.push(b)); status = HashAggregateStatus::Ready; @@ -287,7 +287,7 @@ async fn invoke_next_functions( .write(sink_type.clone(), DataSinkFormat::SerdeBinary) .await } else { - Ok(json!({ "response": "No data to sink." })) + Ok(Value::Null) } } CloudFunction::Lambda(group_name) => { @@ -352,9 +352,7 @@ async fn invoke_next_functions( ); lambda::invoke_function(group_name, &invocation_type, Some(bytes.into())).await?; } - Ok(json!({ - "response": format!("next function: {}", group_name) - })) + Ok(Value::Null) } CloudFunction::Group((group_name, _)) => { if !ctx.is_shuffling().await? { @@ -419,9 +417,7 @@ async fn invoke_next_functions( futures::future::join_all(tasks).await; - Ok(json!({ - "response": format!("next function group: {}", group_name) - })) + Ok(Value::Null) } else { let output = Arc::new(output); let mut rng = StdRng::seed_from_u64(0xDEAD); // Predictable RNG clutch @@ -533,9 +529,7 @@ async fn invoke_next_functions( .collect::>>>(); futures::future::join_all(tasks).await; - Ok(json!({ - "response": format!("next function group: {}", group_name) - })) + Ok(Value::Null) } } } diff --git a/flock-function/src/aws/nexmark/source.rs b/flock-function/src/aws/nexmark/source.rs index eb5b1a28..b005949b 100644 --- a/flock-function/src/aws/nexmark/source.rs +++ b/flock-function/src/aws/nexmark/source.rs @@ -74,5 +74,5 @@ pub async fn handler(ctx: &mut ExecutionContext, payload: Payload) -> Result unimplemented!(), }; - Ok(json!({"name": &ctx.name, "type": "nexmark_bench".to_string()})) + Ok(Value::Null) } diff --git a/flock/src/runtime/arena/mod.rs b/flock/src/runtime/arena/mod.rs index c94ca9a9..fdc0ef78 100644 --- a/flock/src/runtime/arena/mod.rs +++ b/flock/src/runtime/arena/mod.rs @@ -18,12 +18,18 @@ mod bitmap; pub use bitmap::Bitmap; +use crate::encoding::Encoding; use crate::error::{FlockError, Result}; -use crate::runtime::payload::Payload; +use crate::runtime::payload::{DataFrame, Payload}; +use crate::transmute::*; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::arrow_flight::utils::flight_data_to_arrow_batch; +use datafusion::arrow_flight::FlightData; use hashbrown::HashMap; +use rayon::prelude::*; use std::ops::{Deref, DerefMut}; +use tokio::task::JoinHandle; type QueryId = String; type ShuffleId = usize; @@ -61,29 +67,38 @@ pub struct Arena(HashMap); pub struct WindowSession { /// The number of data fragments in the window. /// [`WindowSession::size`] equals to [`Uuid::seq_len`]. - pub size: usize, - /// Aggregate record batches for the first relation. - pub r1_records: Vec>, - /// Aggregate record batches for the second relation. - pub r2_records: Vec>, + pub size: usize, + /// Aggregate the encoded data frames for the first relation. + /// https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ + pub r1_flight_data: Vec>, + /// The schema of the first relation. + pub r1_schema: Vec, + /// Aggregate the encoded data frames for the second relation. + /// https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ + pub r2_flight_data: Vec>, + /// The schema of the second relation. + pub r2_schema: Vec, /// Bitmap indicating the data existence in the window. - pub bitmap: Bitmap, + pub bitmap: Bitmap, + /// The compression method. + pub encoding: Encoding, } impl WindowSession { /// Return the schema of data fragments in the temporal window. pub fn schema(&self) -> Result<(SchemaRef, Option)> { - if self.r1_records.is_empty() || self.r1_records[0].is_empty() { + if self.r1_schema.is_empty() { return Err(FlockError::Internal( "Record batches are empty.".to_string(), )); } - if !self.r2_records.is_empty() && !self.r2_records[0].is_empty() { - Ok((self.r1_records[0][0].schema(), None)) + + if self.r2_schema.is_empty() { + Ok((schema_from_bytes(&self.r1_schema)?, None)) } else { Ok(( - self.r1_records[0][0].schema(), - Some(self.r2_records[0][0].schema()), + schema_from_bytes(&self.r1_schema)?, + Some(schema_from_bytes(&self.r2_schema)?), )) } } @@ -95,12 +110,59 @@ impl Arena { Arena(HashMap::::new()) } - /// Get the data fragments in the temporal window via the key. - pub fn take_batches(&mut self, window_id: &WindowId) -> Vec>> { + /// Take a window from the arena. + pub async fn take(&mut self, window_id: &WindowId) -> Result>>> { + let to_batches = |df: Vec, schema: SchemaRef| -> Vec { + df.into_par_iter() + .map(|d| { + flight_data_to_arrow_batch( + &FlightData { + data_body: d.body, + data_header: d.header, + app_metadata: vec![], + flight_descriptor: None, + }, + schema.clone(), + &[], + ) + .unwrap() + }) + .collect() + }; + if let Some(window) = (*self).remove(window_id) { - vec![window.r1_records, window.r2_records] + let (schema1, schema2) = window.schema()?; + + let mut tasks: Vec>>> = vec![]; + + let encoding = window.encoding.clone(); + tasks.push(tokio::spawn(async move { + window + .r1_flight_data + .into_par_iter() + .map(|d| to_batches(unmarshal(d, encoding.clone()), schema1.clone())) + .collect() + })); + + if schema2.is_some() { + let encoding = window.encoding.clone(); + let schema2 = schema2.unwrap(); + tasks.push(tokio::spawn(async move { + window + .r2_flight_data + .into_par_iter() + .map(|d| to_batches(unmarshal(d, encoding.clone()), schema2.clone())) + .collect() + })); + } + + Ok(futures::future::join_all(tasks) + .await + .into_iter() + .map(|r| r.unwrap()) + .collect()) } else { - vec![vec![], vec![]] + Ok(vec![vec![], vec![]]) } } @@ -112,7 +174,7 @@ impl Arena { /// Return true if the temporal window is empty. pub fn is_complete(&self, window_id: &WindowId) -> bool { self.get(window_id) - .map(|window| window.size == window.r1_records.len()) + .map(|window| window.size == window.r1_flight_data.len()) .unwrap_or(false) } @@ -132,12 +194,11 @@ impl Arena { Some(window) => { assert!(uuid.seq_len == window.size); if !window.bitmap.is_set(uuid.seq_num) { - let (r1, r2) = payload.to_record_batch(); - window.r1_records.push(r1); - window.r2_records.push(r2); - assert!(window.r1_records.len() == window.r2_records.len()); + window.r1_flight_data.push(payload.data); + window.r2_flight_data.push(payload.data2); + assert!(window.r1_flight_data.len() == window.r2_flight_data.len()); window.bitmap.set(uuid.seq_num); - if window.size == window.r1_records.len() { + if window.size == window.r1_flight_data.len() { HashAggregateStatus::Ready } else { HashAggregateStatus::NotReady @@ -147,12 +208,14 @@ impl Arena { } } None => { - let (r1, r2) = payload.to_record_batch(); let mut window = WindowSession { - size: uuid.seq_len, - r1_records: vec![r1], - r2_records: vec![r2], - bitmap: Bitmap::new(uuid.seq_len + 1), // Starts from 1. + size: uuid.seq_len, + r1_flight_data: vec![payload.data], + r2_flight_data: vec![payload.data2], + r1_schema: payload.schema, + r2_schema: payload.schema2, + bitmap: Bitmap::new(uuid.seq_len + 1), // Starts from 1. + encoding: payload.encoding, }; // SEQ_NUM is used to indicate the data existence in the window via bitmap. window.bitmap.set(uuid.seq_num); @@ -243,12 +306,12 @@ mod tests { if let Some(window) = (*arena).get(&window_id) { assert_eq!(8, window.size); - assert_eq!(8, window.r1_records.len()); + assert_eq!(8, window.r1_flight_data.len()); (0..8).for_each(|i| assert!(window.bitmap.is_set(i + 1))); } - assert_eq!(8, arena.take_batches(&window_id)[0].len()); - assert_eq!(0, arena.take_batches(&("no exists".to_owned(), 0))[0].len()); + assert_eq!(8, arena.take(&window_id).await?[0].len()); + assert_eq!(0, arena.take(&("no exists".to_owned(), 0)).await?[0].len()); Ok(()) }