Skip to content

Commit

Permalink
[BugFix] Close StarRocksSourceBeReader in StarRocksDynamicLookupFunct…
Browse files Browse the repository at this point in the history
…ion (#351)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Apr 10, 2024
1 parent ed5cc8a commit 45fceec
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@

package com.starrocks.connector.flink.table.source;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.tools.EnvUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.tools.EnvUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,29 +104,54 @@ private void reloadData() {
LOG.info("LookUpFunction SQL [{}]", sqlSb.toString());
this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, sqlSb.toString());
List<List<QueryBeXTablets>> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo);
cacheMap = lists.get(0).parallelStream().flatMap(beXTablets -> {
StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(
beXTablets.getBeNode(),
columnRichInfos,
selectColumns,
cacheMap = lists.get(0).parallelStream()
.flatMap(beXTablets -> scanBeTablets(beXTablets).stream())
.collect(Collectors.groupingBy(row -> {
GenericRowData gRowData = (GenericRowData)row;
Object[] keyObj = new Object[filterRichInfos.length];
for (int i = 0; i < filterRichInfos.length; i ++) {
keyObj[i] = gRowData.getField(filterRichInfos[i].getColumnIndexInSchema());
}
return Row.of(keyObj);
}));
nextLoadTime = System.currentTimeMillis() + this.cacheExpireMs;
}

private List<RowData> scanBeTablets(QueryBeXTablets beXTablets) {
List<RowData> tmpDataList = new ArrayList<>();
RuntimeException exception = null;
StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(
beXTablets.getBeNode(),
columnRichInfos,
selectColumns,
sourceOptions);
try {
beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(),
sourceOptions);
beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions);
beReader.startToRead();
List<RowData> tmpDataList = new ArrayList<>();
while (beReader.hasNext()) {
RowData row = beReader.getNext();
tmpDataList.add(row);
}
return tmpDataList.stream();
}).collect(Collectors.groupingBy(row -> {
GenericRowData gRowData = (GenericRowData)row;
Object[] keyObj = new Object[filterRichInfos.length];
for (int i = 0; i < filterRichInfos.length; i ++) {
keyObj[i] = gRowData.getField(filterRichInfos[i].getColumnIndexInSchema());
} catch (Exception e) {
LOG.error("Failed to scan tablets for BE node {}", beXTablets.getBeNode(), e);
exception = new RuntimeException("Failed to scan tablets for BE node " + beXTablets.getBeNode(), e);
} finally {
try {
beReader.close();
LOG.info("Close reader for BE {}", beXTablets.getBeNode());
} catch (Exception ie) {
LOG.error("Failed to close reader for BE {}", beXTablets.getBeNode(), ie);
if (exception == null) {
exception = new RuntimeException("Failed to close reader for BE node " + beXTablets.getBeNode(), ie);
}
}
return Row.of(keyObj);
}));
nextLoadTime = System.currentTimeMillis() + this.cacheExpireMs;
}

if (exception != null) {
throw exception;
}
return tmpDataList;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -335,4 +336,54 @@ private String createNestedTypeTable(String tablePrefix) throws Exception {
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testDim() throws Exception {
String tableName = createDimTable("testDim");
executeSrSQL(
String.format(
"INSERT INTO `%s`.`%s` VALUES (%s), (%s)", DB_NAME, tableName, "1", "2"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String srcSQL = "CREATE TABLE sr_src(" +
"c0 INT," +
"proc_time AS PROCTIME()" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(srcSQL);
List<Row> results =
CollectionUtil.iteratorToList(tEnv.executeSql(
"SELECT t0.c0 FROM sr_src AS t0 JOIN sr_src " +
"FOR SYSTEM_TIME AS OF t0.proc_time AS t1 ON t0.c0 = t1.c0;")
.collect());
assertThat(results).containsExactlyInAnyOrderElementsOf(Arrays.asList(Row.of(1), Row.of(2)));
}

private String createDimTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}

}

0 comments on commit 45fceec

Please sign in to comment.