Skip to content

Commit

Permalink
[Improve][transforms-v2][SQL] Support can use column.a, column['a'], …
Browse files Browse the repository at this point in the history
…and column[0] to get the data
  • Loading branch information
Zhouwen-CN committed Sep 12, 2023
1 parent 7b5b28b commit 09956b2
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 4 deletions.
95 changes: 95 additions & 0 deletions docs/en/transform-v2/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,101 @@ sink {
}
```

## When Complex Types

Assuming this JSON from Kafka

```json
{
"c_string":"cs",
"c_array":[
"ca1",
"ca2",
"ca3"
],
"c_map":{
"cmk1":"cmv1",
"cmk2":"cmv2"
},
"c_row":{
"c_int":1,
"c_string":"cr1",
"next_row":{
"c_int":2,
"c_string":"cr2"
}
}
}
```

We use SQL query to transform the source data like this:

```
transform {
Sql {
source_table_name = "t1"
result_table_name = "t2"
query = "select c_string as c1,c_array[0] as c2,c_map['cmk1'] as c3,c_row.next_row.c_string as c4 from t1"
}
}
```

Then the data in result table `t2` will update to

| c1 | c2 | c3 | c4 |
|----|-----|------|-----|
| cs | ca1 | cmv1 | cr2 |

Use FakeSource replace KafkaSource for this Example with Fast Test:

```
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name="t1"
schema = {
fields {
c_string = "string"
c_array = "array<string>"
c_map = "map<string,string>"
c_row {
c_int = "int"
c_string = "string"
next_row {
c_int = "int"
c_string = "string"
}
}
}
}
rows = [
{
kind = INSERT
fields = ["cs",["ca1","ca2","ca3"],{"cmk1":"cmv1","cmk2":"cmv2"},[1,"cr1",[2,"cr2"]]]
}
]
}
}
transform {
Sql {
source_table_name = "t1"
result_table_name = "t2"
query = "select c_string as c1,c_array[0] as c2,c_map['cmk1'] as c3,c_row.next_row.c_string as c4 from t1"
}
}
sink {
Console {
source_table_name = "t2"
}
}
```

## Changelog

### new version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,8 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Container.ExecResult sqlAllColumns =
container.executeJob("/sql_transform/sql_all_columns.conf");
Assertions.assertEquals(0, sqlAllColumns.getExitCode());
Container.ExecResult sqlComplexType =
container.executeJob("/sql_transform/sql_complex_type.conf");
Assertions.assertEquals(0, sqlComplexType.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name="t1"
schema = {
fields {
c_string = "string"
c_array = "array<string>"
c_map = "map<string,string>"
c_row {
c_int = "int"
c_string = "string"
next_row {
c_int = "int"
c_string = "string"
}
}
}
}
rows = [
{
kind = INSERT
fields = ["cs",["ca1","ca2","ca3"],{"cmk1":"cmv1","cmk2":"cmv2"},[1,"cr1",[2,"cr2"]]]
}
]
}
}

transform {
Sql {
source_table_name = "t1"
result_table_name = "t2"
query = "select c_string as c1,c_array[0] as c2,c_map['cmk1'] as c3,c_row.next_row.c_string as c4 from t1"
}
}

sink {
Console {
source_table_name = "t2"
}
Assert {
source_table_name = "t2"
rules = {
field_rules = [
{
field_name = "c1"
field_type = "string"
field_value = [
{equals_to = "cs"}
]
},
{
field_name = "c2"
field_type = "string"
field_value = [
{equals_to = "ca1"}
]
},
{
field_name = "c3"
field_type = "string"
field_value = [
{equals_to = "cmv1"}
]
},
{
field_name = "c4"
field_type = "string"
field_value = [
{equals_to = "cr2"}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.seatunnel.transform.sql.zeta;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
Expand All @@ -28,6 +31,7 @@
import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction;

import net.sf.jsqlparser.expression.ArrayExpression;
import net.sf.jsqlparser.expression.BinaryExpression;
import net.sf.jsqlparser.expression.CastExpression;
import net.sf.jsqlparser.expression.DoubleValue;
Expand All @@ -47,11 +51,15 @@
import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.schema.Table;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ZetaSQLFunction {
// ============================internal functions=====================
Expand Down Expand Up @@ -192,8 +200,70 @@ public Object computeForValue(Expression expression, Object[] inputFields) {
return ((StringValue) expression).getValue();
}
if (expression instanceof Column) {
int idx = inputRowType.indexOf(((Column) expression).getColumnName());
return inputFields[idx];
Column column = (Column) expression;
String columnName = column.getColumnName();
Table table = column.getTable();
if (null == table) {
int index = inputRowType.indexOf(column.getColumnName());
return inputFields[index];
} else {
List<String> keys =
Arrays.stream(table.getFullyQualifiedName().split("\\."))
.collect(Collectors.toList());
keys.add(columnName);

int currentIndex = inputRowType.indexOf(keys.get(0));
SeaTunnelDataType<?> currentDataType = inputRowType.getFieldType(currentIndex);
Object currentFieldData = inputFields[currentIndex];
SeaTunnelRowType currentRowType;

for (int i = 1; i < keys.size(); i++) {
if (currentDataType instanceof SeaTunnelRowType) {
currentRowType = (SeaTunnelRowType) currentDataType;
currentIndex = currentRowType.indexOf(keys.get(i));
currentDataType = currentRowType.getFieldType(currentIndex);
SeaTunnelRow row = (SeaTunnelRow) currentFieldData;
if (i == keys.size() - 1) {
return row.getField(currentIndex);
}
currentFieldData = row.getField(currentIndex);
} else {
throw new TransformException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("The column is not row type: %s ", keys.get(i - 1)));
}
}
}
}
if (expression instanceof ArrayExpression) {
ArrayExpression arrayExpression = (ArrayExpression) expression;
Expression indexExpression = arrayExpression.getIndexExpression();
Expression objExpression = arrayExpression.getObjExpression();
String objectName = objExpression.toString();

int fieldIndex = inputRowType.indexOf(objectName);
SeaTunnelDataType<?> type = inputRowType.getFieldType(fieldIndex);
Object fieldData = inputFields[fieldIndex];
if (indexExpression instanceof StringValue) {
if (type instanceof MapType) {
String key = ((StringValue) indexExpression).getValue();
return ((Map<?, ?>) fieldData).get(key);
} else {
throw new TransformException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("The column is not map type: %s ", objectName));
}
}
if (indexExpression instanceof LongValue) {
if (type instanceof ArrayType) {
long index = ((LongValue) indexExpression).getValue();
return ((Object[]) fieldData)[(int) index];
} else {
throw new TransformException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("The column is not array type: %s ", objectName));
}
}
}
if (expression instanceof Function) {
Function function = (Function) expression;
Expand Down
Loading

0 comments on commit 09956b2

Please sign in to comment.