Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add utilities for combining dictionary wrappers #11944

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion velox/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,78 @@ vector_size_t processFilterResults(
}
}

VectorPtr wrapOne(
vector_size_t wrapSize,
BufferPtr wrapIndices,
const VectorPtr& inputVector,
BufferPtr wrapNulls,
WrapState& wrapState) {
if (!wrapIndices) {
VELOX_CHECK_NULL(wrapNulls);
return inputVector;
}

if (inputVector->encoding() != VectorEncoding::Simple::DICTIONARY) {
return BaseVector::wrapInDictionary(
wrapNulls, wrapIndices, wrapSize, inputVector);
}

if (wrapState.transposeResults.empty()) {
wrapState.nulls = wrapNulls.get();
} else {
VELOX_CHECK(
wrapState.nulls == wrapNulls.get(),
"Must have identical wrapNulls for all wrapped columns");
}
auto baseIndices = inputVector->wrapInfo();
auto baseValues = inputVector->valueVector();
// The base will be wrapped again without loading any lazy. The
// rewrapping is permitted in this case.
baseValues->clearContainingLazyAndWrapped();
auto* rawBaseNulls = inputVector->rawNulls();
if (rawBaseNulls) {
// Dictionary adds nulls.
BufferPtr newIndices =
AlignedBuffer::allocate<vector_size_t>(wrapSize, inputVector->pool());
BufferPtr newNulls =
AlignedBuffer::allocate<bool>(wrapSize, inputVector->pool());
const uint64_t* rawWrapNulls =
wrapNulls ? wrapNulls->as<uint64_t>() : nullptr;
BaseVector::transposeIndicesWithNulls(
baseIndices->as<vector_size_t>(),
rawBaseNulls,
wrapSize,
wrapIndices->as<vector_size_t>(),
rawWrapNulls,
newIndices->asMutable<vector_size_t>(),
newNulls->asMutable<uint64_t>());

return BaseVector::wrapInDictionary(
newNulls, newIndices, wrapSize, baseValues);
}

// if another column had the same indices as this one and this one does not
// add nulls, we use the same transposed wrapping.
auto it = wrapState.transposeResults.find(baseIndices.get());
if (it != wrapState.transposeResults.end()) {
return BaseVector::wrapInDictionary(
wrapNulls, BufferPtr(it->second), wrapSize, baseValues);
}

auto newIndices =
AlignedBuffer::allocate<vector_size_t>(wrapSize, inputVector->pool());
BaseVector::transposeIndices(
baseIndices->as<vector_size_t>(),
wrapSize,
wrapIndices->as<vector_size_t>(),
newIndices->asMutable<vector_size_t>());
// If another column has the same wrapping and does not add nulls, we can use
// the same transposed indices.
wrapState.transposeResults[baseIndices.get()] = newIndices.get();
return BaseVector::wrapInDictionary(
wrapNulls, newIndices, wrapSize, baseValues);
}

VectorPtr wrapChild(
vector_size_t size,
BufferPtr mapping,
Expand Down Expand Up @@ -295,8 +367,9 @@ RowVectorPtr wrap(
}
std::vector<VectorPtr> wrappedChildren;
wrappedChildren.reserve(childVectors.size());
WrapState state;
for (auto& child : childVectors) {
wrappedChildren.emplace_back(wrapChild(size, mapping, child));
wrappedChildren.emplace_back(wrapOne(size, mapping, child, nullptr, state));
}
return std::make_shared<RowVector>(
pool, rowType, nullptr, size, wrappedChildren);
Expand Down Expand Up @@ -425,6 +498,31 @@ void projectChildren(
}
}

void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const RowVectorPtr& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state) {
projectChildren(
projectedChildren, src->children(), projections, size, mapping, state);
}

void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const std::vector<VectorPtr>& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state) {
for (const auto& projection : projections) {
projectedChildren[projection.outputChannel] = state
? wrapOne(size, mapping, src[projection.inputChannel], nullptr, *state)
: wrapChild(size, mapping, src[projection.inputChannel]);
}
}

