Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Restapi] Allow metrics information to be associated to logical plan nodes #7786

Open
wants to merge 16 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

Expand Down Expand Up @@ -135,4 +136,13 @@ default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
return Optional.empty();
}

/**
* Get the catalog table of the sink.
*
* @return Optional of catalog table.
*/
default Optional<CatalogTable> getWriteCatalogTable() {
liugddx marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import lombok.Getter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -157,7 +159,18 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
}

public List<TablePath> getSinkTables() {
return sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());

List<TablePath> tablePaths = new ArrayList<>();
List<SeaTunnelSink> values = new ArrayList<>(sinks.values());
for (int i = 0; i < values.size(); i++) {
if (values.get(i).getWriteCatalogTable().isPresent()) {
tablePaths.add(
((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath());
} else {
tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i]));
}
}
return tablePaths;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
Expand All @@ -56,6 +57,7 @@ public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
private final AssertTableRule assertTableRule;
private final Map<String, AssertCatalogTableRule> assertCatalogTableRule;
private final String catalogTableName;
private final CatalogTable catalogTable;

public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
Expand Down Expand Up @@ -93,6 +95,7 @@ public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
new ConfigException.BadValue(
RULES.key(), "Assert rule config is empty, please add rule config."));
}
this.catalogTable = catalogTable;
}

private void initTableRule(CatalogTable catalogTable, Config tableConfig, String tableName) {
Expand Down Expand Up @@ -130,4 +133,9 @@ public AssertSinkWriter createWriter(SinkWriter.Context context) {
public String getPluginName() {
return "Assert";
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;

import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;

Expand All @@ -32,11 +35,13 @@ public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void>
private final SeaTunnelRowType seaTunnelRowType;
private final boolean isPrintData;
private final int delayMs;
private final CatalogTable catalogTable;

public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig options) {
this.seaTunnelRowType = seaTunnelRowType;
public ConsoleSink(CatalogTable catalogTable, ReadonlyConfig options) {
this.catalogTable = catalogTable;
this.isPrintData = options.get(LOG_PRINT_DATA);
this.delayMs = options.get(LOG_PRINT_DELAY);
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

@Override
Expand All @@ -48,4 +53,9 @@ public ConsoleSinkWriter createWriter(SinkWriter.Context context) {
public String getPluginName() {
return "Console";
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public OptionRule optionRule() {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig options = context.getOptions();
return () ->
new ConsoleSink(
context.getCatalogTable().getTableSchema().toPhysicalRowDataType(),
options);
return () -> new ConsoleSink(context.getCatalogTable(), options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,9 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
catalogTable,
config.get(DorisOptions.CUSTOM_SQL)));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;

import java.io.IOException;
import java.util.Optional;

import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
Expand Down Expand Up @@ -58,4 +59,9 @@ public DruidWriter createWriter(SinkWriter.Context context) throws IOException {
config.get(DATASOURCE),
config.get(BATCH_SIZE));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, tablePath, null, null));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,9 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
}
return Optional.empty();
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
catalogTable,
sinkConfig.getCustomSql()));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, tablePath, null, null));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.List;

@Data
@NoArgsConstructor
Expand All @@ -35,4 +36,6 @@ public class VertexInfo implements Serializable {
private PluginType type;

private String connectorType;

private List<String> tablePaths;
liugddx marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.seatunnel.engine.server.dag;

import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.Edge;
Expand All @@ -28,9 +34,11 @@
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -63,13 +71,57 @@ public static JobDAGInfo getJobDAGInfo(
pipeline.getVertexes()
.forEach(
(id, vertex) -> {
List<String> tablePaths = new ArrayList<>();
liugddx marked this conversation as resolved.
Show resolved Hide resolved
if (vertex.getAction() instanceof SourceAction) {
SourceAction sourceAction =
(SourceAction) vertex.getAction();
List<CatalogTable> producedCatalogTables =
sourceAction
.getSource()
.getProducedCatalogTables();
List<String> collect =
producedCatalogTables.stream()
.map(
catalogTable ->
catalogTable
.getTablePath()
.toString())
.collect(Collectors.toList());
tablePaths.addAll(collect);
} else if (vertex.getAction() instanceof SinkAction) {
SeaTunnelSink seaTunnelSink =
((SinkAction<?, ?, ?, ?>)
vertex.getAction())
.getSink();
if (seaTunnelSink instanceof MultiTableSink) {
List<String> collect =
((MultiTableSink) seaTunnelSink)
.getSinkTables().stream()
.map(
TablePath
::toString)
.collect(
Collectors
.toList());
tablePaths.addAll(collect);
} else {
Optional<CatalogTable> catalogTable =
seaTunnelSink.getWriteCatalogTable();
catalogTable.ifPresent(
table ->
tablePaths.add(
table.getTablePath()
.getFullName()));
}
}
vertexInfoMap.put(
id,
new VertexInfo(
vertex.getVertexId(),
ActionUtils.getActionType(
vertex.getAction()),
vertex.getAction().getName()));
vertex.getAction().getName(),
tablePaths));
});
});
return new JobDAGInfo(
Expand All @@ -85,11 +137,54 @@ public static JobDAGInfo getJobDAGInfo(
Map<Long, VertexInfo> vertexInfoMap =
logicalVertexMap.values().stream()
.map(
v ->
new VertexInfo(
v.getVertexId(),
ActionUtils.getActionType(v.getAction()),
v.getAction().getName()))
v -> {
List<String> tablePaths = new ArrayList<>();
if (v.getAction() instanceof SourceAction) {
SourceAction sourceAction =
(SourceAction) v.getAction();
List<CatalogTable> producedCatalogTables =
sourceAction
.getSource()
.getProducedCatalogTables();
List<String> collect =
producedCatalogTables.stream()
.map(
catalogTable ->
catalogTable
.getTablePath()
.toString())
.collect(Collectors.toList());
tablePaths.addAll(collect);
} else if (v.getAction() instanceof SinkAction) {
SeaTunnelSink seaTunnelSink =
((SinkAction<?, ?, ?, ?>) v.getAction())
.getSink();
if (seaTunnelSink instanceof MultiTableSink) {
List<String> collect =
((MultiTableSink) seaTunnelSink)
.getSinkTables().stream()
.map(TablePath::toString)
.collect(
Collectors
.toList());
tablePaths.addAll(collect);
} else {
Optional<CatalogTable> catalogTable =
seaTunnelSink.getWriteCatalogTable();
catalogTable.ifPresent(
table ->
tablePaths.add(
table.getTablePath()
.getFullName()));
}
}

return new VertexInfo(
v.getVertexId(),
ActionUtils.getActionType(v.getAction()),
v.getAction().getName(),
tablePaths);
})
.collect(
Collectors.toMap(VertexInfo::getVertexId, Function.identity()));

Expand Down
Loading
Loading