Skip to content

Commit

Permalink
[Enhancement] Support to configure socket timeout
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Dec 14, 2023
1 parent afcdb07 commit 92c3477
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ protected boolean isRedirectable(String method) {
httpPut.setHeader("label", label);
httpPut.setHeader("Authorization", getBasicAuthHeader(sinkOptions.getUsername(), sinkOptions.getPassword()));
httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
httpPut.setConfig(RequestConfig.custom()
.setSocketTimeout(sinkOptions.getSocketTimeout())
.setRedirectsEnabled(true).build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@

package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.manager.StarRocksSinkTable;
import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand All @@ -27,9 +22,17 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;

import com.starrocks.connector.flink.manager.StarRocksSinkTable;
import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -39,7 +42,6 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import static org.apache.http.protocol.HttpRequestExecutor.DEFAULT_WAIT_FOR_CONTINUE;

Expand Down Expand Up @@ -82,6 +84,15 @@ public enum StreamLoadFormat {
.stringType().noDefaultValue().withDescription("The prefix of the stream load label. Available values are within [-_A-Za-z0-9]");
public static final ConfigOption<Integer> SINK_CONNECT_TIMEOUT = ConfigOptions.key("sink.connect.timeout-ms")
.intType().defaultValue(30000).withDescription("Timeout in millisecond for connecting to the `load-url`.");

public static final ConfigOption<Integer> SINK_SOCKET_TIMEOUT = ConfigOptions.key("sink.socket.timeout-ms")
.intType().defaultValue(-1).withDescription("Timeout in milliseconds that the http client waits for data or, put differently, " +
"a maximum period inactivity between two consecutive data packets. A timeout value of zero is interpreted as an infinite " +
"timeout. A negative value is interpreted as undefined (system default if applicable). You can use this option to fail " +
"the stream load from the connector side if the http client does not receive response from StarRocks before timeout. " +
"The other option 'sink.properties.timeout' take effects on the StarRocks side, but the response to the client may delay " +
"in some unexpected cases. If you want to have a strict timeout from the connector side, you can set this option to an " +
"acceptable value.");
public static final ConfigOption<Integer> SINK_WAIT_FOR_CONTINUE_TIMEOUT = ConfigOptions.key("sink.wait-for-continue.timeout-ms")
.intType().defaultValue(30000).withDescription("Timeout in millisecond to wait for 100-continue response for http client.");
public static final ConfigOption<Integer> SINK_IO_THREAD_COUNT = ConfigOptions.key("sink.io.thread-count")
Expand Down Expand Up @@ -265,6 +276,10 @@ public int getConnectTimeout() {
return Math.min(connectTimeout, 60000);
}

public int getSocketTimeout() {
return tableOptions.get(SINK_SOCKET_TIMEOUT);
}

public int getWaitForContinueTimeout() {
int waitForContinueTimeoutMs = tableOptions.get(SINK_WAIT_FOR_CONTINUE_TIMEOUT);
if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE) {
Expand Down Expand Up @@ -546,6 +561,7 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) {
.cacheMaxBytes(getSinkMaxBytes())
.connectTimeout(getConnectTimeout())
.waitForContinueTimeoutMs(getWaitForContinueTimeout())
.socketTimeout(getSocketTimeout())
.ioThreadCount(getIoThreadCount())
.scanningFrequency(getScanFrequency())
.labelPrefix(getLabelPrefix())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class DefaultStreamLoader implements StreamLoader, Serializable {

private static final int ERROR_LOG_MAX_LENGTH = 3000;

private StreamLoadProperties properties;
protected StreamLoadProperties properties;
private StreamLoadManager manager;

private HttpClientBuilder clientBuilder;
Expand Down Expand Up @@ -283,7 +283,11 @@ protected StreamLoadResponse sendToSR(TableRegion region) {
String label = region.getLabel();

HttpPut httpPut = new HttpPut(sendUrl);
httpPut.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
httpPut.setConfig(RequestConfig.custom()
.setSocketTimeout(properties.getSocketTimeout())
.setExpectContinueEnabled(true)
.setRedirectsEnabled(true)
.build());
httpPut.setEntity(region.getHttpEntity());

httpPut.setHeaders(defaultHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ protected boolean doBegin(TableRegion region) {
httpPost.addHeader("db", region.getDatabase());
httpPost.addHeader("table", region.getTable());

httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
httpPost.setConfig(RequestConfig.custom()
.setSocketTimeout(properties.getSocketTimeout())
.setExpectContinueEnabled(true)
.setRedirectsEnabled(true)
.build());

String db = region.getDatabase();
String table = region.getTable();
Expand Down Expand Up @@ -169,8 +173,11 @@ public boolean prepare(StreamLoadSnapshot.Transaction transaction) {
httpPost.addHeader("db", transaction.getDatabase());
httpPost.addHeader("table", transaction.getTable());

httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());

httpPost.setConfig(RequestConfig.custom()
.setSocketTimeout(properties.getSocketTimeout())
.setExpectContinueEnabled(true)
.setRedirectsEnabled(true)
.build());

log.info("Transaction prepare, label : {}, request : {}", transaction.getLabel(), httpPost);

Expand Down Expand Up @@ -237,8 +244,11 @@ public boolean commit(StreamLoadSnapshot.Transaction transaction) {
httpPost.addHeader("db", transaction.getDatabase());
httpPost.addHeader("table", transaction.getTable());

httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());

httpPost.setConfig(RequestConfig.custom()
.setSocketTimeout(properties.getSocketTimeout())
.setExpectContinueEnabled(true)
.setRedirectsEnabled(true)
.build());

log.info("Transaction commit, label: {}, request : {}", transaction.getLabel(), httpPost);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ public static class Builder {
private Map<String, StreamLoadTableProperties> tablePropertiesMap = new HashMap<>();

private int connectTimeout = 60000;
private int socketTimeout;
// Default value -1 is the same as that in RequestConfig.Builder#socketTimeout
private int socketTimeout = -1;
private int waitForContinueTimeoutMs = DEFAULT_WAIT_FOR_CONTINUE;
private int ioThreadCount = Runtime.getRuntime().availableProcessors();

Expand Down Expand Up @@ -365,16 +366,13 @@ public Builder connectTimeout(int connectTimeout) {
public Builder waitForContinueTimeoutMs(int waitForContinueTimeoutMs) {
if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE || waitForContinueTimeoutMs > 60000) {
throw new IllegalArgumentException("waitForContinueTimeoutMs `" + waitForContinueTimeoutMs +
"ms` set failed, must be in range in [100, 60000]");
"ms` set failed, must be in range in [3000, 60000]");
}
this.waitForContinueTimeoutMs = waitForContinueTimeoutMs;
return this;
}

public Builder socketTimeout(int socketTimeout) {
if (socketTimeout < 0) {
throw new IllegalArgumentException("socketTimeout `" + socketTimeout + "ms` set failed, must greater or equals to 0");
}
this.socketTimeout = socketTimeout;
return this;
}
Expand Down

0 comments on commit 92c3477

Please sign in to comment.