Skip to content

Commit

Permalink
managed
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 21, 2024
1 parent 17fd410 commit 1435d1f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 23 deletions.
101 changes: 84 additions & 17 deletions crates/rayexec_execution/src/arrays/batch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ops::Deref;

use iterutil::exact_size::IntoExactSizeIterator;
use rayexec_error::{not_implemented, RayexecError, Result};

Expand Down Expand Up @@ -30,7 +32,7 @@ where
for datatype in datatypes {
let buffer = init_array_buffer(manager, &datatype, capacity)?;
let array = Array::new(datatype, buffer);
arrays.push(BatchArray::Owned(array))
arrays.push(BatchArray::owned(array))
}

Ok(Batch {
Expand All @@ -40,23 +42,22 @@ where
})
}

pub fn get_array(&self, idx: usize) -> Result<&Array<B>> {
pub fn get_array(&self, idx: usize) -> Result<&BatchArray<B>> {
self.get_array_opt(idx)
.ok_or_else(|| RayexecError::new("Missing array").with_field("idx", idx))
}

pub fn get_array_opt(&self, idx: usize) -> Option<&Array<B>> {
self.arrays.get(idx).map(|a| a.as_ref())
pub fn get_array_opt(&self, idx: usize) -> Option<&BatchArray<B>> {
self.arrays.get(idx)
}

pub fn get_array_mut(&mut self, idx: usize) -> Result<&mut Array<B>> {
unimplemented!()
// self.get_array_mut_opt(idx)
// .ok_or_else(|| RayexecError::new("Missing array").with_field("idx", idx))
pub fn get_array_mut(&mut self, idx: usize) -> Result<&mut BatchArray<B>> {
self.get_array_mut_opt(idx)
.ok_or_else(|| RayexecError::new("Missing array").with_field("idx", idx))
}

pub fn get_array_mut_opt(&mut self, idx: usize) -> Result<Option<&mut Array<B>>> {
self.arrays.get_mut(idx).map(|a| a.try_as_mut()).transpose()
pub fn get_array_mut_opt(&mut self, idx: usize) -> Option<&mut BatchArray<B>> {
self.arrays.get_mut(idx)
}

pub fn num_rows(&self) -> usize {
Expand All @@ -69,21 +70,75 @@ where
}

#[derive(Debug)]
pub enum BatchArray<B: BufferManager> {
pub struct BatchArray<B: BufferManager> {
inner: BatchArrayInner<B>,
}

#[derive(Debug)]
enum BatchArrayInner<B: BufferManager> {
Managed(B::CowPtr<Array<B>>),
Owned(Array<B>),
Uninit,
}

impl<B> BatchArray<B>
where
B: BufferManager,
{
pub fn owned(array: Array<B>) -> Self {
BatchArray {
inner: BatchArrayInner::Owned(array),
}
}

pub fn is_managed(&self) -> bool {
matches!(self.inner, BatchArrayInner::Managed(_))
}

pub fn is_owned(&self) -> bool {
matches!(self.inner, BatchArrayInner::Owned(_))
}

/// Try to make the array managed by the buffer manager.
///
/// Does nothing if the array is already managed.
///
/// Returns an error if the array cannot be made to be managed. The array is
/// still valid (and remains in the 'owned' state).
pub fn make_managed(&mut self, manager: &B) -> Result<()> {
match &mut self.inner {
BatchArrayInner::Managed(_) => Ok(()), // Already managed.
BatchArrayInner::Owned(_) => {
let orig = std::mem::replace(&mut self.inner, BatchArrayInner::Uninit);
let array = match orig {
BatchArrayInner::Owned(array) => array,
_ => unreachable!("variant already checked"),
};

match manager.make_cow(array) {
Ok(managed) => {
self.inner = BatchArrayInner::Managed(managed);
Ok(())
}
Err(orig) => {
// Manager rejected it, put it back as owned and return
// an error.
self.inner = BatchArrayInner::Owned(orig);
Err(RayexecError::new("Failed to make batch array managed"))
}
}
}
BatchArrayInner::Uninit => panic!("array in uninit state"),
}
}

pub fn try_as_mut(&mut self) -> Result<&mut Array<B>> {
match self {
Self::Managed(_) => Err(RayexecError::new(
match &mut self.inner {
BatchArrayInner::Managed(_) => Err(RayexecError::new(
"Mut references from managed arrays not yet supported",
)),
Self::Owned(array) => Ok(array),
BatchArrayInner::Owned(array) => Ok(array),
BatchArrayInner::Uninit => panic!("array in uninit state"),
}
}
}
Expand All @@ -93,13 +148,25 @@ where
B: BufferManager,
{
fn as_ref(&self) -> &Array<B> {
match self {
Self::Managed(m) => m.as_ref(),
Self::Owned(array) => array,
match &self.inner {
BatchArrayInner::Managed(m) => m.as_ref(),
BatchArrayInner::Owned(array) => array,
BatchArrayInner::Uninit => panic!("array in uninit state"),
}
}
}

impl<B> Deref for BatchArray<B>
where
B: BufferManager,
{
type Target = Array<B>;

fn deref(&self) -> &Self::Target {
BatchArray::as_ref(&self)
}
}

fn init_array_buffer<B>(manager: &B, datatype: &DataType, cap: usize) -> Result<ArrayBuffer<B>>
where
B: BufferManager,
Expand Down
19 changes: 13 additions & 6 deletions crates/rayexec_execution/src/arrays/buffer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use rayexec_error::Result;

pub trait BufferManager: Debug + Clone {
type Reservation: Reservation;
type CowPtr<T>: CowPtr<T>;
// TODO: T => Spillable or something.
type CowPtr<T>: CowPtr<T>
where
T: Debug;

/// Reserve some number of bytes.
fn reserve_external(&self, num_bytes: usize) -> Result<Self::Reservation>;

fn make_cow<T>(&self, item: T) -> Result<Self::CowPtr<T>>;
fn make_cow<T: Debug>(&self, item: T) -> Result<Self::CowPtr<T>, T>;
}

pub trait Reservation: Debug {
/// Combine two reservations into a single reservation.
fn combine(self, other: Self) -> Self;
}

pub trait CowPtr<T>: Clone + AsRef<T> {
pub trait CowPtr<T>: Debug + Clone + AsRef<T> + Deref<Target = T> {
// TODO: Clone on write.
//
// Will need to be able to get the underlying reservation in order to track
Expand All @@ -28,20 +32,23 @@ pub trait CowPtr<T>: Clone + AsRef<T> {
// yet.
}

impl<T> CowPtr<T> for Arc<T> {}
impl<T> CowPtr<T> for Arc<T> where T: Debug {}

#[derive(Debug, Clone, Copy)]
pub struct NopBufferManager;

impl BufferManager for NopBufferManager {
type Reservation = NopReservation;
type CowPtr<T> = Arc<T>;
type CowPtr<T>
= Arc<T>
where
T: Debug;

fn reserve_external(&self, _: usize) -> Result<Self::Reservation> {
Ok(NopReservation)
}

fn make_cow<T>(&self, item: T) -> Result<Self::CowPtr<T>> {
fn make_cow<T: Debug>(&self, item: T) -> Result<Self::CowPtr<T>, T> {
Ok(Arc::new(item))
}
}
Expand Down

0 comments on commit 1435d1f

Please sign in to comment.