diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java index d47f8dfd..508b3198 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java @@ -57,6 +57,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_FAILED; + public class DefaultStreamLoader implements StreamLoader, Serializable { private static final Logger log = LoggerFactory.getLogger(DefaultStreamLoader.class); @@ -336,7 +338,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab String errorMsg = String.format("Stream load failed because of error, db: %s, table: %s, label: %s, " + "\nresponseBody: %s\nerrorLog: %s", region.getDatabase(), region.getTable(), label, responseBody, errorLog); - throw new StreamLoadFailException(errorMsg); + throw new StreamLoadFailException(errorMsg, streamLoadBody); } return streamLoadResponse; } catch (StreamLoadFailException e) { @@ -393,6 +395,15 @@ protected String parseHttpResponse(String requestType, String db, String table, requestType, db, table, label, response.getStatusLine()); log.error("{}", errorMsg); throw new StreamLoadFailException(errorMsg); + } else if (401 == code) { + String errorMsg = String.format("Request %s failed because of access denied. You need to grant at least SELECT and INSERT " + + "privilege on %s.%s. label: %s, response status line: %s", requestType, db, table, label, response.getStatusLine()); + log.error("{}", errorMsg); + // Fake response body to judge the retryable error. See ErrorUtils#isRetryable + StreamLoadResponse.StreamLoadResponseBody responseBody = new StreamLoadResponse.StreamLoadResponseBody(); + responseBody.setStatus(RESULT_STATUS_FAILED); + responseBody.setMessage("Access denied; you need (at least one of) the INSERT privilege(s) for this operation"); + throw new StreamLoadFailException(errorMsg, responseBody); } else if (200 != code) { String errorMsg = String.format("Request %s failed because http response code is not 200. db: %s, table: %s," + "label: %s, response status line: %s", requestType, db, table, label, response.getStatusLine()); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadConstants.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadConstants.java index ecd1387d..b851a76f 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadConstants.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadConstants.java @@ -34,6 +34,7 @@ public interface StreamLoadConstants { String RESULT_STATUS_OK = "OK"; String RESULT_STATUS_SUCCESS = "Success"; + String RESULT_STATUS_INTERNAL_ERROR = "INTERNAL_ERROR"; String RESULT_STATUS_FAILED = "Fail"; String RESULT_STATUS_LABEL_EXISTED = "Label Already Exists"; String RESULT_STATUS_TRANSACTION_NOT_EXISTED = "TXN_NOT_EXISTS"; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadResponse.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadResponse.java index 7f620f3c..cc18a01d 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadResponse.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadResponse.java @@ -198,6 +198,10 @@ public String getExistingJobStatus() { return this.existingJobStatus; } + public String getMessage() { + return message; + } + public String getMsg() { return msg; } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java index 8ecd0635..35523202 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java @@ -95,15 +95,13 @@ protected boolean isRedirectable(String method) { @Override public boolean begin(TableRegion region) { if (region.getLabel() == null) { - for (int i = 0; i < 5; i++) { - region.setLabel(region.getLabelGenerator().next()); - if (doBegin(region)) { - return true; - } else { - region.setLabel(null); - } + region.setLabel(region.getLabelGenerator().next()); + if (doBegin(region)) { + return true; + } else { + region.setLabel(null); + return false; } - return false; } return true; } @@ -146,15 +144,15 @@ protected boolean doBegin(TableRegion region) { throw new StreamLoadFailException(errMsg); } - switch (status) { - case StreamLoadConstants.RESULT_STATUS_OK: - return true; - case StreamLoadConstants.RESULT_STATUS_LABEL_EXISTED: - return false; - default: - log.error("Transaction start failed, db : {}, label : {}", region.getDatabase(), label); - return false; + if (StreamLoadConstants.RESULT_STATUS_OK.equals(status)) { + return true; } + + String errMsg = String.format("Transaction start failed, db: %s, table: %s, label: %s, responseBody: %s", + region.getDatabase(), region.getTable(), label, responseBody); + throw new StreamLoadFailException(errMsg); + } catch (StreamLoadFailException se) { + throw se; } catch (Exception e) { throw new RuntimeException(e); } @@ -216,12 +214,16 @@ public boolean prepare(StreamLoadSnapshot.Transaction transaction) { } String errorLog = getErrorLog(streamLoadBody.getErrorURL()); - log.error("Transaction prepare failed, db: {}, table: {}, label: {}, \nresponseBody: {}\nerrorLog: {}", - transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), responseBody, errorLog); + String errorMsg = String.format("Transaction prepare failed, db: %s, table: %s, label: %s, " + + "\nresponseBody: %s\nerrorLog: %s", transaction.getDatabase(), transaction.getTable(), + transaction.getLabel(), responseBody, errorLog); + log.error(errorMsg); + throw new StreamLoadFailException(errorMsg); + } catch (StreamLoadFailException se) { + throw se; } catch (Exception e) { throw new RuntimeException(e); } - return false; } @Override diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/ErrorUtils.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/ErrorUtils.java new file mode 100644 index 00000000..efb9fec6 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/ErrorUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.exception; + +import com.starrocks.data.load.stream.StreamLoadResponse; + +import java.util.Arrays; +import java.util.List; + +import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_FAILED; +import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_INTERNAL_ERROR; + +public class ErrorUtils { + + private static final List UNRETRYABLE_FAIL_MESSAGE_KEY_WORDS = Arrays.asList( + "too many filtered rows".toLowerCase(), + "primary key size exceed the limit".toLowerCase(), + "Only primary key table support partial update".toLowerCase(), + "Access denied".toLowerCase(), + "current running txns on db".toLowerCase() + ); + + public static boolean isRetryable(Throwable e) { + if (!(e instanceof StreamLoadFailException)) { + return true; + } + + StreamLoadFailException exception = (StreamLoadFailException) e; + StreamLoadResponse.StreamLoadResponseBody responseBody = exception.getResponseBody(); + if (responseBody == null) { + return true; + } + + if (!RESULT_STATUS_FAILED.equalsIgnoreCase(responseBody.getStatus()) + || !RESULT_STATUS_INTERNAL_ERROR.equalsIgnoreCase(responseBody.getStatus())) { + return false; + } + + String failMsg = responseBody.getMessage(); + if (failMsg == null) { + return true; + } + + failMsg = failMsg.toLowerCase(); + for (String keyword : UNRETRYABLE_FAIL_MESSAGE_KEY_WORDS) { + if (failMsg.contains(keyword)) { + return false; + } + } + return true; + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/StreamLoadFailException.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/StreamLoadFailException.java index 3ac03d26..4014f5db 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/StreamLoadFailException.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/exception/StreamLoadFailException.java @@ -20,13 +20,26 @@ package com.starrocks.data.load.stream.exception; +import com.starrocks.data.load.stream.StreamLoadResponse; + public class StreamLoadFailException extends RuntimeException { + private StreamLoadResponse.StreamLoadResponseBody responseBody; + public StreamLoadFailException(String message) { super(message); } + public StreamLoadFailException(String message, StreamLoadResponse.StreamLoadResponseBody responseBody) { + super(message); + this.responseBody = responseBody; + } + public StreamLoadFailException(String message, Throwable cause) { super(message, cause); } + + public StreamLoadResponse.StreamLoadResponseBody getResponseBody() { + return responseBody; + } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index c01c31f9..1d6626b6 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -21,9 +21,9 @@ import com.starrocks.data.load.stream.Chunk; import com.starrocks.data.load.stream.LabelGenerator; import com.starrocks.data.load.stream.StreamLoadManager; -import com.starrocks.data.load.stream.StreamLoader; import com.starrocks.data.load.stream.StreamLoadResponse; import com.starrocks.data.load.stream.StreamLoadSnapshot; +import com.starrocks.data.load.stream.StreamLoader; import com.starrocks.data.load.stream.TableRegion; import com.starrocks.data.load.stream.exception.StreamLoadFailException; import com.starrocks.data.load.stream.http.StreamLoadEntityMeta; @@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static com.starrocks.data.load.stream.exception.ErrorUtils.isRetryable; + public class TransactionTableRegion implements TableRegion { enum State { @@ -268,7 +270,7 @@ public boolean commit() { @Override public void fail(Throwable e) { - if (numRetries >= maxRetries) { + if (numRetries >= maxRetries || !isRetryable(e)) { manager.callback(e); return; } diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/exception/ErrorUtilsTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/exception/ErrorUtilsTest.java new file mode 100644 index 00000000..74527ca4 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/exception/ErrorUtilsTest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.exception; + +import com.starrocks.data.load.stream.StreamLoadConstants; +import com.starrocks.data.load.stream.StreamLoadResponse; +import org.junit.Test; + +import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_FAILED; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ErrorUtilsTest { + + @Test + public void testRetryable() { + assertTrue(ErrorUtils.isRetryable(new Exception())); + assertTrue(ErrorUtils.isRetryable(new StreamLoadFailException("unknown"))); + + StreamLoadResponse.StreamLoadResponseBody body1 = new StreamLoadResponse.StreamLoadResponseBody(); + body1.setStatus(StreamLoadConstants.RESULT_STATUS_LABEL_EXISTED); + assertFalse(ErrorUtils.isRetryable(new StreamLoadFailException("test", body1))); + + StreamLoadResponse.StreamLoadResponseBody body2 = new StreamLoadResponse.StreamLoadResponseBody(); + body2.setStatus(RESULT_STATUS_FAILED); + body2.setMessage("memory exceed"); + assertTrue(ErrorUtils.isRetryable(new StreamLoadFailException("test", body2))); + + StreamLoadResponse.StreamLoadResponseBody body3 = new StreamLoadResponse.StreamLoadResponseBody(); + body3.setStatus(RESULT_STATUS_FAILED); + body3.setMessage("too many filtered rows"); + assertFalse(ErrorUtils.isRetryable(new StreamLoadFailException("test", body3))); + } +}