From a189ad9ab447136b2d7dd858a93f9c6b5a099534 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 11 Sep 2024 20:45:24 +0800 Subject: [PATCH] update --- .../api/table/factory/FactoryUtil.java | 8 +- .../seatunnel/e2e/transform/TestCopyIT.java | 7 + .../resources/copy_transform_multi_table.conf | 131 ++++++++++++++++++ .../parse/MultipleTableJobConfigParser.java | 31 +++-- 4 files changed, 158 insertions(+), 19 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 79c0c18706f..a86f132c33d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -44,7 +44,6 @@ import java.io.Serializable; import java.net.URL; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -307,16 +306,15 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) { return sinkOptionRule; } - public static SeaTunnelTransform createAndPrepareTransform( - CatalogTable catalogTable, + public static SeaTunnelTransform createAndPrepareMultiTableTransform( + List catalogTables, ReadonlyConfig options, ClassLoader classLoader, String factoryIdentifier) { final TableTransformFactory factory = discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier); TableTransformFactoryContext context = - new TableTransformFactoryContext( - Collections.singletonList(catalogTable), options, classLoader); + new TableTransformFactoryContext(catalogTables, options, classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); return factory.createTransform(context).createTransform(); } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java index b81d1fc6664..9217df50fdb 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java @@ -32,4 +32,11 @@ public void testCopy(TestContainer container) throws IOException, InterruptedExc Container.ExecResult execResult = container.executeJob("/copy_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } + + @TestTemplate + public void testCopyMultiTable(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/copy_transform_multi_table.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf new file mode 100644 index 00000000000..b937b0a8cbe --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + c_row = { + c_row = { + c_int = int + } + } + } + } + } +} + +transform { + Copy { + source_table_name = "fake" + result_table_name = "fake1" + src_field = "name" + dest_field = "name1" + } + Copy { + source_table_name = "fake1" + result_table_name = "fake2" + fields { + id_1 = "id" + name2 = "name" + name3 = "name" + c_row_1 = "c_row" + } + } +} + +sink { + Assert { + source_table_name = "fake2" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = id_1 + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name1 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name2 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name3 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index cb7f118a6e4..7b0537c7106 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -423,13 +423,6 @@ private void parseTransform( inputIds.stream() .map(tableWithActionMap::get) .filter(Objects::nonNull) - .peek( - input -> { - if (input.size() > 1) { - throw new JobDefineCheckException( - "Adding transform to multi-table source is not supported."); - } - }) .flatMap(Collection::stream) .collect(Collectors.toList()); if (inputs.isEmpty()) { @@ -450,14 +443,19 @@ private void parseTransform( inputs.stream() .map(Tuple2::_2) .collect(Collectors.toCollection(LinkedHashSet::new)); + + LinkedHashSet catalogTables = + inputs.stream() + .map(Tuple2::_1) + .collect(Collectors.toCollection(LinkedHashSet::new)); checkProducedTypeEquals(inputActions); int spareParallelism = inputs.get(0)._2().getParallelism(); int parallelism = readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism); - CatalogTable catalogTable = inputs.get(0)._1(); SeaTunnelTransform transform = - FactoryUtil.createAndPrepareTransform( - catalogTable, readonlyConfig, classLoader, factoryId); + FactoryUtil.createAndPrepareMultiTableTransform( + new ArrayList<>(catalogTables), readonlyConfig, classLoader, factoryId); + transform.setJobContext(jobConfig.getJobContext()); long id = idGenerator.getNextId(); String actionName = JobConfigParser.createTransformActionName(index, factoryId); @@ -471,10 +469,15 @@ private void parseTransform( jarUrls, new HashSet<>()); transformAction.setParallelism(parallelism); - tableWithActionMap.put( - tableId, - Collections.singletonList( - new Tuple2<>(transform.getProducedCatalogTable(), transformAction))); + + List> actions = new ArrayList<>(); + List producedCatalogTables = transform.getProducedCatalogTables(); + + for (CatalogTable catalogTable : producedCatalogTables) { + actions.add(new Tuple2<>(catalogTable, transformAction)); + } + + tableWithActionMap.put(tableId, actions); } public static SeaTunnelDataType getProducedType(Action action) {