Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx committed Oct 8, 2024
1 parent 3e02ca2 commit 18f47f4
Show file tree
Hide file tree
Showing 57 changed files with 384 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,9 @@ public List<TablePath> getSinkTables() {
public void setJobContext(JobContext jobContext) {
sinks.values().forEach(sink -> sink.setJobContext(jobContext));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return SeaTunnelSink.super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,39 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

import java.io.IOException;
import java.util.Optional;

public class ActivemqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final ReadonlyConfig pluginConfig;
private final CatalogTable catalogTable;

@Override
public String getPluginName() {
return "ActiveMQ";
}

public ActivemqSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
public ActivemqSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = rowType;
this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new ActivemqSinkWriter(pluginConfig, seaTunnelRowType);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ public OptionRule optionRule() {

@Override
public TableSink createSink(TableSinkFactoryContext context) {
return () ->
new ActivemqSink(
context.getOptions(),
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
return () -> new ActivemqSink(context.getOptions(), context.getCatalogTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand All @@ -36,6 +37,7 @@
import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
Expand Down Expand Up @@ -85,4 +87,9 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
throws IOException {
return new AmazonDynamoDBWriter(amazondynamodbSourceOptions, rowType);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,39 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

import java.io.IOException;
import java.util.Optional;

public class AmazonSqsSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private SeaTunnelRowType typeInfo;
private ReadonlyConfig pluginConfig;
private final SeaTunnelRowType typeInfo;
private final ReadonlyConfig pluginConfig;
private final CatalogTable catalogTable;

@Override
public String getPluginName() {
return "AmazonSqs";
}

public AmazonSqsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType typeInfo) {
this.typeInfo = typeInfo;
public AmazonSqsSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.typeInfo = catalogTable.getTableSchema().toPhysicalRowDataType();
this.pluginConfig = pluginConfig;
this.catalogTable = catalogTable;
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new AmazonSqsSinkWriter(typeInfo, pluginConfig);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public String factoryIdentifier() {
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
return () ->
new AmazonSqsSink(config, catalogTable.getTableSchema().toPhysicalRowDataType());
return () -> new AmazonSqsSink(config, catalogTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand All @@ -43,6 +44,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
Expand Down Expand Up @@ -122,4 +124,9 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
throws IOException {
return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, tableSchema);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand Down Expand Up @@ -212,4 +213,9 @@ public Optional<Serializer<CKFileCommitInfo>> getCommitInfoSerializer() {
public Optional<Serializer<CKFileAggCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return SeaTunnelSink.super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand All @@ -35,6 +36,7 @@
import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubConfig.ACCESS_KEY;
Expand Down Expand Up @@ -93,4 +95,9 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context context) thro
pluginConfig.getInt(TIMEOUT.key()),
pluginConfig.getInt(RETRY_TIMES.key()));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
Expand All @@ -32,6 +33,7 @@
import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
import static org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
Expand Down Expand Up @@ -73,4 +75,9 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context context) thro
return new DingTalkWriter(
pluginConfig.getString(URL.key()), pluginConfig.getString(SECRET.key()));
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
Expand All @@ -30,6 +31,8 @@

import com.google.auto.service.AutoService;

import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;

Expand Down Expand Up @@ -75,4 +78,9 @@ public SinkWriter<SeaTunnelRow, EasysearchCommitInfo, EasysearchSinkState> creat
return new EasysearchSinkWriter(
context, seaTunnelRowType, pluginConfig, maxBatchSize, maxRetryCount);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return SeaTunnelSink.super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig;
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;

import lombok.Getter;

import java.util.Optional;

public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {

private SeaTunnelRowType seaTunnelRowType;
private ReadonlyConfig readonlyConfig;
private CatalogTable catalogTable;
private EmailSinkConfig pluginConfig;
private final SeaTunnelRowType seaTunnelRowType;
@Getter private ReadonlyConfig readonlyConfig;
private final CatalogTable catalogTable;
private final EmailSinkConfig pluginConfig;

public EmailSink(ReadonlyConfig config, CatalogTable table) {
this.readonlyConfig = config;
Expand All @@ -51,4 +55,9 @@ public EmailSinkWriter createWriter(SinkWriter.Context context) {
public String getPluginName() {
return EmailConfig.CONNECTOR_IDENTITY;
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
hadoopConf = CosConf.buildWithConfig(pluginConfig);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

public class FtpFileSink extends BaseMultipleTableFileSink {

private final CatalogTable catalogTable;

@Override
public String getPluginName() {
return FileSystemType.FTP.getFileSystemPluginName();
}

public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable);
this.catalogTable = catalogTable;
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public String getPluginName() {
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
hadoopConf = OssConf.buildWithConfig(pluginConfig);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@

public class LocalFileSink extends BaseMultipleTableFileSink {

private final CatalogTable catalogTable;

public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
super(new LocalFileHadoopConf(), readonlyConfig, catalogTable);
this.catalogTable = catalogTable;
}

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
hadoopConf = ObsConf.buildWithConfig(pluginConfig);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
}
}
Loading

0 comments on commit 18f47f4

Please sign in to comment.