diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java index 719cd35e..d89cc14d 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java @@ -14,16 +14,17 @@ package com.starrocks.connector.flink.table.source; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; -import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; -import com.starrocks.connector.flink.table.source.struct.QueryInfo; -import com.starrocks.connector.flink.table.source.struct.SelectColumn; -import com.starrocks.connector.flink.tools.EnvUtils; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; + +import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; +import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; +import com.starrocks.connector.flink.table.source.struct.QueryInfo; +import com.starrocks.connector.flink.table.source.struct.SelectColumn; +import com.starrocks.connector.flink.tools.EnvUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,29 +104,54 @@ private void reloadData() { LOG.info("LookUpFunction SQL [{}]", sqlSb.toString()); this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, sqlSb.toString()); List> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo); - cacheMap = lists.get(0).parallelStream().flatMap(beXTablets -> { - StarRocksSourceBeReader beReader = new StarRocksSourceBeReader( - beXTablets.getBeNode(), - columnRichInfos, - selectColumns, + cacheMap = lists.get(0).parallelStream() + .flatMap(beXTablets -> scanBeTablets(beXTablets).stream()) + .collect(Collectors.groupingBy(row -> { + GenericRowData gRowData = (GenericRowData)row; + Object[] keyObj = new Object[filterRichInfos.length]; + for (int i = 0; i < filterRichInfos.length; i ++) { + keyObj[i] = gRowData.getField(filterRichInfos[i].getColumnIndexInSchema()); + } + return Row.of(keyObj); + })); + nextLoadTime = System.currentTimeMillis() + this.cacheExpireMs; + } + + private List scanBeTablets(QueryBeXTablets beXTablets) { + List tmpDataList = new ArrayList<>(); + RuntimeException exception = null; + StarRocksSourceBeReader beReader = new StarRocksSourceBeReader( + beXTablets.getBeNode(), + columnRichInfos, + selectColumns, + sourceOptions); + try { + beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions); - beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions); beReader.startToRead(); - List tmpDataList = new ArrayList<>(); while (beReader.hasNext()) { RowData row = beReader.getNext(); tmpDataList.add(row); } - return tmpDataList.stream(); - }).collect(Collectors.groupingBy(row -> { - GenericRowData gRowData = (GenericRowData)row; - Object[] keyObj = new Object[filterRichInfos.length]; - for (int i = 0; i < filterRichInfos.length; i ++) { - keyObj[i] = gRowData.getField(filterRichInfos[i].getColumnIndexInSchema()); + } catch (Exception e) { + LOG.error("Failed to scan tablets for BE node {}", beXTablets.getBeNode(), e); + exception = new RuntimeException("Failed to scan tablets for BE node " + beXTablets.getBeNode(), e); + } finally { + try { + beReader.close(); + LOG.info("Close reader for BE {}", beXTablets.getBeNode()); + } catch (Exception ie) { + LOG.error("Failed to close reader for BE {}", beXTablets.getBeNode(), ie); + if (exception == null) { + exception = new RuntimeException("Failed to close reader for BE node " + beXTablets.getBeNode(), ie); + } } - return Row.of(keyObj); - })); - nextLoadTime = System.currentTimeMillis() + this.cacheExpireMs; + } + + if (exception != null) { + throw exception; + } + return tmpDataList; } @Override diff --git a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java index 42906c46..5ac4185a 100644 --- a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java @@ -31,6 +31,7 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -335,4 +336,54 @@ private String createNestedTypeTable(String tablePrefix) throws Exception { executeSrSQL(createStarRocksTable); return tableName; } + + @Test + public void testDim() throws Exception { + String tableName = createDimTable("testDim"); + executeSrSQL( + String.format( + "INSERT INTO `%s`.`%s` VALUES (%s), (%s)", DB_NAME, tableName, "1", "2")); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String srcSQL = "CREATE TABLE sr_src(" + + "c0 INT," + + "proc_time AS PROCTIME()" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + getJdbcUrl() + "'," + + "'scan-url'='" + String.join(";", getHttpUrls()) + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + tableName + "'," + + "'username' = 'root'," + + "'password' = ''" + + ")"; + tEnv.executeSql(srcSQL); + List results = + CollectionUtil.iteratorToList(tEnv.executeSql( + "SELECT t0.c0 FROM sr_src AS t0 JOIN sr_src " + + "FOR SYSTEM_TIME AS OF t0.proc_time AS t1 ON t0.c0 = t1.c0;") + .collect()); + assertThat(results).containsExactlyInAnyOrderElementsOf(Arrays.asList(Row.of(1), Row.of(2))); + } + + private String createDimTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT" + + ") ENGINE = OLAP " + + "DUPLICATE KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } + }