Skip to content

Commit

Permalink
refactor: Optimize IndexedPriorityQueue::addOrUpdate 20 times faster
Browse files Browse the repository at this point in the history
Summary:
Optimize `IndexedPriorityQueue` to use more compact data structures (binary heap) to leverage cache locality and avoid unnecessary memory allocations.  `addOrUpdate` is now about 20 times faster for large data (~1 million elements).  This allows us to replace the custom heap implementation in `ApproxMostFrequentStreamSummary` with `IndexedPriorityQueue`.

Note that `pop` becomes about twice slower in the new version.  This is expected because we shift some of the reordering cost from insertion time to pop time. This shall be good because for each `pop`, we have at least one corresponding `addOrUpdate` which guarantees net performance gain, and in most of the case we do
not even have `pop` on critical path (e.g. in case of `ApproxMostFrequentStreamSummary`).

Differential Revision: D67626564
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 24, 2024
1 parent 8ebd3a8 commit a37c7ce
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 318 deletions.
295 changes: 170 additions & 125 deletions velox/common/base/IndexedPriorityQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,178 +16,223 @@

#pragma once

#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <set>

#include "velox/common/base/Exceptions.h"

#include <folly/container/F14Map.h>

namespace facebook::velox {

/// A priority queue with values of type 'T'. Each value has assigned priority
/// on insertion which determines the value's position in the quue. It supports
/// on insertion which determines the value's position in the queue. It supports
/// to update the priority of the existing values, which adjusts the value's
/// position in the queue accordingly. Ties are broken by insertion order. If
/// 'MaxQueue' is true, it is a max priority queue, otherwise a min priority
/// queue.
/// position in the queue accordingly. Ties are broken by insertion and update
/// order. If 'kMaxQueue' is true, it is a max priority queue, otherwise a min
/// priority queue.
///
/// NOTE: this class is not thread-safe.
template <typename T, bool MaxQueue>
template <
typename T,
bool kMaxQueue,
typename Allocator = std::allocator<T>,
typename Hash = std::hash<T>,
typename EqualTo = std::equal_to<T>>
class IndexedPriorityQueue {
public:
IndexedPriorityQueue() = default;
explicit IndexedPriorityQueue(const Allocator& allocator = {})
: values_(allocator),
priorities_(RebindAlloc<int64_t>(allocator)),
generations_(RebindAlloc<int64_t>(allocator)),
heap_(RebindAlloc<int32_t>(allocator)),
valueIndices_(RebindAlloc<std::pair<const T, int32_t>>(allocator)),
heapIndices_(RebindAlloc<int32_t>(allocator)) {}

~IndexedPriorityQueue() {
VELOX_CHECK_EQ(queue_.size(), index_.size());
validate();
}

size_t size() const {
return queue_.size();
int32_t size() const {
return values_.size();
}

bool empty() const {
return queue_.empty();
return values_.empty();
}

/// Inserts 'value' into the queue with specified 'priority'. If 'value'
/// already exists, then update its priority and the corresponding position in
/// the queue.
bool addOrUpdate(T value, uint64_t priority) {
auto it = index_.find(value);
if (it != index_.end()) {
if (it->second->priority() == priority) {
return false;
}
queue_.erase(it->second.get());

it->second->updatePriority(priority);
queue_.insert(it->second.get());
bool addOrUpdate(const T& value, int64_t priority) {
auto it = valueIndices_.find(value);
if (it == valueIndices_.end()) {
addNewValue(value, priority);
return true;
}
auto i = it->second;
if (priorities_[i] == priority) {
return false;
}

auto newEntry = std::make_unique<Entry>(value, priority, generation_++);
queue_.insert(newEntry.get());
index_.emplace(value, std::move(newEntry));
updatePriority(i, priority);
return true;
}

std::optional<T> pop() {
VELOX_CHECK_EQ(queue_.size(), index_.size());
if (queue_.empty()) {
return std::nullopt;
}

auto it = queue_.begin();
Entry* removedEntry = *it;
const auto value = removedEntry->value();
queue_.erase(it);
VELOX_CHECK(index_.contains(removedEntry->value()));
index_.erase(removedEntry->value());
return value;
}

/// Describes the state of an inserted 'value' in the queue.
class Entry {
public:
Entry(T value, uint64_t priority, uint64_t generation)
: value_(std::move(value)),
priority_(priority),
generation_(generation) {}

const T& value() const {
return value_;
}
const T& top() const {
VELOX_DCHECK(!heap_.empty());
return values_[heap_[0]];
}

uint64_t priority() const {
return priority_;
}
int64_t topPriority() const {
VELOX_DCHECK(!heap_.empty());
return priorities_[heap_[0]];
}

void updatePriority(uint64_t priority) {
priority_ = priority;
T pop() {
VELOX_DCHECK(!heap_.empty());
auto i = heap_[0];
heap_[0] = heap_.back();
heapIndices_[heap_.back()] = 0;
heap_.pop_back();
if (!heap_.empty()) {
percolateDown(0);
}

uint64_t generation() const {
return generation_;
auto result = values_[i];
valueIndices_.erase(values_[i]);
if (i != size() - 1) {
valueIndices_[values_.back()] = i;
values_[i] = values_.back();
priorities_[i] = priorities_.back();
generations_[i] = generations_.back();
heap_[heapIndices_.back()] = i;
heapIndices_[i] = heapIndices_.back();
}
values_.pop_back();
priorities_.pop_back();
generations_.pop_back();
heapIndices_.pop_back();
return result;
}

private:
const T value_;
uint64_t priority_;
const uint64_t generation_;
};

/// Used to iterate through the queue in priority order.
class Iterator {
public:
Iterator(
typename std::set<Entry*>::const_iterator cur,
typename std::set<Entry*>::const_iterator end)
: end_{end}, cur_{cur}, val_{0} {
if (cur_ != end_) {
val_ = (*cur_)->value();
}
}
const T* values() const {
return values_.data();
}

bool operator==(const Iterator& other) const {
return std::tie(cur_, end_) == std::tie(other.cur_, other.end_);
}
const int64_t* priorities() const {
return priorities_.data();
}

bool operator!=(const Iterator& other) const {
return !operator==(other);
std::optional<int> getValueIndex(const T& value) const {
auto it = valueIndices_.find(value);
if (it != valueIndices_.end()) {
return it->second;
}
return std::nullopt;
}

Iterator& operator++() {
VELOX_DCHECK(cur_ != end_);
++cur_;
if (cur_ != end_) {
val_ = (*cur_)->value();
}
return *this;
void updatePriority(int index, int64_t priority) {
bool up = priority < priorities_[index];
if constexpr (kMaxQueue) {
up = !up;
}

const T& operator*() const {
VELOX_DCHECK(cur_ != end_);
return val_;
priorities_[index] = priority;
generations_[index] = ++generation_;
if (up) {
percolateUp(heapIndices_[index]);
} else {
percolateDown(heapIndices_[index]);
}
}

private:
const typename std::set<Entry*>::const_iterator end_;
typename std::set<Entry*>::const_iterator cur_;
T val_;
};
void addNewValue(const T& value, int64_t priority) {
VELOX_DCHECK_LT(size(), std::numeric_limits<int32_t>::max());
auto i = size();
values_.push_back(value);
priorities_.push_back(priority);
generations_.push_back(++generation_);
VELOX_CHECK(valueIndices_.emplace(value, i).second);
heapIndices_.push_back(i);
heap_.push_back(i);
percolateUp(i);
}

Iterator begin() const {
return Iterator{queue_.cbegin(), queue_.cend()};
void replaceTop(const T& value, int64_t priority) {
VELOX_DCHECK(!heap_.empty());
int i = heap_[0];
valueIndices_.erase(values_[i]);
values_[i] = value;
priorities_[i] = priority;
generations_[i] = ++generation_;
VELOX_CHECK(valueIndices_.emplace(value, i).second);
percolateDown(0);
}

Iterator end() const {
return Iterator{queue_.cend(), queue_.cend()};
/// Return negative number if value at i should be ordered before j; positive
/// number if j should be ordered before i. Otherwise return 0.
int64_t compare(int i, int j) const {
int64_t result = priorities_[i] - priorities_[j];
if constexpr (kMaxQueue) {
result = -result;
}
return result != 0 ? result : generations_[i] - generations_[j];
}

private:
struct EntrySetComparator {
bool operator()(Entry* lhs, Entry* rhs) const {
if (MaxQueue) {
if (lhs->priority() > rhs->priority()) {
return true;
}
if (lhs->priority() < rhs->priority()) {
return false;
}
} else {
if (lhs->priority() > rhs->priority()) {
return false;
}
if (lhs->priority() < rhs->priority()) {
return true;
}
template <typename U>
using RebindAlloc =
typename std::allocator_traits<Allocator>::template rebind_alloc<U>;

void validate() const {
VELOX_DCHECK_EQ(values_.size(), priorities_.size());
VELOX_DCHECK_EQ(values_.size(), generations_.size());
VELOX_DCHECK_EQ(values_.size(), heap_.size());
VELOX_DCHECK_EQ(values_.size(), valueIndices_.size());
VELOX_DCHECK_EQ(values_.size(), heapIndices_.size());
}

void percolateUp(int pos) {
while (pos > 0) {
int parent = (pos - 1) / 2;
if (compare(heap_[pos], heap_[parent]) >= 0) {
break;
}
return lhs->generation() < rhs->generation();
std::swap(heap_[pos], heap_[parent]);
heapIndices_[heap_[pos]] = pos;
pos = parent;
}
};
heapIndices_[heap_[pos]] = pos;
}

void percolateDown(int pos) {
for (;;) {
int left = 2 * pos + 1;
if (left >= heap_.size()) {
break;
}
int right = left + 1;
int next = right < heap_.size() && compare(heap_[right], heap_[left]) < 0
? right
: left;
if (compare(heap_[pos], heap_[next]) <= 0) {
break;
}
std::swap(heap_[pos], heap_[next]);
heapIndices_[heap_[pos]] = pos;
pos = next;
}
heapIndices_[heap_[pos]] = pos;
}

uint64_t generation_{0};
folly::F14FastMap<T, std::unique_ptr<Entry>> index_;
std::set<Entry*, EntrySetComparator> queue_;
int64_t generation_{0};
std::vector<T, Allocator> values_;
std::vector<int64_t, RebindAlloc<int64_t>> priorities_;
std::vector<int64_t, RebindAlloc<int64_t>> generations_;
std::vector<int32_t, RebindAlloc<int32_t>> heap_;
folly::F14FastMap<
T,
int32_t,
Hash,
EqualTo,
RebindAlloc<std::pair<const T, int32_t>>>
valueIndices_;
std::vector<int32_t, RebindAlloc<int32_t>> heapIndices_;
};

} // namespace facebook::velox
Loading

0 comments on commit a37c7ce

Please sign in to comment.