-
Notifications
You must be signed in to change notification settings - Fork 384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Significant performance drop after update from 0.16 to 0.18 #7222
Comments
Thanks for reporting this! That sounds quite concerning. Should be easy enough to repro, but if you have a concrete snippet/example-data to try that would be ofc very appreciated |
ReproPython: import numpy as np
import rerun as rr
import time
rr.init("rerun_example_image", spawn=True)
for i in range(0, 100):
rr.set_time_sequence("frame", i)
for j in range(0, 100):
image = np.zeros((200, 300, 3), dtype=np.uint8)
image[:, :, 0] = (i + j) % 255
image[50:150, 50:150] = (0, 255 - ((i + j) % 255), 0)
rr.log(f"images/{j}", rr.Image(image))
time.sleep(0.010) Rust: use ndarray::{s, Array, ShapeBuilder};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("rerun_example_image").spawn()?;
for i in 0..100 {
// NOTE: uncomment this line to fix the perf issue.
rec.set_time_sequence("frame", i);
for j in 0..100 {
let mut image = Array::<u8, _>::zeros((200, 300, 3).f());
image.slice_mut(s![.., .., 0]).fill((i + j) % 255);
image
.slice_mut(s![50..150, 50..150, 1])
.fill(255 - ((i + j) % 255));
rec.log(
format!("images/{j}"),
&rerun::Image::from_color_model_and_tensor(rerun::ColorModel::RGB, image)?,
)?;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
Ok(())
} |
As I expected this is caused by the concatenation kernel being extremely slow for that specific type for some reason ( But I'm also seeing for weird RSS behavior, I have to dig deeper 🤔 |
Regarding the performance, it looks like things are... "correctly slow" 🤷♂️ It seems that my expectations were just off base. Here are standalone benchmarks that faithfully represent a similar workload: Arrow2use arrow2::{
array::{Array, ListArray, PrimitiveArray},
offset::Offsets,
};
fn main() {
let array0 = PrimitiveArray::from_vec((0..200 * 300).map(|v| (v % 255) as u8).collect());
concatenate_and_measure(array0.to_boxed());
let array1 = ListArray::new(
ListArray::<i32>::default_datatype(array0.data_type().clone()),
Offsets::<i32>::try_from_lengths(std::iter::once(array0.len()))
.unwrap()
.into(),
array0.boxed(),
None,
);
concatenate_and_measure(array1.to_boxed());
let array2 = ListArray::new(
ListArray::<i32>::default_datatype(array1.data_type().clone()),
Offsets::<i32>::try_from_lengths(std::iter::once(array1.len()))
.unwrap()
.into(),
array1.boxed(),
None,
);
concatenate_and_measure(array2.to_boxed());
}
fn concatenate_and_measure(array: Box<dyn Array>) {
let mut concatenated = array.clone();
let now = std::time::Instant::now();
for _ in 0..1000 {
concatenated =
arrow2::compute::concatenate::concatenate(&[&*concatenated, &*array]).unwrap();
}
let elapsed = now.elapsed();
// dbg!(&array);
dbg!(concatenated.data_type());
eprintln!(
"1000 accumulated concatenations in {elapsed:?} ({:.3} MiB per sec)",
how_many_bytes(concatenated) as f64 / 1024.0 / 1024.0 / elapsed.as_secs_f64(),
);
}
fn how_many_bytes(array: Box<dyn Array>) -> u64 {
let mut array = array;
loop {
match array.data_type() {
arrow2::datatypes::DataType::UInt8 => break,
arrow2::datatypes::DataType::List(_) => {
let list = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
array = list.values().to_boxed();
}
_ => unreachable!(),
}
}
array.len() as _
}
Arrow1use std::sync::Arc;
use arrow::{
array::{Array, ArrayRef, ListArray, PrimitiveArray},
buffer::OffsetBuffer,
datatypes::{Field, UInt8Type},
};
fn main() {
let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
.map(|v| (v % 255) as u8)
.collect::<Vec<_>>()
.into();
let array0: ArrayRef = Arc::new(array0);
concatenate_and_measure(array0.clone());
let array1 = ListArray::new(
Field::new_list_field(array0.data_type().clone(), false).into(),
OffsetBuffer::from_lengths(std::iter::once(array0.len())),
array0.clone(),
None,
);
let array1: ArrayRef = Arc::new(array1);
concatenate_and_measure(array1.clone());
let array2 = ListArray::new(
Field::new_list_field(array1.data_type().clone(), false).into(),
OffsetBuffer::from_lengths(std::iter::once(array1.len())),
array1.clone(),
None,
);
let array2: ArrayRef = Arc::new(array2);
concatenate_and_measure(array2.clone());
}
fn concatenate_and_measure(array: ArrayRef) {
let mut concatenated = array.clone();
let now = std::time::Instant::now();
for _ in 0..1000 {
concatenated = arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
}
let elapsed = now.elapsed();
// dbg!(&array);
dbg!(concatenated.data_type());
eprintln!(
"1000 accumulated concatenations in {elapsed:?} ({:.3} MiB per sec)",
how_many_bytes(concatenated) as f64 / 1024.0 / 1024.0 / elapsed.as_secs_f64(),
);
}
fn how_many_bytes(array: ArrayRef) -> u64 {
let mut array = array;
loop {
match array.data_type() {
arrow::datatypes::DataType::UInt8 => break,
arrow::datatypes::DataType::List(_) => {
let list = array.as_any().downcast_ref::<ListArray>().unwrap();
array = list.values().clone();
}
_ => unreachable!(),
}
}
array.len() as _
}
Nativefn main() {
let array0 = (0..200 * 300).map(|v| (v % 255) as u8).collect::<Vec<u8>>();
concatenate_and_measure(array0);
}
fn concatenate_and_measure(array: Vec<u8>) {
let mut concatenated = array.clone();
let now = std::time::Instant::now();
for _ in 0..1000 {
concatenated = concatenated
.into_iter()
.chain(array.iter().copied())
.collect();
}
let elapsed = now.elapsed();
eprintln!(
"1000 accumulated concatenations in {elapsed:?} ({:.3} MiB per sec)",
std::mem::size_of_val(concatenated.as_slice()) as f64
/ 1024.0
/ 1024.0
/ elapsed.as_secs_f64()
);
}
ConclusionIf anything, |
Alright I've found the leak, although I haven't had time to look for a solution yet. The issue stems from the implementation of the arrow concatenation kernel for things wrapped in one or more layer of I originally felt this could be the source of the problem, and tested it out on a Interestingly, the problem appears both in I'll look for a fix tomorrow. Arrowuse std::sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
Arc,
};
static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0);
}
pub struct TrackingAllocator {
allocator: std::alloc::System,
}
#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
allocator: std::alloc::System,
};
#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
#[allow(clippy::let_and_return)]
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed));
LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.alloc(layout) }
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed));
LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.dealloc(ptr, layout) };
}
}
fn live_bytes_local() -> usize {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}
fn live_bytes_global() -> usize {
LIVE_BYTES_GLOBAL.load(Relaxed)
}
/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
let used_bytes_start_local = live_bytes_local();
let used_bytes_start_global = live_bytes_global();
let ret = run();
let bytes_used_local = live_bytes_local() - used_bytes_start_local;
let bytes_used_global = live_bytes_global() - used_bytes_start_global;
drop(ret);
(bytes_used_global, bytes_used_local)
}
// ----------------------------------------------------------------------------
use arrow::{
array::{Array, ArrayRef, ListArray, PrimitiveArray},
buffer::OffsetBuffer,
datatypes::{Field, UInt8Type},
};
fn main() {
let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
.map(|v| (v % 255) as u8)
.collect::<Vec<_>>()
.into();
let array0: ArrayRef = Arc::new(array0);
let (global, local) = memory_use(|| {
let concatenated = concatenate_and_measure(array0.clone());
eprintln!("expected: {}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
let array1 = ListArray::new(
Field::new_list_field(array0.data_type().clone(), false).into(),
OffsetBuffer::from_lengths(std::iter::once(array0.len())),
array0.clone(),
None,
);
let array1: ArrayRef = Arc::new(array1);
let (global, local) = memory_use(|| {
let concatenated = concatenate_and_measure(array1.clone());
eprintln!("expected: {}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
let array2 = ListArray::new(
Field::new_list_field(array1.data_type().clone(), false).into(),
OffsetBuffer::from_lengths(std::iter::once(array1.len())),
array1.clone(),
None,
);
let array2: ArrayRef = Arc::new(array2);
let (global, local) = memory_use(|| {
let concatenated = concatenate_and_measure(array2.clone());
eprintln!("expected: {}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
}
fn concatenate_and_measure(array: ArrayRef) -> ArrayRef {
let mut concatenated = array.clone();
let now = std::time::Instant::now();
for _ in 0..1000 {
concatenated = arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
}
let elapsed = now.elapsed();
// dbg!(&array);
dbg!(concatenated.data_type());
eprintln!(
"1000 accumulated concatenations in {elapsed:?} ({:.3} MiB per sec)",
how_many_bytes(concatenated.clone()) as f64 / 1024.0 / 1024.0 / elapsed.as_secs_f64(),
);
concatenated
}
fn how_many_bytes(array: ArrayRef) -> u64 {
let mut array = array;
loop {
match array.data_type() {
arrow::datatypes::DataType::UInt8 => break,
arrow::datatypes::DataType::List(_) => {
let list = array.as_any().downcast_ref::<ListArray>().unwrap();
array = list.values().clone();
}
_ => unreachable!(),
}
}
array.len() as _
}
Arrow2use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0);
}
pub struct TrackingAllocator {
allocator: std::alloc::System,
}
#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
allocator: std::alloc::System,
};
#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
#[allow(clippy::let_and_return)]
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed));
LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.alloc(layout) }
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed));
LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.dealloc(ptr, layout) };
}
}
fn live_bytes_local() -> usize {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}
fn live_bytes_global() -> usize {
LIVE_BYTES_GLOBAL.load(Relaxed)
}
/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
let used_bytes_start_local = live_bytes_local();
let used_bytes_start_global = live_bytes_global();
let ret = run();
let bytes_used_local = live_bytes_local() - used_bytes_start_local;
let bytes_used_global = live_bytes_global() - used_bytes_start_global;
drop(ret);
(bytes_used_global, bytes_used_local)
}
// ----------------------------------------------------------------------------
use arrow2::{
array::{Array, ListArray, PrimitiveArray},
offset::Offsets,
};
fn main() {
let array0 = PrimitiveArray::from_vec((0..200 * 300).map(|v| (v % 255) as u8).collect());
let (global, local) = memory_use(|| {
let concatenated = concatenate_and_measure(array0.to_boxed());
eprintln!("expected: {}", how_many_bytes(concatenated.to_boxed()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
let array1 = ListArray::new(
ListArray::<i32>::default_datatype(array0.data_type().clone()),
Offsets::<i32>::try_from_lengths(std::iter::once(array0.len()))
.unwrap()
.into(),
array0.boxed(),
None,
);
let (global, local) = memory_use(|| {
let concatenated = concatenate_and_measure(array1.to_boxed());
eprintln!("expected: {}", how_many_bytes(concatenated.to_boxed()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
let array2 = ListArray::new(
ListArray::<i32>::default_datatype(array1.data_type().clone()),
Offsets::<i32>::try_from_lengths(std::iter::once(array1.len()))
.unwrap()
.into(),
array1.boxed(),
None,
);
let (global, local) = memory_use(|| {
let concatenated = concatenate_and_measure(array2.to_boxed());
eprintln!("expected: {}", how_many_bytes(concatenated.to_boxed()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
}
fn concatenate_and_measure(array: Box<dyn Array>) -> Box<dyn Array> {
let mut concatenated = array.clone();
let now = std::time::Instant::now();
for _ in 0..1000 {
concatenated =
arrow2::compute::concatenate::concatenate(&[&*concatenated, &*array]).unwrap();
}
let elapsed = now.elapsed();
// dbg!(&array);
dbg!(concatenated.data_type());
eprintln!(
"1000 accumulated concatenations in {elapsed:?} ({:.3} MiB per sec)",
how_many_bytes(concatenated.clone()) as f64 / 1024.0 / 1024.0 / elapsed.as_secs_f64(),
);
concatenated
}
fn how_many_bytes(array: Box<dyn Array>) -> u64 {
let mut array = array;
loop {
match array.data_type() {
arrow2::datatypes::DataType::UInt8 => break,
arrow2::datatypes::DataType::List(_) => {
let list = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
array = list.values().to_boxed();
}
_ => unreachable!(),
}
}
array.len() as _
}
|
This fixes the extra capacity from the temporary growable vector leaking into the final buffer and therefore hanging around indefinitely. See rerun-io/rerun#7222 (comment)
The memory leak has been fixed in rerun-io/re_arrow2#9, which I've released as part of I still need to send the same patch to |
|
@teh-cmc Thanks for investigating and fixing that memory leak! Any news on the performance drop? rerun became so slow with 0.18 that it is unusable for us in the current state 😢 |
Both the memleak and the performance issue should be fixed on |
With branch I tried some bisection of the rerun visualizations in my application and the bottleneck now seems to be this code:
(there are 250 poses) Without this visualization |
Thanks for the update @Danvil, we'll have a look asap. |
This adventure continues in: |
Describe the bug
I have a workload where I visualize about 10 small (200x200) images and tensors at at about 20 Hz.
With rerun 0.16 the visualization was basically running in realtime without problems.
After upgrading to 0.18 (both library and desktop app) ingestion speed is significantly slower and manages maybe ~0.5 Hz. The slowdown is so extreme that data keeps trickling in for minutes after the source program was already closed.
There only seems to be a problem with ingestion speed as replay when scrolling through the timeline is unaffected.
Desktop (please complete the following information):
Linux
Rerun version
0.18
The text was updated successfully, but these errors were encountered: