diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java index 96d2a6d0..100bef2d 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/FlushReason.java @@ -26,10 +26,13 @@ public enum FlushReason { // No need to flush NONE, - // Should commit the data + // Trigger the commit condition, such as flush interval, + // and should flush first COMMIT, // Cache is full, and need flush on or more tables CACHE_FULL, // The number of buffered rows reaches the limit - BUFFER_ROWS_REACH_LIMIT + BUFFER_ROWS_REACH_LIMIT, + // Force flush, such as StreamLoadManagerV2.flush + FORCE } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index 2dd2f982..cc2c13e4 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -176,7 +176,7 @@ public void init() { if (savepoint) { for (TransactionTableRegion region : flushQ) { - boolean flush = region.flush(FlushReason.COMMIT); + boolean flush = region.flush(FlushReason.FORCE); LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}", region.getUniqueKey(), region.getCacheBytes(), flush); } @@ -187,19 +187,20 @@ public void init() { for (TransactionTableRegion region : flushQ) { // savepoint makes sure no more data is written, so these conditions // can guarantee commit after all data has been written to StarRocks - if (region.getCacheBytes() == 0 && !region.isFlushing()) { - boolean success = region.commit(); - if (success) { - committedRegions += 1; - region.resetAge(); - } - LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success); + boolean success = region.commit(); + if (success && region.getCacheBytes() == 0) { + committedRegions += 1; + region.resetAge(); } + LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success); } if (committedRegions == flushQ.size()) { allRegionsCommitted = true; LOG.info("All regions committed for savepoint, number of regions: {}", committedRegions); + } else { + LOG.debug("Some regions not committed for savepoint, expected num: {}, actual num: {}", + flushQ.size(), committedRegions); } } LockSupport.unpark(current); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 26bfb457..c1f410c4 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -219,11 +219,11 @@ public FlushReason shouldFlush() { } public boolean flush(FlushReason reason) { + LOG.debug("Try to flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", + database, table, label, cacheBytes, cacheRows, reason); if (state.compareAndSet(State.ACTIVE, State.FLUSHING)) { for (;;) { if (ctl.compareAndSet(false, true)) { - LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}, rows: {}, reason: {}", - uniqueKey, label, cacheBytes.get(), cacheRows.get(), reason); if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT || activeChunk.numRows() >= properties.getMaxBufferRows()) { switchChunk(); @@ -233,6 +233,8 @@ public boolean flush(FlushReason reason) { } } if (!inactiveChunks.isEmpty()) { + LOG.info("Flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", + database, table, label, cacheBytes.get(), cacheRows.get(), reason); streamLoad(0); return true; } else { @@ -251,6 +253,7 @@ public boolean flush(FlushReason reason) { // indicates the commit should not be triggered, such as it is FLUSHING, // or it's still doing commit asynchronously public boolean commit() { + LOG.debug("Try to commit, db: {}, table: {}, label: {}", database, table, label); boolean commitTriggered = false; if (!state.compareAndSet(State.ACTIVE, State.COMMITTING)) { if (state.get() != State.COMMITTING) { @@ -263,6 +266,7 @@ public boolean commit() { // label will be set to null after commit executes successfully if (label == null) { state.compareAndSet(State.COMMITTING, State.ACTIVE); + LOG.debug("Success to commit, db: {}, table: {}", database, table); return true; } else { // wait for the commit to finish @@ -271,10 +275,14 @@ public boolean commit() { } if (label == null) { - // if the data has never been flushed (label == null), the commit should fail so that StreamLoadManagerV2#init - // will schedule to flush the data first, and then trigger commit again + // if the data has never been flushed (label == null), the commit should fail + // so that StreamLoadManagerV2#init will schedule to flush the data first, and + // then trigger commit again boolean commitSuccess = cacheBytes.get() == 0; state.compareAndSet(State.COMMITTING, State.ACTIVE); + if (commitSuccess) { + LOG.debug("Success to commit, db: {}, table: {}", database, table); + } return commitSuccess; } @@ -345,14 +353,15 @@ public void complete(StreamLoadResponse response) { numRetries = 0; firstException = null; - LOG.info("Stream load flushed, db: {}, table: {}, label : {}", database, table, label); if (!inactiveChunks.isEmpty()) { - LOG.info("Stream load continue, db: {}, table: {}, label : {}", database, table, label); + LOG.info("Stream load continue, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", + database, table, label, cacheBytes, cacheRows); streamLoad(0); return; } if (state.compareAndSet(State.FLUSHING, State.ACTIVE)) { - LOG.info("Stream load completed, db: {}, table: {}, label : {}", database, table, label); + LOG.info("Stream load completed, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", + database, table, label, cacheBytes, cacheRows); } }