Skip to content

Commit

Permalink
more sort
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 24, 2024
1 parent 405cf40 commit 9e088f7
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ where
let mapping = (0..batch.num_rows()).zip(self.row_count..(self.row_count + batch.num_rows()));

match to.datatype.physical_type() {
PhysicalType::Int8 => append_copy::<PhysicalI8, _>(from, mapping, to)?,
PhysicalType::Int32 => append_copy::<PhysicalI32, _>(from, mapping, to)?,
PhysicalType::Utf8 => append_copy::<PhysicalUtf8, _>(from, mapping, to)?,
PhysicalType::Int8 => copy_rows::<PhysicalI8, _>(from, mapping, to)?,
PhysicalType::Int32 => copy_rows::<PhysicalI32, _>(from, mapping, to)?,
PhysicalType::Utf8 => copy_rows::<PhysicalUtf8, _>(from, mapping, to)?,
_ => unimplemented!(),
}
}
Expand All @@ -96,6 +96,30 @@ where
Ok(())
}

pub fn copy_row_from_other(
&mut self,
dest_row: usize,
source: &BatchCollectionBlock<B>,
source_row: usize,
) -> Result<()> {
if self.arrays.len() != source.arrays.len() {
return Err(RayexecError::new("Number of arrays in self and other differ"));
}

for (from, to) in source.arrays().iter().zip(self.arrays.iter_mut()) {
let mapping = [(source_row, dest_row)];

match to.datatype.physical_type() {
PhysicalType::Int8 => copy_rows::<PhysicalI8, _>(from, mapping, to)?,
PhysicalType::Int32 => copy_rows::<PhysicalI32, _>(from, mapping, to)?,
PhysicalType::Utf8 => copy_rows::<PhysicalUtf8, _>(from, mapping, to)?,
_ => unimplemented!(),
}
}

Ok(())
}

/// Reorder rows in the collection based on a selection.
pub fn select(&mut self, manager: &B, selection: &[usize]) -> Result<()> {
for array in &mut self.arrays {
Expand All @@ -106,7 +130,11 @@ where
}
}

fn append_copy<S, B>(
/// Copy rows from `from` to `to`.
///
/// `mapping` provides a mapping of source to destination rows in the form of
/// pairs (from, to).
fn copy_rows<S, B>(
from: &Array<B>,
mapping: impl IntoExactSizeIterator<Item = (usize, usize)>,
to: &mut Array<B>,
Expand Down Expand Up @@ -197,6 +225,62 @@ mod tests {
assert_eq!(vec![4, 4, 6, 6, 5, 5], out);
}

#[test]
fn copy_row_i32_string() {
let mut block1 =
BatchCollectionBlock::new(&NopBufferManager, &[DataType::Int32, DataType::Utf8], 4096).unwrap();
let mut block2 =
BatchCollectionBlock::new(&NopBufferManager, &[DataType::Int32, DataType::Utf8], 4096).unwrap();

block1
.append_batch_data(
&Batch::from_arrays(
[
Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()),
Array::new_with_buffer(
DataType::Utf8,
StringBufferBuilder::from_iter(["a", "b", "c"]).unwrap(),
),
],
true,
)
.unwrap(),
)
.unwrap();

block2
.append_batch_data(
&Batch::from_arrays(
[
Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([7, 8]).unwrap()),
Array::new_with_buffer(DataType::Utf8, StringBufferBuilder::from_iter(["dog", "cat"]).unwrap()),
],
true,
)
.unwrap(),
)
.unwrap();

block1.copy_row_from_other(1, &block2, 0).unwrap();

assert_eq!(3, block1.row_count());

let mut out_i32 = vec![0; 3];
UnaryExecutor::for_each_flat::<PhysicalI32, _>(block1.arrays()[0].flat_view().unwrap(), 0..3, |idx, v| {
out_i32[idx] = v.copied().unwrap();
})
.unwrap();

let mut out_strings = vec![String::new(); 3];
UnaryExecutor::for_each_flat::<PhysicalUtf8, _>(block1.arrays()[1].flat_view().unwrap(), 0..3, |idx, v| {
out_strings[idx] = v.as_ref().unwrap().to_string();
})
.unwrap();

assert_eq!(vec![4, 7, 6], out_i32);
assert_eq!(vec!["a".to_string(), "dog".to_string(), "c".to_string()], out_strings);
}

