Skip to content

Commit

Permalink
feat: Add utilities for combining dictionary wrappers
Browse files Browse the repository at this point in the history
- Adds functions to transpose dictionaries with and without nulls.

- Adds projection that wraps children of a RowVector into a dictionary
  so that dictionaries are combined instead of being nested. If
  multiple columns have the same wrapping indices, they continue to
  share the wrapping also after new indices are combined with the
  previous wrapper.

- This is preparation for limiting dictionary wrapping to one
  level. This will speed up access and simplify corner cases of
  expressions.
  • Loading branch information
Orri Erling committed Dec 24, 2024
1 parent 8ebd3a8 commit e3384c4
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 33 deletions.
100 changes: 99 additions & 1 deletion velox/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,78 @@ vector_size_t processFilterResults(
}
}

VectorPtr wrapOne(
vector_size_t wrapSize,
BufferPtr wrapIndices,
const VectorPtr& inputVector,
BufferPtr wrapNulls,
WrapState& wrapState) {
if (!wrapIndices) {
VELOX_CHECK_NULL(wrapNulls);
return inputVector;
}

if (inputVector->encoding() != VectorEncoding::Simple::DICTIONARY) {
return BaseVector::wrapInDictionary(
wrapNulls, wrapIndices, wrapSize, inputVector);
}

if (wrapState.transposeResults.empty()) {
wrapState.nulls = wrapNulls.get();
} else {
VELOX_CHECK(
wrapState.nulls == wrapNulls.get(),
"Must have identical wrapNulls for all wrapped columns");
}
auto baseIndices = inputVector->wrapInfo();
auto baseValues = inputVector->valueVector();
// The base will be wrapped again without loading any lazy. The
// rewrapping is permitted in this case.
baseValues->clearContainingLazyAndWrapped();
auto* rawBaseNulls = inputVector->rawNulls();
if (rawBaseNulls) {
// Dictionary adds nulls.
BufferPtr newIndices =
AlignedBuffer::allocate<vector_size_t>(wrapSize, inputVector->pool());
BufferPtr newNulls =
AlignedBuffer::allocate<bool>(wrapSize, inputVector->pool());
const uint64_t* rawWrapNulls =
wrapNulls ? wrapNulls->as<uint64_t>() : nullptr;
BaseVector::transposeIndicesWithNulls(
baseIndices->as<vector_size_t>(),
rawBaseNulls,
wrapSize,
wrapIndices->as<vector_size_t>(),
rawWrapNulls,
newIndices->asMutable<vector_size_t>(),
newNulls->asMutable<uint64_t>());

return BaseVector::wrapInDictionary(
newNulls, newIndices, wrapSize, baseValues);
}

// if another column had the same indices as this one and this one does not
// add nulls, we use the same transposed wrapping.
auto it = wrapState.transposeResults.find(baseIndices.get());
if (it != wrapState.transposeResults.end()) {
return BaseVector::wrapInDictionary(
wrapNulls, BufferPtr(it->second), wrapSize, baseValues);
}

auto newIndices =
AlignedBuffer::allocate<vector_size_t>(wrapSize, inputVector->pool());
BaseVector::transposeIndices(
baseIndices->as<vector_size_t>(),
wrapSize,
wrapIndices->as<vector_size_t>(),
newIndices->asMutable<vector_size_t>());
// If another column has the same wrapping and does not add nulls, we can use
// the same transposed indices.
wrapState.transposeResults[baseIndices.get()] = newIndices.get();
return BaseVector::wrapInDictionary(
wrapNulls, newIndices, wrapSize, baseValues);
}

VectorPtr wrapChild(
vector_size_t size,
BufferPtr mapping,
Expand Down Expand Up @@ -295,8 +367,9 @@ RowVectorPtr wrap(
}
std::vector<VectorPtr> wrappedChildren;
wrappedChildren.reserve(childVectors.size());
WrapState state;
for (auto& child : childVectors) {
wrappedChildren.emplace_back(wrapChild(size, mapping, child));
wrappedChildren.emplace_back(wrapOne(size, mapping, child, nullptr, state));
}
return std::make_shared<RowVector>(
pool, rowType, nullptr, size, wrappedChildren);
Expand Down Expand Up @@ -425,6 +498,31 @@ void projectChildren(
}
}

