Skip to content

Commit

Permalink
Add memory limit to VegaFusionCache and implement SLRU eviction policy (
Browse files Browse the repository at this point in the history
#75)

* Add initial TaskValue::size_of associated function
* Add cache memory limit alongsize size limit
* Update cache implementation to segmented LRU
  • Loading branch information
jonmmease authored Feb 18, 2022
1 parent 9e48cee commit 9feb57f
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 89 deletions.
134 changes: 75 additions & 59 deletions python/vegafusion-jupyter/conda-win-64-cp310.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions python/vegafusion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ install_requires =
altair>=4.2.0
pyarrow>=6
pandas
psutil

[options.extras_require]
embed = vegafusion-python-embed
51 changes: 48 additions & 3 deletions python/vegafusion/vegafusion/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import multiprocessing
import psutil


class VegaFusionRuntime:
def __init__(self, cache_capacity, worker_threads):
def __init__(self, cache_capacity, memory_limit, worker_threads):
self._runtime = None
self._cache_capacity = cache_capacity
self._memory_limit = memory_limit
self._worker_threads = worker_threads

@property
Expand All @@ -29,7 +31,7 @@ def runtime(self):
# Try to initialize an embedded runtime
from vegafusion_embed import PyTaskGraphRuntime

self._runtime = PyTaskGraphRuntime(self.cache_capacity, self.worker_threads)
self._runtime = PyTaskGraphRuntime(self.cache_capacity, self.memory_limit, self.worker_threads)
return self._runtime

def process_request_bytes(self, request):
Expand All @@ -50,6 +52,49 @@ def worker_threads(self, value):
self._worker_threads = value
self.reset()

@property
def total_memory(self):
if self._runtime:
return self._runtime.total_memory()
else:
return None

@property
def _protected_memory(self):
if self._runtime:
return self._runtime.protected_memory()
else:
return None

@property
def _probationary_memory(self):
if self._runtime:
return self._runtime.probationary_memory()
else:
return None

@property
def size(self):
if self._runtime:
return self._runtime.size()
else:
return None

@property
def memory_limit(self):
return self._memory_limit

@memory_limit.setter
def memory_limit(self, value):
"""
Restart the runtime with the specified memory limit
:param threads: Max approximate memory usage of cache
"""
if value != self._memory_limit:
self._memory_limit = value
self.reset()

@property
def cache_capacity(self):
return self._cache_capacity
Expand Down Expand Up @@ -78,4 +123,4 @@ def __repr__(self):
)


runtime = VegaFusionRuntime(16, multiprocessing.cpu_count())
runtime = VegaFusionRuntime(64, psutil.virtual_memory().total // 2, psutil.cpu_count())
95 changes: 95 additions & 0 deletions vegafusion-core/src/task_graph/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::data::scalar::ScalarValue;
use crate::data::table::VegaFusionTable;
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::mem::{size_of, size_of_val};

/// Get the size of a Field value, including any inner heap-allocated data
fn size_of_field(field: &Field) -> usize {
size_of::<Field>() + inner_size_of_dtype(field.data_type())
}

/// Get the size of inner heap-allocated data associated with a DataType value
fn inner_size_of_dtype(value: &DataType) -> usize {
match value {
DataType::Map(field, _) => size_of_field(field),
DataType::Timestamp(_, Some(tz)) => size_of::<String>() + size_of_val(tz.as_bytes()),
DataType::List(field) => size_of_field(field),
DataType::LargeList(field) => size_of_field(field),
DataType::FixedSizeList(field, _) => size_of_field(field),
DataType::Struct(fields) => {
size_of::<Vec<Field>>() + fields.iter().map(size_of_field).sum::<usize>()
}
DataType::Union(fields) => {
size_of::<Vec<Field>>() + fields.iter().map(size_of_field).sum::<usize>()
}
DataType::Dictionary(key_dtype, value_dtype) => {
2 * size_of::<DataType>()
+ inner_size_of_dtype(key_dtype)
+ inner_size_of_dtype(value_dtype)
}
_ => {
// No inner heap-allocated data
0
}
}
}

/// Get the size of inner heap-allocated data associated with a ScalarValue value
pub fn inner_size_of_scalar(value: &ScalarValue) -> usize {
match value {
ScalarValue::Utf8(Some(s)) => size_of_val(s.as_bytes()) + size_of::<String>(),
ScalarValue::LargeUtf8(Some(s)) => size_of_val(s.as_bytes()) + size_of::<String>(),
ScalarValue::Binary(Some(b)) => size_of_val(b.as_slice()) + size_of::<Vec<u8>>(),
ScalarValue::LargeBinary(Some(b)) => size_of_val(b.as_slice()) + size_of::<Vec<u8>>(),
ScalarValue::List(Some(values), dtype) => {
let values_bytes: usize = size_of::<Vec<ScalarValue>>()
+ values
.iter()
.map(|v| size_of::<ScalarValue>() + inner_size_of_scalar(v))
.sum::<usize>();

let dtype_bytes = size_of::<DataType>() + inner_size_of_dtype(dtype);

values_bytes + dtype_bytes
}
ScalarValue::Struct(Some(values), fields) => {
let values_bytes: usize = size_of::<Vec<ScalarValue>>()
+ values
.iter()
.map(|v| size_of::<ScalarValue>() + inner_size_of_scalar(v))
.sum::<usize>();

let fields_bytes: usize =
size_of::<Vec<DataType>>() + fields.iter().map(size_of_field).sum::<usize>();

values_bytes + fields_bytes
}
_ => {
// No inner heap-allocated data
0
}
}
}

pub fn size_of_array_ref(array: &ArrayRef) -> usize {
array.get_array_memory_size() + inner_size_of_dtype(array.data_type()) + size_of::<ArrayRef>()
}

pub fn size_of_schema(schema: &Schema) -> usize {
size_of::<Schema>() + schema.fields().iter().map(size_of_field).sum::<usize>()
}

pub fn size_of_record_batch(batch: &RecordBatch) -> usize {
let schema = batch.schema();
let schema_size: usize = size_of_schema(schema.as_ref());
let arrays_size: usize = batch.columns().iter().map(size_of_array_ref).sum();
size_of::<RecordBatch>() + schema_size + arrays_size
}

pub fn inner_size_of_table(value: &VegaFusionTable) -> usize {
let schema_size: usize = size_of_schema(&value.schema);
let size_of_batches: usize = value.batches.iter().map(size_of_record_batch).sum();
schema_size + size_of_batches
}
1 change: 1 addition & 0 deletions vegafusion-core/src/task_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* If not, see http://www.gnu.org/licenses/.
*/
pub mod graph;
pub mod memory;
pub mod scope;
pub mod task;
pub mod task_value;
10 changes: 10 additions & 0 deletions vegafusion-core/src/task_graph/task_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::data::table::VegaFusionTable;
use crate::error::{Result, VegaFusionError};
use crate::proto::gen::tasks::task_value::Data;
use crate::proto::gen::tasks::TaskValue as ProtoTaskValue;
use crate::task_graph::memory::{inner_size_of_scalar, inner_size_of_table};
use arrow::record_batch::RecordBatch;
use serde_json::Value;
use std::convert::TryFrom;
Expand Down Expand Up @@ -52,6 +53,15 @@ impl TaskValue {
TaskValue::Table(value) => Ok(value.to_json()),
}
}

pub fn size_of(&self) -> usize {
let inner_size = match self {
TaskValue::Scalar(scalar) => inner_size_of_scalar(scalar),
TaskValue::Table(table) => inner_size_of_table(table),
};

std::mem::size_of::<Self>() + inner_size
}
}

impl TryFrom<&ProtoTaskValue> for TaskValue {
Expand Down
24 changes: 22 additions & 2 deletions vegafusion-python-embed/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ struct PyTaskGraphRuntime {
#[pymethods]
impl PyTaskGraphRuntime {
#[new]
pub fn new(max_capacity: i32, worker_threads: Option<i32>) -> PyResult<Self> {
pub fn new(
max_capacity: Option<usize>,
memory_limit: Option<usize>,
worker_threads: Option<i32>,
) -> PyResult<Self> {
let mut tokio_runtime_builder = tokio::runtime::Builder::new_multi_thread();
tokio_runtime_builder.enable_all();

Expand All @@ -46,7 +50,7 @@ impl PyTaskGraphRuntime {
.external("Failed to create Tokio thread pool")?;

Ok(Self {
runtime: TaskGraphRuntime::new(max_capacity as usize),
runtime: TaskGraphRuntime::new(max_capacity, memory_limit),
tokio_runtime,
})
}
Expand All @@ -61,6 +65,22 @@ impl PyTaskGraphRuntime {
pub fn clear_cache(&self) {
self.tokio_runtime.block_on(self.runtime.clear_cache());
}

pub fn size(&self) -> usize {
self.runtime.cache.size()
}

pub fn total_memory(&self) -> usize {
self.runtime.cache.total_memory()
}

pub fn protected_memory(&self) -> usize {
self.runtime.cache.protected_memory()
}

pub fn probationary_memory(&self) -> usize {
self.runtime.cache.probationary_memory()
}
}

/// A Python module implemented in Rust. The name of this function must match
Expand Down
Loading

0 comments on commit 9feb57f

Please sign in to comment.