From 93b43a5241e8e0954e54c5b0f3f4012a861c1232 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 14 Dec 2023 18:07:07 +0800 Subject: [PATCH] [Enhancement] Support to configure socket timeout Signed-off-by: PengFei Li --- .../manager/StarRocksStreamLoadVisitor.java | 4 ++- .../StarRocksDynamicTableSinkFactory.java | 1 + .../table/sink/StarRocksSinkOptions.java | 28 +++++++++++++++---- .../data/load/stream/DefaultStreamLoader.java | 8 ++++-- .../load/stream/TransactionStreamLoader.java | 20 +++++++++---- .../properties/StreamLoadProperties.java | 8 ++---- 6 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java index ef89fce3..9b59362f 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java @@ -290,7 +290,9 @@ protected boolean isRedirectable(String method) { httpPut.setHeader("label", label); httpPut.setHeader("Authorization", getBasicAuthHeader(sinkOptions.getUsername(), sinkOptions.getPassword())); httpPut.setEntity(new ByteArrayEntity(data)); - httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + httpPut.setConfig(RequestConfig.custom() + .setSocketTimeout(sinkOptions.getSocketTimeout()) + .setRedirectsEnabled(true).build()); try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { HttpEntity respEntity = getHttpEntity(resp); if (respEntity == null) diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java index 21a2b7c6..251391df 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java @@ -77,6 +77,7 @@ public Set> optionalOptions() { optionalOptions.add(StarRocksSinkOptions.SINK_PARALLELISM); optionalOptions.add(StarRocksSinkOptions.SINK_LABEL_PREFIX); optionalOptions.add(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT); + optionalOptions.add(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT); optionalOptions.add(StarRocksSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT); optionalOptions.add(StarRocksSinkOptions.SINK_IO_THREAD_COUNT); optionalOptions.add(StarRocksSinkOptions.SINK_CHUNK_LIMIT); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index e0fac69c..6f6a611d 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -14,11 +14,6 @@ package com.starrocks.connector.flink.table.sink; -import com.starrocks.connector.flink.manager.StarRocksSinkTable; -import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser; -import com.starrocks.data.load.stream.StreamLoadDataFormat; -import com.starrocks.data.load.stream.properties.StreamLoadProperties; -import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -27,9 +22,17 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.Preconditions; + +import com.starrocks.connector.flink.manager.StarRocksSinkTable; +import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser; +import com.starrocks.data.load.stream.StreamLoadDataFormat; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +42,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import javax.annotation.Nullable; import static org.apache.http.protocol.HttpRequestExecutor.DEFAULT_WAIT_FOR_CONTINUE; @@ -82,6 +84,15 @@ public enum StreamLoadFormat { .stringType().noDefaultValue().withDescription("The prefix of the stream load label. Available values are within [-_A-Za-z0-9]"); public static final ConfigOption SINK_CONNECT_TIMEOUT = ConfigOptions.key("sink.connect.timeout-ms") .intType().defaultValue(30000).withDescription("Timeout in millisecond for connecting to the `load-url`."); + + public static final ConfigOption SINK_SOCKET_TIMEOUT = ConfigOptions.key("sink.socket.timeout-ms") + .intType().defaultValue(-1).withDescription("Timeout in milliseconds that the http client waits for data or, put differently, " + + "a maximum period inactivity between two consecutive data packets. A timeout value of zero is interpreted as an infinite " + + "timeout. A negative value is interpreted as undefined (system default if applicable). You can use this option to fail " + + "the stream load from the connector side if the http client does not receive response from StarRocks before timeout. " + + "The other option 'sink.properties.timeout' take effects on the StarRocks side, but the response to the client may delay " + + "in some unexpected cases. If you want to have a strict timeout from the connector side, you can set this option to an " + + "acceptable value."); public static final ConfigOption SINK_WAIT_FOR_CONTINUE_TIMEOUT = ConfigOptions.key("sink.wait-for-continue.timeout-ms") .intType().defaultValue(30000).withDescription("Timeout in millisecond to wait for 100-continue response for http client."); public static final ConfigOption SINK_IO_THREAD_COUNT = ConfigOptions.key("sink.io.thread-count") @@ -265,6 +276,10 @@ public int getConnectTimeout() { return Math.min(connectTimeout, 60000); } + public int getSocketTimeout() { + return tableOptions.get(SINK_SOCKET_TIMEOUT); + } + public int getWaitForContinueTimeout() { int waitForContinueTimeoutMs = tableOptions.get(SINK_WAIT_FOR_CONTINUE_TIMEOUT); if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE) { @@ -546,6 +561,7 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .cacheMaxBytes(getSinkMaxBytes()) .connectTimeout(getConnectTimeout()) .waitForContinueTimeoutMs(getWaitForContinueTimeout()) + .socketTimeout(getSocketTimeout()) .ioThreadCount(getIoThreadCount()) .scanningFrequency(getScanFrequency()) .labelPrefix(getLabelPrefix()) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java index e6898c8e..ffcbc5df 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java @@ -65,7 +65,7 @@ public class DefaultStreamLoader implements StreamLoader, Serializable { private static final int ERROR_LOG_MAX_LENGTH = 3000; - private StreamLoadProperties properties; + protected StreamLoadProperties properties; private StreamLoadManager manager; private HttpClientBuilder clientBuilder; @@ -283,7 +283,11 @@ protected StreamLoadResponse sendToSR(TableRegion region) { String label = region.getLabel(); HttpPut httpPut = new HttpPut(sendUrl); - httpPut.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build()); + httpPut.setConfig(RequestConfig.custom() + .setSocketTimeout(properties.getSocketTimeout()) + .setExpectContinueEnabled(true) + .setRedirectsEnabled(true) + .build()); httpPut.setEntity(region.getHttpEntity()); httpPut.setHeaders(defaultHeaders); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java index de5d7c00..9731249f 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/TransactionStreamLoader.java @@ -118,7 +118,11 @@ protected boolean doBegin(TableRegion region) { httpPost.addHeader("db", region.getDatabase()); httpPost.addHeader("table", region.getTable()); - httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build()); + httpPost.setConfig(RequestConfig.custom() + .setSocketTimeout(properties.getSocketTimeout()) + .setExpectContinueEnabled(true) + .setRedirectsEnabled(true) + .build()); String db = region.getDatabase(); String table = region.getTable(); @@ -169,8 +173,11 @@ public boolean prepare(StreamLoadSnapshot.Transaction transaction) { httpPost.addHeader("db", transaction.getDatabase()); httpPost.addHeader("table", transaction.getTable()); - httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build()); - + httpPost.setConfig(RequestConfig.custom() + .setSocketTimeout(properties.getSocketTimeout()) + .setExpectContinueEnabled(true) + .setRedirectsEnabled(true) + .build()); log.info("Transaction prepare, label : {}, request : {}", transaction.getLabel(), httpPost); @@ -237,8 +244,11 @@ public boolean commit(StreamLoadSnapshot.Transaction transaction) { httpPost.addHeader("db", transaction.getDatabase()); httpPost.addHeader("table", transaction.getTable()); - httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build()); - + httpPost.setConfig(RequestConfig.custom() + .setSocketTimeout(properties.getSocketTimeout()) + .setExpectContinueEnabled(true) + .setRedirectsEnabled(true) + .build()); log.info("Transaction commit, label: {}, request : {}", transaction.getLabel(), httpPost); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java index e255eba0..118a168c 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java @@ -259,7 +259,8 @@ public static class Builder { private Map tablePropertiesMap = new HashMap<>(); private int connectTimeout = 60000; - private int socketTimeout; + // Default value -1 is the same as that in RequestConfig.Builder#socketTimeout + private int socketTimeout = -1; private int waitForContinueTimeoutMs = DEFAULT_WAIT_FOR_CONTINUE; private int ioThreadCount = Runtime.getRuntime().availableProcessors(); @@ -365,16 +366,13 @@ public Builder connectTimeout(int connectTimeout) { public Builder waitForContinueTimeoutMs(int waitForContinueTimeoutMs) { if (waitForContinueTimeoutMs < DEFAULT_WAIT_FOR_CONTINUE || waitForContinueTimeoutMs > 60000) { throw new IllegalArgumentException("waitForContinueTimeoutMs `" + waitForContinueTimeoutMs + - "ms` set failed, must be in range in [100, 60000]"); + "ms` set failed, must be in range in [3000, 60000]"); } this.waitForContinueTimeoutMs = waitForContinueTimeoutMs; return this; } public Builder socketTimeout(int socketTimeout) { - if (socketTimeout < 0) { - throw new IllegalArgumentException("socketTimeout `" + socketTimeout + "ms` set failed, must greater or equals to 0"); - } this.socketTimeout = socketTimeout; return this; }