void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const RowVectorPtr& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state) {
projectChildren(
projectedChildren, src->children(), projections, size, mapping, state);
}

void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const std::vector<VectorPtr>& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state) {
for (const auto& projection : projections) {
projectedChildren[projection.outputChannel] = state
? wrapOne(size, mapping, src[projection.inputChannel], nullptr, *state)
: wrapChild(size, mapping, src[projection.inputChannel]);
}
}

std::unique_ptr<Operator> BlockedOperatorFactory::toOperator(
DriverCtx* ctx,
int32_t id,
Expand Down
54 changes: 54 additions & 0 deletions velox/exec/OperatorUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@ RowVectorPtr wrap(
const std::vector<VectorPtr>& childVectors,
memory::MemoryPool* pool);

/// Represents unique dictionary wrappers over a set of vectors when
/// wrapping these inside another dictionary. When multiple wrapped
/// vectors with the same wrapping get re-wrapped, we replace the
/// wrapper with a composition of the two dictionaries. This needs to
/// be done once per distinct wrapper in the input. WrapState records
/// the compositions that are already made.
struct WrapState {
// Records wrap nulls added in wrapping. If wrap nulls are added, the same
// wrap nulls must be applied to all columns.
Buffer* nulls;

// Set of distinct wrappers in input, each mapped to the wrap
// indices combining the former with the new wrap.

folly::F14FastMap<Buffer*, Buffer*> transposeResults;
};

/// Wraps 'inputVector' with 'wrapIndices' and
/// 'wrapNulls'. 'wrapSize' is the size of of 'wrapIndices' and of
/// the resulting vector. Dictionary combining is deduplicated using
/// 'wrapState'. If the same indices are added on top of dictionary
/// encoded vectors sharing the same wrapping, the resulting vectors
/// will share the same composition of the original wrap and
/// 'wrapIndices'.
VectorPtr wrapOne(
vector_size_t wrapSize,
BufferPtr wrapIndices,
const VectorPtr& inputVector,
BufferPtr wrapNulls,
WrapState& wrapState);

// Ensures that all LazyVectors reachable from 'input' are loaded for all rows.
void loadColumns(const RowVectorPtr& input, core::ExecCtx& execCtx);

Expand Down Expand Up @@ -156,6 +187,29 @@ void projectChildren(
int32_t size,
const BufferPtr& mapping);

/// Projects children of 'src' row vector to 'dest' row vector
/// according to 'projections' and 'mapping'. 'size' specifies number
/// of projected rows in 'dest'. 'state' is used to
/// deduplicate dictionary merging when applying the same dictionary
/// over more than one identical set of indices.
void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const RowVectorPtr& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state);

/// Overload of the above function that takes reference to const vector of
/// VectorPtr as 'src' argument, instead of row vector.
void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const std::vector<VectorPtr>& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state);

using BlockedOperatorCb = std::function<BlockingReason(ContinueFuture* future)>;

