Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Sep 10, 2024
1 parent 5d336b6 commit d6a5f38
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 32 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/static_array_collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use crate::array::{
MutableBinaryArray, MutableBinaryValuesArray, MutableBinaryViewArray, PrimitiveArray,
StructArray, Utf8Array, Utf8ViewArray,
};
use crate::storage::SharedStorage;
use crate::bitmap::Bitmap;
use crate::datatypes::ArrowDataType;
#[cfg(feature = "dtype-array")]
use crate::legacy::prelude::fixed_size_list::AnonymousBuilder as AnonymousFixedSizeListArrayBuilder;
use crate::legacy::prelude::list::AnonymousBuilder as AnonymousListArrayBuilder;
use crate::legacy::trusted_len::TrustedLenPush;
use crate::storage::SharedStorage;
use crate::trusted_len::TrustedLen;
use crate::types::NativeType;

Expand Down
23 changes: 15 additions & 8 deletions crates/polars-arrow/src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::bitmap::aligned::AlignedBitmapSlice;
use crate::bitmap::iterator::{
FastU32BitmapIter, FastU56BitmapIter, FastU64BitmapIter, TrueIdxIter,
};
use crate::storage::SharedStorage;
use crate::legacy::utils::FromTrustedLenIterator;
use crate::storage::SharedStorage;
use crate::trusted_len::TrustedLen;

const UNKNOWN_BIT_COUNT: u64 = u64::MAX;
Expand Down Expand Up @@ -294,7 +294,8 @@ impl Bitmap {
// Subtract the null count of the chunks we slice off.
let slice_end = self.offset + offset + length;
let head_count = count_zeros(&self.storage, self.offset, offset);
let tail_count = count_zeros(&self.storage, slice_end, self.length - length - offset);
let tail_count =
count_zeros(&self.storage, slice_end, self.length - length - offset);
let new_count = *unset_bit_count_cache - head_count as u64 - tail_count as u64;
*unset_bit_count_cache = new_count;
} else {
Expand Down Expand Up @@ -370,7 +371,7 @@ impl Bitmap {
Err(storage) => {
self.storage = storage;
Either::Left(self)
}
},
}
}

Expand Down Expand Up @@ -399,10 +400,9 @@ impl Bitmap {
// We intentionally leak 1MiB of zeroed memory once so we don't have to
// refcount it.
const GLOBAL_ZERO_SIZE: usize = 1024 * 1024;
static GLOBAL_ZEROES: LazyLock<SharedStorage<u8>> = LazyLock::new(|| {
SharedStorage::from_static(vec![0; GLOBAL_ZERO_SIZE].leak())
});

static GLOBAL_ZEROES: LazyLock<SharedStorage<u8>> =
LazyLock::new(|| SharedStorage::from_static(vec![0; GLOBAL_ZERO_SIZE].leak()));

let bytes_needed = length.div_ceil(8);
let storage = if bytes_needed <= GLOBAL_ZERO_SIZE {
GLOBAL_ZEROES.clone()
Expand All @@ -427,7 +427,14 @@ impl Bitmap {
vec![0; length.saturating_add(7) / 8]
};
let unset_bits = if value { 0 } else { length };
unsafe { Bitmap::from_inner_unchecked(SharedStorage::from_vec(bytes), 0, length, Some(unset_bits)) }
unsafe {
Bitmap::from_inner_unchecked(
SharedStorage::from_vec(bytes),
0,
length,
Some(unset_bits),
)
}
}

/// Counts the nulls (unset bits) starting from `offset` bits and for `length` bits.
Expand Down
10 changes: 7 additions & 3 deletions crates/polars-arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use either::Either;
use num_traits::Zero;

use super::IntoIter;
use crate::storage::SharedStorage;
use crate::array::{ArrayAccessor, Splitable};
use crate::storage::SharedStorage;

/// [`Buffer`] is a contiguous memory region that can be shared across
/// thread boundaries.
Expand Down Expand Up @@ -82,7 +82,11 @@ impl<T> Buffer<T> {
pub(crate) fn from_storage(storage: SharedStorage<T>) -> Self {
let ptr = storage.as_ptr();
let length = storage.len();
Buffer { storage, ptr, length, }
Buffer {
storage,
ptr,
length,
}
}

/// Returns the number of bytes in the buffer
Expand Down Expand Up @@ -213,7 +217,7 @@ impl<T> Buffer<T> {
Err(slf) => {
self.storage = slf;
Either::Left(self)
}
},
}
}

Expand Down
7 changes: 2 additions & 5 deletions crates/polars-arrow/src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::array::*;
use crate::bitmap::utils::bytes_for;
use crate::bitmap::Bitmap;
use crate::buffer::Buffer;
use crate::storage::SharedStorage;
use crate::datatypes::{ArrowDataType, PhysicalType};
use crate::ffi::schema::get_child;
use crate::storage::SharedStorage;
use crate::types::NativeType;
use crate::{match_integer_type, with_match_primitive_type_full};

Expand Down Expand Up @@ -330,10 +330,7 @@ unsafe fn create_bitmap(
None
};
Ok(Bitmap::from_inner_unchecked(
storage,
offset,
len,
null_count,
storage, offset, len, null_count,
))
}

Expand Down
32 changes: 17 additions & 15 deletions crates/polars-arrow/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::ffi::InternalArrowArray;

enum BackingStorage {
Vec {
capacity: usize
capacity: usize,
},
InternalArrowArray(InternalArrowArray),
#[cfg(feature = "arrow_rs")]
Expand All @@ -29,8 +29,8 @@ impl<T> Drop for SharedStorageInner<T> {
Some(BackingStorage::ArrowBuffer(b)) => drop(b),
Some(BackingStorage::Vec { capacity }) => unsafe {
drop(Vec::from_raw_parts(self.ptr, self.length, capacity))
}
None => {}
},
None => {},
}
}
}
Expand All @@ -39,6 +39,9 @@ pub struct SharedStorage<T> {
inner: NonNull<SharedStorageInner<T>>,
}

unsafe impl<T: Sync + Send> Send for SharedStorage<T> {}
unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}

impl<T> SharedStorage<T> {
pub fn from_static(slice: &'static [T]) -> Self {
let length = slice.len();
Expand Down Expand Up @@ -69,7 +72,7 @@ impl<T> SharedStorage<T> {
inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
}
}

pub fn from_internal_arrow_array(ptr: *const T, len: usize, arr: InternalArrowArray) -> Self {
let inner = SharedStorageInner {
ref_count: AtomicU64::new(1),
Expand Down Expand Up @@ -101,7 +104,7 @@ impl<T: crate::types::NativeType> SharedStorage<T> {
inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
}
}

pub fn into_arrow_buffer(self) -> arrow_buffer::Buffer {
let ptr = NonNull::new(self.as_ptr() as *mut u8).unwrap();
let len = self.len() * std::mem::size_of::<T>();
Expand All @@ -115,20 +118,20 @@ impl<T> SharedStorage<T> {
pub fn len(&self) -> usize {
self.inner().length
}

#[inline(always)]
pub fn as_ptr(&self) -> *const T {
self.inner().ptr
}

#[inline(always)]
pub fn is_exclusive(&mut self) -> bool {
// Ordering semantics copied from Arc<T>.
self.inner().ref_count.load(Ordering::Acquire) == 1
}

/// Gets the reference count of this storage.
///
///
/// Because this function takes a shared reference this should not be used
/// in cases where we are checking if the refcount is one for safety,
/// someone else could increment it in the meantime.
Expand All @@ -137,14 +140,14 @@ impl<T> SharedStorage<T> {
// Ordering semantics copied from Arc<T>.
self.inner().ref_count.load(Ordering::Acquire)
}

pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> {
self.is_exclusive().then(|| {
let inner = self.inner();
unsafe { core::slice::from_raw_parts_mut(inner.ptr, inner.length) }
})
}

pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
let Some(BackingStorage::Vec { capacity }) = self.inner().backing else {
return Err(self);
Expand All @@ -171,9 +174,6 @@ impl<T> SharedStorage<T> {
}
}

unsafe impl<T: Sync + Send> Send for SharedStorage<T> {}
unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}

impl<T> Deref for SharedStorage<T> {
type Target = [T];

Expand Down Expand Up @@ -203,11 +203,13 @@ impl<T> Drop for SharedStorage<T> {
if inner.backing.is_none() {
return;
}

// Ordering semantics copied from Arc<T>.
if inner.ref_count.fetch_sub(1, Ordering::Release) == 1 {
std::sync::atomic::fence(Ordering::Acquire);
unsafe { self.drop_slow(); }
unsafe {
self.drop_slow();
}
}
}
}

0 comments on commit d6a5f38

Please sign in to comment.