Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Connector-V2] connector-maxcompute: The source reader may cause data duplication #8379

Open
3 tasks done
liangcw1111 opened this issue Dec 25, 2024 · 1 comment
Open
3 tasks done
Labels

Comments

@liangcw1111
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When maxcompute source split enumerator assign pending splits, the assignSplitOperation is sent to task group worker and source reader execute pollNext(Collector output) completed, if split enumerator signalNoMoreSplits Operation
is not arrived, the pollNext(Collector output) may execute again. This leads to the set of splits read more than once. It is easy to hanpened when the cluster's system load is high.

SeaTunnel Version

2.3.7

SeaTunnel Config

seatunnel:
  engine:
    classloader-cache-mode: true
    history-job-expire-minutes: 1440
    backup-count: 1
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    queue-type: blockingqueue
    slot-service:
      dynamic-slot: false
      slot-num: 20
    checkpoint:
      interval: 30000
      timeout: 2147483647
      max-concurrent: 5
      tolerable-failure: 2
      storage:
        type: oss

Running Command

sh /alidata1/za-seatunnel/seatunnel-2.3.7/bin/seatunnel-cluster.sh -d -r master/worker

Error Exception

There is no exception, but the data of one or more splits is reading repeated.

Zeta or Flink or Spark Version

zeta

Java or Scala Version

java 1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@liangcw1111
Copy link
Author

liangcw1111 commented Dec 25, 2024

Failed to connect to github.com port 443 after 21089 ms: Couldn't connect to server.
this code can fix it
/*

  • Licensed to the Apache Software Foundation (ASF) under one or more
  • contributor license agreements. See the NOTICE file distributed with
  • this work for additional information regarding copyright ownership.
  • The ASF licenses this file to You under the Apache License, Version 2.0
  • (the "License"); you may not use this file except in compliance with
  • the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.
    */

package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;

@slf4j
public class MaxcomputeSourceReader implements SourceReader<SeaTunnelRow, MaxcomputeSourceSplit> {
private final SourceReader.Context context;
private final Deque sourceSplits = new ConcurrentLinkedDeque<>();
private Config pluginConfig;
boolean noMoreSplit;
private SeaTunnelRowType seaTunnelRowType;

public MaxcomputeSourceReader(
        Config pluginConfig, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) {
    this.pluginConfig = pluginConfig;
    this.context = context;
    this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public void open() {}

@Override
public void close() {}

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
    synchronized (output.getCheckpointLock()) {
        MaxcomputeSourceSplit split = sourceSplits.poll();
        if (null != split) {
            try {
                TableTunnel.DownloadSession session =
                        MaxcomputeUtil.getDownloadSession(
                                ReadonlyConfig.fromConfig(pluginConfig));
                TunnelRecordReader recordReader =
                        session.openRecordReader(split.getSplitId(), split.getRowNum());
                log.info("open record reader success");
                Record record;
                while ((record = recordReader.read()) != null) {
                    SeaTunnelRow seaTunnelRow =
                            MaxcomputeTypeMapper.getSeaTunnelRowData(
                                    record, seaTunnelRowType);
                    output.collect(seaTunnelRow);
                }
                recordReader.close();
            } catch (Exception e) {
                throw new MaxcomputeConnectorException(
                        CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e);
            }
        } else if (noMoreSplit && sourceSplits.isEmpty()) {
            // signal to the source that we have reached the end of the data.
            log.info("Closed the bounded maxcompute source");
            context.signalNoMoreElement();
        } else {
            Thread.sleep(1000L);
        }
    }
}

@Override
public List<MaxcomputeSourceSplit> snapshotState(long checkpointId) throws Exception {
    return new ArrayList<>(sourceSplits);
}

@Override
public void addSplits(List<MaxcomputeSourceSplit> splits) {
    sourceSplits.addAll(splits);
}

@Override
public void handleNoMoreSplits() {
    this.noMoreSplit = true;
}

@Override
public void notifyCheckpointComplete(long checkpointId) {}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant