Skip to content

Commit

Permalink
[Enhancement] Don't retry for some exceptions
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Sep 3, 2023
1 parent 9c43260 commit d3149e4
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_FAILED;

public class DefaultStreamLoader implements StreamLoader, Serializable {

private static final Logger log = LoggerFactory.getLogger(DefaultStreamLoader.class);
Expand Down Expand Up @@ -336,7 +338,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab
String errorMsg = String.format("Stream load failed because of error, db: %s, table: %s, label: %s, " +
"\nresponseBody: %s\nerrorLog: %s", region.getDatabase(), region.getTable(), label,
responseBody, errorLog);
throw new StreamLoadFailException(errorMsg);
throw new StreamLoadFailException(errorMsg, streamLoadBody);
}
return streamLoadResponse;
} catch (StreamLoadFailException e) {
Expand Down Expand Up @@ -393,6 +395,15 @@ protected String parseHttpResponse(String requestType, String db, String table,
requestType, db, table, label, response.getStatusLine());
log.error("{}", errorMsg);
throw new StreamLoadFailException(errorMsg);
} else if (401 == code) {
String errorMsg = String.format("Request %s failed because of access denied. You need to grant at least SELECT and INSERT " +
"privilege on %s.%s. label: %s, response status line: %s", requestType, db, table, label, response.getStatusLine());
log.error("{}", errorMsg);
// Fake response body to judge the retryable error. See ErrorUtils#isRetryable
StreamLoadResponse.StreamLoadResponseBody responseBody = new StreamLoadResponse.StreamLoadResponseBody();
responseBody.setStatus(RESULT_STATUS_FAILED);
responseBody.setMessage("Access denied; you need (at least one of) the INSERT privilege(s) for this operation");
throw new StreamLoadFailException(errorMsg, responseBody);
} else if (200 != code) {
String errorMsg = String.format("Request %s failed because http response code is not 200. db: %s, table: %s," +
"label: %s, response status line: %s", requestType, db, table, label, response.getStatusLine());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface StreamLoadConstants {

String RESULT_STATUS_OK = "OK";
String RESULT_STATUS_SUCCESS = "Success";
String RESULT_STATUS_INTERNAL_ERROR = "INTERNAL_ERROR";
String RESULT_STATUS_FAILED = "Fail";
String RESULT_STATUS_LABEL_EXISTED = "Label Already Exists";
String RESULT_STATUS_TRANSACTION_NOT_EXISTED = "TXN_NOT_EXISTS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ public String getExistingJobStatus() {
return this.existingJobStatus;
}

public String getMessage() {
return message;
}

public String getMsg() {
return msg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,13 @@ protected boolean isRedirectable(String method) {
@Override
public boolean begin(TableRegion region) {
if (region.getLabel() == null) {
for (int i = 0; i < 5; i++) {
region.setLabel(region.getLabelGenerator().next());
if (doBegin(region)) {
return true;
} else {
region.setLabel(null);
}
region.setLabel(region.getLabelGenerator().next());
if (doBegin(region)) {
return true;
} else {
region.setLabel(null);
return false;
}
return false;
}
return true;
}
Expand Down Expand Up @@ -146,15 +144,15 @@ protected boolean doBegin(TableRegion region) {
throw new StreamLoadFailException(errMsg);
}

switch (status) {
case StreamLoadConstants.RESULT_STATUS_OK:
return true;
case StreamLoadConstants.RESULT_STATUS_LABEL_EXISTED:
return false;
default:
log.error("Transaction start failed, db : {}, label : {}", region.getDatabase(), label);
return false;
if (StreamLoadConstants.RESULT_STATUS_OK.equals(status)) {
return true;
}

String errMsg = String.format("Transaction start failed, db: %s, table: %s, label: %s, responseBody: %s",
region.getDatabase(), region.getTable(), label, responseBody);
throw new StreamLoadFailException(errMsg);
} catch (StreamLoadFailException se) {
throw se;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -216,12 +214,16 @@ public boolean prepare(StreamLoadSnapshot.Transaction transaction) {
}

String errorLog = getErrorLog(streamLoadBody.getErrorURL());
log.error("Transaction prepare failed, db: {}, table: {}, label: {}, \nresponseBody: {}\nerrorLog: {}",
transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), responseBody, errorLog);
String errorMsg = String.format("Transaction prepare failed, db: %s, table: %s, label: %s, " +
"\nresponseBody: %s\nerrorLog: %s", transaction.getDatabase(), transaction.getTable(),
transaction.getLabel(), responseBody, errorLog);
log.error(errorMsg);
throw new StreamLoadFailException(errorMsg);
} catch (StreamLoadFailException se) {
throw se;
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.exception;

import com.starrocks.data.load.stream.StreamLoadResponse;

import java.util.Arrays;
import java.util.List;

import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_FAILED;
import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_INTERNAL_ERROR;

public class ErrorUtils {

private static final List<String> UNRETRYABLE_FAIL_MESSAGE_KEY_WORDS = Arrays.asList(
"too many filtered rows".toLowerCase(),
"primary key size exceed the limit".toLowerCase(),
"Only primary key table support partial update".toLowerCase(),
"Access denied".toLowerCase(),
"current running txns on db".toLowerCase()
);

public static boolean isRetryable(Throwable e) {
if (!(e instanceof StreamLoadFailException)) {
return true;
}

StreamLoadFailException exception = (StreamLoadFailException) e;
StreamLoadResponse.StreamLoadResponseBody responseBody = exception.getResponseBody();
if (responseBody == null) {
return true;
}

if (!RESULT_STATUS_FAILED.equalsIgnoreCase(responseBody.getStatus())
|| !RESULT_STATUS_INTERNAL_ERROR.equalsIgnoreCase(responseBody.getStatus())) {
return false;
}

String failMsg = responseBody.getMessage();
if (failMsg == null) {
return true;
}

failMsg = failMsg.toLowerCase();
for (String keyword : UNRETRYABLE_FAIL_MESSAGE_KEY_WORDS) {
if (failMsg.contains(keyword)) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,26 @@

package com.starrocks.data.load.stream.exception;

import com.starrocks.data.load.stream.StreamLoadResponse;

public class StreamLoadFailException extends RuntimeException {

private StreamLoadResponse.StreamLoadResponseBody responseBody;

public StreamLoadFailException(String message) {
super(message);
}

public StreamLoadFailException(String message, StreamLoadResponse.StreamLoadResponseBody responseBody) {
super(message);
this.responseBody = responseBody;
}

public StreamLoadFailException(String message, Throwable cause) {
super(message, cause);
}

public StreamLoadResponse.StreamLoadResponseBody getResponseBody() {
return responseBody;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.starrocks.data.load.stream.Chunk;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
Expand All @@ -38,6 +38,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static com.starrocks.data.load.stream.exception.ErrorUtils.isRetryable;

public class TransactionTableRegion implements TableRegion {

enum State {
Expand Down Expand Up @@ -268,7 +270,7 @@ public boolean commit() {

@Override
public void fail(Throwable e) {
if (numRetries >= maxRetries) {
if (numRetries >= maxRetries || !isRetryable(e)) {
manager.callback(e);
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.exception;

import com.starrocks.data.load.stream.StreamLoadConstants;
import com.starrocks.data.load.stream.StreamLoadResponse;
import org.junit.Test;

import static com.starrocks.data.load.stream.StreamLoadConstants.RESULT_STATUS_FAILED;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ErrorUtilsTest {

@Test
public void testRetryable() {
assertTrue(ErrorUtils.isRetryable(new Exception()));
assertTrue(ErrorUtils.isRetryable(new StreamLoadFailException("unknown")));

StreamLoadResponse.StreamLoadResponseBody body1 = new StreamLoadResponse.StreamLoadResponseBody();
body1.setStatus(StreamLoadConstants.RESULT_STATUS_LABEL_EXISTED);
assertFalse(ErrorUtils.isRetryable(new StreamLoadFailException("test", body1)));

StreamLoadResponse.StreamLoadResponseBody body2 = new StreamLoadResponse.StreamLoadResponseBody();
body2.setStatus(RESULT_STATUS_FAILED);
body2.setMessage("memory exceed");
assertTrue(ErrorUtils.isRetryable(new StreamLoadFailException("test", body2)));

StreamLoadResponse.StreamLoadResponseBody body3 = new StreamLoadResponse.StreamLoadResponseBody();
body3.setStatus(RESULT_STATUS_FAILED);
body3.setMessage("too many filtered rows");
assertFalse(ErrorUtils.isRetryable(new StreamLoadFailException("test", body3)));
}
}

0 comments on commit d3149e4

Please sign in to comment.