std::unique_ptr<Operator> BlockedOperatorFactory::toOperator(
DriverCtx* ctx,
int32_t id,
Expand Down
54 changes: 54 additions & 0 deletions velox/exec/OperatorUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@ RowVectorPtr wrap(
const std::vector<VectorPtr>& childVectors,
memory::MemoryPool* pool);

/// Represents unique dictionary wrappers over a set of vectors when
/// wrapping these inside another dictionary. When multiple wrapped
/// vectors with the same wrapping get re-wrapped, we replace the
/// wrapper with a composition of the two dictionaries. This needs to
/// be done once per distinct wrapper in the input. WrapState records
/// the compositions that are already made.
struct WrapState {
// Records wrap nulls added in wrapping. If wrap nulls are added, the same
// wrap nulls must be applied to all columns.
Buffer* nulls;

// Set of distinct wrappers in input, each mapped to the wrap
// indices combining the former with the new wrap.

folly::F14FastMap<Buffer*, Buffer*> transposeResults;
};

/// Wraps 'inputVector' with 'wrapIndices' and
/// 'wrapNulls'. 'wrapSize' is the size of of 'wrapIndices' and of
/// the resulting vector. Dictionary combining is deduplicated using
/// 'wrapState'. If the same indices are added on top of dictionary
/// encoded vectors sharing the same wrapping, the resulting vectors
/// will share the same composition of the original wrap and
/// 'wrapIndices'.
VectorPtr wrapOne(
vector_size_t wrapSize,
BufferPtr wrapIndices,
const VectorPtr& inputVector,
BufferPtr wrapNulls,
WrapState& wrapState);

// Ensures that all LazyVectors reachable from 'input' are loaded for all rows.
void loadColumns(const RowVectorPtr& input, core::ExecCtx& execCtx);

Expand Down Expand Up @@ -156,6 +187,29 @@ void projectChildren(
int32_t size,
const BufferPtr& mapping);

/// Projects children of 'src' row vector to 'dest' row vector
/// according to 'projections' and 'mapping'. 'size' specifies number
/// of projected rows in 'dest'. 'state' is used to
/// deduplicate dictionary merging when applying the same dictionary
/// over more than one identical set of indices.
void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const RowVectorPtr& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state);

/// Overload of the above function that takes reference to const vector of
/// VectorPtr as 'src' argument, instead of row vector.
void projectChildren(
std::vector<VectorPtr>& projectedChildren,
const std::vector<VectorPtr>& src,
const std::vector<IdentityProjection>& projections,
int32_t size,
const BufferPtr& mapping,
WrapState* state);

using BlockedOperatorCb = std::function<BlockingReason(ContinueFuture* future)>;