/// An operator that blocks until the blockedCb tells it not. blockedCb is
Expand Down
128 changes: 128 additions & 0 deletions velox/exec/tests/OperatorUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,131 @@ TEST_F(OperatorUtilsTest, outputBatchRows) {
ASSERT_EQ(1000, mockOp.outputRows(3'000'000'000));
}
}

TEST_F(OperatorUtilsTest, wrapMany) {
// Creates a RowVector with nullable and non-null vectors sharing
// different dictionary wraps. Rewraps these with a new wrap with
// and without nulls. Checks that the outcome has a single level of
// wrapping that combines the dictionaries and nulls and keeps the
// new wraps deduplicated where possible.
constexpr int32_t kSize = 1001;
auto indices1 = makeIndices(kSize, [](vector_size_t i) { return i; });
auto indices2 = makeIndicesInReverse(kSize);
auto indices3 = makeIndicesInReverse(kSize);
auto wrapNulls = AlignedBuffer::allocate<uint64_t>(
bits::nwords(kSize), pool_.get(), bits::kNotNull64);
for (auto i = 0; i < kSize; i += 5) {
bits::setNull(wrapNulls->asMutable<uint64_t>(), i);
}
// Test dataset: *_a has no nulls, *_b has nulls. plain* is not wrapped.
// wrapped1* is wrapped in one dict, wrapped2* is wrapped in another,
// wrapped3* is wrapped in a dictionary that adds nulls.
auto row = makeRowVector(
{"plain_a",
"plain_b",
"wrapped1_a",
"wrapped1_b",
"wrapped2_a",
"wrapped2_b",
"wrapped3_a",
"wrapped3_b"},

{// plain_a
makeFlatVector<int32_t>(kSize, [](auto i) { return i; }),
// plain_b
makeFlatVector<int32_t>(
kSize, [](auto i) { return i; }, [](auto i) { return i % 4 == 0; }),

// wrapped1-a
BaseVector::wrapInDictionary(
nullptr,
indices1,
kSize,
makeFlatVector<int32_t>(kSize, [](auto i) { return i; })),
// wrapped1_b
BaseVector::wrapInDictionary(
nullptr,
indices1,
kSize,
makeFlatVector<int32_t>(
kSize,
[](auto i) { return i; },
[](auto i) { return i % 4 == 0; })),

// wrapped2-a
BaseVector::wrapInDictionary(
nullptr,
indices2,
kSize,
makeFlatVector<int32_t>(kSize, [](auto i) { return i; })),
// wrapped2_b
BaseVector::wrapInDictionary(
nullptr,
indices2,
kSize,
makeFlatVector<int32_t>(
kSize,
[](auto i) { return i; },
[](auto i) { return i % 4 == 0; })),
// wrapped3-a
BaseVector::wrapInDictionary(
wrapNulls,
indices3,
kSize,
makeFlatVector<int32_t>(kSize, [](auto i) { return i; })),
// wrapped3_b
BaseVector::wrapInDictionary(
wrapNulls,
indices3,
kSize,
makeFlatVector<int32_t>(
kSize,
[](auto i) { return i; },
[](auto i) { return i % 4 == 0; }))

});
auto rowType = row->type();
std::vector<IdentityProjection> identicalProjections{};
for (auto i = 0; i < rowType->size(); ++i) {
identicalProjections.emplace_back(i, i);
}

// Now wrap 'row' in 'newIndices' keeping wraps to one level and deduplicating
// dictionary transposes.
auto newIndices = makeIndicesInReverse(kSize);
WrapState state;
std::vector<VectorPtr> projected(rowType->size());
projectChildren(
projected, row, identicalProjections, kSize, newIndices, &state);
auto result = makeRowVector(projected);
for (auto i = 0; i < kSize; ++i) {
EXPECT_TRUE(
row->equalValueAt(result.get(), i, newIndices->as<int32_t>()[i]));
}

// The two unwrapped columns get 'newIndices' directly.
EXPECT_EQ(projected[0]->wrapInfo(), newIndices);
EXPECT_EQ(projected[1]->wrapInfo(), newIndices);

// The next two have the same wrapper and this is now combined with newIndices
// and used twice.
EXPECT_NE(projected[2]->wrapInfo(), newIndices);
EXPECT_NE(projected[2]->wrapInfo(), indices2);
EXPECT_EQ(projected[2]->wrapInfo(), projected[3]->wrapInfo());

// The next two share a different wrapper.
EXPECT_NE(projected[3]->wrapInfo(), projected[4]->wrapInfo());
EXPECT_EQ(projected[4]->wrapInfo(), projected[5]->wrapInfo());

// The next two columns have nulls from their wrapper and thus they each get
// their own wrappers.
EXPECT_NE(projected[6]->wrapInfo(), projected[7]->wrapInfo());

// All columns have one level of wrapping.
EXPECT_EQ(
projected[2]->valueVector()->encoding(), VectorEncoding::Simple::FLAT);
EXPECT_EQ(
projected[4]->valueVector()->encoding(), VectorEncoding::Simple::FLAT);
EXPECT_EQ(
projected[6]->valueVector()->encoding(), VectorEncoding::Simple::FLAT);
}
Loading

0 comments on commit e3384c4

Please sign in to comment.