From ed2603de0d373b3ac77d708cf161d068fa89ec42 Mon Sep 17 00:00:00 2001 From: Zhao Yang Date: Wed, 28 Aug 2024 17:27:43 +0800 Subject: [PATCH] CNDB-10588: change ReadObserver to track source iterators from sstables and memtable instead of merged iterator --- .../db/PartitionRangeReadCommand.java | 6 +- .../org/apache/cassandra/db/ReadCommand.java | 51 +------- .../cassandra/db/ReadExecutionController.java | 11 ++ .../org/apache/cassandra/db/ReadObserver.java | 41 +++--- .../cassandra/db/ReadObserverFactory.java | 5 +- .../db/SinglePartitionReadCommand.java | 20 +-- .../apache/cassandra/db/ReadObserverTest.java | 118 ++++++++++++++++-- 7 files changed, 163 insertions(+), 89 deletions(-) diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index a760aeee3351..b310ee16cceb 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -298,7 +298,9 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange()); if (useMinLocalDeletionTime) controller.updateMinOldestUnrepairedTombstone(iter.getMinLocalDeletionTime()); - inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false)); + + UnfilteredPartitionIterator partitions = controller.observer().observeUnmergedPartitions(iter); + inputCollector.addMemtableIterator(RTBoundValidator.validate(partitions, RTBoundValidator.Stage.MEMTABLE, false)); } SSTableReadsListener readCountUpdater = newReadCountUpdater(); @@ -306,6 +308,8 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater); + iter = controller.observer().observeUnmergedPartitions(iter); + inputCollector.addSSTableIterator(sstable, RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); if (!sstable.isRepaired()) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 2d74d51eb584..4c0c6eb7f215 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -417,7 +417,7 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut try { iterator = withStateTracking(iterator); - iterator = withReadObserver(iterator); + iterator = executionController.observer().observeMergedPartitions(iterator); iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); @@ -462,55 +462,6 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut } } - public UnfilteredPartitionIterator withReadObserver(UnfilteredPartitionIterator partitions) - { - ReadObserver observer = ReadObserverFactory.instance.create(this.metadata()); - - // skip if observer is disabled - if (observer == ReadObserver.NO_OP) - return partitions; - - class ReadObserverTransformation extends Transformation - { - @Override - protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) - { - observer.onPartition(partition.partitionKey(), partition.partitionLevelDeletion()); - return Transformation.apply(partition, this); - } - - @Override - protected Row applyToStatic(Row row) - { - if (!row.isEmpty()) - observer.onStaticRow(row); - return row; - } - - @Override - protected Row applyToRow(Row row) - { - observer.onUnfiltered(row); - return row; - } - - @Override - protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) - { - observer.onUnfiltered(marker); - return marker; - } - - @Override - protected void onClose() - { - observer.onComplete(); - } - } - - return Transformation.apply(partitions, new ReadObserverTransformation()); - } - public UnfilteredPartitionIterator searchStorage(Index.Searcher searcher, ReadExecutionController executionController) { return searcher.search(executionController); diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java index 753e9f9138ad..017090c331e5 100644 --- a/src/java/org/apache/cassandra/db/ReadExecutionController.java +++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java @@ -54,6 +54,8 @@ public class ReadExecutionController implements AutoCloseable public final Histogram sstablesScannedPerRowRead; + private final ReadObserver readObserver; + ReadExecutionController(ReadCommand command, OpOrder.Group baseOp, TableMetadata baseMetadata, @@ -72,6 +74,8 @@ public class ReadExecutionController implements AutoCloseable this.command = command; this.createdAtNanos = createdAtNanos; + this.readObserver = ReadObserverFactory.instance.create(baseMetadata); + this.sstablesScannedPerRowRead = new Histogram(new DecayingEstimatedHistogramReservoir(true)); if (trackRepairedStatus) @@ -101,6 +105,11 @@ public ReadExecutionController indexReadController() return indexController; } + public ReadObserver observer() + { + return readObserver; + } + public WriteContext getWriteContext() { return writeContext; @@ -217,6 +226,8 @@ public void close() } } + readObserver.onComplete(); + if (createdAtNanos != NO_SAMPLING) addSample(); diff --git a/src/java/org/apache/cassandra/db/ReadObserver.java b/src/java/org/apache/cassandra/db/ReadObserver.java index 65767af435fe..4ad12991fd7d 100644 --- a/src/java/org/apache/cassandra/db/ReadObserver.java +++ b/src/java/org/apache/cassandra/db/ReadObserver.java @@ -17,8 +17,8 @@ */ package org.apache.cassandra.db; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; /** * An interface that allows to capture what local data has been read @@ -30,29 +30,40 @@ public interface ReadObserver ReadObserver NO_OP = new ReadObserver() {}; /** - * Called on every partition read - * - * @param partitionKey the partition key - * @param deletionTime partition deletion time + * Observes the provided unmerged partitions. + *

+ * This method allows tracking of the partitions from individual sstable or memtable */ - default void onPartition(DecoratedKey partitionKey, DeletionTime deletionTime) {} + default UnfilteredPartitionIterator observeUnmergedPartitions(UnfilteredPartitionIterator partitions) + { + return partitions; + } /** - * Called on every static row read. - * - * @param staticRow static row of the partition + * Observes the provided unmerged partition. + *

+ * This method allows tracking of the partition from individual sstable or memtable */ - default void onStaticRow(Row staticRow) {} + default UnfilteredRowIterator observeUnmergedPartition(UnfilteredRowIterator partition) + { + return partition; + } /** - * Called on every unfiltered read. - * - * @param unfiltered either row or range tombstone. + * Observes the provided merged partitions. + *

+ * This method allows tracking of partitions after they have been merged from multiple sstables and memtable. */ - default void onUnfiltered(Unfiltered unfiltered) {} + default UnfilteredPartitionIterator observeMergedPartitions(UnfilteredPartitionIterator partitions) + { + return partitions; + } /** * Called on read request completion + *

+ * This method is used to signify that a read request has completed, allowing any necessary + * final tracking or cleanup. */ default void onComplete() {} } diff --git a/src/java/org/apache/cassandra/db/ReadObserverFactory.java b/src/java/org/apache/cassandra/db/ReadObserverFactory.java index a0539eb70eeb..bf4432ce245e 100644 --- a/src/java/org/apache/cassandra/db/ReadObserverFactory.java +++ b/src/java/org/apache/cassandra/db/ReadObserverFactory.java @@ -17,13 +17,14 @@ */ package org.apache.cassandra.db; +import javax.annotation.Nullable; + import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY; - /** * Provides custom factory that creates a {@link ReadObserver} instance per read request */ @@ -33,7 +34,7 @@ public interface ReadObserverFactory new ReadObserverFactory() {} : FBUtilities.construct(CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY.getString(), "custom read observer factory"); - default ReadObserver create(TableMetadata table) + default ReadObserver create(@Nullable TableMetadata table) { return ReadObserver.NO_OP; } diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index b576907c42e8..e69a6d50be38 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -39,7 +39,6 @@ import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.index.Index; -import org.apache.cassandra.index.sai.utils.RowWithSourceTable; import org.apache.cassandra.io.sstable.format.RowIndexEntry; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReadsListener; @@ -649,6 +648,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); + iter = controller.observer().observeUnmergedPartition(iter); var wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(memtable)) : iter; @@ -707,6 +707,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs UnfilteredRowIterator iter = intersects ? makeIterator(cfs, sstable, metricsCollector) : makeIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector); + + iter = controller.observer().observeUnmergedPartition(iter); if (!intersects) { nonIntersectingSSTables++; @@ -887,7 +889,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam if (partition == null) continue; - try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition)) + try (UnfilteredRowIterator iter = controller.observer().observeUnmergedPartition(filter.getUnfilteredRowIterator(columnFilter(), partition))) { if (iter.isEmpty()) continue; @@ -932,13 +934,13 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam continue; } - try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, - sstable, - partitionKey(), - intersects ? filter.getSlices(metadata()) : Slices.NONE, - columnFilter(), - filter.isReversed(), - metricsCollector)) + try (UnfilteredRowIterator iter = controller.observer().observeUnmergedPartition(StorageHook.instance.makeRowIterator(cfs, + sstable, + partitionKey(), + intersects ? filter.getSlices(metadata()) : Slices.NONE, + columnFilter(), + filter.isReversed(), + metricsCollector))) { if (!hasRequiredStatics && !intersects && !iter.partitionLevelDeletion().isLive()) // => partitionLevelDelections == true { diff --git a/test/unit/org/apache/cassandra/db/ReadObserverTest.java b/test/unit/org/apache/cassandra/db/ReadObserverTest.java index 15531fe7ca7d..4b24fc83616b 100644 --- a/test/unit/org/apache/cassandra/db/ReadObserverTest.java +++ b/test/unit/org/apache/cassandra/db/ReadObserverTest.java @@ -18,10 +18,15 @@ package org.apache.cassandra.db; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.base.Preconditions; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -29,12 +34,17 @@ import org.apache.cassandra.Util; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Indexes; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; @@ -42,9 +52,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; public class ReadObserverTest @@ -56,7 +66,10 @@ public static class TestReadObserverFactory implements ReadObserverFactory @Override public ReadObserver create(TableMetadata table) { - ReadObserver observer = mock(ReadObserver.class); + if (table == null || !table.keyspace.equals(KEYSPACE)) + return ReadObserver.NO_OP; + + ReadObserver observer = spy(ReadObserver.class); issuedObservers.add(Pair.create(table, observer)); return observer; } @@ -71,6 +84,12 @@ public static void defineSchema() throws ConfigurationException CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY.setString(TestReadObserverFactory.class.getName()); DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + + Indexes.Builder indexes = Indexes.builder(); + + IndexTarget target = new IndexTarget(new ColumnIdentifier("a", true), IndexTarget.Type.SIMPLE); + indexes.add(IndexMetadata.fromIndexTargets(Collections.singletonList(target), "sai", IndexMetadata.Kind.CUSTOM, Map.of("class_name", "StorageAttachedIndex"))); TableMetadata.Builder metadata = TableMetadata.builder(KEYSPACE, CF) @@ -79,7 +98,8 @@ public static void defineSchema() throws ConfigurationException .addClusteringColumn("col", AsciiType.instance) .addRegularColumn("a", AsciiType.instance) .addRegularColumn("b", AsciiType.instance) - .caching(CachingParams.CACHE_EVERYTHING); + .indexes(indexes.build()) + .caching(CachingParams.CACHE_NOTHING); SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, @@ -89,10 +109,36 @@ public static void defineSchema() throws ConfigurationException LocalSessionAccessor.startup(); } + @Before + public void beforeEach() + { + TestReadObserverFactory.issuedObservers.clear(); + } + + // TODO + // 4. multi-sstables + @Test + public void testObserverCallbacksSinglePartitionRead() + { + testObserverCallbacks(cfs -> Util.cmd(cfs, Util.dk("key")).build()); + } + @Test - public void testObserverCallbacks() + public void testObserverCallbacksRangeRead() + { + testObserverCallbacks(cfs -> Util.cmd(cfs).build()); + } + + @Test + public void testObserverCallbacksSAI() + { + testObserverCallbacks(cfs -> Util.cmd(cfs).filterOn("a", Operator.EQ, ByteBufferUtil.bytes("regular")).build()); + } + + private void testObserverCallbacks(Function commandBuilder) { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF); + cfs.truncateBlocking(); new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) .clustering("cc") @@ -100,23 +146,71 @@ public void testObserverCallbacks() .build() .apply(); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED); + + // duplicated row with newer timestamp + new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key")) + .clustering("cc") + .add("a", ByteBufferUtil.bytes("regular")) + .build() + .apply(); + + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED); + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key")) .add("s", ByteBufferUtil.bytes("static")) .build() .apply(); - ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED); + + // duplicated row with newer timestamp in memtable + new RowUpdateBuilder(cfs.metadata(), 2, ByteBufferUtil.bytes("key")) + .clustering("cc") + .add("a", ByteBufferUtil.bytes("regular")) + .build() + .apply(); + + int sstables = cfs.getLiveSSTables().size(); + + ReadCommand readCommand = commandBuilder.apply(cfs); assertFalse(Util.getAll(readCommand).isEmpty()); List> observers = TestReadObserverFactory.issuedObservers.stream() .filter(p -> p.left.name.equals(CF)) .collect(Collectors.toList()); + + // expect one observer per query assertEquals(1, observers.size()); ReadObserver observer = observers.get(0).right; - verify(observer).onPartition(eq(Util.dk("key")), eq(DeletionTime.LIVE)); - verify(observer).onUnfiltered(argThat(Unfiltered::isRow)); - verify(observer).onStaticRow(argThat(row -> row.columns().stream().allMatch(col -> col.name.toCQLString().equals("s")))); - verify(observer).onComplete(); + int unmergedPartitionRead = -1; + int unmergedPartitionsRead = -1; + int mergedPartitionsRead = -1; + if (readCommand.indexQueryPlan != null) // SAI + { + unmergedPartitionRead = sstables + 1; // one unmerged partition per sstable and memtable + unmergedPartitionsRead = 0; + mergedPartitionsRead = 1; + + } + else if (readCommand instanceof PartitionRangeReadCommand) + { + unmergedPartitionRead = 0; + unmergedPartitionsRead = sstables + 1; // one unmerged partitions per sstable and memtable + mergedPartitionsRead = 1; + } + else + { + Preconditions.checkArgument(readCommand instanceof SinglePartitionReadCommand); + unmergedPartitionRead = sstables + 1; // one unmerged partition per sstable and memtable + unmergedPartitionsRead = 0; + mergedPartitionsRead = 1; + } + + verify(observer, times(unmergedPartitionRead)).observeUnmergedPartition(any()); // intercept unmerged partition + verify(observer, times(unmergedPartitionsRead)).observeUnmergedPartitions(any()); // intercept unmerged partitions + verify(observer, times(mergedPartitionsRead)).observeMergedPartitions(any()); // intercept merged partitions + verify(observer, times(1)).onComplete(); // onComplete() once per query } } \ No newline at end of file