From 329d39618a471a388335cde5987dc10e213a523d Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Tue, 24 Dec 2024 15:06:09 -0800 Subject: [PATCH] refactor: Optimize IndexedPriorityQueue::addOrUpdate 20 times faster (#11955) Summary: Optimize `IndexedPriorityQueue` to use more compact data structures (binary heap) to leverage cache locality and avoid unnecessary memory allocations. `addOrUpdate` is now about 20 times faster for large data (~1 million elements). This allows us to replace the custom heap implementation in `ApproxMostFrequentStreamSummary` with `IndexedPriorityQueue`. Note that `pop` becomes about twice slower in the new version. This is expected because we shift some of the reordering cost from insertion time to pop time. This shall be good because for each `pop`, we have at least one corresponding `addOrUpdate` which guarantees net performance gain, and in most of the case we do not even have `pop` on critical path (e.g. in case of `ApproxMostFrequentStreamSummary`). Differential Revision: D67626564 --- velox/common/base/BitUtil.h | 16 + velox/common/base/IndexedPriorityQueue.h | 295 ++++++++++-------- velox/common/base/SkewedPartitionBalancer.cpp | 31 +- velox/common/base/SkewedPartitionBalancer.h | 6 +- velox/common/base/benchmarks/CMakeLists.txt | 8 + .../IndexedPriorityQueueBenchmark.cpp | 106 +++++++ .../base/tests/IndexedPriorityQueueTest.cpp | 51 +-- .../lib/ApproxMostFrequentStreamSummary.h | 173 +++------- 8 files changed, 368 insertions(+), 318 deletions(-) create mode 100644 velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp diff --git a/velox/common/base/BitUtil.h b/velox/common/base/BitUtil.h index 75b118ce094f..3ef7856ed554 100644 --- a/velox/common/base/BitUtil.h +++ b/velox/common/base/BitUtil.h @@ -18,6 +18,8 @@ #include "velox/common/base/Exceptions.h" +#include + #include #include #include @@ -28,6 +30,20 @@ #include #endif +// Remove once we upgrade folly. +#ifndef FOLLY_BUILTIN_MEMCPY +#if FOLLY_HAS_BUILTIN(__builtin_memcpy_inline) +#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \ + void(__builtin_memcpy_inline((dest), (src), (size))) +#elif FOLLY_HAS_BUILTIN(__builtin_memcpy) +#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \ + void(__builtin_memcpy((dest), (src), (size))) +#else +#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \ + void(::std::memcpy((dest), (src), (size))) +#endif +#endif + namespace facebook { namespace velox { namespace bits { diff --git a/velox/common/base/IndexedPriorityQueue.h b/velox/common/base/IndexedPriorityQueue.h index 302cbf7aae8f..348915045c30 100644 --- a/velox/common/base/IndexedPriorityQueue.h +++ b/velox/common/base/IndexedPriorityQueue.h @@ -16,178 +16,223 @@ #pragma once -#include -#include -#include - #include "velox/common/base/Exceptions.h" +#include + namespace facebook::velox { + /// A priority queue with values of type 'T'. Each value has assigned priority -/// on insertion which determines the value's position in the quue. It supports +/// on insertion which determines the value's position in the queue. It supports /// to update the priority of the existing values, which adjusts the value's -/// position in the queue accordingly. Ties are broken by insertion order. If -/// 'MaxQueue' is true, it is a max priority queue, otherwise a min priority -/// queue. +/// position in the queue accordingly. Ties are broken by insertion and update +/// order. If 'kMaxQueue' is true, it is a max priority queue, otherwise a min +/// priority queue. /// /// NOTE: this class is not thread-safe. -template +template < + typename T, + bool kMaxQueue, + typename Allocator = std::allocator, + typename Hash = std::hash, + typename EqualTo = std::equal_to> class IndexedPriorityQueue { public: - IndexedPriorityQueue() = default; + explicit IndexedPriorityQueue(const Allocator& allocator = {}) + : values_(allocator), + priorities_(RebindAlloc(allocator)), + generations_(RebindAlloc(allocator)), + heap_(RebindAlloc(allocator)), + valueIndices_(RebindAlloc>(allocator)), + heapIndices_(RebindAlloc(allocator)) {} ~IndexedPriorityQueue() { - VELOX_CHECK_EQ(queue_.size(), index_.size()); + validate(); } - size_t size() const { - return queue_.size(); + int32_t size() const { + return values_.size(); } bool empty() const { - return queue_.empty(); + return values_.empty(); } /// Inserts 'value' into the queue with specified 'priority'. If 'value' /// already exists, then update its priority and the corresponding position in /// the queue. - bool addOrUpdate(T value, uint64_t priority) { - auto it = index_.find(value); - if (it != index_.end()) { - if (it->second->priority() == priority) { - return false; - } - queue_.erase(it->second.get()); - - it->second->updatePriority(priority); - queue_.insert(it->second.get()); + bool addOrUpdate(const T& value, int64_t priority) { + auto it = valueIndices_.find(value); + if (it == valueIndices_.end()) { + addNewValue(value, priority); + return true; + } + auto i = it->second; + if (priorities_[i] == priority) { return false; } - - auto newEntry = std::make_unique(value, priority, generation_++); - queue_.insert(newEntry.get()); - index_.emplace(value, std::move(newEntry)); + updatePriority(i, priority); return true; } - std::optional pop() { - VELOX_CHECK_EQ(queue_.size(), index_.size()); - if (queue_.empty()) { - return std::nullopt; - } - - auto it = queue_.begin(); - Entry* removedEntry = *it; - const auto value = removedEntry->value(); - queue_.erase(it); - VELOX_CHECK(index_.contains(removedEntry->value())); - index_.erase(removedEntry->value()); - return value; - } - - /// Describes the state of an inserted 'value' in the queue. - class Entry { - public: - Entry(T value, uint64_t priority, uint64_t generation) - : value_(std::move(value)), - priority_(priority), - generation_(generation) {} - - const T& value() const { - return value_; - } + const T& top() const { + VELOX_DCHECK(!heap_.empty()); + return values_[heap_[0]]; + } - uint64_t priority() const { - return priority_; - } + int64_t topPriority() const { + VELOX_DCHECK(!heap_.empty()); + return priorities_[heap_[0]]; + } - void updatePriority(uint64_t priority) { - priority_ = priority; + T pop() { + VELOX_DCHECK(!heap_.empty()); + auto i = heap_[0]; + heap_[0] = heap_.back(); + heapIndices_[heap_.back()] = 0; + heap_.pop_back(); + if (!heap_.empty()) { + percolateDown(0); } - - uint64_t generation() const { - return generation_; + auto result = values_[i]; + valueIndices_.erase(values_[i]); + if (i != size() - 1) { + valueIndices_[values_.back()] = i; + values_[i] = values_.back(); + priorities_[i] = priorities_.back(); + generations_[i] = generations_.back(); + heap_[heapIndices_.back()] = i; + heapIndices_[i] = heapIndices_.back(); } + values_.pop_back(); + priorities_.pop_back(); + generations_.pop_back(); + heapIndices_.pop_back(); + return result; + } - private: - const T value_; - uint64_t priority_; - const uint64_t generation_; - }; - - /// Used to iterate through the queue in priority order. - class Iterator { - public: - Iterator( - typename std::set::const_iterator cur, - typename std::set::const_iterator end) - : end_{end}, cur_{cur}, val_{0} { - if (cur_ != end_) { - val_ = (*cur_)->value(); - } - } + const T* values() const { + return values_.data(); + } - bool operator==(const Iterator& other) const { - return std::tie(cur_, end_) == std::tie(other.cur_, other.end_); - } + const int64_t* priorities() const { + return priorities_.data(); + } - bool operator!=(const Iterator& other) const { - return !operator==(other); + std::optional getValueIndex(const T& value) const { + auto it = valueIndices_.find(value); + if (it != valueIndices_.end()) { + return it->second; } + return std::nullopt; + } - Iterator& operator++() { - VELOX_DCHECK(cur_ != end_); - ++cur_; - if (cur_ != end_) { - val_ = (*cur_)->value(); - } - return *this; + void updatePriority(int index, int64_t priority) { + bool up = priority < priorities_[index]; + if constexpr (kMaxQueue) { + up = !up; } - - const T& operator*() const { - VELOX_DCHECK(cur_ != end_); - return val_; + priorities_[index] = priority; + generations_[index] = ++generation_; + if (up) { + percolateUp(heapIndices_[index]); + } else { + percolateDown(heapIndices_[index]); } + } - private: - const typename std::set::const_iterator end_; - typename std::set::const_iterator cur_; - T val_; - }; + void addNewValue(const T& value, int64_t priority) { + VELOX_DCHECK_LT(size(), std::numeric_limits::max()); + auto i = size(); + values_.push_back(value); + priorities_.push_back(priority); + generations_.push_back(++generation_); + VELOX_CHECK(valueIndices_.emplace(value, i).second); + heapIndices_.push_back(i); + heap_.push_back(i); + percolateUp(i); + } - Iterator begin() const { - return Iterator{queue_.cbegin(), queue_.cend()}; + void replaceTop(const T& value, int64_t priority) { + VELOX_DCHECK(!heap_.empty()); + int i = heap_[0]; + valueIndices_.erase(values_[i]); + values_[i] = value; + priorities_[i] = priority; + generations_[i] = ++generation_; + VELOX_CHECK(valueIndices_.emplace(value, i).second); + percolateDown(0); } - Iterator end() const { - return Iterator{queue_.cend(), queue_.cend()}; + /// Return negative number if value at i should be ordered before j; positive + /// number if j should be ordered before i. Otherwise return 0. + int64_t compare(int i, int j) const { + int64_t result = priorities_[i] - priorities_[j]; + if constexpr (kMaxQueue) { + result = -result; + } + return result != 0 ? result : generations_[i] - generations_[j]; } private: - struct EntrySetComparator { - bool operator()(Entry* lhs, Entry* rhs) const { - if (MaxQueue) { - if (lhs->priority() > rhs->priority()) { - return true; - } - if (lhs->priority() < rhs->priority()) { - return false; - } - } else { - if (lhs->priority() > rhs->priority()) { - return false; - } - if (lhs->priority() < rhs->priority()) { - return true; - } + template + using RebindAlloc = + typename std::allocator_traits::template rebind_alloc; + + void validate() const { + VELOX_DCHECK_EQ(values_.size(), priorities_.size()); + VELOX_DCHECK_EQ(values_.size(), generations_.size()); + VELOX_DCHECK_EQ(values_.size(), heap_.size()); + VELOX_DCHECK_EQ(values_.size(), valueIndices_.size()); + VELOX_DCHECK_EQ(values_.size(), heapIndices_.size()); + } + + void percolateUp(int pos) { + while (pos > 0) { + int parent = (pos - 1) / 2; + if (compare(heap_[pos], heap_[parent]) >= 0) { + break; } - return lhs->generation() < rhs->generation(); + std::swap(heap_[pos], heap_[parent]); + heapIndices_[heap_[pos]] = pos; + pos = parent; } - }; + heapIndices_[heap_[pos]] = pos; + } + + void percolateDown(int pos) { + for (;;) { + int left = 2 * pos + 1; + if (left >= heap_.size()) { + break; + } + int right = left + 1; + int next = right < heap_.size() && compare(heap_[right], heap_[left]) < 0 + ? right + : left; + if (compare(heap_[pos], heap_[next]) <= 0) { + break; + } + std::swap(heap_[pos], heap_[next]); + heapIndices_[heap_[pos]] = pos; + pos = next; + } + heapIndices_[heap_[pos]] = pos; + } - uint64_t generation_{0}; - folly::F14FastMap> index_; - std::set queue_; + int64_t generation_{0}; + std::vector values_; + std::vector> priorities_; + std::vector> generations_; + std::vector> heap_; + folly::F14FastMap< + T, + int32_t, + Hash, + EqualTo, + RebindAlloc>> + valueIndices_; + std::vector> heapIndices_; }; } // namespace facebook::velox diff --git a/velox/common/base/SkewedPartitionBalancer.cpp b/velox/common/base/SkewedPartitionBalancer.cpp index 1d2880f3c35b..8825971cb7d6 100644 --- a/velox/common/base/SkewedPartitionBalancer.cpp +++ b/velox/common/base/SkewedPartitionBalancer.cpp @@ -149,12 +149,8 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness( std::vector>& taskMaxPartitions) { std::unordered_set scaledPartitions; - while (true) { - const auto maxTaskIdOpt = maxTasks.pop(); - if (!maxTaskIdOpt.has_value()) { - break; - } - const uint32_t maxTaskId = maxTaskIdOpt.value(); + while (!maxTasks.empty()) { + const auto maxTaskId = maxTasks.pop(); auto& maxPartitions = taskMaxPartitions[maxTaskId]; if (maxPartitions.empty()) { @@ -167,12 +163,8 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness( break; } - while (true) { - const auto maxPartitionOpt = maxPartitions.pop(); - if (!maxPartitionOpt.has_value()) { - break; - } - const uint32_t maxPartition = maxPartitionOpt.value(); + while (!maxPartitions.empty()) { + const auto maxPartition = maxPartitions.pop(); // Rebalance partition only once in a single cycle to avoid aggressive // rebalancing. @@ -267,12 +259,14 @@ void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() { std::vector SkewedPartitionRebalancer::findSkewedMinTasks( uint32_t maxTaskId, - const IndexedPriorityQueue& minTasks) const { - std::vector minSkewdTaskIds; - for (uint32_t minTaskId : minTasks) { + IndexedPriorityQueue& minTasks) const { + std::vector minSkewedTaskIds; + while (!minTasks.empty()) { + auto minTaskId = minTasks.top(); if (minTaskId == maxTaskId) { break; } + minTasks.pop(); const double skewness = ((double)(estimatedTaskBytesSinceLastRebalance_[maxTaskId] - estimatedTaskBytesSinceLastRebalance_[minTaskId])) / @@ -280,9 +274,12 @@ std::vector SkewedPartitionRebalancer::findSkewedMinTasks( if (skewness <= kTaskSkewnessThreshod_ || std::isnan(skewness)) { break; } - minSkewdTaskIds.push_back(minTaskId); + minSkewedTaskIds.push_back(minTaskId); + } + for (auto taskId : minSkewedTaskIds) { + minTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]); } - return minSkewdTaskIds; + return minSkewedTaskIds; } std::string SkewedPartitionRebalancer::Stats::toString() const { diff --git a/velox/common/base/SkewedPartitionBalancer.h b/velox/common/base/SkewedPartitionBalancer.h index c81709a31777..3c998920c1ee 100644 --- a/velox/common/base/SkewedPartitionBalancer.h +++ b/velox/common/base/SkewedPartitionBalancer.h @@ -113,9 +113,9 @@ class SkewedPartitionRebalancer { uint64_t calculateTaskDataSizeSinceLastRebalance( const IndexedPriorityQueue& maxPartitions) { uint64_t estimatedDataBytesSinceLastRebalance{0}; - for (uint32_t partition : maxPartitions) { + for (int i = 0; i < maxPartitions.size(); ++i) { estimatedDataBytesSinceLastRebalance += - partitionBytesSinceLastRebalancePerTask_[partition]; + partitionBytesSinceLastRebalancePerTask_[maxPartitions.values()[i]]; } return estimatedDataBytesSinceLastRebalance; } @@ -132,7 +132,7 @@ class SkewedPartitionRebalancer { // 'maxTaskId'. std::vector findSkewedMinTasks( uint32_t maxTaskId, - const IndexedPriorityQueue& minTasks) const; + IndexedPriorityQueue& minTasks) const; // Tries to assign 'targetTaskId' to 'rebalancePartition' for rebalancing. // Returns true if rebalanced, otherwise false. diff --git a/velox/common/base/benchmarks/CMakeLists.txt b/velox/common/base/benchmarks/CMakeLists.txt index 72fe153b52c8..b175e021e69f 100644 --- a/velox/common/base/benchmarks/CMakeLists.txt +++ b/velox/common/base/benchmarks/CMakeLists.txt @@ -24,3 +24,11 @@ target_link_libraries( velox_common_stringsearch_benchmarks PUBLIC Folly::follybenchmark PRIVATE velox_common_base Folly::folly) + +add_executable(velox_common_indexed_priority_queue_benchmark + IndexedPriorityQueueBenchmark.cpp) + +target_link_libraries( + velox_common_indexed_priority_queue_benchmark + PUBLIC Folly::follybenchmark + PRIVATE velox_common_base Folly::folly) diff --git a/velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp b/velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp new file mode 100644 index 000000000000..a7566ce9df18 --- /dev/null +++ b/velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/IndexedPriorityQueue.h" + +#include +#include +#include + +#define VELOX_BENCHMARK(_type, _name, ...) \ + [[maybe_unused]] _type _name(FOLLY_PP_STRINGIZE(_name), __VA_ARGS__) + +namespace facebook::velox { +namespace { + +template +class IndexedPriorityQueueBenchmark { + public: + IndexedPriorityQueueBenchmark(const char* name, int numValues) + : values_(numValues), priorities_(2 * numValues) { + generateValues(); + for (int i = 0; i < 2 * numValues; ++i) { + priorities_[i] = folly::Random::rand32(); + } + folly::addBenchmark(__FILE__, fmt::format("{}_add", name), [this] { + IndexedPriorityQueue queue; + return add(queue); + }); + folly::addBenchmark(__FILE__, fmt::format("{}_update", name), [this] { + IndexedPriorityQueue queue; + BENCHMARK_SUSPEND { + add(queue); + } + return update(queue); + }); + folly::addBenchmark(__FILE__, fmt::format("{}_pop", name), [this] { + IndexedPriorityQueue queue; + BENCHMARK_SUSPEND { + add(queue); + update(queue); + } + return pop(queue); + }); + } + + private: + void generateValues(); + + unsigned add(IndexedPriorityQueue& queue) const { + for (int i = 0; i < values_.size(); ++i) { + queue.addOrUpdate(values_[i], priorities_[i]); + } + return values_.size(); + } + + unsigned update(IndexedPriorityQueue& queue) const { + for (int i = 0; i < values_.size(); ++i) { + queue.addOrUpdate(values_[i], priorities_[i + values_.size()]); + } + return values_.size(); + } + + unsigned pop(IndexedPriorityQueue& queue) const { + while (!queue.empty()) { + queue.pop(); + } + return values_.size(); + } + + std::vector values_; + std::vector priorities_; +}; + +template <> +void IndexedPriorityQueueBenchmark::generateValues() { + std::iota(values_.begin(), values_.end(), 0); +} + +} // namespace +} // namespace facebook::velox + +int main(int argc, char* argv[]) { + using namespace facebook::velox; + folly::Init follyInit(&argc, &argv); + VELOX_BENCHMARK(IndexedPriorityQueueBenchmark, int64_1000, 1000); + VELOX_BENCHMARK(IndexedPriorityQueueBenchmark, int64_10000, 10'000); + VELOX_BENCHMARK( + IndexedPriorityQueueBenchmark, int64_100000, 100'000); + VELOX_BENCHMARK( + IndexedPriorityQueueBenchmark, int64_1000000, 1'000'000); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/common/base/tests/IndexedPriorityQueueTest.cpp b/velox/common/base/tests/IndexedPriorityQueueTest.cpp index 0836b699b3d7..66f1403046f5 100644 --- a/velox/common/base/tests/IndexedPriorityQueueTest.cpp +++ b/velox/common/base/tests/IndexedPriorityQueueTest.cpp @@ -30,22 +30,8 @@ class IndexedPriorityQueueTest : public testing::Test { void verify( const IndexedPriorityQueue& queue, const std::vector& expectedValues) { - int i{0}; - for (auto value : queue) { - ASSERT_EQ(value, expectedValues[i++]); - } - } - - template - void verifyWithIterate( - const IndexedPriorityQueue& queue, - const std::vector& expectedValues) { - int i{0}; - auto it = queue.begin(); - while (it != queue.end()) { - ASSERT_EQ(*it, expectedValues[i++]); - ++it; - } + auto clone = queue; + verifyAndRemove(expectedValues, clone); } template @@ -55,7 +41,7 @@ class IndexedPriorityQueueTest : public testing::Test { ASSERT_EQ(expectedValues.size(), queue.size()); int i{0}; while (!queue.empty()) { - ASSERT_EQ(queue.pop().value(), expectedValues[i++]); + ASSERT_EQ(queue.pop(), expectedValues[i++]); } ASSERT_TRUE(queue.empty()); } @@ -74,9 +60,11 @@ class IndexedPriorityQueueTest : public testing::Test { queue.addOrUpdate(value, priority); } ASSERT_EQ(queue.size(), valuePriorities.size()); + auto size = queue.size(); std::unordered_set queuedValues; uint64_t prev = MaxQueue ? std::numeric_limits::max() : 0; - for (auto value : queue) { + while (!queue.empty()) { + auto value = queue.pop(); queuedValues.insert(value); ASSERT_TRUE(valuePriorities.find(value) != valuePriorities.end()); if (MaxQueue) { @@ -87,8 +75,8 @@ class IndexedPriorityQueueTest : public testing::Test { prev = valuePriorities[value]; } } - ASSERT_EQ(queuedValues.size(), queue.size()); - ASSERT_EQ(queue.size(), valuePriorities.size()); + ASSERT_EQ(queuedValues.size(), size); + ASSERT_EQ(size, valuePriorities.size()); } }; @@ -104,8 +92,6 @@ TEST_F(IndexedPriorityQueueTest, insertOnly) { ASSERT_EQ(minQueue.size(), 0); ASSERT_TRUE(maxQueue.empty()); ASSERT_TRUE(minQueue.empty()); - ASSERT_FALSE(maxQueue.pop().has_value()); - ASSERT_FALSE(minQueue.pop().has_value()); for (int value = 0; value < numValues; ++value) { maxQueue.addOrUpdate(value, priorities[value]); @@ -113,15 +99,6 @@ TEST_F(IndexedPriorityQueueTest, insertOnly) { } ASSERT_EQ(maxQueue.size(), numValues); ASSERT_EQ(minQueue.size(), numValues); - verify(maxQueue, expectedMaxValues); - verify(minQueue, expectedMinValues); - - ASSERT_EQ(maxQueue.size(), numValues); - ASSERT_EQ(minQueue.size(), numValues); - - verifyWithIterate(maxQueue, expectedMaxValues); - verifyWithIterate(minQueue, expectedMinValues); - verifyAndRemove(expectedMaxValues, maxQueue); verifyAndRemove(expectedMinValues, minQueue); } @@ -209,12 +186,12 @@ TEST_F(IndexedPriorityQueueTest, remove) { maxQueue.addOrUpdate(value2, /*priority=*/2); maxQueue.addOrUpdate(value3, /*priority=*/3); verify(maxQueue, {33, 32, 31}); - ASSERT_EQ(maxQueue.pop().value(), 33); + ASSERT_EQ(maxQueue.pop(), 33); verify(maxQueue, {32, 31}); maxQueue.addOrUpdate(value2, 0); - ASSERT_EQ(maxQueue.pop().value(), 31); + ASSERT_EQ(maxQueue.pop(), 31); verify(maxQueue, {32}); - ASSERT_EQ(maxQueue.pop().value(), 32); + ASSERT_EQ(maxQueue.pop(), 32); ASSERT_TRUE(maxQueue.empty()); IndexedPriorityQueue minQueue; @@ -222,12 +199,12 @@ TEST_F(IndexedPriorityQueueTest, remove) { minQueue.addOrUpdate(value2, /*priority=*/2); minQueue.addOrUpdate(value3, /*priority=*/3); verify(minQueue, {31, 32, 33}); - ASSERT_EQ(minQueue.pop().value(), 31); + ASSERT_EQ(minQueue.pop(), 31); verify(minQueue, {32, 33}); minQueue.addOrUpdate(value2, 20); - ASSERT_EQ(minQueue.pop().value(), 33); + ASSERT_EQ(minQueue.pop(), 33); verify(minQueue, {32}); - ASSERT_EQ(minQueue.pop().value(), 32); + ASSERT_EQ(minQueue.pop(), 32); ASSERT_TRUE(minQueue.empty()); } diff --git a/velox/functions/lib/ApproxMostFrequentStreamSummary.h b/velox/functions/lib/ApproxMostFrequentStreamSummary.h index dd15c7f77f4e..b9aff626bf03 100644 --- a/velox/functions/lib/ApproxMostFrequentStreamSummary.h +++ b/velox/functions/lib/ApproxMostFrequentStreamSummary.h @@ -16,13 +16,13 @@ #pragma once -#include - -#include - +#include "velox/common/base/BitUtil.h" #include "velox/common/base/Exceptions.h" +#include "velox/common/base/IndexedPriorityQueue.h" #include "velox/type/StringView.h" +#include + namespace facebook::velox::functions { /// Data structure to approximately compute the top frequent values from a large @@ -70,58 +70,27 @@ struct ApproxMostFrequentStreamSummary { /// Return the pointer to values data. The number of values equals to size(). const T* values() const { - return values_.data(); + return queue_.values(); } /// Return the pointer to counts data. The number of counts equals to size(). const int64_t* counts() const { - return counts_.data(); + return queue_.priorities(); } bool contains(T value) const { - return indices_.count(value) > 0; + return queue_.getValueIndex(value).has_value(); } private: - template - using RebindAlloc = - typename std::allocator_traits::template rebind_alloc; - - int heapCompare(int i, int j) const; - void percolateUp(int position); - void percolateDown(int position); - - int capacity() const { - return capacity_; - } - int capacity_ = 0; - int64_t currentGeneration_ = 0; - std::vector values_; - std::vector> counts_; - std::vector> generations_; - std::vector> heap_; - - folly::F14FastMap< - T, - int32_t, - std::hash, - std::equal_to, - RebindAlloc>> - indices_; - - std::vector> heapIndices_; + IndexedPriorityQueue queue_; }; template ApproxMostFrequentStreamSummary::ApproxMostFrequentStreamSummary( const A& allocator) - : values_(allocator), - counts_(RebindAlloc(allocator)), - generations_(RebindAlloc(allocator)), - heap_(RebindAlloc(allocator)), - indices_(RebindAlloc>(allocator)), - heapIndices_(RebindAlloc(allocator)) {} + : queue_(allocator) {} template void ApproxMostFrequentStreamSummary::setCapacity(int capacity) { @@ -135,92 +104,21 @@ void ApproxMostFrequentStreamSummary::setCapacity(int capacity) { template int ApproxMostFrequentStreamSummary::size() const { - VELOX_DCHECK_EQ(values_.size(), counts_.size()); - VELOX_DCHECK_EQ(values_.size(), generations_.size()); - VELOX_DCHECK_EQ(values_.size(), heap_.size()); - VELOX_DCHECK_EQ(values_.size(), heapIndices_.size()); - return values_.size(); -} - -template -int ApproxMostFrequentStreamSummary::heapCompare(int i, int j) const { - if (int ans = counts_[i] - counts_[j]; ans != 0) { - return ans; - } - // When the counts are same, we want to consider the previously generated - // value as minimum to prefer it over newly generated value with same count - // when we need to remove min. - return generations_[i] - generations_[j]; + return queue_.size(); } template void ApproxMostFrequentStreamSummary::insert(T value, int64_t count) { - if (auto it = indices_.find(value); it != indices_.end()) { - // The value to be counted is currently being tracked, we just need to - // increase the counter. - int i = it->second; - counts_[i] += count; - generations_[i] = ++currentGeneration_; - percolateDown(heapIndices_[i]); - return; - } - if (size() < capacity()) { - // There is still room available, just insert the value. - int i = size(); - values_.push_back(value); - counts_.push_back(count); - generations_.push_back(++currentGeneration_); - indices_.emplace(value, i); - heapIndices_.push_back(i); - heap_.push_back(i); - percolateUp(i); - return; - } - // Replace the element with least hits. - VELOX_DCHECK(!heap_.empty()); - int i = heap_[0]; - indices_.erase(values_[i]); - values_[i] = value; - counts_[i] += count; - generations_[i] = ++currentGeneration_; - indices_.emplace(value, i); - percolateDown(0); -} - -template -void ApproxMostFrequentStreamSummary::percolateUp(int pos) { - while (pos > 0) { - int parent = (pos - 1) / 2; - if (heapCompare(heap_[pos], heap_[parent]) >= 0) { - break; - } - std::swap(heap_[pos], heap_[parent]); - heapIndices_[heap_[pos]] = pos; - pos = parent; - } - heapIndices_[heap_[pos]] = pos; -} - -template -void ApproxMostFrequentStreamSummary::percolateDown(int pos) { - for (;;) { - int left = 2 * pos + 1; - if (left >= size()) { - break; - } - int child = left; - if (int right = left + 1; - right < size() && heapCompare(heap_[right], heap_[left]) < 0) { - child = right; - } - if (heapCompare(heap_[pos], heap_[child]) <= 0) { - break; - } - std::swap(heap_[pos], heap_[child]); - heapIndices_[heap_[pos]] = pos; - pos = child; + auto index = queue_.getValueIndex(value); + if (index.has_value()) { + auto oldCount = queue_.priorities()[*index]; + queue_.updatePriority(*index, oldCount + count); + } else if (size() < capacity_) { + queue_.addNewValue(value, count); + } else { + auto oldCount = queue_.topPriority(); + queue_.replaceTop(value, oldCount + count); } - heapIndices_[heap_[pos]] = pos; } template @@ -237,12 +135,12 @@ void ApproxMostFrequentStreamSummary::topK( // elements. auto posEnd = reinterpret_cast(counts + k); auto posBeg = posEnd - k; - auto gt = [&](auto i, auto j) { return heapCompare(i, j) > 0; }; + auto gt = [&](auto i, auto j) { return queue_.compare(i, j) > 0; }; for (int i = 0; i < size(); ++i) { if (i < k) { posBeg[i] = i; std::push_heap(posBeg, posBeg + i + 1, gt); - } else if (heapCompare(i, *posBeg) > 0) { + } else if (queue_.compare(i, *posBeg) > 0) { std::pop_heap(posBeg, posEnd, gt); posBeg[k - 1] = i; std::push_heap(posBeg, posEnd, gt); @@ -251,8 +149,8 @@ void ApproxMostFrequentStreamSummary::topK( std::sort(posBeg, posEnd, gt); for (auto it = posBeg; it != posEnd; ++it) { auto i = *it; - *values++ = values_[i]; - *counts++ = counts_[i]; + *values++ = queue_.values()[i]; + *counts++ = queue_.priorities()[i]; } } @@ -275,7 +173,8 @@ template size_t ApproxMostFrequentStreamSummary::serializedByteSize() const { size_t ans = sizeof(int32_t) + sizeof(T) * size() + sizeof(int64_t) * size(); if constexpr (std::is_same_v) { - for (auto& v : values_) { + for (int i = 0; i < size(); ++i) { + auto& v = queue_.values()[i]; if (!v.isInline()) { ans += v.size(); } @@ -291,17 +190,18 @@ size_t ApproxMostFrequentStreamSummary::serializedByteSize() const { // 4. If the value type is StringView, the actual non-inlined string data template void ApproxMostFrequentStreamSummary::serialize(char* out) const { - auto cur = out; - *reinterpret_cast(cur) = size(); + auto* cur = out; + folly::storeUnaligned(cur, size()); cur += sizeof(int32_t); auto byteSize = sizeof(T) * size(); - memcpy(cur, values_.data(), byteSize); + memcpy(cur, queue_.values(), byteSize); cur += byteSize; byteSize = sizeof(int64_t) * size(); - memcpy(cur, counts_.data(), byteSize); + memcpy(cur, queue_.priorities(), byteSize); cur += byteSize; if constexpr (std::is_same_v) { - for (auto& v : values_) { + for (int i = 0; i < size(); ++i) { + auto& v = queue_.values()[i]; if (!v.isInline()) { memcpy(cur, v.data(), v.size()); cur += v.size(); @@ -313,23 +213,24 @@ void ApproxMostFrequentStreamSummary::serialize(char* out) const { template void ApproxMostFrequentStreamSummary::mergeSerialized(const char* other) { - auto size = *reinterpret_cast(other); + auto size = folly::loadUnaligned(other); other += sizeof size; - auto values = reinterpret_cast(other); + auto* values = other; other += sizeof(T) * size; - auto counts = reinterpret_cast(other); + auto* counts = other; if constexpr (std::is_same_v) { other += sizeof(int64_t) * size; } + T v; for (int i = 0; i < size; ++i) { - auto v = values[i]; + FOLLY_BUILTIN_MEMCPY(&v, values + i * sizeof(T), sizeof(T)); if constexpr (std::is_same_v) { if (!v.isInline()) { v = {other, static_cast(v.size())}; other += v.size(); } } - insert(v, counts[i]); + insert(v, folly::loadUnaligned(counts + i * sizeof(int64_t))); } }