Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 23, 2024
1 parent c1e33ba commit 1125f97
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 54 deletions.
11 changes: 7 additions & 4 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1749,8 +1749,8 @@ mod tests {

let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_column_key("kc1".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("kc2".as_bytes().to_vec(), column_2_key.to_vec())
.with_column_key("float_field".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("double_field".as_bytes().to_vec(), column_2_key.to_vec())
.build(),
);

Expand Down Expand Up @@ -1791,6 +1791,8 @@ mod tests {
#[test]
#[cfg(feature = "encryption")]
fn test_non_uniform_encryption() {
// Decryption configuration 2: Decrypt using key retriever callback that holds the
// keys of two encrypted columns and the footer key. Supply aad_prefix.
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
let file = File::open(path).unwrap();
Expand All @@ -1802,8 +1804,9 @@ mod tests {
let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_footer_key(footer_key.to_vec())
.with_column_key("kc1".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("kc2".as_bytes().to_vec(), column_2_key.to_vec())
.with_column_key("float_field".as_bytes().to_vec(), column_1_key.to_vec())
.with_column_key("double_field".as_bytes().to_vec(), column_2_key.to_vec())
.with_aad_prefix("tester".as_bytes().to_vec())
.build(),
);

Expand Down
21 changes: 18 additions & 3 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
use ring::rand::{SecureRandom, SystemRandom};
use zstd::zstd_safe::WriteBuf;
use crate::errors::{ParquetError, Result};
use crate::format::EncryptionAlgorithm;

Expand Down Expand Up @@ -231,32 +232,40 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
pub struct FileDecryptionProperties {
footer_key: Option<Vec<u8>>,
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
aad_prefix: Option<Vec<u8>>,
}

impl FileDecryptionProperties {
pub fn builder() -> DecryptionPropertiesBuilder {
DecryptionPropertiesBuilder::with_defaults()
}
pub fn has_footer_key(&self) -> bool { self.footer_key.is_some() }

pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
self.aad_prefix.as_ref()
}
}

pub struct DecryptionPropertiesBuilder {
footer_key: Option<Vec<u8>>,
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
aad_prefix: Option<Vec<u8>>,
}

impl DecryptionPropertiesBuilder {
pub fn with_defaults() -> Self {
Self {
footer_key: None,
column_keys: None,
aad_prefix: None,
}
}

pub fn build(self) -> FileDecryptionProperties {
FileDecryptionProperties {
footer_key: self.footer_key,
column_keys: self.column_keys,
aad_prefix: self.aad_prefix,
}
}

Expand All @@ -266,6 +275,11 @@ impl DecryptionPropertiesBuilder {
self
}

pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
self.aad_prefix = Some(value);
self
}

pub fn with_column_key(mut self, key: Vec<u8>, value: Vec<u8>) -> Self {
let mut column_keys= self.column_keys.unwrap_or_else(HashMap::new);
column_keys.insert(key, value);
Expand Down Expand Up @@ -311,9 +325,10 @@ impl FileDecryptor {
self.footer_decryptor.unwrap()
}

pub(crate) fn get_column_decryptor(&self, column_key: &[u8]) -> RingGcmBlockDecryptor {
let column_key = self.decryption_properties.column_keys.as_ref().unwrap().get(column_key).unwrap();
RingGcmBlockDecryptor::new(column_key)
pub(crate) fn get_column_decryptor(&self, column_name: &[u8]) -> RingGcmBlockDecryptor {
let column_keys = &self.decryption_properties.column_keys.clone().unwrap();
let column_key = column_keys[&column_name.to_vec()].clone();
RingGcmBlockDecryptor::new(&column_key)
}

pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties {
Expand Down
59 changes: 48 additions & 11 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,9 @@ mod writer;

use std::ops::Range;
use std::sync::Arc;

use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup,
SizeStatistics, SortingColumn,
};

use zstd::zstd_safe::WriteBuf;
use crate::format::{BoundaryOrder, ColumnChunk, ColumnCryptoMetaData, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, SizeStatistics, SortingColumn};
use crate::encryption::ciphers::{create_footer_aad, create_page_aad, ModuleType};
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
#[cfg(feature = "encryption")]
use crate::encryption::ciphers::FileDecryptor;
Expand All @@ -119,6 +116,9 @@ use crate::schema::types::{
pub use reader::ParquetMetaDataReader;
pub use writer::ParquetMetaDataWriter;
pub(crate) use writer::ThriftMetadataWriter;
use crate::data_type::AsBytes;
use crate::encryption::ciphers::{BlockDecryptor, DecryptionPropertiesBuilder, FileDecryptionProperties};
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};

/// Page level statistics for each column chunk of each row group.
///
Expand Down Expand Up @@ -636,7 +636,7 @@ impl RowGroupMetaData {
}

/// Method to convert from Thrift.
pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result<RowGroupMetaData> {
pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup, #[cfg(feature = "encryption")] decryptor: Option<&FileDecryptor>) -> Result<RowGroupMetaData> {
if schema_descr.num_columns() != rg.columns.len() {
return Err(general_err!(
"Column count mismatch. Schema has {} columns while Row Group has {}",
Expand All @@ -647,8 +647,45 @@ impl RowGroupMetaData {
let total_byte_size = rg.total_byte_size;
let num_rows = rg.num_rows;
let mut columns = vec![];
for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
for (i, (c, d)) in rg.columns.drain(0..).zip(schema_descr.columns()).enumerate() {
let cc;
#[cfg(feature = "encryption")]
if let Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) = c.crypto_metadata.clone() {
if decryptor.is_none() {
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
} else {
let column_name = crypto_metadata.path_in_schema.join(".");
let column_decryptor = decryptor.unwrap().get_column_decryptor(column_name.as_bytes());
let aad_file_unique = decryptor.unwrap().aad_file_unique();
let aad_prefix = decryptor.unwrap().decryption_properties().aad_prefix().unwrap();
let aad = [aad_prefix.clone(), aad_file_unique.clone()].concat();
// let s = aad.as_slice();
let column_aad = create_page_aad(
aad.as_slice(),
ModuleType::ColumnMetaData,
rg.ordinal.unwrap() as usize,
i as usize,
None,
)?;

let mut buf = c.encrypted_column_metadata.unwrap();
// let mut prot = TCompactSliceInputProtocol::new(buf.as_slice().clone());
// let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
// decrypted_fmd_buf =
// footer_decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
let mut c2 = column_decryptor.decrypt(buf.as_ref(), column_aad.as_ref())?;
let mut prot = TCompactSliceInputProtocol::new(c2.as_slice());
let c3 = ColumnChunk::read_from_in_protocol(&mut prot)?;
// let md = ColumnMetaData::from_thrift(c2, d.clone())?;
// c2.meta_data = Some(md);
cc = ColumnChunkMetaData::from_thrift(d.clone(), c3)?;
// } else if let Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(x)) = c.crypto_metadata {
// todo!()
}
} else {
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
}

columns.push(cc);
}
let sorting_columns = rg.sorting_columns;
Expand Down Expand Up @@ -1641,7 +1678,7 @@ mod tests {
.unwrap();

let row_group_exp = row_group_meta.to_thrift();
let row_group_res = RowGroupMetaData::from_thrift(schema_descr, row_group_exp.clone())
let row_group_res = RowGroupMetaData::from_thrift(schema_descr, row_group_exp.clone(), #[cfg(feature = "encryption")] None)
.unwrap()
.to_thrift();

Expand Down Expand Up @@ -1723,7 +1760,7 @@ mod tests {
.unwrap();

let err =
RowGroupMetaData::from_thrift(schema_descr_3cols, row_group_meta_2cols.to_thrift())
RowGroupMetaData::from_thrift(schema_descr_3cols, row_group_meta_2cols.to_thrift(), #[cfg(feature = "encryption")] None)
.unwrap_err()
.to_string();
assert_eq!(
Expand Down
3 changes: 1 addition & 2 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,7 @@ impl ParquetMetaDataReader {
let mut row_groups = Vec::new();
// TODO: row group filtering
for rg in t_file_metadata.row_groups {
// rg.
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg, #[cfg(feature = "encryption")] decryptor.as_ref())?);
}
let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);

Expand Down
76 changes: 42 additions & 34 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use std::collections::VecDeque;
use std::iter;
use std::{fs::File, io::Read, path::Path, sync::Arc};
use thrift::protocol::TCompactInputProtocol;
use crate::encryption::ciphers::RingGcmBlockDecryptor;

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -346,40 +347,42 @@ pub(crate) fn read_page_header<T: Read>(
#[cfg(feature = "encryption")] crypto_context: Option<Arc<CryptoContext>>,
) -> Result<PageHeader> {
#[cfg(feature = "encryption")]
if let Some(crypto_context) = crypto_context {
let decryptor = &crypto_context.data_decryptor();
// todo: get column decryptor
// let file_decryptor = decryptor.get_column_decryptor(crypto_context.column_ordinal);
// if !decryptor.decryption_properties().has_footer_key() {
// return Err(general_err!("Missing footer decryptor"));
// }
let file_decryptor = decryptor.footer_decryptor();
let aad_file_unique = decryptor.aad_file_unique();

let module_type = if crypto_context.dictionary_page {
ModuleType::DictionaryPageHeader
} else {
ModuleType::DataPageHeader
};
let aad = create_page_aad(
aad_file_unique.as_slice(),
module_type,
crypto_context.row_group_ordinal,
crypto_context.column_ordinal,
crypto_context.page_ordinal,
)?;

let mut len_bytes = [0; 4];
input.read_exact(&mut len_bytes)?;
let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
let mut ciphertext = vec![0; 4 + ciphertext_len];
input.read_exact(&mut ciphertext[4..])?;
let buf = file_decryptor.unwrap().decrypt(&ciphertext, aad.as_ref())?;

let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
return Ok(page_header);
}
// if let Some(crypto_context) = crypto_context {
// // crypto_context.data_decryptor().get_column_decryptor()
// let decryptor = &crypto_context.data_decryptor();
// // todo: get column decryptor
// // let file_decryptor = decryptor.ge(crypto_context.column_ordinal);
// // if !decryptor.decryption_properties().has_footer_key() {
// // return Err(general_err!("Missing footer decryptor"));
// // }
// let file_decryptor = decryptor.footer_decryptor();
// let aad_file_unique = decryptor.aad_file_unique();
// let aad_prefix = decryptor.aad_prefix();
//
// let module_type = if crypto_context.dictionary_page {
// ModuleType::DictionaryPageHeader
// } else {
// ModuleType::DataPageHeader
// };
// let aad = create_page_aad(
// [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat().as_slice(),
// module_type,
// crypto_context.row_group_ordinal,
// crypto_context.column_ordinal,
// crypto_context.page_ordinal,
// )?;
//
// let mut len_bytes = [0; 4];
// input.read_exact(&mut len_bytes)?;
// let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
// let mut ciphertext = vec![0; 4 + ciphertext_len];
// input.read_exact(&mut ciphertext[4..])?;
// let buf = file_decryptor.unwrap().decrypt(&ciphertext, aad.as_ref())?;
//
// let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
// let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
// return Ok(page_header);
// }

let mut prot = TCompactInputProtocol::new(input);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down Expand Up @@ -454,6 +457,11 @@ pub(crate) fn decode_page(
let buffer: Bytes = if crypto_context.is_some() {
let crypto_context = crypto_context.as_ref().unwrap();
let decryptor = crypto_context.data_decryptor();
let Some(file_decryptor) = if let Some(f) = decryptor.footer_decryptor().clone() {
// Some(RingGcmBlockDecryptor::new(decryptor..as_ref()))
} else {
decryptor.
};
let file_decryptor = decryptor.footer_decryptor();

let module_type = if crypto_context.dictionary_page {
Expand Down

0 comments on commit 1125f97

Please sign in to comment.