diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java index ffcbc5df..151d98b5 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -141,6 +142,11 @@ public void close() { } } + @Override + public ExecutorService getExecutorService() { + return executorService; + } + @Override public boolean begin(TableRegion region) { region.setLabel(region.getLabelGenerator().next()); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoader.java index 0b02545c..d9723de2 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoader.java @@ -22,6 +22,7 @@ import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public interface StreamLoader { @@ -43,4 +44,8 @@ public interface StreamLoader { boolean prepare(StreamLoadSnapshot snapshot); boolean commit(StreamLoadSnapshot snapshot); boolean rollback(StreamLoadSnapshot snapshot); + + default ExecutorService getExecutorService() { + throw new UnsupportedOperationException(); + } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index dd395e00..841f9945 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -235,44 +235,74 @@ public boolean flush(FlushReason reason) { return false; } + // Commit the load asynchronously + // 1. commit() should not be called concurrently + // 2. because commit is executed asynchronously, the caller should poll the + // method to see if it executes successfully + // 3. a true returned value indicates a successful commit, and a false value + // indicates the commit should not be triggered, such as it is FLUSHING, + // or it's still doing commit asynchronously public boolean commit() { + boolean commitTriggered = false; if (!state.compareAndSet(State.ACTIVE, State.COMMITTING)) { - return false; + if (state.get() != State.COMMITTING) { + return false; + } + commitTriggered = true; } - boolean commitSuccess; - if (label != null) { - StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(database, table, label); - try { - if (!streamLoader.prepare(transaction)) { - String errorMsg = "Failed to prepare transaction, please check taskmanager log for details, " + transaction; - throw new StreamLoadFailException(errorMsg); - } - - if (!streamLoader.commit(transaction)) { - String errorMsg = "Failed to commit transaction, please check taskmanager log for details, " + transaction; - throw new StreamLoadFailException(errorMsg); - } - } catch (Exception e) { - LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", database, table, label, e); - fail(e); + if (commitTriggered) { + // label will be set to null after commit executes successfully + if (label == null) { + state.compareAndSet(State.COMMITTING, State.ACTIVE); + return true; + } else { + // wait for the commit to finish return false; } + } - label = null; - long commitTime = System.currentTimeMillis(); - long commitDuration = commitTime - lastCommitTimeMills; - lastCommitTimeMills = commitTime; - commitSuccess = true; - LOG.info("Success to commit transaction: {}, duration: {} ms", transaction, commitDuration); - } else { + if (label == null) { // if the data has never been flushed (label == null), the commit should fail so that StreamLoadManagerV2#init // will schedule to flush the data first, and then trigger commit again - commitSuccess = cacheBytes.get() == 0; + boolean commitSuccess = cacheBytes.get() == 0; + state.compareAndSet(State.COMMITTING, State.ACTIVE); + return commitSuccess; + } + + try { + streamLoader.getExecutorService().submit(this::doCommit); + } catch (Exception e) { + LOG.error("Failed to submit commit task, db: {}, table: {}, label: {}", database, table, label, e); + throw e; + } + + // wait for the commit to finish + return false; + } + + private void doCommit() { + StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(database, table, label); + try { + if (!streamLoader.prepare(transaction)) { + String errorMsg = "Failed to prepare transaction, please check taskmanager log for details, " + transaction; + throw new StreamLoadFailException(errorMsg); + } + + if (!streamLoader.commit(transaction)) { + String errorMsg = "Failed to commit transaction, please check taskmanager log for details, " + transaction; + throw new StreamLoadFailException(errorMsg); + } + } catch (Throwable e) { + LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", database, table, label, e); + fail(e); } - state.compareAndSet(State.COMMITTING, State.ACTIVE); - return commitSuccess; + label = null; + long commitTime = System.currentTimeMillis(); + long commitDuration = commitTime - lastCommitTimeMills; + lastCommitTimeMills = commitTime; + LOG.info("Success to commit transaction: {}, duration: {} ms", transaction, commitDuration); } @Override