Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MutableBuffer::into_buffer leaking its extra capacity into the final buffer #6300

Closed
wants to merge 2 commits into from

Conversation

teh-cmc
Copy link

@teh-cmc teh-cmc commented Aug 24, 2024

The extra capacity that is initially used to efficiently append data to the mutable buffer ends up leaking into the final immutable buffer, where it will linger indefinitely, therefore hogging memory.


Consider this code:

use arrow::{
    array::{Array, ArrayRef, ListArray, PrimitiveArray},
    buffer::OffsetBuffer,
    datatypes::{Field, UInt8Type},
};

fn main() {
    let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
        .map(|v| (v % 255) as u8)
        .collect::<Vec<_>>()
        .into();
    let array0: ArrayRef = Arc::new(array0);

    let (global, local) = memory_use(|| {
        let concatenated = concatenate(array0.clone());
        dbg!(concatenated.data_type());
        eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
        concatenated
    });
    eprintln!("global: {global} bytes");
    eprintln!("local: {local} bytes");

    eprintln!("---");

    let array1 = ListArray::new(
        Field::new_list_field(array0.data_type().clone(), false).into(),
        OffsetBuffer::from_lengths(std::iter::once(array0.len())),
        array0.clone(),
        None,
    );
    let array1: ArrayRef = Arc::new(array1);

    let (global, local) = memory_use(|| {
        let concatenated = concatenate(array1.clone());
        dbg!(concatenated.data_type());
        eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
        concatenated
    });
    eprintln!("global: {global} bytes");
    eprintln!("local: {local} bytes");
}

fn concatenate(array: ArrayRef) -> ArrayRef {
    let mut concatenated = array.clone();

    for _ in 0..1000 {
        concatenated = arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
    }

    concatenated
}

fn how_many_bytes(array: ArrayRef) -> u64 {
    let mut array = array;
    loop {
        match array.data_type() {
            arrow::datatypes::DataType::UInt8 => break,
            arrow::datatypes::DataType::List(_) => {
                let list = array.as_any().downcast_ref::<ListArray>().unwrap();
                array = list.values().clone();
            }
            _ => unreachable!(),
        }
    }

    array.len() as _
}

// --- Memory tracking ---

use std::sync::{
    atomic::{AtomicUsize, Ordering::Relaxed},
    Arc,
};

static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);

thread_local! {
    static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0);
}

pub struct TrackingAllocator {
    allocator: std::alloc::System,
}

#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
    allocator: std::alloc::System,
};

#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
    #[allow(clippy::let_and_return)]
    unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
        LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed));
        LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);

        // SAFETY:
        // Just deferring
        unsafe { self.allocator.alloc(layout) }
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
        LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed));
        LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);

        // SAFETY:
        // Just deferring
        unsafe { self.allocator.dealloc(ptr, layout) };
    }
}

fn live_bytes_local() -> usize {
    LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}

fn live_bytes_global() -> usize {
    LIVE_BYTES_GLOBAL.load(Relaxed)
}

/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
    let used_bytes_start_local = live_bytes_local();
    let used_bytes_start_global = live_bytes_global();
    let ret = run();
    let bytes_used_local = live_bytes_local() - used_bytes_start_local;
    let bytes_used_global = live_bytes_global() - used_bytes_start_global;
    drop(ret);
    (bytes_used_global, bytes_used_local)
}

HEAD:

[src/main.rs:16:9] concatenated.data_type() = UInt8
expected: ~60060000
global: 60060200 bytes
local: 60060200 bytes
---
[src/main.rs:35:9] concatenated.data_type() = List(
    Field {
        name: "item",
        data_type: UInt8,
        nullable: false,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
)
expected: ~60060000
global: 120004384 bytes
local: 120004384 bytes

This patch:

[src/main.rs:16:9] concatenated.data_type() = UInt8
expected: ~60060000
global: 60060200 bytes
local: 60060200 bytes
---
[src/main.rs:35:9] concatenated.data_type() = List(
    Field {
        name: "item",
        data_type: UInt8,
        nullable: false,
        dict_id: 0,
        dict_is_ordered: false,
        metadata: {},
    },
)
expected: ~60060000
global: 60064416 bytes
local: 60064416 bytes

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @teh-cmc -- looks like an epic debugging exercise

I am concerned that this change will decrease performance by forcing an extra copy in all situations, though I may not understand the implications

This PR currently seems to have some CI failures

Some other thoughts:

  1. Have you considered making the initial capacity calculations more accurate (I am not sure this is possible) so the delta between capacity and actual is lower
  2. Adding another API (if it doesn't already exist) to "shrink_to_fit" for Arrays in general -- that would permit users to decide if they preferred larger allocations / fewer copies or smaller allocations / more copies

pub(super) fn into_buffer(self) -> Buffer {
pub(super) fn into_buffer(mut self) -> Buffer {
// Don't leak our extra capacity into the final buffer.
self.shrink_to_fit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this force a copy in the case where the capacity is larger that the actual need?

@github-actions github-actions bot added the arrow-flight Changes to the arrow-flight crate label Aug 31, 2024
@teh-cmc
Copy link
Author

teh-cmc commented Aug 31, 2024

I am concerned that this change will decrease performance by forcing an extra copy in all situations, though I may not understand the implications

I wouldn't expect this to have a noticable impact on performance for two reasons:

  1. MutableBuffer already reallocates memory and copies data around as it grows in any case (that's the root of this issue to begin with, after all).
  2. I don't think the extra copy is a guarantee -- see my answer to the next question.

Does this force a copy in the case where the capacity is larger that the actual need?

As far as I understand by looking at the code, at the end of the day all of this machinery bottoms out in realloc().
Whether this actually reallocates memory and copies data around should therefore be unspecified: it is left as an implementation detail of whichever GlobalAllocator is currently in use.

I'd expect any decent allocator to do whatever is most efficient depending on the exact situation at hand.

Have you considered making the initial capacity calculations more accurate (I am not sure this is possible) so the delta between capacity and actual is lower

That doesn't seem possible given the nature of MutableBuffer and how it is used across the different layers of the codebase.

This PR currently seems to have some CI failures

I've updated the tests to reflect the new expected capacity values, so they should now pass.

Beware that I am changing these values with absolutely zero knowledge of the context in which these tests were written though: it would be fair to say that I have effectively no idea what I'm doing.


That's it for the theory... now for the practice.

I have found two relevant benchmark suites that relate to this change:

  • --bench concatenate_kernel
  • --bench mutable_array

--bench concatenate_kernel

Performance looks roughly on pair with master for this benchmark suite. In fact it even looks like a slight overall improvement, for some reason.

taskset -c 7 cargo bench -p arrow --bench concatenate_kernel --features="test_utils"

concat i32 1024         time:   [453.90 ns 454.31 ns 454.75 ns]
                        change: [-3.7808% -3.5907% -3.4059%] (p = 0.00 < 0.05)
                        Performance has improved.

concat i32 nulls 1024   time:   [652.64 ns 652.88 ns 653.18 ns]
                        change: [+0.0226% +0.2489% +0.4726%] (p = 0.03 < 0.05)
                        Change within noise threshold.

concat 1024 arrays i32 4
                        time:   [109.22 µs 109.38 µs 109.57 µs]
                        change: [-7.2900% -6.8379% -6.4195%] (p = 0.00 < 0.05)
                        Performance has improved.

concat str 1024         time:   [9.3471 µs 9.3609 µs 9.3764 µs]
                        change: [-0.1653% +0.4566% +1.0820%] (p = 0.16 > 0.05)
                        No change in performance detected.

concat str nulls 1024   time:   [5.7129 µs 5.7328 µs 5.7480 µs]
                        change: [-4.0256% -3.3280% -2.6276%] (p = 0.00 < 0.05)
                        Performance has improved.

concat str_dict 1024    time:   [2.4110 µs 2.4143 µs 2.4177 µs]
                        change: [+2.5143% +2.6708% +2.8211%] (p = 0.00 < 0.05)
                        Performance has regressed.

concat str_dict_sparse 1024
                        time:   [4.7248 µs 4.7301 µs 4.7371 µs]
                        change: [+1.7168% +1.8403% +1.9509%] (p = 0.00 < 0.05)
                        Performance has regressed.

concat str nulls 1024 #2
                        time:   [5.7316 µs 5.7364 µs 5.7415 µs]
                        change: [-3.8916% -3.0466% -2.2002%] (p = 0.00 < 0.05)
                        Performance has improved.

concat fixed size lists time:   [136.77 µs 136.97 µs 137.18 µs]
                        change: [-0.5076% -0.1102% +0.2847%] (p = 0.63 > 0.05)
                        No change in performance detected.

--bench mutable_array

This one is a different beast -- it is much slower on this branch:

mutable str 1024        time:   [31.213 ms 31.302 ms 31.420 ms]
                        change: [+73.378% +74.185% +74.973%] (p = 0.00 < 0.05)
                        Performance has regressed.

mutable str nulls 1024  time:   [11.210 ms 11.273 ms 11.376 ms]
                        change: [-13.933% -13.406% -12.652%] (p = 0.00 < 0.05)
                        Performance has improved.

which means my first assumption above was wrong: you can certainly feel the extra copy if the right conditions are met, despite MutableBuffer doing a bunch of its own.

That's with the default system allocator though, which is notoriously not a good idea... Running these benchmarks with mimalloc tells a very different story:

diff --git a/arrow/benches/mutable_array.rs b/arrow/benches/mutable_array.rs
index b04e5cd84..d9b388e68 100644
--- a/arrow/benches/mutable_array.rs
+++ b/arrow/benches/mutable_array.rs
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use mimalloc::MiMalloc;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
 #[macro_use]
 extern crate criterion;
 use criterion::Criterion;

Results:

mutable str 1024        time:   [11.755 ms 11.790 ms 11.825 ms]
                        change: [+0.1332% +0.5256% +0.9202%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

mutable str nulls 1024  time:   [5.8176 ms 5.8273 ms 5.8385 ms]
                        change: [-1.0864% -0.7649% -0.4585%] (p = 0.00 < 0.05)
                        Change within noise threshold.

which means my second assumption was correct: it is left to the allocator implementation to do the right thing.

Conlusion

Impact of this patch:

  • Memory usage divided by 2, irrelevant of the allocator used.
  • Severe performance degradation with my default system allocator.
  • No performance impact with a production grade allocator.

So, should this go in as is? I honestly have no idea, I have almost zero knowledge of this codebase, I'm not equipped to make that call at all.

I sure hope we can find a solution for this ASAP though, whether it's this or something else, as this is the one remaining blocker for us to migrate to arrow-rs (we're on a fork on arrow2 right now).
This issue results in 2x memory usage in some real workloads in our case. 😞

@teh-cmc
Copy link
Author

teh-cmc commented Aug 31, 2024

Uh-oh, I missed one!

Adding another API (if it doesn't already exist) to "shrink_to_fit" for Arrays in general

That could be an alternative solution if this one doesn't cut it, yes.

Beyond the fact that it is likely much more work, it does sound like an antipattern though: given that Arrow arrays are immutable once created, is there any situation in which you would ever want all that extra capacity to hang around?
The only reason I can think of is if getting rid of that extra capacity comes at a performance cost, but as the benchmarks show above, this is not necessarily the case...

@tustvold
Copy link
Contributor

tustvold commented Sep 2, 2024

Beyond the fact that it is likely much more work, it does sound like an antipattern though: given that Arrow arrays are immutable once created, is there any situation in which you would ever want all that extra capacity to hang around?

When the buffer is merely an intermediate that won't live for very long. This is extremely common in query processing, and in fact one of the major motivations for the new binary view types is to avoid copying at the expense of less efficient memory usage.

Memory usage divided by 2, irrelevant of the allocator used.

I believe this is only the case where the bump allocator is being used, which is almost always terrible from a performance standpoint. Provided capacity estimation is done correctly, as most of the kernels go to great pains to do, this shouldn't be a major issue. Perhaps there is a particular codepath you are running into that is not doing this?

Adding another API (if it doesn't already exist) to "shrink_to_fit" for Arrays in general

I think this would be my preference, if only because it is much easier to understand the ramifications of such a change. Such a method should be relatively easy to add to the relevant ArrayBuilders, and therefore not hugely disruptive.

@teh-cmc
Copy link
Author

teh-cmc commented Sep 4, 2024

I think this would be my preference, if only because it is much easier to understand the ramifications of such a change. Such a method should be relatively easy to add to the relevant ArrayBuilders, and therefore not hugely disruptive.

I'm fine going down that route, but we would need for shrink_to_fit to be implemented on Array directly in our case, not ArrayBuilders.

We keep a lot of Arrow data in long-lived memory storage, and therefore must be guaranteed that the data has been optimized for space before it gets committed to storage, regardless of how it got built or how it got there.

Is that still okay?

@tustvold
Copy link
Contributor

tustvold commented Sep 4, 2024

We keep a lot of Arrow data in long-lived memory storage, and therefore must be guaranteed that the data has been optimized for space before it gets committed to storage, regardless of how it got built or how it got there.

I think that would be generally useful, and would also handle the case of sliced arrays. I'd recommend something that returns a new Array, e.g. fn shrink_to_fit(&self) -> ArrayRef. You could also provide a strongly typed non-trait version, much like we do for slice.

Edit: I guess one question would be if such a method should also optimise dictionaries... 🤔 Might be worth filing a ticket for discussion

@teh-cmc
Copy link
Author

teh-cmc commented Sep 5, 2024

👍 Alright, closing this in favor of this issue then:

I won't be able to attend to it for a couple weeks though.

@teh-cmc teh-cmc closed this Sep 5, 2024
@emilk
Copy link
Contributor

emilk commented Dec 3, 2024

This has now been fixed in another way:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants