Skip to content

Commit

Permalink
Merge branch 'main' into hive-multiple-paths
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 22, 2024
2 parents 4f2e7b6 + 0c1274f commit b5e0c56
Show file tree
Hide file tree
Showing 87 changed files with 1,086 additions and 845 deletions.
77 changes: 45 additions & 32 deletions crates/polars-core/src/chunked_array/ops/bit_repr.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use arrow::buffer::Buffer;

use crate::prelude::*;
use crate::series::BitRepr;

/// Reinterprets the type of a [`ChunkedArray`]. T and U must have the same size
/// and alignment.
Expand Down Expand Up @@ -103,41 +104,41 @@ impl<T> ToBitRepr for ChunkedArray<T>
where
T: PolarsNumericType,
{
fn bit_repr_is_large() -> bool {
std::mem::size_of::<T::Native>() == 8
}
fn to_bit_repr(&self) -> BitRepr {
let is_large = std::mem::size_of::<T::Native>() == 8;

fn bit_repr_large(&self) -> UInt64Chunked {
if std::mem::size_of::<T::Native>() == 8 {
if is_large {
if matches!(self.dtype(), DataType::UInt64) {
let ca = self.clone();
// Convince the compiler we are this type. This keeps flags.
return unsafe { std::mem::transmute::<ChunkedArray<T>, UInt64Chunked>(ca) };
return BitRepr::Large(unsafe {
std::mem::transmute::<ChunkedArray<T>, UInt64Chunked>(ca)
});
}
reinterpret_chunked_array(self)
} else {
unreachable!()
}
}

fn bit_repr_small(&self) -> UInt32Chunked {
if std::mem::size_of::<T::Native>() == 4 {
if matches!(self.dtype(), DataType::UInt32) {
let ca = self.clone();
// Convince the compiler we are this type. This preserves flags.
return unsafe { std::mem::transmute::<ChunkedArray<T>, UInt32Chunked>(ca) };
}
reinterpret_chunked_array(self)
BitRepr::Large(reinterpret_chunked_array(self))
} else {
// SAFETY: an unchecked cast to uint32 (which has no invariants) is
// always sound.
unsafe {
self.cast_unchecked(&DataType::UInt32)
.unwrap()
.u32()
.unwrap()
.clone()
}
BitRepr::Small(if std::mem::size_of::<T::Native>() == 4 {
if matches!(self.dtype(), DataType::UInt32) {
let ca = self.clone();
// Convince the compiler we are this type. This preserves flags.
return BitRepr::Small(unsafe {
std::mem::transmute::<ChunkedArray<T>, UInt32Chunked>(ca)
});
}

reinterpret_chunked_array(self)
} else {
// SAFETY: an unchecked cast to uint32 (which has no invariants) is
// always sound.
unsafe {
self.cast_unchecked(&DataType::UInt32)
.unwrap()
.u32()
.unwrap()
.clone()
}
})
}
}
}
Expand All @@ -160,7 +161,10 @@ impl Reinterpret for Int64Chunked {
}

fn reinterpret_unsigned(&self) -> Series {
self.bit_repr_large().into_series()
let BitRepr::Large(b) = self.to_bit_repr() else {
unreachable!()
};
b.into_series()
}
}

Expand All @@ -183,7 +187,10 @@ impl Reinterpret for Int32Chunked {
}

fn reinterpret_unsigned(&self) -> Series {
self.bit_repr_small().into_series()
let BitRepr::Small(b) = self.to_bit_repr() else {
unreachable!()
};
b.into_series()
}
}

Expand Down Expand Up @@ -250,7 +257,10 @@ impl Float32Chunked {
where
F: Fn(&Series) -> Series,
{
let s = self.bit_repr_small().into_series();
let BitRepr::Small(s) = self.to_bit_repr() else {
unreachable!()
};
let s = s.into_series();
let out = f(&s);
let out = out.u32().unwrap();
out._reinterpret_float().into()
Expand All @@ -261,7 +271,10 @@ impl Float64Chunked {
where
F: Fn(&Series) -> Series,
{
let s = self.bit_repr_large().into_series();
let BitRepr::Large(s) = self.to_bit_repr() else {
unreachable!()
};
let s = s.into_series();
let out = f(&s);
let out = out.u64().unwrap();
out._reinterpret_float().into()
Expand Down
7 changes: 2 additions & 5 deletions crates/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use serde::{Deserialize, Serialize};
pub use sort::options::*;

use crate::chunked_array::cast::CastOptions;
use crate::series::IsSorted;
use crate::series::{BitRepr, IsSorted};
#[cfg(feature = "reinterpret")]
pub trait Reinterpret {
fn reinterpret_signed(&self) -> Series {
Expand All @@ -59,10 +59,7 @@ pub trait Reinterpret {
/// This is useful in hashing context and reduces no.
/// of compiled code paths.
pub(crate) trait ToBitRepr {
fn bit_repr_is_large() -> bool;

fn bit_repr_large(&self) -> UInt64Chunked;
fn bit_repr_small(&self) -> UInt32Chunked;
fn to_bit_repr(&self) -> BitRepr;
}

pub trait ChunkAnyValue {
Expand Down
103 changes: 51 additions & 52 deletions crates/polars-core/src/frame/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ fn get_exploded(series: &Series) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
}
}

/// Arguments for `[DataFrame::melt]` function
/// Arguments for `[DataFrame::unpivot]` function
#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
pub struct MeltArgs {
pub id_vars: Vec<SmartString>,
pub value_vars: Vec<SmartString>,
pub struct UnpivotArgs {
pub on: Vec<SmartString>,
pub index: Vec<SmartString>,
pub variable_name: Option<SmartString>,
pub value_name: Option<SmartString>,
/// Whether the melt may be done
/// Whether the unpivot may be done
/// in the streaming engine
/// This will not have a stable ordering
pub streamable: bool,
Expand Down Expand Up @@ -189,10 +189,10 @@ impl DataFrame {
///
/// # Arguments
///
/// * `id_vars` - String slice that represent the columns to use as id variables.
/// * `value_vars` - String slice that represent the columns to use as value variables.
/// * `on` - String slice that represent the columns to use as value variables.
/// * `index` - String slice that represent the columns to use as id variables.
///
/// If `value_vars` is empty all columns that are not in `id_vars` will be used.
/// If `on` is empty all columns that are not in `index` will be used.
///
/// ```ignore
/// # use polars_core::prelude::*;
Expand All @@ -202,9 +202,9 @@ impl DataFrame {
/// "D" => &[2, 4, 6]
/// )?;
///
/// let melted = df.melt(&["A", "B"], &["C", "D"])?;
/// let unpivoted = df.unpivot(&["A", "B"], &["C", "D"])?;
/// println!("{:?}", df);
/// println!("{:?}", melted);
/// println!("{:?}", unpivoted);
/// # Ok::<(), PolarsError>(())
/// ```
/// Outputs:
Expand Down Expand Up @@ -239,51 +239,51 @@ impl DataFrame {
/// | "a" | 5 | "D" | 6 |
/// +-----+-----+----------+-------+
/// ```
pub fn melt<I, J>(&self, id_vars: I, value_vars: J) -> PolarsResult<Self>
pub fn unpivot<I, J>(&self, on: I, index: J) -> PolarsResult<Self>
where
I: IntoVec<SmartString>,
J: IntoVec<SmartString>,
{
let id_vars = id_vars.into_vec();
let value_vars = value_vars.into_vec();
self.melt2(MeltArgs {
id_vars,
value_vars,
let index = index.into_vec();
let on = on.into_vec();
self.unpivot2(UnpivotArgs {
on,
index,
..Default::default()
})
}

/// Similar to melt, but without generics. This may be easier if you want to pass
/// an empty `id_vars` or empty `value_vars`.
pub fn melt2(&self, args: MeltArgs) -> PolarsResult<Self> {
let id_vars = args.id_vars;
let mut value_vars = args.value_vars;
/// Similar to unpivot, but without generics. This may be easier if you want to pass
/// an empty `index` or empty `on`.
pub fn unpivot2(&self, args: UnpivotArgs) -> PolarsResult<Self> {
let index = args.index;
let mut on = args.on;

let variable_name = args.variable_name.as_deref().unwrap_or("variable");
let value_name = args.value_name.as_deref().unwrap_or("value");

let len = self.height();

// if value vars is empty we take all columns that are not in id_vars.
if value_vars.is_empty() {
if on.is_empty() {
// return empty frame if there are no columns available to use as value vars
if id_vars.len() == self.width() {
if index.len() == self.width() {
let variable_col = Series::new_empty(variable_name, &DataType::String);
let value_col = Series::new_empty(variable_name, &DataType::Null);

let mut out = self.select(id_vars).unwrap().clear().columns;
let mut out = self.select(index).unwrap().clear().columns;
out.push(variable_col);
out.push(value_col);

return Ok(unsafe { DataFrame::new_no_checks(out) });
}

let id_vars_set = PlHashSet::from_iter(id_vars.iter().map(|s| s.as_str()));
value_vars = self
let index_set = PlHashSet::from_iter(index.iter().map(|s| s.as_str()));
on = self
.get_columns()
.iter()
.filter_map(|s| {
if id_vars_set.contains(s.name()) {
if index_set.contains(s.name()) {
None
} else {
Some(s.name().into())
Expand All @@ -294,7 +294,7 @@ impl DataFrame {

// values will all be placed in single column, so we must find their supertype
let schema = self.schema();
let mut iter = value_vars.iter().map(|v| {
let mut iter = on.iter().map(|v| {
schema
.get(v)
.ok_or_else(|| polars_err!(ColumnNotFound: "{}", v))
Expand All @@ -304,31 +304,30 @@ impl DataFrame {
st = try_get_supertype(&st, dt?)?;
}

// The column name of the variable that is melted
let mut variable_col =
MutableBinaryViewArray::<str>::with_capacity(len * value_vars.len() + 1);
// The column name of the variable that is unpivoted
let mut variable_col = MutableBinaryViewArray::<str>::with_capacity(len * on.len() + 1);
// prepare ids
let ids_ = self.select_with_schema_unchecked(id_vars, &schema)?;
let ids_ = self.select_with_schema_unchecked(index, &schema)?;
let mut ids = ids_.clone();
if ids.width() > 0 {
for _ in 0..value_vars.len() - 1 {
for _ in 0..on.len() - 1 {
ids.vstack_mut_unchecked(&ids_)
}
}
ids.as_single_chunk_par();
drop(ids_);

let mut values = Vec::with_capacity(value_vars.len());
let mut values = Vec::with_capacity(on.len());

for value_column_name in &value_vars {
for value_column_name in &on {
variable_col.extend_constant(len, Some(value_column_name.as_str()));
// ensure we go via the schema so we are O(1)
// self.column() is linear
// together with this loop that would make it O^2 over value_vars
// together with this loop that would make it O^2 over `on`
let (pos, _name, _dtype) = schema.try_get_full(value_column_name)?;
let col = &self.columns[pos];
let value_col = col.cast(&st).map_err(
|_| polars_err!(InvalidOperation: "'melt/unpivot' not supported for dtype: {}", col.dtype()),
|_| polars_err!(InvalidOperation: "'unpivot' not supported for dtype: {}", col.dtype()),
)?;
values.extend_from_slice(value_col.chunks())
}
Expand Down Expand Up @@ -434,28 +433,28 @@ mod test {

#[test]
#[cfg_attr(miri, ignore)]
fn test_melt() -> PolarsResult<()> {
fn test_unpivot() -> PolarsResult<()> {
let df = df!("A" => &["a", "b", "a"],
"B" => &[1, 3, 5],
"C" => &[10, 11, 12],
"D" => &[2, 4, 6]
)
.unwrap();

let melted = df.melt(["A", "B"], ["C", "D"])?;
let unpivoted = df.unpivot(["C", "D"], ["A", "B"])?;
assert_eq!(
Vec::from(melted.column("value")?.i32()?),
Vec::from(unpivoted.column("value")?.i32()?),
&[Some(10), Some(11), Some(12), Some(2), Some(4), Some(6)]
);

let args = MeltArgs {
id_vars: vec![],
value_vars: vec![],
let args = UnpivotArgs {
on: vec![],
index: vec![],
..Default::default()
};

let melted = df.melt2(args).unwrap();
let value = melted.column("value")?;
let unpivoted = df.unpivot2(args).unwrap();
let value = unpivoted.column("value")?;
// String because of supertype
let value = value.str()?;
let value = value.into_no_null_iter().collect::<Vec<_>>();
Expand All @@ -464,22 +463,22 @@ mod test {
&["a", "b", "a", "1", "3", "5", "10", "11", "12", "2", "4", "6"]
);

let args = MeltArgs {
id_vars: vec!["A".into()],
value_vars: vec![],
let args = UnpivotArgs {
on: vec![],
index: vec!["A".into()],
..Default::default()
};

let melted = df.melt2(args).unwrap();
let value = melted.column("value")?;
let unpivoted = df.unpivot2(args).unwrap();
let value = unpivoted.column("value")?;
let value = value.i32()?;
let value = value.into_no_null_iter().collect::<Vec<_>>();
assert_eq!(value, &[1, 3, 5, 10, 11, 12, 2, 4, 6]);
let variable = melted.column("variable")?;
let variable = unpivoted.column("variable")?;
let variable = variable.str()?;
let variable = variable.into_no_null_iter().collect::<Vec<_>>();
assert_eq!(variable, &["B", "B", "B", "C", "C", "C", "D", "D", "D"]);
assert!(melted.column("A").is_ok());
assert!(unpivoted.column("A").is_ok());
Ok(())
}
}
Loading

0 comments on commit b5e0c56

Please sign in to comment.