From 63131d12461ddf1ed10ec6ca30be87750eb3d9e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Mon, 30 Sep 2024 16:37:30 +0100 Subject: [PATCH 1/3] CNDB-11092: Fix IndexInputLeakDetector --- .../sai/disk/format/IndexDescriptor.java | 4 +- .../index/sai/disk/io/FilterIndexInput.java | 84 ----------------- .../index/sai/disk/io/IndexInputReader.java | 2 +- .../index/sai/disk/v1/TermsReader.java | 24 ++--- .../disk/v1/bitpack/BlockPackedReader.java | 2 +- .../bitpack/MonotonicBlockPackedReader.java | 2 +- .../index/sai/disk/v1/kdtree/BKDReader.java | 10 +- .../index/sai/utils/IndexFileUtils.java | 28 ++++-- .../index/sai/v1/AbstractOnDiskBenchmark.java | 4 +- .../sai/disk/io/TrackingIndexFileUtils.java | 94 +++++++++++++------ .../sai/disk/v1/kdtree/BKDReaderTest.java | 6 +- .../sai/utils/IndexInputLeakDetector.java | 4 +- 12 files changed, 118 insertions(+), 146 deletions(-) delete mode 100644 src/java/org/apache/cassandra/index/sai/disk/io/FilterIndexInput.java diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java index 93f68cccdf39..7ae8ad51f825 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java @@ -624,7 +624,7 @@ public FileHandle createIndexBuildTimeFileHandle() @Override public IndexInput openInput() { - return IndexFileUtils.instance.openBlockingInput(createFileHandle()); + return IndexFileUtils.instance().openBlockingInput(createFileHandle()); } @Override @@ -662,7 +662,7 @@ public IndexOutputWriter openOutput(boolean append) throws IOException component, file); - return IndexFileUtils.instance.openOutput(file, byteOrder(), append); + return IndexFileUtils.instance().openOutput(file, byteOrder(), append); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/io/FilterIndexInput.java b/src/java/org/apache/cassandra/index/sai/disk/io/FilterIndexInput.java deleted file mode 100644 index 5d0e86967d00..000000000000 --- a/src/java/org/apache/cassandra/index/sai/disk/io/FilterIndexInput.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.index.sai.disk.io; - -import java.io.IOException; - -public abstract class FilterIndexInput extends IndexInput -{ - private final IndexInput delegate; - - protected FilterIndexInput(IndexInput delegate) - { - super(delegate.toString(), delegate.order()); - this.delegate = delegate; - } - - public IndexInput getDelegate() - { - return delegate; - } - - @Override - public void close() throws IOException - { - delegate.close(); - } - - @Override - public long getFilePointer() - { - return delegate.getFilePointer(); - } - - @Override - public void seek(long pos) throws IOException - { - delegate.seek(pos); - } - - @Override - public long length() - { - return delegate.length(); - } - - @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException - { - return delegate.slice(sliceDescription, offset, length); - } - - @Override - public byte readByte() throws IOException - { - return delegate.readByte(); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException - { - delegate.readBytes(b, offset, len); - } - - @Override - public String toString() - { - return delegate.toString(); - } -} diff --git a/src/java/org/apache/cassandra/index/sai/disk/io/IndexInputReader.java b/src/java/org/apache/cassandra/index/sai/disk/io/IndexInputReader.java index aaef8031e6a4..216eb8c57595 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/io/IndexInputReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/io/IndexInputReader.java @@ -31,7 +31,7 @@ public class IndexInputReader extends IndexInput private final RandomAccessReader input; private final Runnable doOnClose; - private IndexInputReader(RandomAccessReader input, Runnable doOnClose) + protected IndexInputReader(RandomAccessReader input, Runnable doOnClose) { super(input.getFile().toString(), input.order()); this.input = input; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java index a75871ec28cd..fb15305d0494 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java @@ -92,7 +92,7 @@ public TermsReader(IndexContext indexContext, termDictionaryRoot = root; this.termDictionaryFileEncodingVersion = termsDataEncodingVersion; - try (final IndexInput indexInput = IndexFileUtils.instance.openInput(termDictionaryFile)) + try (final IndexInput indexInput = IndexFileUtils.instance().openInput(termDictionaryFile)) { // if the pointer is -1 then this is a previous version of the index // use the old way to validate the footer @@ -107,7 +107,7 @@ public TermsReader(IndexContext indexContext, } } - try (final IndexInput indexInput = IndexFileUtils.instance.openInput(postingsFile)) + try (final IndexInput indexInput = IndexFileUtils.instance().openInput(postingsFile)) { validate(indexInput); } @@ -168,8 +168,8 @@ public class TermQuery TermQuery(ByteComparable term, QueryEventListener.TrieIndexEventListener listener, QueryContext context) { this.listener = listener; - postingsInput = IndexFileUtils.instance.openInput(postingsFile); - postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + postingsInput = IndexFileUtils.instance().openInput(postingsFile); + postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); this.term = term; lookupStartTime = System.nanoTime(); this.context = context; @@ -299,8 +299,8 @@ private PostingList readAndMergePostings(TrieTermsDictionaryReader reader) throw ArrayList postingLists = new ArrayList<>(); // index inputs will be closed with the onClose method of the returned merged posting list - IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); - IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + IndexInput postingsInput = IndexFileUtils.instance().openInput(postingsFile); + IndexInput postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); do { @@ -329,8 +329,8 @@ private PostingList readFilterAndMergePosting(TrieTermsDictionaryReader reader) ArrayList postingLists = new ArrayList<>(); // index inputs will be closed with the onClose method of the returned merged posting list - IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); - IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + IndexInput postingsInput = IndexFileUtils.instance().openInput(postingsFile); + IndexInput postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); do { @@ -379,8 +379,8 @@ private class TermsScanner implements TermsIterator private TermsScanner(Version version, AbstractType type) { this.termsDictionaryReader = new TrieTermsDictionaryReader(termDictionaryFile.instantiateRebufferer(), termDictionaryRoot, termDictionaryFileEncodingVersion); - this.postingsInput = IndexFileUtils.instance.openInput(postingsFile); - this.postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + this.postingsInput = IndexFileUtils.instance().openInput(postingsFile); + this.postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); // We decode based on the logic used to encode the min and max terms in the trie. if (version.onOrAfter(Version.DB) && TypeUtil.isComposite(type)) { @@ -451,8 +451,8 @@ private class ReverseTermsScanner implements TermsIterator private ReverseTermsScanner() { this.iterator = new ReverseTrieTermsDictionaryReader(termDictionaryFile.instantiateRebufferer(), termDictionaryRoot); - this.postingsInput = IndexFileUtils.instance.openInput(postingsFile); - this.postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + this.postingsInput = IndexFileUtils.instance().openInput(postingsFile); + this.postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java index 738ae5e709c1..64cc249bdc09 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java @@ -101,7 +101,7 @@ public BlockPackedReader(FileHandle file, NumericValuesMeta meta) throws IOExcep @Override public LongArray open() { - var indexInput = IndexFileUtils.instance.openInput(file); + var indexInput = IndexFileUtils.instance().openInput(file); return new AbstractBlockPackedReader(indexInput, blockBitsPerValue, blockShift, blockMask, 0, valueCount) { @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java index 7c4129376cd9..3fbbb882a6b4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java @@ -86,7 +86,7 @@ public MonotonicBlockPackedReader(FileHandle file, NumericValuesMeta meta) throw @SuppressWarnings("resource") public LongArray open() { - var indexInput = IndexFileUtils.instance.openInput(file); + var indexInput = IndexFileUtils.instance().openInput(file); return new AbstractBlockPackedReader(indexInput, blockBitsPerValue, blockShift, blockMask, 0, valueCount) { @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java index 69a084e36a60..f399d2032078 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java @@ -147,8 +147,8 @@ public IteratorState(DocMapper docMapper, boolean isAscending, BKDReader.Interse scratch = new byte[packedBytesLength]; final long firstLeafFilePointer = getMinLeafBlockFP(); - bkdInput = IndexFileUtils.instance.openInput(kdtreeFile); - bkdPostingsInput = IndexFileUtils.instance.openInput(postingsFile); + bkdInput = IndexFileUtils.instance().openInput(kdtreeFile); + bkdPostingsInput = IndexFileUtils.instance().openInput(postingsFile); bkdInput.seek(firstLeafFilePointer); NavigableMap leafNodeToLeafFP = getLeafOffsets(); @@ -420,9 +420,9 @@ public PostingList intersect(IntersectVisitor visitor, QueryEventListener.BKDInd } listener.onSegmentHit(); - IndexInput bkdInput = IndexFileUtils.instance.openInput(indexFile); - IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); - IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + IndexInput bkdInput = IndexFileUtils.instance().openInput(indexFile); + IndexInput postingsInput = IndexFileUtils.instance().openInput(postingsFile); + IndexInput postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); PackedIndexTree index = new PackedIndexTree(); Intersection completable = diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java b/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java index 0aab2a99f44a..81ac8ccd131c 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java +++ b/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java @@ -55,13 +55,29 @@ public class IndexFileUtils .finishOnClose(true) .build(); - public static final IndexFileUtils instance = new IndexFileUtils(); + private static final IndexFileUtils instance = new IndexFileUtils(defaultWriterOption); + private static IndexFileUtils overrideInstance = null; - private static final SequentialWriterOption writerOption = defaultWriterOption; + private final SequentialWriterOption writerOption; + + public static synchronized void setOverrideInstance(IndexFileUtils overrideInstance) + { + IndexFileUtils.overrideInstance = overrideInstance; + } + + public static IndexFileUtils instance() + { + if (overrideInstance == null) + return instance; + else + return overrideInstance; + } @VisibleForTesting - protected IndexFileUtils() - {} + protected IndexFileUtils(SequentialWriterOption writerOption) + { + this.writerOption = writerOption; + } public IndexOutputWriter openOutput(File file, ByteOrder order, boolean append) throws IOException { @@ -91,12 +107,12 @@ public BufferedRandomAccessWriter openRandomAccessOutput(File file, boolean appe return out; } - public IndexInput openInput(FileHandle handle) + public IndexInputReader openInput(FileHandle handle) { return IndexInputReader.create(handle); } - public IndexInput openBlockingInput(FileHandle fileHandle) + public IndexInputReader openBlockingInput(FileHandle fileHandle) { final RandomAccessReader randomReader = fileHandle.createReader(); return IndexInputReader.create(randomReader, fileHandle::close); diff --git a/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java index af64a1a88918..7b1cdbd90e1d 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java +++ b/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java @@ -174,8 +174,8 @@ private long writePostings(int rows) throws IOException protected final PostingsReader openPostingsReader() throws IOException { - IndexInput input = IndexFileUtils.instance.openInput(postings); - IndexInput summaryInput = IndexFileUtils.instance.openInput(postings); + IndexInput input = IndexFileUtils.instance().openInput(postings); + IndexInput summaryInput = IndexFileUtils.instance().openInput(postings); PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(summaryInput, summaryPosition); return new PostingsReader(input, summary, QueryEventListener.PostingListEventListener.NO_OP); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java b/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java index b06451c87612..5270d62bf492 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java @@ -19,78 +19,114 @@ package org.apache.cassandra.index.sai.disk.io; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import com.google.common.base.Throwables; +import org.junit.Assert; + import org.apache.cassandra.index.sai.utils.IndexFileUtils; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.SequentialWriterOption; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; +import org.apache.lucene.index.CorruptIndexException; public class TrackingIndexFileUtils extends IndexFileUtils { private final Map openInputs = Collections.synchronizedMap(new HashMap<>()); + private final Set closedInputs = new HashSet<>(); public TrackingIndexFileUtils(SequentialWriterOption writerOption) { - setWriterOption(writerOption); + super(writerOption); } @Override - public IndexInput openInput(FileHandle handle) + public IndexInputReader openInput(FileHandle handle) { TrackingIndexInput input = new TrackingIndexInput(super.openInput(handle)); openInputs.put(input, Throwables.getStackTraceAsString(new RuntimeException("Input created"))); return input; } - public Map getOpenInputs() + @Override + public IndexInputReader openBlockingInput(FileHandle fileHandle) { - return new HashMap<>(openInputs); + TrackingIndexInput input = new TrackingIndexInput(super.openBlockingInput(fileHandle)); + openInputs.put(input, Throwables.getStackTraceAsString(new RuntimeException("Blocking input created"))); + return input; } - public static void reset() + public Map getOpenInputs() { - setWriterOption(IndexFileUtils.defaultWriterOption); + return new HashMap<>(openInputs); } - public class TrackingIndexInput extends FilterIndexInput + private class TrackingIndexInput extends IndexInputReader { - TrackingIndexInput(IndexInput delegate) + private final IndexInputReader delegate; + + protected TrackingIndexInput(IndexInputReader delegate) { - super(delegate); + super(delegate.reader(), () -> {}); + this.delegate = delegate; } @Override - public void close() throws IOException + public synchronized void close() { - super.close(); + delegate.close(); final String creationStackTrace = openInputs.remove(this); - assertNotNull("Closed unregistered input: " + this, creationStackTrace); + + if (closedInputs.add(this) && creationStackTrace == null) + { + Assert.fail("Closed unregistered input: " + this); + } } - } - public static void setWriterOption(SequentialWriterOption option) - { - try + @Override + public long getFilePointer() { - Field writerOption = IndexFileUtils.class.getDeclaredField("writerOption"); - writerOption.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(writerOption, writerOption.getModifiers() & ~Modifier.FINAL); - writerOption.set(null, option); + return delegate.getFilePointer(); } - catch (Throwable e) + + @Override + public void seek(long pos) + { + delegate.seek(pos); + } + + @Override + public long length() + { + return delegate.length(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws CorruptIndexException + { + return delegate.slice(sliceDescription, offset, length); + } + + @Override + public byte readByte() throws IOException + { + return delegate.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException + { + delegate.readBytes(b, offset, len); + } + + @Override + public String toString() { - fail(); + return delegate.toString(); } } } diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReaderTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReaderTest.java index 4ef769d56862..df04899c82d7 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReaderTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReaderTest.java @@ -217,6 +217,7 @@ private void doTestAdvance(boolean crypto) throws IOException assertEquals(PostingList.END_OF_STREAM, intersection.advance(numRows + 1)); intersection.close(); + reader.close(); } @Test @@ -237,9 +238,12 @@ public void testResourcesReleaseWhenQueryDoesntMatchAnything() throws Exception } final BKDReader reader = finishAndOpenReaderOneDim(50, buffer); - final PostingList intersection = reader.intersect(buildQuery(1017, 1096), (QueryEventListener.BKDIndexEventListener)NO_OP_BKD_LISTENER, new QueryContext()); + assertEquals(PostingList.EMPTY, intersection); + + intersection.close(); + reader.close(); } private BKDReader.IntersectVisitor buildQuery(int queryMin, int queryMax) diff --git a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java index f3e0aef6ffcf..66448da479c8 100644 --- a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java +++ b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java @@ -29,7 +29,6 @@ import org.apache.cassandra.index.sai.disk.io.TrackingIndexFileUtils; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.SequentialWriterOption; -import org.apache.cassandra.schema.TableMetadata; import static org.junit.Assert.assertTrue; @@ -41,6 +40,7 @@ public IndexDescriptor newIndexDescriptor(Descriptor descriptor, SequentialWrite { TrackingIndexFileUtils trackingIndexFileUtils = new TrackingIndexFileUtils(sequentialWriterOption); trackedIndexFileUtils.add(trackingIndexFileUtils); + IndexFileUtils.setOverrideInstance(trackingIndexFileUtils); return IndexDescriptor.empty(descriptor); } @@ -58,6 +58,6 @@ protected void afterIfSuccessful() protected void afterAlways(List errors) { trackedIndexFileUtils.clear(); - TrackingIndexFileUtils.reset(); + IndexFileUtils.setOverrideInstance(null); } } From b28eb4278a2e8c72b98526de297f0591b4b5299b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Wed, 2 Oct 2024 15:54:25 +0100 Subject: [PATCH 2/3] Address review comments --- .../sai/disk/format/IndexDescriptor.java | 4 ++-- .../index/sai/disk/v1/TermsReader.java | 24 +++++++++---------- .../disk/v1/bitpack/BlockPackedReader.java | 2 +- .../bitpack/MonotonicBlockPackedReader.java | 2 +- .../index/sai/disk/v1/kdtree/BKDReader.java | 10 ++++---- .../index/sai/utils/IndexFileUtils.java | 18 +------------- .../index/sai/v1/AbstractOnDiskBenchmark.java | 4 ++-- .../sai/disk/io/TrackingIndexFileUtils.java | 2 +- .../sai/utils/IndexInputLeakDetector.java | 4 ++-- 9 files changed, 27 insertions(+), 43 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java index 7ae8ad51f825..93f68cccdf39 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java @@ -624,7 +624,7 @@ public FileHandle createIndexBuildTimeFileHandle() @Override public IndexInput openInput() { - return IndexFileUtils.instance().openBlockingInput(createFileHandle()); + return IndexFileUtils.instance.openBlockingInput(createFileHandle()); } @Override @@ -662,7 +662,7 @@ public IndexOutputWriter openOutput(boolean append) throws IOException component, file); - return IndexFileUtils.instance().openOutput(file, byteOrder(), append); + return IndexFileUtils.instance.openOutput(file, byteOrder(), append); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java index fb15305d0494..a75871ec28cd 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/TermsReader.java @@ -92,7 +92,7 @@ public TermsReader(IndexContext indexContext, termDictionaryRoot = root; this.termDictionaryFileEncodingVersion = termsDataEncodingVersion; - try (final IndexInput indexInput = IndexFileUtils.instance().openInput(termDictionaryFile)) + try (final IndexInput indexInput = IndexFileUtils.instance.openInput(termDictionaryFile)) { // if the pointer is -1 then this is a previous version of the index // use the old way to validate the footer @@ -107,7 +107,7 @@ public TermsReader(IndexContext indexContext, } } - try (final IndexInput indexInput = IndexFileUtils.instance().openInput(postingsFile)) + try (final IndexInput indexInput = IndexFileUtils.instance.openInput(postingsFile)) { validate(indexInput); } @@ -168,8 +168,8 @@ public class TermQuery TermQuery(ByteComparable term, QueryEventListener.TrieIndexEventListener listener, QueryContext context) { this.listener = listener; - postingsInput = IndexFileUtils.instance().openInput(postingsFile); - postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); + postingsInput = IndexFileUtils.instance.openInput(postingsFile); + postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); this.term = term; lookupStartTime = System.nanoTime(); this.context = context; @@ -299,8 +299,8 @@ private PostingList readAndMergePostings(TrieTermsDictionaryReader reader) throw ArrayList postingLists = new ArrayList<>(); // index inputs will be closed with the onClose method of the returned merged posting list - IndexInput postingsInput = IndexFileUtils.instance().openInput(postingsFile); - IndexInput postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); + IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); + IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); do { @@ -329,8 +329,8 @@ private PostingList readFilterAndMergePosting(TrieTermsDictionaryReader reader) ArrayList postingLists = new ArrayList<>(); // index inputs will be closed with the onClose method of the returned merged posting list - IndexInput postingsInput = IndexFileUtils.instance().openInput(postingsFile); - IndexInput postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); + IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); + IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); do { @@ -379,8 +379,8 @@ private class TermsScanner implements TermsIterator private TermsScanner(Version version, AbstractType type) { this.termsDictionaryReader = new TrieTermsDictionaryReader(termDictionaryFile.instantiateRebufferer(), termDictionaryRoot, termDictionaryFileEncodingVersion); - this.postingsInput = IndexFileUtils.instance().openInput(postingsFile); - this.postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); + this.postingsInput = IndexFileUtils.instance.openInput(postingsFile); + this.postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); // We decode based on the logic used to encode the min and max terms in the trie. if (version.onOrAfter(Version.DB) && TypeUtil.isComposite(type)) { @@ -451,8 +451,8 @@ private class ReverseTermsScanner implements TermsIterator private ReverseTermsScanner() { this.iterator = new ReverseTrieTermsDictionaryReader(termDictionaryFile.instantiateRebufferer(), termDictionaryRoot); - this.postingsInput = IndexFileUtils.instance().openInput(postingsFile); - this.postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); + this.postingsInput = IndexFileUtils.instance.openInput(postingsFile); + this.postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java index 64cc249bdc09..738ae5e709c1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/BlockPackedReader.java @@ -101,7 +101,7 @@ public BlockPackedReader(FileHandle file, NumericValuesMeta meta) throws IOExcep @Override public LongArray open() { - var indexInput = IndexFileUtils.instance().openInput(file); + var indexInput = IndexFileUtils.instance.openInput(file); return new AbstractBlockPackedReader(indexInput, blockBitsPerValue, blockShift, blockMask, 0, valueCount) { @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java index 3fbbb882a6b4..7c4129376cd9 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/MonotonicBlockPackedReader.java @@ -86,7 +86,7 @@ public MonotonicBlockPackedReader(FileHandle file, NumericValuesMeta meta) throw @SuppressWarnings("resource") public LongArray open() { - var indexInput = IndexFileUtils.instance().openInput(file); + var indexInput = IndexFileUtils.instance.openInput(file); return new AbstractBlockPackedReader(indexInput, blockBitsPerValue, blockShift, blockMask, 0, valueCount) { @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java index f399d2032078..69a084e36a60 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDReader.java @@ -147,8 +147,8 @@ public IteratorState(DocMapper docMapper, boolean isAscending, BKDReader.Interse scratch = new byte[packedBytesLength]; final long firstLeafFilePointer = getMinLeafBlockFP(); - bkdInput = IndexFileUtils.instance().openInput(kdtreeFile); - bkdPostingsInput = IndexFileUtils.instance().openInput(postingsFile); + bkdInput = IndexFileUtils.instance.openInput(kdtreeFile); + bkdPostingsInput = IndexFileUtils.instance.openInput(postingsFile); bkdInput.seek(firstLeafFilePointer); NavigableMap leafNodeToLeafFP = getLeafOffsets(); @@ -420,9 +420,9 @@ public PostingList intersect(IntersectVisitor visitor, QueryEventListener.BKDInd } listener.onSegmentHit(); - IndexInput bkdInput = IndexFileUtils.instance().openInput(indexFile); - IndexInput postingsInput = IndexFileUtils.instance().openInput(postingsFile); - IndexInput postingsSummaryInput = IndexFileUtils.instance().openInput(postingsFile); + IndexInput bkdInput = IndexFileUtils.instance.openInput(indexFile); + IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); + IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); PackedIndexTree index = new PackedIndexTree(); Intersection completable = diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java b/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java index 81ac8ccd131c..4f5d07881b5f 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java +++ b/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; @@ -32,7 +31,6 @@ import io.github.jbellis.jvector.disk.BufferedRandomAccessWriter; import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.index.sai.disk.io.IndexInput; import org.apache.cassandra.index.sai.disk.io.IndexInputReader; import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter; import org.apache.cassandra.io.compress.BufferType; @@ -55,24 +53,10 @@ public class IndexFileUtils .finishOnClose(true) .build(); - private static final IndexFileUtils instance = new IndexFileUtils(defaultWriterOption); - private static IndexFileUtils overrideInstance = null; + public static volatile IndexFileUtils instance = new IndexFileUtils(defaultWriterOption); private final SequentialWriterOption writerOption; - public static synchronized void setOverrideInstance(IndexFileUtils overrideInstance) - { - IndexFileUtils.overrideInstance = overrideInstance; - } - - public static IndexFileUtils instance() - { - if (overrideInstance == null) - return instance; - else - return overrideInstance; - } - @VisibleForTesting protected IndexFileUtils(SequentialWriterOption writerOption) { diff --git a/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java index 7b1cdbd90e1d..af64a1a88918 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java +++ b/test/microbench/org/apache/cassandra/test/microbench/index/sai/v1/AbstractOnDiskBenchmark.java @@ -174,8 +174,8 @@ private long writePostings(int rows) throws IOException protected final PostingsReader openPostingsReader() throws IOException { - IndexInput input = IndexFileUtils.instance().openInput(postings); - IndexInput summaryInput = IndexFileUtils.instance().openInput(postings); + IndexInput input = IndexFileUtils.instance.openInput(postings); + IndexInput summaryInput = IndexFileUtils.instance.openInput(postings); PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(summaryInput, summaryPosition); return new PostingsReader(input, summary, QueryEventListener.PostingListEventListener.NO_OP); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java b/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java index 5270d62bf492..0206420c72c4 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/io/TrackingIndexFileUtils.java @@ -37,7 +37,7 @@ public class TrackingIndexFileUtils extends IndexFileUtils { private final Map openInputs = Collections.synchronizedMap(new HashMap<>()); - private final Set closedInputs = new HashSet<>(); + private final Set closedInputs = Collections.synchronizedSet(new HashSet<>()); public TrackingIndexFileUtils(SequentialWriterOption writerOption) { diff --git a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java index 66448da479c8..46643255b43b 100644 --- a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java +++ b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java @@ -40,7 +40,7 @@ public IndexDescriptor newIndexDescriptor(Descriptor descriptor, SequentialWrite { TrackingIndexFileUtils trackingIndexFileUtils = new TrackingIndexFileUtils(sequentialWriterOption); trackedIndexFileUtils.add(trackingIndexFileUtils); - IndexFileUtils.setOverrideInstance(trackingIndexFileUtils); + IndexFileUtils.instance = trackingIndexFileUtils; return IndexDescriptor.empty(descriptor); } @@ -58,6 +58,6 @@ protected void afterIfSuccessful() protected void afterAlways(List errors) { trackedIndexFileUtils.clear(); - IndexFileUtils.setOverrideInstance(null); + IndexFileUtils.instance = null; } } From 4db0525360a42d8cceb6ec56187f399998636718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Wed, 2 Oct 2024 18:11:10 +0100 Subject: [PATCH 3/3] Fix SortedTermsReader.Cursor not closing blockOffsets --- .../index/sai/disk/v2/sortedterms/SortedTermsReader.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsReader.java b/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsReader.java index 840b381ade41..c42e8773e517 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsReader.java @@ -322,8 +322,9 @@ public void reset() throws IOException @Override public void close() throws IOException { - this.termsData.close(); - this.reader.close(); + blockOffsets.close(); + termsData.close(); + reader.close(); } } }