diff --git a/velox/exec/OperatorUtils.cpp b/velox/exec/OperatorUtils.cpp index 536706fb9fb0..ccc6e3ffb8c1 100644 --- a/velox/exec/OperatorUtils.cpp +++ b/velox/exec/OperatorUtils.cpp @@ -258,6 +258,78 @@ vector_size_t processFilterResults( } } +VectorPtr wrapOne( + vector_size_t wrapSize, + BufferPtr wrapIndices, + const VectorPtr& inputVector, + BufferPtr wrapNulls, + WrapState& wrapState) { + if (!wrapIndices) { + VELOX_CHECK_NULL(wrapNulls); + return inputVector; + } + + if (inputVector->encoding() != VectorEncoding::Simple::DICTIONARY) { + return BaseVector::wrapInDictionary( + wrapNulls, wrapIndices, wrapSize, inputVector); + } + + if (wrapState.transposeResults.empty()) { + wrapState.nulls = wrapNulls.get(); + } else { + VELOX_CHECK( + wrapState.nulls == wrapNulls.get(), + "Must have identical wrapNulls for all wrapped columns"); + } + auto baseIndices = inputVector->wrapInfo(); + auto baseValues = inputVector->valueVector(); + // The base will be wrapped again without loading any lazy. The + // rewrapping is permitted in this case. + baseValues->clearContainingLazyAndWrapped(); + auto* rawBaseNulls = inputVector->rawNulls(); + if (rawBaseNulls) { + // Dictionary adds nulls. + BufferPtr newIndices = + AlignedBuffer::allocate(wrapSize, inputVector->pool()); + BufferPtr newNulls = + AlignedBuffer::allocate(wrapSize, inputVector->pool()); + const uint64_t* rawWrapNulls = + wrapNulls ? wrapNulls->as() : nullptr; + BaseVector::transposeIndicesWithNulls( + baseIndices->as(), + rawBaseNulls, + wrapSize, + wrapIndices->as(), + rawWrapNulls, + newIndices->asMutable(), + newNulls->asMutable()); + + return BaseVector::wrapInDictionary( + newNulls, newIndices, wrapSize, baseValues); + } + + // if another column had the same indices as this one and this one does not + // add nulls, we use the same transposed wrapping. + auto it = wrapState.transposeResults.find(baseIndices.get()); + if (it != wrapState.transposeResults.end()) { + return BaseVector::wrapInDictionary( + wrapNulls, BufferPtr(it->second), wrapSize, baseValues); + } + + auto newIndices = + AlignedBuffer::allocate(wrapSize, inputVector->pool()); + BaseVector::transposeIndices( + baseIndices->as(), + wrapSize, + wrapIndices->as(), + newIndices->asMutable()); + // If another column has the same wrapping and does not add nulls, we can use + // the same transposed indices. + wrapState.transposeResults[baseIndices.get()] = newIndices.get(); + return BaseVector::wrapInDictionary( + wrapNulls, newIndices, wrapSize, baseValues); +} + VectorPtr wrapChild( vector_size_t size, BufferPtr mapping, @@ -295,8 +367,9 @@ RowVectorPtr wrap( } std::vector wrappedChildren; wrappedChildren.reserve(childVectors.size()); + WrapState state; for (auto& child : childVectors) { - wrappedChildren.emplace_back(wrapChild(size, mapping, child)); + wrappedChildren.emplace_back(wrapOne(size, mapping, child, nullptr, state)); } return std::make_shared( pool, rowType, nullptr, size, wrappedChildren); @@ -425,6 +498,31 @@ void projectChildren( } } +void projectChildren( + std::vector& projectedChildren, + const RowVectorPtr& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state) { + projectChildren( + projectedChildren, src->children(), projections, size, mapping, state); +} + +void projectChildren( + std::vector& projectedChildren, + const std::vector& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state) { + for (const auto& projection : projections) { + projectedChildren[projection.outputChannel] = state + ? wrapOne(size, mapping, src[projection.inputChannel], nullptr, *state) + : wrapChild(size, mapping, src[projection.inputChannel]); + } +} + std::unique_ptr BlockedOperatorFactory::toOperator( DriverCtx* ctx, int32_t id, diff --git a/velox/exec/OperatorUtils.h b/velox/exec/OperatorUtils.h index bea261f3d24a..c8438a67a781 100644 --- a/velox/exec/OperatorUtils.h +++ b/velox/exec/OperatorUtils.h @@ -86,6 +86,37 @@ RowVectorPtr wrap( const std::vector& childVectors, memory::MemoryPool* pool); +/// Represents unique dictionary wrappers over a set of vectors when +/// wrapping these inside another dictionary. When multiple wrapped +/// vectors with the same wrapping get re-wrapped, we replace the +/// wrapper with a composition of the two dictionaries. This needs to +/// be done once per distinct wrapper in the input. WrapState records +/// the compositions that are already made. +struct WrapState { + // Records wrap nulls added in wrapping. If wrap nulls are added, the same + // wrap nulls must be applied to all columns. + Buffer* nulls; + + // Set of distinct wrappers in input, each mapped to the wrap + // indices combining the former with the new wrap. + + folly::F14FastMap transposeResults; +}; + +/// Wraps 'inputVector' with 'wrapIndices' and +/// 'wrapNulls'. 'wrapSize' is the size of of 'wrapIndices' and of +/// the resulting vector. Dictionary combining is deduplicated using +/// 'wrapState'. If the same indices are added on top of dictionary +/// encoded vectors sharing the same wrapping, the resulting vectors +/// will share the same composition of the original wrap and +/// 'wrapIndices'. +VectorPtr wrapOne( + vector_size_t wrapSize, + BufferPtr wrapIndices, + const VectorPtr& inputVector, + BufferPtr wrapNulls, + WrapState& wrapState); + // Ensures that all LazyVectors reachable from 'input' are loaded for all rows. void loadColumns(const RowVectorPtr& input, core::ExecCtx& execCtx); @@ -156,6 +187,29 @@ void projectChildren( int32_t size, const BufferPtr& mapping); +/// Projects children of 'src' row vector to 'dest' row vector +/// according to 'projections' and 'mapping'. 'size' specifies number +/// of projected rows in 'dest'. 'state' is used to +/// deduplicate dictionary merging when applying the same dictionary +/// over more than one identical set of indices. +void projectChildren( + std::vector& projectedChildren, + const RowVectorPtr& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state); + +/// Overload of the above function that takes reference to const vector of +/// VectorPtr as 'src' argument, instead of row vector. +void projectChildren( + std::vector& projectedChildren, + const std::vector& src, + const std::vector& projections, + int32_t size, + const BufferPtr& mapping, + WrapState* state); + using BlockedOperatorCb = std::function; /// An operator that blocks until the blockedCb tells it not. blockedCb is diff --git a/velox/exec/tests/OperatorUtilsTest.cpp b/velox/exec/tests/OperatorUtilsTest.cpp index 5d4bab633b4b..fe3177da0016 100644 --- a/velox/exec/tests/OperatorUtilsTest.cpp +++ b/velox/exec/tests/OperatorUtilsTest.cpp @@ -531,3 +531,131 @@ TEST_F(OperatorUtilsTest, outputBatchRows) { ASSERT_EQ(1000, mockOp.outputRows(3'000'000'000)); } } + +TEST_F(OperatorUtilsTest, wrapMany) { + // Creates a RowVector with nullable and non-null vectors sharing + // different dictionary wraps. Rewraps these with a new wrap with + // and without nulls. Checks that the outcome has a single level of + // wrapping that combines the dictionaries and nulls and keeps the + // new wraps deduplicated where possible. + constexpr int32_t kSize = 1001; + auto indices1 = makeIndices(kSize, [](vector_size_t i) { return i; }); + auto indices2 = makeIndicesInReverse(kSize); + auto indices3 = makeIndicesInReverse(kSize); + auto wrapNulls = AlignedBuffer::allocate( + bits::nwords(kSize), pool_.get(), bits::kNotNull64); + for (auto i = 0; i < kSize; i += 5) { + bits::setNull(wrapNulls->asMutable(), i); + } + // Test dataset: *_a has no nulls, *_b has nulls. plain* is not wrapped. + // wrapped1* is wrapped in one dict, wrapped2* is wrapped in another, + // wrapped3* is wrapped in a dictionary that adds nulls. + auto row = makeRowVector( + {"plain_a", + "plain_b", + "wrapped1_a", + "wrapped1_b", + "wrapped2_a", + "wrapped2_b", + "wrapped3_a", + "wrapped3_b"}, + + {// plain_a + makeFlatVector(kSize, [](auto i) { return i; }), + // plain_b + makeFlatVector( + kSize, [](auto i) { return i; }, [](auto i) { return i % 4 == 0; }), + + // wrapped1-a + BaseVector::wrapInDictionary( + nullptr, + indices1, + kSize, + makeFlatVector(kSize, [](auto i) { return i; })), + // wrapped1_b + BaseVector::wrapInDictionary( + nullptr, + indices1, + kSize, + makeFlatVector( + kSize, + [](auto i) { return i; }, + [](auto i) { return i % 4 == 0; })), + + // wrapped2-a + BaseVector::wrapInDictionary( + nullptr, + indices2, + kSize, + makeFlatVector(kSize, [](auto i) { return i; })), + // wrapped2_b + BaseVector::wrapInDictionary( + nullptr, + indices2, + kSize, + makeFlatVector( + kSize, + [](auto i) { return i; }, + [](auto i) { return i % 4 == 0; })), + // wrapped3-a + BaseVector::wrapInDictionary( + wrapNulls, + indices3, + kSize, + makeFlatVector(kSize, [](auto i) { return i; })), + // wrapped3_b + BaseVector::wrapInDictionary( + wrapNulls, + indices3, + kSize, + makeFlatVector( + kSize, + [](auto i) { return i; }, + [](auto i) { return i % 4 == 0; })) + + }); + auto rowType = row->type(); + std::vector identicalProjections{}; + for (auto i = 0; i < rowType->size(); ++i) { + identicalProjections.emplace_back(i, i); + } + + // Now wrap 'row' in 'newIndices' keeping wraps to one level and deduplicating + // dictionary transposes. + auto newIndices = makeIndicesInReverse(kSize); + WrapState state; + std::vector projected(rowType->size()); + projectChildren( + projected, row, identicalProjections, kSize, newIndices, &state); + auto result = makeRowVector(projected); + for (auto i = 0; i < kSize; ++i) { + EXPECT_TRUE( + row->equalValueAt(result.get(), i, newIndices->as()[i])); + } + + // The two unwrapped columns get 'newIndices' directly. + EXPECT_EQ(projected[0]->wrapInfo(), newIndices); + EXPECT_EQ(projected[1]->wrapInfo(), newIndices); + + // The next two have the same wrapper and this is now combined with newIndices + // and used twice. + EXPECT_NE(projected[2]->wrapInfo(), newIndices); + EXPECT_NE(projected[2]->wrapInfo(), indices2); + EXPECT_EQ(projected[2]->wrapInfo(), projected[3]->wrapInfo()); + + // The next two share a different wrapper. + EXPECT_NE(projected[3]->wrapInfo(), projected[4]->wrapInfo()); + EXPECT_EQ(projected[4]->wrapInfo(), projected[5]->wrapInfo()); + + // The next two columns have nulls from their wrapper and thus they each get + // their own wrappers. + EXPECT_NE(projected[6]->wrapInfo(), projected[7]->wrapInfo()); + + // All columns have one level of wrapping. + EXPECT_EQ( + projected[2]->valueVector()->encoding(), VectorEncoding::Simple::FLAT); + EXPECT_EQ( + projected[4]->valueVector()->encoding(), VectorEncoding::Simple::FLAT); + EXPECT_EQ( + projected[6]->valueVector()->encoding(), VectorEncoding::Simple::FLAT); +} diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index 180e4c33336f..d65db337726c 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -170,37 +170,27 @@ VectorPtr BaseVector::wrapInDictionary( return result; } -template -static VectorPtr -addSequence(BufferPtr lengths, vector_size_t size, VectorPtr vector) { - auto base = vector.get(); - auto pool = base->pool(); - auto lsize = lengths->size(); - return std::make_shared< - SequenceVector::WrapperType>>( - pool, - size, - std::move(vector), - std::move(lengths), - SimpleVectorStats::WrapperType>{}, - std::nullopt /*distinctCount*/, - std::nullopt, - false /*sorted*/, - base->representedBytes().has_value() - ? std::optional( - base->representedBytes().value() * size / - (1 + (lsize / sizeof(vector_size_t)))) - : std::nullopt); -} - // static VectorPtr BaseVector::wrapInSequence( BufferPtr lengths, vector_size_t size, VectorPtr vector) { - auto kind = vector->typeKind(); - return VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - addSequence, kind, std::move(lengths), size, std::move(vector)); + const auto numLengths = lengths->size() / sizeof(vector_size_t); + int64_t numIndices = 0; + auto* rawLengths = lengths->as(); + for (auto i = 0; i < numLengths; ++i) { + numIndices += rawLengths[i]; + } + VELOX_CHECK_LT(numIndices, std::numeric_limits::max()); + BufferPtr indices = + AlignedBuffer::allocate(numIndices, vector->pool()); + auto* rawIndices = indices->asMutable(); + int32_t fill = 0; + for (auto i = 0; i < numLengths; ++i) { + std::fill(rawIndices + fill, rawIndices + fill + rawLengths[i], i); + fill += rawLengths[i]; + } + return wrapInDictionary(nullptr, indices, numIndices, vector); } template @@ -1007,6 +997,102 @@ std::string printIndices( return out.str(); } +// static +void BaseVector::transposeIndices( + const vector_size_t* baseIndices, + vector_size_t wrapSize, + const vector_size_t* wrapIndices, + vector_size_t* resultIndices) { + constexpr int32_t kBatch = xsimd::batch::size; + static_assert(kBatch == 8); + static_assert(sizeof(vector_size_t) == sizeof(int32_t)); + int32_t i = 0; + for (; i + kBatch <= wrapSize; i += kBatch) { + auto indexBatch = xsimd::load_unaligned(wrapIndices + i); + simd::gather(baseIndices, indexBatch).store_unaligned(resultIndices + i); + } + if (i < wrapSize) { + auto indexBatch = xsimd::load_unaligned(wrapIndices + i); + auto mask = simd::leadingMask(wrapSize - i); + simd::maskGather( + xsimd::batch::broadcast(0), mask, baseIndices, indexBatch) + .store_unaligned(resultIndices + i); + } +} + +// static +void BaseVector::transposeIndicesWithNulls( + const vector_size_t* baseIndices, + const uint64_t* baseNulls, + vector_size_t wrapSize, + const vector_size_t* wrapIndices, + const uint64_t* wrapNulls, + vector_size_t* resultIndices, + uint64_t* resultNulls) { + constexpr int32_t kBatch = xsimd::batch::size; + static_assert(kBatch == 8); + static_assert(sizeof(vector_size_t) == sizeof(int32_t)); + for (auto i = 0; i < wrapSize; i += kBatch) { + auto indexBatch = xsimd::load_unaligned(wrapIndices + i); + uint8_t wrapNullsByte = + i + kBatch > wrapSize ? bits::lowMask(wrapSize - i) : 0xff; + + if (wrapNulls) { + wrapNullsByte &= reinterpret_cast(wrapNulls)[i / 8]; + } + if (wrapNullsByte != 0xff) { + // Zero out indices at null positions. + auto mask = simd::fromBitMask(wrapNullsByte); + indexBatch = indexBatch & + xsimd::load_unaligned(reinterpret_cast(&mask)); + } + if (baseNulls) { + uint8_t baseNullBits = simd::gather8Bits(baseNulls, indexBatch, 8); + wrapNullsByte &= baseNullBits; + } + reinterpret_cast(resultNulls)[i / 8] = wrapNullsByte; + simd::gather(baseIndices, indexBatch) + .store_unaligned(resultIndices + i); + } +} + +// static +void BaseVector::transposeDictionaryValues( + vector_size_t wrapSize, + BufferPtr& wrapNulls, + BufferPtr& wrapIndices, + std::shared_ptr& dictionaryValues) { + if (!wrapIndices->unique()) { + wrapIndices = AlignedBuffer::copy(dictionaryValues->pool(), wrapIndices); + } + auto* rawBaseNulls = dictionaryValues->rawNulls(); + auto baseIndices = dictionaryValues->wrapInfo(); + if (!rawBaseNulls && !wrapNulls) { + transposeIndices( + baseIndices->as(), + wrapSize, + wrapIndices->as(), + wrapIndices->asMutable()); + } else { + BufferPtr newNulls; + if (!wrapNulls || !wrapNulls->unique()) { + newNulls = AlignedBuffer::allocate( + wrapSize, dictionaryValues->pool(), bits::kNull); + } else { + newNulls = wrapNulls; + } + transposeIndicesWithNulls( + baseIndices->as(), + rawBaseNulls, + wrapSize, + wrapIndices->as(), + wrapNulls ? wrapNulls->as() : nullptr, + wrapIndices->asMutable(), + newNulls->asMutable()); + } + dictionaryValues = dictionaryValues->valueVector(); +} + template bool isAllSameFlat(const BaseVector& vector, vector_size_t size) { using T = typename KindToFlatVector::WrapperType; diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index 883ee403cfb2..65e014a193c0 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -540,12 +540,49 @@ class BaseVector { /// length. virtual VectorPtr slice(vector_size_t offset, vector_size_t length) const = 0; - /// Returns a vector of the type of 'source' where 'indices' contains - /// an index into 'source' for each element of 'source'. The - /// resulting vector has position i set to source[i]. This is - /// equivalent to wrapping 'source' in a dictionary with 'indices' - /// but this may reuse structure if said structure is uniquely owned - /// or if a copy is more efficient than dictionary wrapping. + /// Transposes two sets of dictionary indices into one level of indirection. + /// Sets result[i] = base[indices[i]] for i = 0 ... i < size. + static void transposeIndices( + const vector_size_t* base, + vector_size_t size, + const vector_size_t* indices, + vector_size_t* result); + + /// Transposes two levels of indices into a single level with nulls. sets + /// result[i] = base[indices[i]] where i is not null in 'wrapNulls' and + /// indices[i] is not null in 'baseNulls'. If indices[i] is null in + /// 'baseNulls' or i is null in 'wrapNulls', then 'resultNulls' is null at i. + /// 'wrapNulls' may be nullptr, meaning that no new nulls are added. + static void transposeIndicesWithNulls( + const vector_size_t* baseIndices, + const uint64_t* baseNulls, + vector_size_t wrapSize, + const vector_size_t* wrapIndices, + const uint64_t* wrapNulls, + vector_size_t* resultIndices, + uint64_t* resultNulls); + + /// Flattens 'dictionaryValues', which is a dictionary and replaces + /// it with its base. 'size' is the number of valid elements in + /// 'indices' and 'nulls'. Null positions may have an invalid + /// index. Rewrites 'indices' from being indices into + /// 'dictionaryValues' to being indices into the latter's + /// base. Rewrites 'nulls' to be nulls from 'dictionaryValues' and + /// its base vector. This is used when a dictionary vector loads a + /// lazy values vector and finds out that the loaded is itself a + /// dictionary. + static void transposeDictionaryValues( + vector_size_t wrapSize, + BufferPtr& wrapNulls, + BufferPtr& wrapIndices, + std::shared_ptr& dictionaryValues); + + // Returns a vector of the type of 'source' where 'indices' contains + // an index into 'source' for each element of 'source'. The + // resulting vector has position i set to source[i]. This is + // equivalent to wrapping 'source' in a dictionary with 'indices' + // but this may reuse structure if said structure is uniquely owned + // or if a copy is more efficient than dictionary wrapping. static VectorPtr transpose(BufferPtr indices, VectorPtr&& source); static VectorPtr createConstant(