Skip to content

Commit

Permalink
Add connector test
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jul 16, 2024
1 parent 0bddc52 commit 612c555
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ public enum StreamLoadFormat {
"in the coming 2.0. Note that it's not compatible after changing the flag, that's, you can't recover from " +
"the previous job after changing the flag.");

public static final ConfigOption<Integer> SINK_GROUP_COMMIT_CHECK_LABEL_INTERVAL_MS =
ConfigOptions.key("sink.group-commit.check-label.interval-ms")
.intType().defaultValue(100);

public static final ConfigOption<Integer> SINK_GROUP_COMMIT_CHECK_LABEL_TIMEOUT_MS =
ConfigOptions.key("sink.group-commit.check-label.timeout-ms")
.intType().defaultValue(60000);

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

// Sink semantic
Expand Down Expand Up @@ -377,6 +385,14 @@ public boolean isUseUnifiedSinkApi() {
return tableOptions.get(SINK_USE_NEW_SINK_API);
}

public int getGroupCommitCheckLabelIntervalMs() {
return tableOptions.get(SINK_GROUP_COMMIT_CHECK_LABEL_INTERVAL_MS);
}

public int getGroupCommitCheckLabelTimeoutMs() {
return tableOptions.get(SINK_GROUP_COMMIT_CHECK_LABEL_TIMEOUT_MS);
}

private void validateStreamLoadUrl() {
tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> {
for (String host : urlList) {
Expand Down Expand Up @@ -573,7 +589,9 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) {
// TODO not support retry currently
.maxRetries(0)
.retryIntervalInMs(getRetryIntervalMs())
.addHeaders(streamLoadProperties);
.addHeaders(streamLoadProperties)
.setCheckLabelIntervalMs(getGroupCommitCheckLabelIntervalMs())
.setCheckLabelTimeoutMs(getGroupCommitCheckLabelTimeoutMs());

for (StreamLoadTableProperties tableProperties : tablePropertiesList) {
builder.addTableProperties(tableProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,4 +737,13 @@ public void testJsonLz4Compression() throws Exception {
map.put("sink.at-least-once.use-transaction-stream-load", "false");
testConfigurationBase(map, env -> null);
}

@Test
public void testGroupCommit() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("sink.properties.group_commit", "async");
options.put("sink.properties.format", "json");
options.put("sink.properties.compression", "lz4_frame");
testConfigurationBase(options, env -> null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class LabelManager implements Closeable, Serializable {
private final String[] hosts;
private final String user;
private final String password;
private final long retryIntervalMs;
private final long timeoutMs;
private final long checkLabelIntervalMs;
private final long checkLabelTimeoutMs;
private final Map<TableId, TableLabelHolder> labelHolderMap;
private transient ScheduledExecutorService scheduledExecutorService;
private transient ObjectMapper objectMapper;
Expand All @@ -59,8 +59,8 @@ public LabelManager(StreamLoadProperties properties) {
this.hosts = properties.getLoadUrls();
this.user = properties.getUsername();
this.password = properties.getPassword();
this.retryIntervalMs = properties.getCheckLabelIntervalMs();
this.timeoutMs = properties.getCheckLabelTimeoutMs();
this.checkLabelIntervalMs = properties.getCheckLabelIntervalMs();
this.checkLabelTimeoutMs = properties.getCheckLabelTimeoutMs();
this.labelHolderMap = new ConcurrentHashMap<>();
}

Expand All @@ -78,6 +78,8 @@ public void start() {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// filed names in StreamLoadResponseBody are case-insensitive
objectMapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
LOG.info("Start label manager, checkLabelIntervalMs: {}, checkLabelTimeoutMs: {}",
checkLabelIntervalMs, checkLabelTimeoutMs);
}

@Override
Expand All @@ -94,7 +96,7 @@ public CompletableFuture<TransactionStatus> getLabelFinalStatusAsync(
.addLabel(label, expectFinishTimeMs);
if (labelMeta.isScheduled.compareAndSet(false, true)) {
long delayMs = expectFinishTimeMs > 0 ?
Math.max(0, expectFinishTimeMs - System.currentTimeMillis()) : retryIntervalMs;
Math.max(0, expectFinishTimeMs - System.currentTimeMillis()) : checkLabelIntervalMs;
scheduledExecutorService.schedule(() -> checkLabelState(labelMeta), delayMs, TimeUnit.MILLISECONDS);
LOG.info("Schedule to get label state, db: {}, table: {}, label: {}, delay: {}ms",
tableId.db, tableId.table, label, delayMs);
Expand Down Expand Up @@ -134,15 +136,15 @@ private void checkLabelState(LabelMeta labelMeta) {
return;
}

if (System.currentTimeMillis() - labelMeta.createTimeMs >= timeoutMs) {
if (System.currentTimeMillis() - labelMeta.createTimeMs >= checkLabelTimeoutMs) {
labelMeta.future.completeExceptionally(new RuntimeException("Get label state timeout"));
LOG.error("Failed to retry to get label state because of timeout, db: {}, table: {}, label: {}, timeout: {}ms",
labelMeta.tableId.db, labelMeta.tableId.table, labelMeta.label, timeoutMs);
labelMeta.tableId.db, labelMeta.tableId.table, labelMeta.label, checkLabelTimeoutMs);
return;
}

labelMeta.numRetries += 1;
scheduledExecutorService.schedule(() -> checkLabelState(labelMeta), retryIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.schedule(() -> checkLabelState(labelMeta), checkLabelIntervalMs, TimeUnit.MILLISECONDS);
LOG.info("Retry to get label state, db: {}, table: {}, label: {}, retries: {}",
labelMeta.tableId.db, labelMeta.tableId.table, labelMeta.label, labelMeta.numRetries);
}
Expand Down

0 comments on commit 612c555

Please sign in to comment.