Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Sep 11, 2024
1 parent 8813933 commit a189ad9
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -307,16 +306,15 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
return sinkOptionRule;
}

public static SeaTunnelTransform<?> createAndPrepareTransform(
CatalogTable catalogTable,
public static SeaTunnelTransform<?> createAndPrepareMultiTableTransform(
List<CatalogTable> catalogTables,
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier) {
final TableTransformFactory factory =
discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
new TableTransformFactoryContext(catalogTables, options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ public void testCopy(TestContainer container) throws IOException, InterruptedExc
Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testCopyMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/copy_transform_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
c_row = {
c_row = {
c_int = int
}
}
}
}
}
}

transform {
Copy {
source_table_name = "fake"
result_table_name = "fake1"
src_field = "name"
dest_field = "name1"
}
Copy {
source_table_name = "fake1"
result_table_name = "fake2"
fields {
id_1 = "id"
name2 = "name"
name3 = "name"
c_row_1 = "c_row"
}
}
}

sink {
Assert {
source_table_name = "fake2"
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
],
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = id_1
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name1
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name2
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name3
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,13 +423,6 @@ private void parseTransform(
inputIds.stream()
.map(tableWithActionMap::get)
.filter(Objects::nonNull)
.peek(
input -> {
if (input.size() > 1) {
throw new JobDefineCheckException(
"Adding transform to multi-table source is not supported.");
}
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (inputs.isEmpty()) {
Expand All @@ -450,14 +443,19 @@ private void parseTransform(
inputs.stream()
.map(Tuple2::_2)
.collect(Collectors.toCollection(LinkedHashSet::new));

LinkedHashSet<CatalogTable> catalogTables =
inputs.stream()
.map(Tuple2::_1)
.collect(Collectors.toCollection(LinkedHashSet::new));
checkProducedTypeEquals(inputActions);
int spareParallelism = inputs.get(0)._2().getParallelism();
int parallelism =
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
CatalogTable catalogTable = inputs.get(0)._1();
SeaTunnelTransform<?> transform =
FactoryUtil.createAndPrepareTransform(
catalogTable, readonlyConfig, classLoader, factoryId);
FactoryUtil.createAndPrepareMultiTableTransform(
new ArrayList<>(catalogTables), readonlyConfig, classLoader, factoryId);

transform.setJobContext(jobConfig.getJobContext());
long id = idGenerator.getNextId();
String actionName = JobConfigParser.createTransformActionName(index, factoryId);
Expand All @@ -471,10 +469,15 @@ private void parseTransform(
jarUrls,
new HashSet<>());
transformAction.setParallelism(parallelism);
tableWithActionMap.put(
tableId,
Collections.singletonList(
new Tuple2<>(transform.getProducedCatalogTable(), transformAction)));

List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
List<CatalogTable> producedCatalogTables = transform.getProducedCatalogTables();

for (CatalogTable catalogTable : producedCatalogTables) {
actions.add(new Tuple2<>(catalogTable, transformAction));
}

tableWithActionMap.put(tableId, actions);
}

public static SeaTunnelDataType<?> getProducedType(Action action) {
Expand Down

0 comments on commit a189ad9

Please sign in to comment.