From 9c045e8932974f9ba11e812ff7b3d734313db42f Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 23 May 2024 08:27:16 +0800 Subject: [PATCH] support explode outer --- velox/core/PlanNode.cpp | 11 ++- velox/core/PlanNode.h | 19 ++++- velox/exec/Unnest.cpp | 105 +++++++++++++++++-------- velox/exec/Unnest.h | 8 ++ velox/exec/tests/UnnestTest.cpp | 41 ++++++++++ velox/exec/tests/utils/PlanBuilder.cpp | 6 +- velox/exec/tests/utils/PlanBuilder.h | 5 +- 7 files changed, 154 insertions(+), 41 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index cdb724f1ef0f..9770265704cc 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -833,12 +833,14 @@ UnnestNode::UnnestNode( std::vector unnestVariables, const std::vector& unnestNames, const std::optional& ordinalityName, - const PlanNodePtr& source) + const PlanNodePtr& source, + const bool isOuter) : PlanNode(id), replicateVariables_{std::move(replicateVariables)}, unnestVariables_{std::move(unnestVariables)}, withOrdinality_{ordinalityName.has_value()}, - sources_{source} { + sources_{source}, + isOuter_{isOuter} { // Calculate output type. First come "replicate" columns, followed by // "unnest" columns, followed by an optional ordinality column. std::vector names; @@ -899,6 +901,7 @@ folly::dynamic UnnestNode::serialize() const { if (withOrdinality_) { obj["ordinalityName"] = outputType()->names().back(); } + obj["isOuter"] = isOuter_; return obj; } @@ -913,6 +916,7 @@ PlanNodePtr UnnestNode::create(const folly::dynamic& obj, void* context) { if (obj.count("ordinalityName")) { ordinalityName = obj["ordinalityName"].asString(); } + bool isOuter = obj["isOuter"].asBool(); return std::make_shared( deserializePlanNodeId(obj), @@ -920,7 +924,8 @@ PlanNodePtr UnnestNode::create(const folly::dynamic& obj, void* context) { std::move(unnestVariables), std::move(unnestNames), ordinalityName, - std::move(source)); + std::move(source), + isOuter); } AbstractJoinNode::AbstractJoinNode( diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index b4449989b8d5..0bf68bca0536 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -1950,13 +1950,25 @@ class UnnestNode : public PlanNode { /// names must appear in the same order as unnestVariables. /// @param ordinalityName Optional name for the ordinality columns. If not /// present, ordinality column is not produced. + /// @param isOuter If true, emit null data for empty array/map or array/map + /// with null elements. Used in SparkSQL's explode_outer. UnnestNode( const PlanNodeId& id, std::vector replicateVariables, std::vector unnestVariables, const std::vector& unnestNames, const std::optional& ordinalityName, - const PlanNodePtr& source); + const PlanNodePtr& source, + const bool isOuter); + + UnnestNode( + const PlanNodeId& id, + std::vector replicateVariables, + std::vector unnestVariables, + const std::vector& unnestNames, + const std::optional& ordinalityName, + const PlanNodePtr& source): + UnnestNode(id, replicateVariables, unnestVariables, unnestNames, ordinalityName, source, false) {} /// The order of columns in the output is: replicated columns (in the order /// specified), unnested columns (in the order specified, for maps: key @@ -1981,6 +1993,10 @@ class UnnestNode : public PlanNode { return withOrdinality_; } + bool isOuter() const { + return isOuter_; + } + std::string_view name() const override { return "Unnest"; } @@ -1996,6 +2012,7 @@ class UnnestNode : public PlanNode { const std::vector unnestVariables_; const bool withOrdinality_; const std::vector sources_; + const bool isOuter_; RowTypePtr outputType_; }; diff --git a/velox/exec/Unnest.cpp b/velox/exec/Unnest.cpp index 5e35a51675cd..324ee6c96d9b 100644 --- a/velox/exec/Unnest.cpp +++ b/velox/exec/Unnest.cpp @@ -30,7 +30,8 @@ Unnest::Unnest( unnestNode->id(), "Unnest"), withOrdinality_(unnestNode->withOrdinality()), - maxOutputSize_(outputBatchRows()) { + maxOutputSize_(outputBatchRows()), + isOuter_(unnestNode->isOuter()) { const auto& inputType = unnestNode->sources()[0]->outputType(); const auto& unnestVariables = unnestNode->unnestVariables(); for (const auto& variable : unnestVariables) { @@ -73,36 +74,12 @@ void Unnest::addInput(RowVectorPtr input) { rawOffsets_.resize(unnestChannels_.size()); rawIndices_.resize(unnestChannels_.size()); - for (auto channel = 0; channel < unnestChannels_.size(); ++channel) { - const auto& unnestVector = input_->childAt(unnestChannels_[channel]); - - auto& currentDecoded = unnestDecoded_[channel]; - currentDecoded.decode(*unnestVector); - - rawIndices_[channel] = currentDecoded.indices(); - - if (unnestVector->typeKind() == TypeKind::ARRAY) { - const auto* unnestBaseArray = currentDecoded.base()->as(); - rawSizes_[channel] = unnestBaseArray->rawSizes(); - rawOffsets_[channel] = unnestBaseArray->rawOffsets(); - } else { - VELOX_CHECK(unnestVector->typeKind() == TypeKind::MAP); - const auto* unnestBaseMap = currentDecoded.base()->as(); - rawSizes_[channel] = unnestBaseMap->rawSizes(); - rawOffsets_[channel] = unnestBaseMap->rawOffsets(); - } - - // Count max number of elements per row. - auto* currentSizes = rawSizes_[channel]; - auto* currentIndices = rawIndices_[channel]; - for (auto row = 0; row < size; ++row) { - if (!currentDecoded.isNullAt(row)) { - const auto unnestSize = currentSizes[currentIndices[row]]; - if (rawMaxSizes_[row] < unnestSize) { - rawMaxSizes_[row] = unnestSize; - } - } - } + // Count max number of elements per row. + if (isOuter_) { + rawOrdinalityIsNull_.resize(size, false); + countMaxNumElementsPerRow(size); + } else { + countMaxNumElementsPerRow(size); } } @@ -126,7 +103,13 @@ RowVectorPtr Unnest::getOutput() { return nullptr; } - const auto output = generateOutput(rowRange); + RowVectorPtr output = nullptr; + if (isOuter_) { + output = generateOutput(rowRange); + } else { + output = generateOutput(rowRange); + } + if (rowRange.lastRowEnd.has_value()) { // The last row is not processed completely. firstRowStart_ = rowRange.lastRowEnd.value(); @@ -258,6 +241,7 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel( return {elementIndices, nulls, identityMapping}; } +template VectorPtr Unnest::generateOrdinalityVector(const RowRange& range) { auto ordinalityVector = BaseVector::create>( BIGINT(), range.numElements, pool()); @@ -268,10 +252,17 @@ VectorPtr Unnest::generateOrdinalityVector(const RowRange& range) { VELOX_DCHECK_GT(range.size, 0); + vector_size_t index = 0; range.forEachRow( - [&](vector_size_t /*row*/, vector_size_t start, vector_size_t size) { + [&](vector_size_t row, vector_size_t start, vector_size_t size) { + if constexpr(isOuter) { + if (rawOrdinalityIsNull_[row]) { + ordinalityVector->setNull(index, true); + } + } std::iota(rawOrdinality, rawOrdinality + size, start + 1); rawOrdinality += size; + index += size; }, rawMaxSizes_, firstRowStart_); @@ -279,6 +270,7 @@ VectorPtr Unnest::generateOrdinalityVector(const RowRange& range) { return ordinalityVector; } +template RowVectorPtr Unnest::generateOutput(const RowRange& range) { std::vector outputs(outputType_->size()); generateRepeatedColumns(range, outputs); @@ -309,7 +301,7 @@ RowVectorPtr Unnest::generateOutput(const RowRange& range) { if (withOrdinality_) { // Ordinality column is always at the end. - outputs.back() = generateOrdinalityVector(range); + outputs.back() = generateOrdinalityVector(range); } return std::make_shared( @@ -320,6 +312,51 @@ RowVectorPtr Unnest::generateOutput(const RowRange& range) { std::move(outputs)); } + +template +void Unnest::countMaxNumElementsPerRow(int32_t size) { + for (auto channel = 0; channel < unnestChannels_.size(); ++channel) { + const auto& unnestVector = input_->childAt(unnestChannels_[channel]); + + auto& currentDecoded = unnestDecoded_[channel]; + currentDecoded.decode(*unnestVector); + + rawIndices_[channel] = currentDecoded.indices(); + + if (unnestVector->typeKind() == TypeKind::ARRAY) { + const auto* unnestBaseArray = currentDecoded.base()->as(); + rawSizes_[channel] = unnestBaseArray->rawSizes(); + rawOffsets_[channel] = unnestBaseArray->rawOffsets(); + } else { + VELOX_CHECK(unnestVector->typeKind() == TypeKind::MAP); + const auto* unnestBaseMap = currentDecoded.base()->as(); + rawSizes_[channel] = unnestBaseMap->rawSizes(); + rawOffsets_[channel] = unnestBaseMap->rawOffsets(); + } + + // Count max number of elements per row. + auto* currentSizes = rawSizes_[channel]; + auto* currentIndices = rawIndices_[channel]; + for (auto row = 0; row < size; ++row) { + if (!currentDecoded.isNullAt(row)) { + auto unnestSize = currentSizes[currentIndices[row]]; + if constexpr (isOuter) { + if (unnestSize == 0) { + unnestSize = 1; + rawOrdinalityIsNull_[row] = true; + } + } + if (rawMaxSizes_[row] < unnestSize) { + rawMaxSizes_[row] = unnestSize; + } + } else if constexpr (isOuter) { + rawMaxSizes_[row] = 1; + rawOrdinalityIsNull_[row] = true; + } + } + } +} + VectorPtr Unnest::UnnestChannelEncoding::wrap( const VectorPtr& base, vector_size_t wrapSize) const { diff --git a/velox/exec/Unnest.h b/velox/exec/Unnest.h index c89766bcd139..879600bd46d7 100644 --- a/velox/exec/Unnest.h +++ b/velox/exec/Unnest.h @@ -98,6 +98,7 @@ class Unnest : public Operator { // Generate output for 'rowRange' represented rows. // @param rowRange Range of rows to process. + template RowVectorPtr generateOutput(const RowRange& rowRange); // Invoked by generateOutput function above to generate the repeated output @@ -106,6 +107,9 @@ class Unnest : public Operator { const RowRange& rowRange, std::vector& outputs); + template + void countMaxNumElementsPerRow(int32_t size); + struct UnnestChannelEncoding { BufferPtr indices; BufferPtr nulls; @@ -121,6 +125,7 @@ class Unnest : public Operator { const RowRange& rowRange); // Invoked by generateOutput for the ordinality column. + template VectorPtr generateOrdinalityVector(const RowRange& rowRange); const bool withOrdinality_; @@ -142,5 +147,8 @@ class Unnest : public Operator { // Next 'input_' row to process in getOutput(). vector_size_t nextInputRow_{0}; + + std::vector rawOrdinalityIsNull_; + const bool isOuter_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/UnnestTest.cpp b/velox/exec/tests/UnnestTest.cpp index bef3e44e7920..b226d26793ab 100644 --- a/velox/exec/tests/UnnestTest.cpp +++ b/velox/exec/tests/UnnestTest.cpp @@ -474,6 +474,47 @@ TEST_P(UnnestTest, batchSize) { ASSERT_EQ(expectedNumVectors, stats.at(unnestId).outputVectors); } +TEST_P(UnnestTest, basicArrayWithOuter) { + auto vector = makeRowVector({ + makeFlatVector({1, 2, 3}), + makeNullableArrayVector({ + {1, 2, std::nullopt}, + {}, + {3} + }), + }); + + createDuckDbTable({vector}); + + // is_outer = false + auto op1 = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}, std::nullopt, false /* is_outer */).planNode(); + auto params1 = makeCursorParameters(op1); + auto expected1 = makeRowVector({ + makeFlatVector({1, 1, 1, 3}), + makeNullableFlatVector({1, 2, std::nullopt, 3}), + }); + assertQuery(params1, expected1); + + // is_outer = true + auto op2 = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}, std::nullopt, true /* is_outer */).planNode(); + auto params2 = makeCursorParameters(op2); + auto expected2 = makeRowVector({ + makeFlatVector({1, 1, 1, 2, 3}), + makeNullableFlatVector({1, 2, std::nullopt, std::nullopt, 3}), + }); + assertQuery(params2, expected2); + + // ordinal = true && is_outer = true + auto op3 = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}, "ordinal", true /* is_outer */).planNode(); + auto params3 = makeCursorParameters(op3); + auto expected3 = makeRowVector({ + makeFlatVector({1, 1, 1, 2, 3}), + makeNullableFlatVector({1, 2, std::nullopt, std::nullopt, 3}), + makeNullableFlatVector({1, 2, 3, std::nullopt, 1}), + }); + assertQuery(params3, expected3); +} + VELOX_INSTANTIATE_TEST_SUITE_P( UnnestTest, UnnestTest, diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index e95ea01f101e..fd3f400ad7c8 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1517,7 +1517,8 @@ PlanBuilder& PlanBuilder::nestedLoopJoin( PlanBuilder& PlanBuilder::unnest( const std::vector& replicateColumns, const std::vector& unnestColumns, - const std::optional& ordinalColumn) { + const std::optional& ordinalColumn, + bool isOuter) { VELOX_CHECK_NOT_NULL(planNode_, "Unnest cannot be the source node"); std::vector> replicateFields; @@ -1553,7 +1554,8 @@ PlanBuilder& PlanBuilder::unnest( unnestFields, unnestNames, ordinalColumn, - planNode_); + planNode_, + isOuter); return *this; } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index f380ef4f03c7..1a3cb6d2ad7b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -956,10 +956,13 @@ class PlanBuilder { /// @param ordinalColumn An optional name for the 'ordinal' column to produce. /// This column contains the index of the element of the unnested array or /// map. If not specified, the output will not contain this column. + /// @param isOuter If true, emit null data for empty array/map or array/map + /// with null elements. Used in SparkSQL's explode_outer. PlanBuilder& unnest( const std::vector& replicateColumns, const std::vector& unnestColumns, - const std::optional& ordinalColumn = std::nullopt); + const std::optional& ordinalColumn = std::nullopt, + bool isOuter = false); /// Add a WindowNode to compute one or more windowFunctions. /// @param windowFunctions A list of one or more window function SQL like