diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index dce6dff0ad38..2fe0f234032f 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -307,7 +307,7 @@ void LocalPartition::addInput(RowVectorPtr input) { if (numPartitions_ == 1) { ContinueFuture future; auto blockingReason = - queues_[0]->enqueue(input, input->retainedSize(), &future); + queues_[0]->enqueue(input, input->estimateFlatSize(), &future); if (blockingReason != BlockingReason::kNotBlocked) { blockingReasons_.push_back(blockingReason); futures_.push_back(std::move(future)); @@ -320,7 +320,7 @@ void LocalPartition::addInput(RowVectorPtr input) { if (singlePartition.has_value()) { ContinueFuture future; auto blockingReason = queues_[singlePartition.value()]->enqueue( - input, input->retainedSize(), &future); + input, input->estimateFlatSize(), &future); if (blockingReason != BlockingReason::kNotBlocked) { blockingReasons_.push_back(blockingReason); futures_.push_back(std::move(future)); @@ -342,7 +342,7 @@ void LocalPartition::addInput(RowVectorPtr input) { ++maxIndex[partition]; } - const int64_t totalSize = input->retainedSize(); + const int64_t totalSize = input->estimateFlatSize(); for (auto i = 0; i < numPartitions_; i++) { auto partitionSize = maxIndex[i]; if (partitionSize == 0) {