From 6faa920124a31ec1ae596e651b977a2bc1f741c7 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 23 Dec 2024 08:21:39 -0800 Subject: [PATCH] Use estimateFlatSize in LocalPartition Summary: Using retained size is problematic as it may account for memory shared between multiple vectors. For example in the join operators vectors are wrapped in dictionary and shared between multiple output vectors. It was observed that a Join may produce vectors that retain over 40MB of data while having a flat size of little over 2MB. When the size reported is high the LocalPartition operator has to block frequently reducing query performance. Differential Revision: D67601403 --- velox/exec/LocalPartition.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index dce6dff0ad38..2fe0f234032f 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -307,7 +307,7 @@ void LocalPartition::addInput(RowVectorPtr input) { if (numPartitions_ == 1) { ContinueFuture future; auto blockingReason = - queues_[0]->enqueue(input, input->retainedSize(), &future); + queues_[0]->enqueue(input, input->estimateFlatSize(), &future); if (blockingReason != BlockingReason::kNotBlocked) { blockingReasons_.push_back(blockingReason); futures_.push_back(std::move(future)); @@ -320,7 +320,7 @@ void LocalPartition::addInput(RowVectorPtr input) { if (singlePartition.has_value()) { ContinueFuture future; auto blockingReason = queues_[singlePartition.value()]->enqueue( - input, input->retainedSize(), &future); + input, input->estimateFlatSize(), &future); if (blockingReason != BlockingReason::kNotBlocked) { blockingReasons_.push_back(blockingReason); futures_.push_back(std::move(future)); @@ -342,7 +342,7 @@ void LocalPartition::addInput(RowVectorPtr input) { ++maxIndex[partition]; } - const int64_t totalSize = input->retainedSize(); + const int64_t totalSize = input->estimateFlatSize(); for (auto i = 0; i < numPartitions_; i++) { auto partitionSize = maxIndex[i]; if (partitionSize == 0) {