From 3314894f13a0408fc33e3a80e3b7e4f9aca4a920 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Mon, 21 Aug 2023 11:42:36 +0800 Subject: [PATCH] [Bugfix] Fix NPE caused by concurrent access Signed-off-by: PengFei Li --- .../load/stream/v2/StreamLoadManagerV2.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index 43094330..09fbcfad 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -35,10 +35,10 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -84,7 +84,7 @@ enum State { private final long maxCacheBytes; // threshold to block write private final long maxWriteBlockCacheBytes; - private final Map regions = new HashMap<>(); + private final Map regions = new ConcurrentHashMap<>(); private final AtomicLong currentCacheBytes = new AtomicLong(0L); private final AtomicLong totalFlushRows = new AtomicLong(0L); @@ -105,7 +105,7 @@ enum State { private final AtomicReference state = new AtomicReference<>(State.INACTIVE); private volatile Throwable e; - private final Queue flushQ = new LinkedList<>(); + private final Queue flushQ = new ConcurrentLinkedQueue<>(); /** * Whether write() has triggered a flush after currentCacheBytes > maxCacheBytes. @@ -419,16 +419,11 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t TableRegion region = regions.get(uniqueKey); if (region == null) { - synchronized (regions) { - region = regions.get(uniqueKey); - if (region == null) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey); - region = new TransactionTableRegion(uniqueKey, database, table, this, - tableProperties, streamLoader, maxRetries, retryIntervalInMs); - regions.put(uniqueKey, region); - flushQ.offer((TransactionTableRegion) region); - } - } + StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey); + region = new TransactionTableRegion(uniqueKey, database, table, this, + tableProperties, streamLoader, maxRetries, retryIntervalInMs); + regions.put(uniqueKey, region); + flushQ.offer((TransactionTableRegion) region); } return region; }