Skip to content

Commit

Permalink
Merge branch 'dev' into create-pulsar-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
lightzhao committed Sep 1, 2023
2 parents 5ee01e1 + 3e5d018 commit de45934
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ source {
MySQL-CDC {
result_table_name = "table1"

hostname = localhost
base-url="jdbc:mysql://localhost:3306/test"
"startup.mode"=INITIAL
catalog {
Expand Down
330 changes: 162 additions & 168 deletions docs/en/connector-v2/source/MySQL-CDC.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public OptionRule optionRule() {
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
JdbcSourceOptions.INVERSE_SAMPLING_RATE)
.optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
StartupMode.INITIAL,
SourceOptions.EXACTLY_ONCE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
StartupMode.SPECIFIC,
Expand All @@ -81,18 +85,6 @@ public OptionRule optionRule() {
StopMode.SPECIFIC,
SourceOptions.STOP_SPECIFIC_OFFSET_FILE,
SourceOptions.STOP_SPECIFIC_OFFSET_POS)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
StartupMode.TIMESTAMP,
SourceOptions.STARTUP_TIMESTAMP)
.conditional(
MySqlSourceOptions.STOP_MODE,
StopMode.TIMESTAMP,
SourceOptions.STOP_TIMESTAMP)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
StartupMode.INITIAL,
SourceOptions.EXACTLY_ONCE)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,22 @@ public class MySqlSourceOptions {
Arrays.asList(
StartupMode.INITIAL,
StartupMode.EARLIEST,
StartupMode.LATEST))
StartupMode.LATEST,
StartupMode.SPECIFIC))
.defaultValue(StartupMode.INITIAL)
.withDescription(
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");
+ "\"initial\", \"earliest\", \"latest\" or \"specific\"");

public static final SingleChoiceOption<StopMode> STOP_MODE =
(SingleChoiceOption)
Options.key(SourceOptions.STOP_MODE_KEY)
.singleChoice(StopMode.class, Arrays.asList(StopMode.NEVER))
.singleChoice(
StopMode.class,
Arrays.asList(
StopMode.LATEST, StopMode.SPECIFIC, StopMode.NEVER))
.defaultValue(StopMode.NEVER)
.withDescription(
"Optional stop mode for CDC source, valid enumerations are "
+ "\"never\", \"latest\", \"timestamp\"\n or \"specific\"");
+ "\"never\", \"latest\" or \"specific\"");
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@ public class MySqlTypeUtils {
private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
private static final String MYSQL_DECIMAL = "DECIMAL";
private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
private static final String MYSQL_NUMERIC = "NUMERIC";
private static final String MYSQL_NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
private static final String MYSQL_FLOAT = "FLOAT";
private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
private static final String MYSQL_DOUBLE = "DOUBLE";
private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
private static final String MYSQL_REAL = "REAL";
private static final String MYSQL_REAL_UNSIGNED = "REAL UNSIGNED";

// -------------------------string----------------------------
private static final String MYSQL_CHAR = "CHAR";
Expand All @@ -63,6 +67,7 @@ public class MySqlTypeUtils {
private static final String MYSQL_TEXT = "TEXT";
private static final String MYSQL_LONGTEXT = "LONGTEXT";
private static final String MYSQL_JSON = "JSON";
private static final String MYSQL_ENUM = "ENUM";

// ------------------------------time-------------------------
private static final String MYSQL_DATE = "DATE";
Expand All @@ -89,6 +94,7 @@ public static SeaTunnelDataType<?> convertFromColumn(Column column) {
return column.length() == 1 ? BasicType.BOOLEAN_TYPE : BasicType.INT_TYPE;
case MYSQL_TINYINT_UNSIGNED:
case MYSQL_SMALLINT:
return BasicType.SHORT_TYPE;
case MYSQL_SMALLINT_UNSIGNED:
case MYSQL_MEDIUMINT:
case MYSQL_MEDIUMINT_UNSIGNED:
Expand All @@ -103,15 +109,20 @@ public static SeaTunnelDataType<?> convertFromColumn(Column column) {
case MYSQL_BIGINT_UNSIGNED:
return new DecimalType(20, 0);
case MYSQL_DECIMAL:
case MYSQL_DECIMAL_UNSIGNED:
case MYSQL_NUMERIC:
case MYSQL_NUMERIC_UNSIGNED:
return new DecimalType(column.length(), column.scale().orElse(0));
case MYSQL_FLOAT:
return BasicType.FLOAT_TYPE;
case MYSQL_FLOAT_UNSIGNED:
log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
return BasicType.FLOAT_TYPE;
case MYSQL_DOUBLE:
case MYSQL_REAL:
return BasicType.DOUBLE_TYPE;
case MYSQL_DOUBLE_UNSIGNED:
case MYSQL_REAL_UNSIGNED:
log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
return BasicType.DOUBLE_TYPE;
case MYSQL_CHAR:
Expand All @@ -120,6 +131,7 @@ public static SeaTunnelDataType<?> convertFromColumn(Column column) {
case MYSQL_TEXT:
case MYSQL_VARCHAR:
case MYSQL_JSON:
case MYSQL_ENUM:
return BasicType.STRING_TYPE;
case MYSQL_LONGTEXT:
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilte
"SHOW DATABASES",
rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
String databaseName = rs.getString(1);
if (tableFilters.databaseFilter().test(databaseName)) {
databaseNames.add(databaseName);
}
}
});
LOG.info("\t list of available databases is: {}", databaseNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.junit.jupiter.api.Test;

import com.hazelcast.internal.serialization.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@Slf4j
public class CheckpointTimeOutTest extends AbstractSeaTunnelServerTest {

public static String CONF_PATH = "stream_fake_to_console_checkpointTimeOut.conf";
Expand All @@ -45,20 +47,20 @@ public void testJobLevelCheckpointTimeOut() {

await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertTrue(
server.getCoordinatorService()
.getJobStatus(JOB_ID)
.equals(JobStatus.RUNNING));
});
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(JOB_ID),
JobStatus.RUNNING));

await().atMost(120000, TimeUnit.MILLISECONDS)
await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertTrue(
server.getCoordinatorService()
.getJobStatus(JOB_ID)
.equals(JobStatus.FAILED));
log.info(
"Job status: {}",
server.getCoordinatorService().getJobStatus(JOB_ID));
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(JOB_ID),
JobStatus.FAILED);
});
}

Expand Down

0 comments on commit de45934

Please sign in to comment.