Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-11092 main: Fix IndexInputLeakDetector #1310

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class IndexInputReader extends IndexInput
private final RandomAccessReader input;
private final Runnable doOnClose;

private IndexInputReader(RandomAccessReader input, Runnable doOnClose)
protected IndexInputReader(RandomAccessReader input, Runnable doOnClose)
{
super(input.getFile().toString(), input.order());
this.input = input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,9 @@ public void reset() throws IOException
@Override
public void close() throws IOException
{
this.termsData.close();
this.reader.close();
blockOffsets.close();
termsData.close();
reader.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -32,7 +31,6 @@
import io.github.jbellis.jvector.disk.BufferedRandomAccessWriter;
import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.index.sai.disk.io.IndexInput;
import org.apache.cassandra.index.sai.disk.io.IndexInputReader;
import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
import org.apache.cassandra.io.compress.BufferType;
Expand All @@ -55,13 +53,15 @@ public class IndexFileUtils
.finishOnClose(true)
.build();

public static final IndexFileUtils instance = new IndexFileUtils();
public static volatile IndexFileUtils instance = new IndexFileUtils(defaultWriterOption);

private static final SequentialWriterOption writerOption = defaultWriterOption;
private final SequentialWriterOption writerOption;

@VisibleForTesting
protected IndexFileUtils()
{}
protected IndexFileUtils(SequentialWriterOption writerOption)
{
this.writerOption = writerOption;
}

public IndexOutputWriter openOutput(File file, ByteOrder order, boolean append) throws IOException
{
Expand Down Expand Up @@ -91,12 +91,12 @@ public BufferedRandomAccessWriter openRandomAccessOutput(File file, boolean appe
return out;
}

public IndexInput openInput(FileHandle handle)
public IndexInputReader openInput(FileHandle handle)
{
return IndexInputReader.create(handle);
}

public IndexInput openBlockingInput(FileHandle fileHandle)
public IndexInputReader openBlockingInput(FileHandle fileHandle)
{
final RandomAccessReader randomReader = fileHandle.createReader();
return IndexInputReader.create(randomReader, fileHandle::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,114 @@
package org.apache.cassandra.index.sai.disk.io;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import com.google.common.base.Throwables;

import org.junit.Assert;

import org.apache.cassandra.index.sai.utils.IndexFileUtils;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.SequentialWriterOption;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.apache.lucene.index.CorruptIndexException;

public class TrackingIndexFileUtils extends IndexFileUtils
{
private final Map<TrackingIndexInput, String> openInputs = Collections.synchronizedMap(new HashMap<>());
private final Set<TrackingIndexInput> closedInputs = Collections.synchronizedSet(new HashSet<>());

public TrackingIndexFileUtils(SequentialWriterOption writerOption)
{
setWriterOption(writerOption);
super(writerOption);
}

@Override
public IndexInput openInput(FileHandle handle)
public IndexInputReader openInput(FileHandle handle)
{
TrackingIndexInput input = new TrackingIndexInput(super.openInput(handle));
openInputs.put(input, Throwables.getStackTraceAsString(new RuntimeException("Input created")));
return input;
}

public Map<IndexInput, String> getOpenInputs()
@Override
public IndexInputReader openBlockingInput(FileHandle fileHandle)
{
return new HashMap<>(openInputs);
TrackingIndexInput input = new TrackingIndexInput(super.openBlockingInput(fileHandle));
openInputs.put(input, Throwables.getStackTraceAsString(new RuntimeException("Blocking input created")));
return input;
}

public static void reset()
public Map<IndexInput, String> getOpenInputs()
{
setWriterOption(IndexFileUtils.defaultWriterOption);
return new HashMap<>(openInputs);
}

public class TrackingIndexInput extends FilterIndexInput
private class TrackingIndexInput extends IndexInputReader
{
TrackingIndexInput(IndexInput delegate)
private final IndexInputReader delegate;

protected TrackingIndexInput(IndexInputReader delegate)
{
super(delegate);
super(delegate.reader(), () -> {});
this.delegate = delegate;
}

@Override
public void close() throws IOException
public synchronized void close()
{
super.close();
delegate.close();
final String creationStackTrace = openInputs.remove(this);
assertNotNull("Closed unregistered input: " + this, creationStackTrace);

if (closedInputs.add(this) && creationStackTrace == null)
{
Assert.fail("Closed unregistered input: " + this);
}
}
}

public static void setWriterOption(SequentialWriterOption option)
{
try
@Override
public long getFilePointer()
{
Field writerOption = IndexFileUtils.class.getDeclaredField("writerOption");
writerOption.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(writerOption, writerOption.getModifiers() & ~Modifier.FINAL);
writerOption.set(null, option);
return delegate.getFilePointer();
}
catch (Throwable e)

@Override
public void seek(long pos)
{
delegate.seek(pos);
}

@Override
public long length()
{
return delegate.length();
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws CorruptIndexException
{
return delegate.slice(sliceDescription, offset, length);
}

@Override
public byte readByte() throws IOException
{
return delegate.readByte();
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException
{
delegate.readBytes(b, offset, len);
}

@Override
public String toString()
{
fail();
return delegate.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ private void doTestAdvance(boolean crypto) throws IOException
assertEquals(PostingList.END_OF_STREAM, intersection.advance(numRows + 1));

intersection.close();
reader.close();
}

@Test
Expand All @@ -237,9 +238,12 @@ public void testResourcesReleaseWhenQueryDoesntMatchAnything() throws Exception
}

final BKDReader reader = finishAndOpenReaderOneDim(50, buffer);

final PostingList intersection = reader.intersect(buildQuery(1017, 1096), (QueryEventListener.BKDIndexEventListener)NO_OP_BKD_LISTENER, new QueryContext());

assertEquals(PostingList.EMPTY, intersection);

intersection.close();
reader.close();
}

private BKDReader.IntersectVisitor buildQuery(int queryMin, int queryMax)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.cassandra.index.sai.disk.io.TrackingIndexFileUtils;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.SequentialWriterOption;
import org.apache.cassandra.schema.TableMetadata;

import static org.junit.Assert.assertTrue;

Expand All @@ -41,6 +40,7 @@ public IndexDescriptor newIndexDescriptor(Descriptor descriptor, SequentialWrite
{
TrackingIndexFileUtils trackingIndexFileUtils = new TrackingIndexFileUtils(sequentialWriterOption);
trackedIndexFileUtils.add(trackingIndexFileUtils);
IndexFileUtils.instance = trackingIndexFileUtils;
return IndexDescriptor.empty(descriptor);
}

Expand All @@ -58,6 +58,6 @@ protected void afterIfSuccessful()
protected void afterAlways(List<Throwable> errors)
{
trackedIndexFileUtils.clear();
TrackingIndexFileUtils.reset();
IndexFileUtils.instance = null;
}
}