Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Connector-V2] Bug postgres sink does not support composite primary key in jdbc mode #8350

Open
2 of 3 tasks
ZhiYinZhang opened this issue Dec 20, 2024 · 0 comments
Open
2 of 3 tasks
Labels

Comments

@ZhiYinZhang
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Sync data from MySQL to postgres in batch mode can not set composite primary key.

SeaTunnel Version

2.3.8

SeaTunnel Config

env{
  parallelism=1
  job.mode="BATCH"
}
source {
  jdbc {
    url = "jdbc:mysql://192.168.17.201:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user="root"
    password="xxxxxxxx"
    table_list=[{table_path="test.yh"}]
  }
}

sink{
  jdbc{
    url = "jdbc:postgresql://192.168.17.201:5432/postgres"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "xxxxxxxx"
    database = "postgres"
    table = "myschema.${table_name}"
    generate_sink_sql = true
    primary_keys=["yhbh","zhxh"]
  }
}

Running Command

./bin/seatunnel.sh -c config/batch-mysql2pg.conf

Error Exception

Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:378)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:384)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:511)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table postgres.myschema.yh
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:185)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:425)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.savemode.JdbcSaveModeHandler.createTable(JdbcSaveModeHandler.java:48)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:115)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:74)
        at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:40)
        at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:376)
        ... 21 more
Caused by: org.postgresql.util.PSQLException: ERROR: multiple primary keys for table "yh" are not allowed
  Position: 648
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:180)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:620)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:167)
        ... 28 more

        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:518)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Zeta or Flink or Spark Version

zeta

Java or Scala Version

java 1.8.0

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant