Skip to content

Commit

Permalink
Refine the log
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Apr 28, 2024
1 parent 1eb5e3a commit 1598d66
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
public enum FlushReason {
// No need to flush
NONE,
// Should commit the data
// Trigger the commit condition, such as flush interval,
// and should flush first
COMMIT,
// Cache is full, and need flush on or more tables
CACHE_FULL,
// The number of buffered rows reaches the limit
BUFFER_ROWS_REACH_LIMIT
BUFFER_ROWS_REACH_LIMIT,
// Force flush, such as StreamLoadManagerV2.flush
FORCE
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void init() {

if (savepoint) {
for (TransactionTableRegion region : flushQ) {
boolean flush = region.flush(FlushReason.COMMIT);
boolean flush = region.flush(FlushReason.FORCE);
LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}",
region.getUniqueKey(), region.getCacheBytes(), flush);
}
Expand All @@ -187,19 +187,20 @@ public void init() {
for (TransactionTableRegion region : flushQ) {
// savepoint makes sure no more data is written, so these conditions
// can guarantee commit after all data has been written to StarRocks
if (region.getCacheBytes() == 0 && !region.isFlushing()) {
boolean success = region.commit();
if (success) {
committedRegions += 1;
region.resetAge();
}
LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success);
boolean success = region.commit();
if (success && region.getCacheBytes() == 0) {
committedRegions += 1;
region.resetAge();
}
LOG.debug("Commit region {} for savepoint, success: {}", region.getUniqueKey(), success);
}

if (committedRegions == flushQ.size()) {
allRegionsCommitted = true;
LOG.info("All regions committed for savepoint, number of regions: {}", committedRegions);
} else {
LOG.debug("Some regions not committed for savepoint, expected num: {}, actual num: {}",
flushQ.size(), committedRegions);
}
}
LockSupport.unpark(current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ public FlushReason shouldFlush() {
}

public boolean flush(FlushReason reason) {
LOG.debug("Try to flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}",
database, table, label, cacheBytes, cacheRows, reason);
if (state.compareAndSet(State.ACTIVE, State.FLUSHING)) {
for (;;) {
if (ctl.compareAndSet(false, true)) {
LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}, rows: {}, reason: {}",
uniqueKey, label, cacheBytes.get(), cacheRows.get(), reason);
if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT ||
activeChunk.numRows() >= properties.getMaxBufferRows()) {
switchChunk();
Expand All @@ -233,6 +233,8 @@ public boolean flush(FlushReason reason) {
}
}
if (!inactiveChunks.isEmpty()) {
LOG.info("Flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}",
database, table, label, cacheBytes.get(), cacheRows.get(), reason);
streamLoad(0);
return true;
} else {
Expand All @@ -251,6 +253,7 @@ public boolean flush(FlushReason reason) {
// indicates the commit should not be triggered, such as it is FLUSHING,
// or it's still doing commit asynchronously
public boolean commit() {
LOG.debug("Try to commit, db: {}, table: {}, label: {}", database, table, label);
boolean commitTriggered = false;
if (!state.compareAndSet(State.ACTIVE, State.COMMITTING)) {
if (state.get() != State.COMMITTING) {
Expand All @@ -263,6 +266,7 @@ public boolean commit() {
// label will be set to null after commit executes successfully
if (label == null) {
state.compareAndSet(State.COMMITTING, State.ACTIVE);
LOG.debug("Success to commit, db: {}, table: {}", database, table);
return true;
} else {
// wait for the commit to finish
Expand All @@ -271,10 +275,14 @@ public boolean commit() {
}

if (label == null) {
// if the data has never been flushed (label == null), the commit should fail so that StreamLoadManagerV2#init
// will schedule to flush the data first, and then trigger commit again
// if the data has never been flushed (label == null), the commit should fail
// so that StreamLoadManagerV2#init will schedule to flush the data first, and
// then trigger commit again
boolean commitSuccess = cacheBytes.get() == 0;
state.compareAndSet(State.COMMITTING, State.ACTIVE);
if (commitSuccess) {
LOG.debug("Success to commit, db: {}, table: {}", database, table);
}
return commitSuccess;
}

Expand Down Expand Up @@ -345,14 +353,15 @@ public void complete(StreamLoadResponse response) {
numRetries = 0;
firstException = null;

LOG.info("Stream load flushed, db: {}, table: {}, label : {}", database, table, label);
if (!inactiveChunks.isEmpty()) {
LOG.info("Stream load continue, db: {}, table: {}, label : {}", database, table, label);
LOG.info("Stream load continue, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}",
database, table, label, cacheBytes, cacheRows);
streamLoad(0);
return;
}
if (state.compareAndSet(State.FLUSHING, State.ACTIVE)) {
LOG.info("Stream load completed, db: {}, table: {}, label : {}", database, table, label);
LOG.info("Stream load completed, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}",
database, table, label, cacheBytes, cacheRows);
}
}

Expand Down

0 comments on commit 1598d66

Please sign in to comment.