Skip to content

Commit

Permalink
[Bugfix] Fix NPE caused by concurrent access
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Aug 21, 2023
1 parent b3ae21e commit 3314894
Showing 1 changed file with 9 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -84,7 +84,7 @@ enum State {
private final long maxCacheBytes;
// threshold to block write
private final long maxWriteBlockCacheBytes;
private final Map<String, TableRegion> regions = new HashMap<>();
private final Map<String, TableRegion> regions = new ConcurrentHashMap<>();
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final AtomicLong totalFlushRows = new AtomicLong(0L);

Expand All @@ -105,7 +105,7 @@ enum State {
private final AtomicReference<State> state = new AtomicReference<>(State.INACTIVE);
private volatile Throwable e;

private final Queue<TransactionTableRegion> flushQ = new LinkedList<>();
private final Queue<TransactionTableRegion> flushQ = new ConcurrentLinkedQueue<>();

/**
* Whether write() has triggered a flush after currentCacheBytes > maxCacheBytes.
Expand Down Expand Up @@ -419,16 +419,11 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t

TableRegion region = regions.get(uniqueKey);
if (region == null) {
synchronized (regions) {
region = regions.get(uniqueKey);
if (region == null) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey);
region = new TransactionTableRegion(uniqueKey, database, table, this,
tableProperties, streamLoader, maxRetries, retryIntervalInMs);
regions.put(uniqueKey, region);
flushQ.offer((TransactionTableRegion) region);
}
}
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey);
region = new TransactionTableRegion(uniqueKey, database, table, this,
tableProperties, streamLoader, maxRetries, retryIntervalInMs);
regions.put(uniqueKey, region);
flushQ.offer((TransactionTableRegion) region);
}
return region;
}
Expand Down

0 comments on commit 3314894

Please sign in to comment.