Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Connector-V2][Assert] AssertSinkWriter supports multiple concurrent operations #8357

Open
2 of 3 tasks
zhangshenghang opened this issue Dec 20, 2024 · 2 comments
Open
2 of 3 tasks
Assignees

Comments

@zhangshenghang
Copy link
Member

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

The current AssertSinkWriter has a bug in the Close method to verify whether the data has been run completely, especially when dealing with multiple concurrent instances.
For example, if there are two AssertSinkWriter threads A and B, and A has finished running while B has not, A will execute the close method to count whether the overall execution is complete.

The correct logic is to perform validation after all threads have completed running, or alternatively, thread A only verifies the data it runs.

image

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@zhangshenghang
Copy link
Member Author

You can use this configuration file to reproduce this issue

env {
  job.mode = "BATCH"
}

source {
  FakeSource {
    parallelism = 1
    tables_configs = [
      {
        row.num = 1000
        schema = {
          table = "test.abc"
          columns = [
            {
              name = "id"
              type = "bigint"
            },
            {
              name = "name"
              type = "string"
            },
            {
              name = "age"
              type = "int"
            }
          ]
        }
      },
      {
        row.num = 1000
        schema = {
          table = "test.xyz"
          columns = [
            {
              name = "id"
              type = "bigint"
            },
            {
              name = "name"
              type = "string"
            },
            {
              name = "age"
              type = "int"
            }
          ]
        }
      },
      {
        row.num = 1000
        schema = {
          table = "test.www"
          columns = [
            {
              name = "id"
              type = "bigint"
            },
            {
              name = "name"
              type = "string"
            },
            {
              name = "age"
              type = "int"
            }
          ]
        }
      }
    ]
  }
}

transform {
  FilterRowKind {
    table_match_regex = "test.a.*"
    table_transform = [{
      table_path = "test.xyz"
      exclude_kinds = ["INSERT"]
    }]
    exclude_kinds = ["INSERT"]
  }
}

sink {
  Assert {
    rules =
      {
        tables_configs = [
          {
            table_path = "test.abc"
            row_rules = [
              {
                rule_type = MIN_ROW
                rule_value = 0
              },
              {
                rule_type = MAX_ROW
                rule_value = 0
              }
            ]
          },
          {
            table_path = "test.xyz"
            row_rules = [
              {
                rule_type = MIN_ROW
                rule_value = 0
              },
              {
                rule_type = MAX_ROW
                rule_value = 0
              }
            ]
          },
          {
            table_path = "test.www"
            row_rules = [
              {
                rule_type = MIN_ROW
                rule_value = 1000
              },
              {
                rule_type = MAX_ROW
                rule_value = 1000
              }
            ]
          }
        ]
      }
  }
}

@corgy-w
Copy link
Contributor

corgy-w commented Dec 24, 2024

I'm in 'rehabilitation training' [/dog], if I have time I can try this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants