Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOM in GroupedHashAggregateStream::group_aggregate_batch() #13831

Open
avantgardnerio opened this issue Dec 18, 2024 · 12 comments
Open

OOM in GroupedHashAggregateStream::group_aggregate_batch() #13831

avantgardnerio opened this issue Dec 18, 2024 · 12 comments
Labels
bug Something isn't working

Comments

@avantgardnerio
Copy link
Contributor

Describe the bug

When attempting to accumulate large text fields with a group by, it was observed that group_aggregate_batch() can OOM despite ostensibly using the MemoryPool.

Query:

select truncated_time, count(*) AS cnt
from (
    select
        truncated_time, k8s_deployment_name, message
    from (
        SELECT
            priorityclass,
            timestamp,
            date_trunc('day', timestamp) AS truncated_time,
            k8s_deployment_name,
            message
        FROM agg_oom
        where priorityclass != 'low'
    )
    group by truncated_time, k8s_deployment_name, message
) group by truncated_time

On 8x ~50MB parquet files where the message column can be up to 8192 byte strings. When profiled, by far it was the largest use of memory:

image

When logging, we can see it fails while interning

converting 3 rows
interning 8192 rows with 1486954 bytes
interned 8192 rows, now I'm 13054176 bytes
resizing to 14103171
resizing to 14103171
reserving 28206342 extra bytes
converting 3 rows
interning 8192 rows with 1350859 bytes
memory allocation of 25690112 bytes failed
Aborted (core dumped)

To Reproduce

  1. set up a test with
    let memory_limit = 125_000_000;
    let MEMORY_FRACTION = 1.0;
    let rt_config = RuntimeConfig::new()
        .with_memory_limit(memory_limit, MEMORY_FRACTION);

2.set ulimit -v 1152000

  1. query some parquet files with long strings

Expected behavior

group_aggregate_batch() doesn't make the assumption:

            // Here we can ignore `insufficient_capacity_err` because we will spill later,
            // but at least one batch should fit in the memory

But instead realizes that adding 1 row to a million doesn't allocate 1,000,001, but rather 2,000,000 when the Vec exponentially resizes.

Additional context

Proposed solution:

Add

            self.reservation.try_resize(self.reservation.size() * 2)?;

Above

            self.group_values
                .intern(group_values, &mut self.current_group_indices)?;
@avantgardnerio avantgardnerio added the bug Something isn't working label Dec 18, 2024
@alamb
Copy link
Contributor

alamb commented Dec 19, 2024

This is a good find.

Given your description it sounds like the storage for the group values is what is taking the memory

group by truncated_time, k8s_deployment_name, message

The aggregate operator does account for the memory stored in the groups here:

https://github.com/apache/datafusion/blob/63ce4865896b906ca34fcbf85fdc55bff3080c30/datafusion/physical-plan/src/aggregates/row_hash.rs#L900-L899

However, I believe the memory accounting is only updated after processing an entire batch of values.

So for example, if you are using a batch of 8000 rows and each row has values 8k, that means at least 256 MB will be allocated (and since your query has 3 columns that may be even higher)

I can think of two possible solutuions:

  1. Use a smaller batch size for such queries so the memory accounting is more fine grained
  2. Leave more "slop" in the configured limits (e.g. set the maximum memory limit to be 500MB less than you actually have, for example)

@thinkharderdev
Copy link
Contributor

This is a good find.

Given your description it sounds like the storage for the group values is what is taking the memory

group by truncated_time, k8s_deployment_name, message

The aggregate operator does account for the memory stored in the groups here:

https://github.com/apache/datafusion/blob/63ce4865896b906ca34fcbf85fdc55bff3080c30/datafusion/physical-plan/src/aggregates/row_hash.rs#L900-L899

However, I believe the memory accounting is only updated after processing an entire batch of values.

So for example, if you are using a batch of 8000 rows and each row has values 8k, that means at least 256 MB will be allocated (and since your query has 3 columns that may be even higher)

I can think of two possible solutuions:

  1. Use a smaller batch size for such queries so the memory accounting is more fine grained
  2. Leave more "slop" in the configured limits (e.g. set the maximum memory limit to be 500MB less than you actually have, for example)

I think part of the issue is that we are accounting for the memory used by the actual values but the underlying Vec will always grow by 2x when it needs to grow. So in scenarios where we have a lot of group values the memory accounting can diverge significantly from what we are actually allocating from the OS

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Dec 19, 2024

Perhaps we just manually call Vec::reserve(new_amount) to make it behave as expected?

n/m it looks like with Vec::grow_amortized() it will always be at. least exponential :(

@Dandandan
Copy link
Contributor

Perhaps we just manually call Vec::reserve(new_amount) to make it behave as expected?

I believe that won't change the allocation behavior of Vec?
I think it's important to keep the exponential growing behavior to maintain performance / runtime complexity.

I think we have to:

  • fix / improve the memory tracking to record bytes used by underlying Vecs capacity.
  • We could try to improve storage by keeping them in large chunks rather than in a single Vec (in order to keep memory usage lower), this will be however be performance sensitive.

@avantgardnerio
Copy link
Contributor Author

FYI it looks like reserve_exact() could work...

@Dandandan
Copy link
Contributor

FYI it looks like reserve_exact() could work...

Yes, that will work for the issue, but AFAIK it will lead to O(n^2) runtime (linear reallocations / copies) as frequent insertions will happen.

@avantgardnerio
Copy link
Contributor Author

This might be another option: https://crates.io/crates/fallible_collections

@thinkharderdev
Copy link
Contributor

This might be another option: https://crates.io/crates/fallible_collections

I think the the std Vec already has fallible try_reserve but I'm not sure that would make a difference. On linux in userspace allocations never fail, your process just gets OOMKilled

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Dec 19, 2024

I have confirmed the fix suggested in the issue works:

self.reservation.try_resize(self.reservation.size() * 2)?;

yields:

     Running `target/debug/dpctl query <redacted> --from-file bad.dql`
Error: Error executing prepared query

Caused by:
    GrpcError(message = grpc error, source = Status { code: Internal, message: "Query failed with error: DatafusionError/ResourcesExhausted: Failed to allocate additional 557901120 bytes for GroupedHashAggregateStream[24] with 557901120 bytes already allocated - maximum available is 551244083", metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Thu, 19 Dec 2024 19:06:37 GMT", "content-length": "0"} }, source: None })

@alamb
Copy link
Contributor

alamb commented Dec 19, 2024

The "right" solution is probably the kind of thing that @Rachelint proposed in this PR:

(not use a single large allocation, but manage the growth in chunks)

However that was a pretty serious amount of work. It might now be tractable to begin to contemplate again

@Rachelint
Copy link
Contributor

The "right" solution is probably the kind of thing that @Rachelint proposed in this PR:

* [Sketch for aggregation intermediate results blocked management #11943](https://github.com/apache/datafusion/pull/11943)

(not use a single large allocation, but manage the growth in chunks)

However that was a pretty serious amount of work. It might now be tractable to begin to contemplate again

I have some new ideas about this epic.
I think the new design will change much fewer codes to implement it and have made some tries on it.

But still have no bandwidth to continue pushing it this month due to the busy work in the employer's side...

@alamb
Copy link
Contributor

alamb commented Dec 25, 2024

I have some new ideas about this epic. I think the new design will change much fewer codes to implement it and have made some tries on it.

But still have no bandwidth to continue pushing it this month due to the busy work in the employer's side...

THank you @Rachelint -- I can't wait to see what you come up with. Good luck with your busy month!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants