Skip to content

Commit

Permalink
RATIS-1743. Memory leak in SegmentedRaftLogWorker due to metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Nov 14, 2022
1 parent 0e89ce7 commit 78102d5
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,16 @@ static void setWriteBufferSize(RaftProperties properties, SizeInBytes writeBuffe
setSizeInBytes(properties::set, WRITE_BUFFER_SIZE_KEY, writeBufferSize);
}

String BYTE_ARRAY_SHARE_KEY = PREFIX + ".byte.array.share";
boolean BYTE_ARRAY_SHARE_DEFAULT = true;
static boolean byteArrayShare(RaftProperties properties) {
return getBoolean(properties::getBoolean,
BYTE_ARRAY_SHARE_KEY, BYTE_ARRAY_SHARE_DEFAULT, getDefaultLog());
}
static void setByteArrayShare(RaftProperties properties, boolean isShared) {
setBoolean(properties::setBoolean, BYTE_ARRAY_SHARE_KEY, isShared);
}

String FORCE_SYNC_NUM_KEY = PREFIX + ".force.sync.num";
int FORCE_SYNC_NUM_DEFAULT = 128;
static int forceSyncNum(RaftProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.nio.channels.FileChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.function.IntFunction;
import java.util.zip.Checksum;

public class SegmentedRaftLogOutputStream implements Closeable {
Expand All @@ -53,7 +53,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
private final File file;
private final BufferedWriteChannel out; // buffered FileChannel for writing
private final Checksum checksum;
private final Supplier<byte[]> sharedBuffer;
private final IntFunction<byte[]> byteArrayGet;

private final long segmentMaxSize;
private final long preallocatedSize;
Expand All @@ -65,13 +65,13 @@ public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSi
}

SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
long preallocatedSize, ByteBuffer byteBuffer, Supplier<byte[]> sharedBuffer)
long preallocatedSize, ByteBuffer byteBuffer, IntFunction<byte[]> byteArrayGet)
throws IOException {
this.file = file;
this.checksum = new PureJavaCrc32C();
this.segmentMaxSize = segmentMaxSize;
this.preallocatedSize = preallocatedSize;
this.sharedBuffer = sharedBuffer;
this.byteArrayGet = byteArrayGet;
this.out = BufferedWriteChannel.open(file, append, byteBuffer);

if (!append) {
Expand All @@ -98,7 +98,7 @@ public void write(LogEntryProto entry) throws IOException {
final int serialized = entry.getSerializedSize();
final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
final int total = proto + 4; // proto and 4-byte checksum
final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new byte[total];
final byte[] buf = byteArrayGet != null? byteArrayGet.apply(total): new byte[total];
Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " > buf.length " + buf.length);
preallocateIfNecessary(total);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
Expand Down Expand Up @@ -51,6 +50,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -132,6 +132,53 @@ synchronized void updateIndex(long i) {
}
}

static class ByteArray {
private final boolean shared;
private final int sizeLimit;
private volatile byte[] bytes;

ByteArray(boolean shared, int sizeLimit) {
this.shared = shared;
this.sizeLimit = sizeLimit;
}

byte[] get(int size) {
final byte[] byteArray = getImpl(size);
Preconditions.assertNotNull(byteArray, "byteArray");
Preconditions.assertTrue(byteArray.length >= size);
return byteArray;
}

private byte[] getImpl(int size) {
if (!shared) {
return new byte[size];
}

final byte[] existing = bytes;
if (existing != null && existing.length >= size) {
return existing;
}

synchronized (this) {
if (bytes != null && bytes.length >= size) {
return bytes;
}
bytes = new byte[Math.min(sizeLimit, roundUpPowerOfTwo(size))];
return bytes;
}
}

static int roundUpPowerOfTwo(int n) {
final long highestOne = Integer.highestOneBit(n);
if (highestOne == n) {
return n; // n is a power of two.
}
return Math.toIntExact(highestOne << 1);
}
}

private static final AtomicInteger COUNT = new AtomicInteger();

private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", this, s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", this, s);

Expand All @@ -154,7 +201,7 @@ synchronized void updateIndex(long i) {
private final Timer raftLogEnqueueingDelayTimer;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final ByteBuffer writeBuffer;
private final Supplier<byte[]> sharedBuffer;
private final ByteArray byteArray;

/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
Expand Down Expand Up @@ -185,7 +232,7 @@ synchronized void updateIndex(long i) {
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftServer.Division server, RaftStorage storage, RaftProperties properties,
SegmentedRaftLogMetrics metricRegistry) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.getAndIncrement();
LOG.info("new {} for {}", name, storage);

this.submitUpdateCommitEvent = submitUpdateCommitEvent;
Expand Down Expand Up @@ -219,8 +266,9 @@ synchronized void updateIndex(long i) {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
final boolean share = RaftServerConfigKeys.Log.byteArrayShare(properties);
// 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]);
this.byteArray = new ByteArray(share, logEntryLimit + 8);
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
if (asyncFlush && unsafeFlush) {
Expand Down Expand Up @@ -368,7 +416,6 @@ private boolean shouldFlush() {
return pendingFlushNum > 0 && queue.isEmpty();
}

@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
private void flushIfNecessary() throws IOException {
if (shouldFlush()) {
raftLogMetrics.onRaftLogFlush();
Expand Down Expand Up @@ -754,6 +801,6 @@ private void freeSegmentedRaftLogOutputStream() {
private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
preallocatedSize, writeBuffer, sharedBuffer);
preallocatedSize, writeBuffer, byteArray::get);
}
}

0 comments on commit 78102d5

Please sign in to comment.