/// An operator that blocks until the blockedCb tells it not. blockedCb is
Expand Down
128 changes: 128 additions & 0 deletions velox/exec/tests/OperatorUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,131 @@ TEST_F(OperatorUtilsTest, outputBatchRows) {
ASSERT_EQ(1000, mockOp.outputRows(3'000'000'000));
}
}

TEST_F(OperatorUtilsTest, wrapMany) {
// Creates a RowVector with nullable and non-null vectors sharing
// different dictionary wraps. Rewraps these with a new wrap with
// and without nulls. Checks that the outcome has a single level of
// wrapping that combines the dictionaries and nulls and keeps the
// new wraps deduplicated where possible.
constexpr int32_t kSize = 1001;
auto indices1 = makeIndices(kSize, [](vector_size_t i) { return i; });
auto indices2 = makeIndicesInReverse(kSize);
auto indices3 = makeIndicesInReverse(kSize);
auto wrapNulls = AlignedBuffer::allocate<uint64_t>(
bits::nwords(kSize), pool_.get(), bits::kNotNull64);
for (auto i = 0; i < kSize; i += 5) {
bits::setNull(wrapNulls->asMutable<uint64_t>(), i);
}
// Test dataset: *_a has no nulls, *_b has nulls. plain* is not wrapped.
// wrapped1* is wrapped in one dict, wrapped2* is wrapped in another,
// wrapped3* is wrapped in a dictionary that adds nulls.
auto row = makeRowVector(
{"plain_a",
"plain_b",
"wrapped1_a",
"wrapped1_b",
"wrapped2_a",
"wrapped2_b",
"wrapped3_a",
"wrapped3_b"},

{// plain_a
makeFlatVector<int32_t>(kSize, [](auto i) { return i; }),
// plain_b
makeFlatVector<int32_t>(
kSize, [](auto i) { return i; }, [](auto i) { return i % 4 == 0; }),

// wrapped1-a
BaseVector::wrapInDictionary(
nullptr,
indices1,
kSize,
makeFlatVector<int32_t>(kSize, [](auto i) { return i; })),
// wrapped1_b
BaseVector::wrapInDictionary(
nullptr,
indices1,
kSize,
makeFlatVector<int32_t>(
kSize,
[](auto i) { return i; },
[](auto i) { return i % 4 == 0; })),

// wrapped2-a
BaseVector::wrapInDictionary(
nullptr,
indices2,
kSize,
makeFlatVector<int32_t>(kSize, [](auto i) { return i; })),
// wrapped2_b
BaseVector::wrapInDictionary(
nullptr,
indices2,
kSize,
makeFlatVector<int32_t>(
kSize,
[](auto i) { return i; },
[](auto i) { return i % 4 == 0; })),
// wrapped3-a
BaseVector::wrapInDictionary(
wrapNulls,
indices3,
kSize,
makeFlatVector<int32_t>(kSize, [](auto i) { return i; })),
// wrapped3_b
BaseVector::wrapInDictionary(
wrapNulls,
indices3,
kSize,
makeFlatVector<int32_t>(
kSize,
[](auto i) { return i; },
[](auto i) { return i % 4 == 0; }))

});
auto rowType = row->type();
std::vector<IdentityProjection> identicalProjections{};
for (auto i = 0; i < rowType->size(); ++i) {
identicalProjections.emplace_back(i, i);
}

// Now wrap 'row' in 'newIndices' keeping wraps to one level and deduplicating
// dictionary transposes.
auto newIndices = makeIndicesInReverse(kSize);
WrapState state;
std::vector<VectorPtr> projected(rowType->size());
projectChildren(
projected, row, identicalProjections, kSize, newIndices, &state);
auto result = makeRowVector(projected);
for (auto i = 0; i < kSize; ++i) {
EXPECT_TRUE(
row->equalValueAt(result.get(), i, newIndices->as<int32_t>()[i]));
}

// The two unwrapped columns get 'newIndices' directly.
EXPECT_EQ(projected[0]->wrapInfo(), newIndices);
EXPECT_EQ(projected[1]->wrapInfo(), newIndices);

// The next two have the same wrapper and this is now combined with newIndices
// and used twice.
EXPECT_NE(projected[2]->wrapInfo(), newIndices);
EXPECT_NE(projected[2]->wrapInfo(), indices2);
EXPECT_EQ(projected[2]->wrapInfo(), projected[3]->wrapInfo());

// The next two share a different wrapper.
EXPECT_NE(projected[3]->wrapInfo(), projected[4]->wrapInfo());
EXPECT_EQ(projected[4]->wrapInfo(), projected[5]->wrapInfo());

// The next two columns have nulls from their wrapper and thus they each get
// their own wrappers.
EXPECT_NE(projected[6]->wrapInfo(), projected[7]->wrapInfo());

// All columns have one level of wrapping.
EXPECT_EQ(
projected[2]->valueVector()->encoding(), VectorEncoding::Simple::FLAT);
EXPECT_EQ(
projected[4]->valueVector()->encoding(), VectorEncoding::Simple::FLAT);
EXPECT_EQ(
projected[6]->valueVector()->encoding(), VectorEncoding::Simple::FLAT);
}
Loading
Loading