#[test]
fn append_string() {
let mut block = BatchCollectionBlock::new(&NopBufferManager, &[DataType::Utf8], 4096).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use physical_sort::partition_state::SortPartitionState;
use rayexec_error::Result;

use crate::arrays::batch::Batch;
use crate::arrays::buffer_manager::BufferManager;
use crate::explain::explainable::Explainable;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::collections::{BinaryHeap, VecDeque};

use rayexec_error::Result;

use super::sort_data::{SortBlock, SortData, SortLayout};
use super::sort_data::{SortBlock, SortData};
use super::sort_layout::SortLayout;
use crate::arrays::buffer_manager::BufferManager;

/// A block containing sorted rows that's being merged with other blocks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ pub mod partition_state;

mod merge;
mod sort_data;
mod sort_layout;

use std::task::Context;

use rayexec_error::{OptionExt, Result};
use sort_layout::SortLayout;

use super::{ExecutableOperator, ExecuteInOutState, OperatorState, PartitionState, PollExecute, PollFinalize};
use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable};
Expand All @@ -15,6 +17,7 @@ use crate::expr::physical::{PhysicalScalarExpression, PhysicalSortExpression};

#[derive(Debug)]
pub struct PhysicalSort {
pub(crate) layout: SortLayout,
pub(crate) exprs: Vec<PhysicalSortExpression>,
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use rayexec_error::Result;

use super::sort_layout::SortLayout;
use crate::arrays::batch::Batch;
use crate::arrays::buffer::physical_type::{PhysicalI8, PhysicalType};
use crate::arrays::buffer_manager::BufferManager;
Expand Down Expand Up @@ -111,45 +112,6 @@ where
}
}

#[derive(Debug)]
pub struct SortLayout {
pub input_types: Vec<DataType>,
pub key_columns: Vec<usize>,
pub key_sizes: Vec<usize>,
pub key_nulls_first: Vec<bool>,
pub key_desc: Vec<bool>,
}

impl SortLayout {
fn new(input_types: Vec<DataType>, exprs: &[PhysicalSortExpression]) -> Self {
let key_columns = exprs.iter().map(|expr| expr.column.idx).collect();
let key_nulls_first = exprs.iter().map(|expr| expr.nulls_first).collect();
let key_desc = exprs.iter().map(|expr| expr.desc).collect();

let key_sizes = exprs
.iter()
.map(|sort_expr| {
let key_type = &input_types[sort_expr.column.idx];

let size = match key_type.physical_type() {
PhysicalType::Int8 => std::mem::size_of::<i8>(),
PhysicalType::Int32 => std::mem::size_of::<i32>(),
_ => unimplemented!(),
};
size + 1 // Account for validity byte. Currently we set it for everything.
})
.collect();

SortLayout {
input_types,
key_desc,
key_sizes,
key_columns,
key_nulls_first,
}
}
}

/// Blocks containing unsorted input and encoded keys.
#[derive(Debug)]
pub struct SortBlock<B: BufferManager> {
Expand Down Expand Up @@ -187,13 +149,27 @@ where
&self.key_encode_buffer[start..end]
}

pub fn get_sort_key_buf_mut(&mut self, row_idx: usize) -> &mut [u8] {
let start = self.key_encode_offsets[row_idx];
let end = self.key_encode_offsets[row_idx + 1];
&mut self.key_encode_buffer[start..end]
}

pub fn row_count(&self) -> usize {
self.block.row_count()
}

/// Copy a row from another sort block into this sort block.
pub fn copy_row_from_other(&mut self, dest_row: usize, source: &SortBlock<B>, source_row: usize) -> Result<()> {
unimplemented!()
// Copy encoded keys.
let source_buf = source.get_sort_key_buf(source_row);
let dest_buf = self.get_sort_key_buf_mut(dest_row);
dest_buf.copy_from_slice(source_buf);

// Copy actual row data.
self.block.copy_row_from_other(dest_row, &source.block, source_row)?;

Ok(())
}

fn sort(mut self, manager: &B, layout: &SortLayout, sort_indices: &mut [usize]) -> Result<SortBlock<B>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::arrays::buffer::physical_type::PhysicalType;
use crate::arrays::datatype::DataType;
use crate::expr::physical::PhysicalSortExpression;

#[derive(Debug)]
pub struct SortLayout {
pub input_types: Vec<DataType>,
pub key_columns: Vec<usize>,
pub key_sizes: Vec<usize>,
pub key_nulls_first: Vec<bool>,
pub key_desc: Vec<bool>,
}

impl SortLayout {
pub fn new(input_types: Vec<DataType>, exprs: &[PhysicalSortExpression]) -> Self {
let key_columns = exprs.iter().map(|expr| expr.column.idx).collect();
let key_nulls_first = exprs.iter().map(|expr| expr.nulls_first).collect();
let key_desc = exprs.iter().map(|expr| expr.desc).collect();

let key_sizes = exprs
.iter()
.map(|sort_expr| {
let key_type = &input_types[sort_expr.column.idx];

let size = match key_type.physical_type() {
PhysicalType::Int8 => std::mem::size_of::<i8>(),
PhysicalType::Int32 => std::mem::size_of::<i32>(),
_ => unimplemented!(),
};
size + 1 // Account for validity byte. Currently we set it for everything.
})
.collect();

SortLayout {
input_types,
key_desc,
key_sizes,
key_columns,
key_nulls_first,
}
}
}

0 comments on commit 9e088f7

Please sign in to comment.