Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Spark explode outer #11954

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,12 +833,14 @@ UnnestNode::UnnestNode(
std::vector<FieldAccessTypedExprPtr> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const PlanNodePtr& source)
const PlanNodePtr& source,
const bool isOuter)
: PlanNode(id),
replicateVariables_{std::move(replicateVariables)},
unnestVariables_{std::move(unnestVariables)},
withOrdinality_{ordinalityName.has_value()},
sources_{source} {
sources_{source},
isOuter_{isOuter} {
// Calculate output type. First come "replicate" columns, followed by
// "unnest" columns, followed by an optional ordinality column.
std::vector<std::string> names;
Expand Down Expand Up @@ -899,6 +901,7 @@ folly::dynamic UnnestNode::serialize() const {
if (withOrdinality_) {
obj["ordinalityName"] = outputType()->names().back();
}
obj["isOuter"] = isOuter_;
return obj;
}

Expand All @@ -913,14 +916,16 @@ PlanNodePtr UnnestNode::create(const folly::dynamic& obj, void* context) {
if (obj.count("ordinalityName")) {
ordinalityName = obj["ordinalityName"].asString();
}
bool isOuter = obj["isOuter"].asBool();

return std::make_shared<UnnestNode>(
deserializePlanNodeId(obj),
std::move(replicateVariables),
std::move(unnestVariables),
std::move(unnestNames),
ordinalityName,
std::move(source));
std::move(source),
isOuter);
}

AbstractJoinNode::AbstractJoinNode(
Expand Down
19 changes: 18 additions & 1 deletion velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1950,13 +1950,25 @@ class UnnestNode : public PlanNode {
/// names must appear in the same order as unnestVariables.
/// @param ordinalityName Optional name for the ordinality columns. If not
/// present, ordinality column is not produced.
/// @param isOuter If true, emit null data for empty array/map or array/map
/// with null elements. Used in SparkSQL's explode_outer.
UnnestNode(
const PlanNodeId& id,
std::vector<FieldAccessTypedExprPtr> replicateVariables,
std::vector<FieldAccessTypedExprPtr> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const PlanNodePtr& source);
const PlanNodePtr& source,
const bool isOuter);

UnnestNode(
const PlanNodeId& id,
std::vector<FieldAccessTypedExprPtr> replicateVariables,
std::vector<FieldAccessTypedExprPtr> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const PlanNodePtr& source):
UnnestNode(id, replicateVariables, unnestVariables, unnestNames, ordinalityName, source, false) {}

/// The order of columns in the output is: replicated columns (in the order
/// specified), unnested columns (in the order specified, for maps: key
Expand All @@ -1981,6 +1993,10 @@ class UnnestNode : public PlanNode {
return withOrdinality_;
}

bool isOuter() const {
return isOuter_;
}

std::string_view name() const override {
return "Unnest";
}
Expand All @@ -1996,6 +2012,7 @@ class UnnestNode : public PlanNode {
const std::vector<FieldAccessTypedExprPtr> unnestVariables_;
const bool withOrdinality_;
const std::vector<PlanNodePtr> sources_;
const bool isOuter_;
RowTypePtr outputType_;
};

Expand Down
105 changes: 71 additions & 34 deletions velox/exec/Unnest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Unnest::Unnest(
unnestNode->id(),
"Unnest"),
withOrdinality_(unnestNode->withOrdinality()),
maxOutputSize_(outputBatchRows()) {
maxOutputSize_(outputBatchRows()),
isOuter_(unnestNode->isOuter()) {
const auto& inputType = unnestNode->sources()[0]->outputType();
const auto& unnestVariables = unnestNode->unnestVariables();
for (const auto& variable : unnestVariables) {
Expand Down Expand Up @@ -73,36 +74,12 @@ void Unnest::addInput(RowVectorPtr input) {
rawOffsets_.resize(unnestChannels_.size());
rawIndices_.resize(unnestChannels_.size());

for (auto channel = 0; channel < unnestChannels_.size(); ++channel) {
const auto& unnestVector = input_->childAt(unnestChannels_[channel]);

auto& currentDecoded = unnestDecoded_[channel];
currentDecoded.decode(*unnestVector);

rawIndices_[channel] = currentDecoded.indices();

if (unnestVector->typeKind() == TypeKind::ARRAY) {
const auto* unnestBaseArray = currentDecoded.base()->as<ArrayVector>();
rawSizes_[channel] = unnestBaseArray->rawSizes();
rawOffsets_[channel] = unnestBaseArray->rawOffsets();
} else {
VELOX_CHECK(unnestVector->typeKind() == TypeKind::MAP);
const auto* unnestBaseMap = currentDecoded.base()->as<MapVector>();
rawSizes_[channel] = unnestBaseMap->rawSizes();
rawOffsets_[channel] = unnestBaseMap->rawOffsets();
}

// Count max number of elements per row.
auto* currentSizes = rawSizes_[channel];
auto* currentIndices = rawIndices_[channel];
for (auto row = 0; row < size; ++row) {
if (!currentDecoded.isNullAt(row)) {
const auto unnestSize = currentSizes[currentIndices[row]];
if (rawMaxSizes_[row] < unnestSize) {
rawMaxSizes_[row] = unnestSize;
}
}
}
// Count max number of elements per row.
if (isOuter_) {
rawOrdinalityIsNull_.resize(size, false);
countMaxNumElementsPerRow<true>(size);
} else {
countMaxNumElementsPerRow<false>(size);
}
}

Expand All @@ -126,7 +103,13 @@ RowVectorPtr Unnest::getOutput() {
return nullptr;
}

const auto output = generateOutput(rowRange);
RowVectorPtr output = nullptr;
if (isOuter_) {
output = generateOutput<true>(rowRange);
} else {
output = generateOutput<false>(rowRange);
}

if (rowRange.lastRowEnd.has_value()) {
// The last row is not processed completely.
firstRowStart_ = rowRange.lastRowEnd.value();
Expand Down Expand Up @@ -258,6 +241,7 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
return {elementIndices, nulls, identityMapping};
}

template <bool isOuter>
VectorPtr Unnest::generateOrdinalityVector(const RowRange& range) {
auto ordinalityVector = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), range.numElements, pool());
Expand All @@ -268,17 +252,25 @@ VectorPtr Unnest::generateOrdinalityVector(const RowRange& range) {

VELOX_DCHECK_GT(range.size, 0);

vector_size_t index = 0;
range.forEachRow(
[&](vector_size_t /*row*/, vector_size_t start, vector_size_t size) {
[&](vector_size_t row, vector_size_t start, vector_size_t size) {
if constexpr(isOuter) {
if (rawOrdinalityIsNull_[row]) {
ordinalityVector->setNull(index, true);
}
}
std::iota(rawOrdinality, rawOrdinality + size, start + 1);
rawOrdinality += size;
index += size;
},
rawMaxSizes_,
firstRowStart_);

return ordinalityVector;
}

template <bool isOuter>
RowVectorPtr Unnest::generateOutput(const RowRange& range) {
std::vector<VectorPtr> outputs(outputType_->size());
generateRepeatedColumns(range, outputs);
Expand Down Expand Up @@ -309,7 +301,7 @@ RowVectorPtr Unnest::generateOutput(const RowRange& range) {

if (withOrdinality_) {
// Ordinality column is always at the end.
outputs.back() = generateOrdinalityVector(range);
outputs.back() = generateOrdinalityVector<isOuter>(range);
}

return std::make_shared<RowVector>(
Expand All @@ -320,6 +312,51 @@ RowVectorPtr Unnest::generateOutput(const RowRange& range) {
std::move(outputs));
}


template <bool isOuter>
void Unnest::countMaxNumElementsPerRow(int32_t size) {
for (auto channel = 0; channel < unnestChannels_.size(); ++channel) {
const auto& unnestVector = input_->childAt(unnestChannels_[channel]);

auto& currentDecoded = unnestDecoded_[channel];
currentDecoded.decode(*unnestVector);

rawIndices_[channel] = currentDecoded.indices();

if (unnestVector->typeKind() == TypeKind::ARRAY) {
const auto* unnestBaseArray = currentDecoded.base()->as<ArrayVector>();
rawSizes_[channel] = unnestBaseArray->rawSizes();
rawOffsets_[channel] = unnestBaseArray->rawOffsets();
} else {
VELOX_CHECK(unnestVector->typeKind() == TypeKind::MAP);
const auto* unnestBaseMap = currentDecoded.base()->as<MapVector>();
rawSizes_[channel] = unnestBaseMap->rawSizes();
rawOffsets_[channel] = unnestBaseMap->rawOffsets();
}

// Count max number of elements per row.
auto* currentSizes = rawSizes_[channel];
auto* currentIndices = rawIndices_[channel];
for (auto row = 0; row < size; ++row) {
if (!currentDecoded.isNullAt(row)) {
auto unnestSize = currentSizes[currentIndices[row]];
if constexpr (isOuter) {
if (unnestSize == 0) {
unnestSize = 1;
rawOrdinalityIsNull_[row] = true;
}
}
if (rawMaxSizes_[row] < unnestSize) {
rawMaxSizes_[row] = unnestSize;
}
} else if constexpr (isOuter) {
rawMaxSizes_[row] = 1;
rawOrdinalityIsNull_[row] = true;
}
}
}
}

VectorPtr Unnest::UnnestChannelEncoding::wrap(
const VectorPtr& base,
vector_size_t wrapSize) const {
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/Unnest.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Unnest : public Operator {

// Generate output for 'rowRange' represented rows.
// @param rowRange Range of rows to process.
template <bool isOuter>
RowVectorPtr generateOutput(const RowRange& rowRange);

// Invoked by generateOutput function above to generate the repeated output
Expand All @@ -106,6 +107,9 @@ class Unnest : public Operator {
const RowRange& rowRange,
std::vector<VectorPtr>& outputs);

template <bool isOuter>
void countMaxNumElementsPerRow(int32_t size);

struct UnnestChannelEncoding {
BufferPtr indices;
BufferPtr nulls;
Expand All @@ -121,6 +125,7 @@ class Unnest : public Operator {
const RowRange& rowRange);

// Invoked by generateOutput for the ordinality column.
template <bool isOuter>
VectorPtr generateOrdinalityVector(const RowRange& rowRange);

const bool withOrdinality_;
Expand All @@ -142,5 +147,8 @@ class Unnest : public Operator {

// Next 'input_' row to process in getOutput().
vector_size_t nextInputRow_{0};

std::vector<bool> rawOrdinalityIsNull_;
const bool isOuter_;
};
} // namespace facebook::velox::exec
41 changes: 41 additions & 0 deletions velox/exec/tests/UnnestTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,47 @@ TEST_P(UnnestTest, batchSize) {
ASSERT_EQ(expectedNumVectors, stats.at(unnestId).outputVectors);
}

TEST_P(UnnestTest, basicArrayWithOuter) {
auto vector = makeRowVector({
makeFlatVector<int64_t>({1, 2, 3}),
makeNullableArrayVector<int64_t>({
{1, 2, std::nullopt},
{},
{3}
}),
});

createDuckDbTable({vector});

// is_outer = false
auto op1 = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}, std::nullopt, false /* is_outer */).planNode();
auto params1 = makeCursorParameters(op1);
auto expected1 = makeRowVector({
makeFlatVector<int64_t>({1, 1, 1, 3}),
makeNullableFlatVector<int64_t>({1, 2, std::nullopt, 3}),
});
assertQuery(params1, expected1);

// is_outer = true
auto op2 = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}, std::nullopt, true /* is_outer */).planNode();
auto params2 = makeCursorParameters(op2);
auto expected2 = makeRowVector({
makeFlatVector<int64_t>({1, 1, 1, 2, 3}),
makeNullableFlatVector<int64_t>({1, 2, std::nullopt, std::nullopt, 3}),
});
assertQuery(params2, expected2);

// ordinal = true && is_outer = true
auto op3 = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}, "ordinal", true /* is_outer */).planNode();
auto params3 = makeCursorParameters(op3);
auto expected3 = makeRowVector({
makeFlatVector<int64_t>({1, 1, 1, 2, 3}),
makeNullableFlatVector<int64_t>({1, 2, std::nullopt, std::nullopt, 3}),
makeNullableFlatVector<int64_t>({1, 2, 3, std::nullopt, 1}),
});
assertQuery(params3, expected3);
}

VELOX_INSTANTIATE_TEST_SUITE_P(
UnnestTest,
UnnestTest,
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,8 @@ PlanBuilder& PlanBuilder::nestedLoopJoin(
PlanBuilder& PlanBuilder::unnest(
const std::vector<std::string>& replicateColumns,
const std::vector<std::string>& unnestColumns,
const std::optional<std::string>& ordinalColumn) {
const std::optional<std::string>& ordinalColumn,
bool isOuter) {
VELOX_CHECK_NOT_NULL(planNode_, "Unnest cannot be the source node");
std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>>
replicateFields;
Expand Down Expand Up @@ -1553,7 +1554,8 @@ PlanBuilder& PlanBuilder::unnest(
unnestFields,
unnestNames,
ordinalColumn,
planNode_);
planNode_,
isOuter);
return *this;
}

Expand Down
5 changes: 4 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -956,10 +956,13 @@ class PlanBuilder {
/// @param ordinalColumn An optional name for the 'ordinal' column to produce.
/// This column contains the index of the element of the unnested array or
/// map. If not specified, the output will not contain this column.
/// @param isOuter If true, emit null data for empty array/map or array/map
/// with null elements. Used in SparkSQL's explode_outer.
PlanBuilder& unnest(
const std::vector<std::string>& replicateColumns,
const std::vector<std::string>& unnestColumns,
const std::optional<std::string>& ordinalColumn = std::nullopt);
const std::optional<std::string>& ordinalColumn = std::nullopt,
bool isOuter = false);

/// Add a WindowNode to compute one or more windowFunctions.
/// @param windowFunctions A list of one or more window function SQL like
Expand Down