Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Optimize IndexedPriorityQueue::addOrUpdate 20 times faster #11955

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions velox/common/base/BitUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "velox/common/base/Exceptions.h"

#include <folly/CPortability.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
Expand All @@ -28,6 +30,20 @@
#include <x86intrin.h>
#endif

// Remove once we upgrade folly.
#ifndef FOLLY_BUILTIN_MEMCPY
#if FOLLY_HAS_BUILTIN(__builtin_memcpy_inline)
#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \
void(__builtin_memcpy_inline((dest), (src), (size)))
#elif FOLLY_HAS_BUILTIN(__builtin_memcpy)
#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \
void(__builtin_memcpy((dest), (src), (size)))
#else
#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \
void(::std::memcpy((dest), (src), (size)))
#endif
#endif

namespace facebook {
namespace velox {
namespace bits {
Expand Down
295 changes: 170 additions & 125 deletions velox/common/base/IndexedPriorityQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,178 +16,223 @@

#pragma once

#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <set>

#include "velox/common/base/Exceptions.h"

#include <folly/container/F14Map.h>

namespace facebook::velox {

/// A priority queue with values of type 'T'. Each value has assigned priority
/// on insertion which determines the value's position in the quue. It supports
/// on insertion which determines the value's position in the queue. It supports
/// to update the priority of the existing values, which adjusts the value's
/// position in the queue accordingly. Ties are broken by insertion order. If
/// 'MaxQueue' is true, it is a max priority queue, otherwise a min priority
/// queue.
/// position in the queue accordingly. Ties are broken by insertion and update
/// order. If 'kMaxQueue' is true, it is a max priority queue, otherwise a min
/// priority queue.
///
/// NOTE: this class is not thread-safe.
template <typename T, bool MaxQueue>
template <
typename T,
bool kMaxQueue,
typename Allocator = std::allocator<T>,
typename Hash = std::hash<T>,
typename EqualTo = std::equal_to<T>>
class IndexedPriorityQueue {
public:
IndexedPriorityQueue() = default;
explicit IndexedPriorityQueue(const Allocator& allocator = {})
: values_(allocator),
priorities_(RebindAlloc<int64_t>(allocator)),
generations_(RebindAlloc<int64_t>(allocator)),
heap_(RebindAlloc<int32_t>(allocator)),
valueIndices_(RebindAlloc<std::pair<const T, int32_t>>(allocator)),
heapIndices_(RebindAlloc<int32_t>(allocator)) {}

~IndexedPriorityQueue() {
VELOX_CHECK_EQ(queue_.size(), index_.size());
validate();
}

size_t size() const {
return queue_.size();
int32_t size() const {
return values_.size();
}

bool empty() const {
return queue_.empty();
return values_.empty();
}

/// Inserts 'value' into the queue with specified 'priority'. If 'value'
/// already exists, then update its priority and the corresponding position in
/// the queue.
bool addOrUpdate(T value, uint64_t priority) {
auto it = index_.find(value);
if (it != index_.end()) {
if (it->second->priority() == priority) {
return false;
}
queue_.erase(it->second.get());

it->second->updatePriority(priority);
queue_.insert(it->second.get());
bool addOrUpdate(const T& value, int64_t priority) {
auto it = valueIndices_.find(value);
if (it == valueIndices_.end()) {
addNewValue(value, priority);
return true;
}
auto i = it->second;
if (priorities_[i] == priority) {
return false;
}

auto newEntry = std::make_unique<Entry>(value, priority, generation_++);
queue_.insert(newEntry.get());
index_.emplace(value, std::move(newEntry));
updatePriority(i, priority);
return true;
}

std::optional<T> pop() {
VELOX_CHECK_EQ(queue_.size(), index_.size());
if (queue_.empty()) {
return std::nullopt;
}

auto it = queue_.begin();
Entry* removedEntry = *it;
const auto value = removedEntry->value();
queue_.erase(it);
VELOX_CHECK(index_.contains(removedEntry->value()));
index_.erase(removedEntry->value());
return value;
}

/// Describes the state of an inserted 'value' in the queue.
class Entry {
public:
Entry(T value, uint64_t priority, uint64_t generation)
: value_(std::move(value)),
priority_(priority),
generation_(generation) {}

const T& value() const {
return value_;
}
const T& top() const {
VELOX_DCHECK(!heap_.empty());
return values_[heap_[0]];
}

uint64_t priority() const {
return priority_;
}
int64_t topPriority() const {
VELOX_DCHECK(!heap_.empty());
return priorities_[heap_[0]];
}

void updatePriority(uint64_t priority) {
priority_ = priority;
T pop() {
VELOX_DCHECK(!heap_.empty());
auto i = heap_[0];
heap_[0] = heap_.back();
heapIndices_[heap_.back()] = 0;
heap_.pop_back();
if (!heap_.empty()) {
percolateDown(0);
}

uint64_t generation() const {
return generation_;
auto result = values_[i];
valueIndices_.erase(values_[i]);
if (i != size() - 1) {
valueIndices_[values_.back()] = i;
values_[i] = values_.back();
priorities_[i] = priorities_.back();
generations_[i] = generations_.back();
heap_[heapIndices_.back()] = i;
heapIndices_[i] = heapIndices_.back();
}
values_.pop_back();
priorities_.pop_back();
generations_.pop_back();
heapIndices_.pop_back();
return result;
}

private:
const T value_;
uint64_t priority_;
const uint64_t generation_;
};

/// Used to iterate through the queue in priority order.
class Iterator {
public:
Iterator(
typename std::set<Entry*>::const_iterator cur,
typename std::set<Entry*>::const_iterator end)
: end_{end}, cur_{cur}, val_{0} {
if (cur_ != end_) {
val_ = (*cur_)->value();
}
}
const T* values() const {
return values_.data();
}

bool operator==(const Iterator& other) const {
return std::tie(cur_, end_) == std::tie(other.cur_, other.end_);
}
const int64_t* priorities() const {
return priorities_.data();
}

bool operator!=(const Iterator& other) const {
return !operator==(other);
std::optional<int> getValueIndex(const T& value) const {
auto it = valueIndices_.find(value);
if (it != valueIndices_.end()) {
return it->second;
}
return std::nullopt;
}

Iterator& operator++() {
VELOX_DCHECK(cur_ != end_);
++cur_;
if (cur_ != end_) {
val_ = (*cur_)->value();
}
return *this;
void updatePriority(int index, int64_t priority) {
bool up = priority < priorities_[index];
if constexpr (kMaxQueue) {
up = !up;
}

const T& operator*() const {
VELOX_DCHECK(cur_ != end_);
return val_;
priorities_[index] = priority;
generations_[index] = ++generation_;
if (up) {
percolateUp(heapIndices_[index]);
} else {
percolateDown(heapIndices_[index]);
}
}

private:
const typename std::set<Entry*>::const_iterator end_;
typename std::set<Entry*>::const_iterator cur_;
T val_;
};
void addNewValue(const T& value, int64_t priority) {
VELOX_DCHECK_LT(size(), std::numeric_limits<int32_t>::max());
auto i = size();
values_.push_back(value);
priorities_.push_back(priority);
generations_.push_back(++generation_);
VELOX_CHECK(valueIndices_.emplace(value, i).second);
heapIndices_.push_back(i);
heap_.push_back(i);
percolateUp(i);
}

Iterator begin() const {
return Iterator{queue_.cbegin(), queue_.cend()};
void replaceTop(const T& value, int64_t priority) {
VELOX_DCHECK(!heap_.empty());
int i = heap_[0];
valueIndices_.erase(values_[i]);
values_[i] = value;
priorities_[i] = priority;
generations_[i] = ++generation_;
VELOX_CHECK(valueIndices_.emplace(value, i).second);
percolateDown(0);
}

Iterator end() const {
return Iterator{queue_.cend(), queue_.cend()};
/// Return negative number if value at i should be ordered before j; positive
/// number if j should be ordered before i. Otherwise return 0.
int64_t compare(int i, int j) const {
int64_t result = priorities_[i] - priorities_[j];
if constexpr (kMaxQueue) {
result = -result;
}
return result != 0 ? result : generations_[i] - generations_[j];
}

private:
struct EntrySetComparator {
bool operator()(Entry* lhs, Entry* rhs) const {
if (MaxQueue) {
if (lhs->priority() > rhs->priority()) {
return true;
}
if (lhs->priority() < rhs->priority()) {
return false;
}
} else {
if (lhs->priority() > rhs->priority()) {
return false;
}
if (lhs->priority() < rhs->priority()) {
return true;
}
template <typename U>
using RebindAlloc =
typename std::allocator_traits<Allocator>::template rebind_alloc<U>;

void validate() const {
VELOX_DCHECK_EQ(values_.size(), priorities_.size());
VELOX_DCHECK_EQ(values_.size(), generations_.size());
VELOX_DCHECK_EQ(values_.size(), heap_.size());
VELOX_DCHECK_EQ(values_.size(), valueIndices_.size());
VELOX_DCHECK_EQ(values_.size(), heapIndices_.size());
}

void percolateUp(int pos) {
while (pos > 0) {
int parent = (pos - 1) / 2;
if (compare(heap_[pos], heap_[parent]) >= 0) {
break;
}
return lhs->generation() < rhs->generation();
std::swap(heap_[pos], heap_[parent]);
heapIndices_[heap_[pos]] = pos;
pos = parent;
}
};
heapIndices_[heap_[pos]] = pos;
}

void percolateDown(int pos) {
for (;;) {
int left = 2 * pos + 1;
if (left >= heap_.size()) {
break;
}
int right = left + 1;
int next = right < heap_.size() && compare(heap_[right], heap_[left]) < 0
? right
: left;
if (compare(heap_[pos], heap_[next]) <= 0) {
break;
}
std::swap(heap_[pos], heap_[next]);
heapIndices_[heap_[pos]] = pos;
pos = next;
}
heapIndices_[heap_[pos]] = pos;
}

uint64_t generation_{0};
folly::F14FastMap<T, std::unique_ptr<Entry>> index_;
std::set<Entry*, EntrySetComparator> queue_;
int64_t generation_{0};
std::vector<T, Allocator> values_;
std::vector<int64_t, RebindAlloc<int64_t>> priorities_;
std::vector<int64_t, RebindAlloc<int64_t>> generations_;
std::vector<int32_t, RebindAlloc<int32_t>> heap_;
folly::F14FastMap<
T,
int32_t,
Hash,
EqualTo,
RebindAlloc<std::pair<const T, int32_t>>>
valueIndices_;
std::vector<int32_t, RebindAlloc<int32_t>> heapIndices_;
};

} // namespace facebook::velox
Loading
Loading