Skip to content

Commit

Permalink
CNDB-10588: change ReadObserver to track source iterators from sstabl…
Browse files Browse the repository at this point in the history
…es and memtable instead of merged iterator
  • Loading branch information
jasonstack committed Sep 18, 2024
1 parent 44ef8ec commit ed2603d
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,18 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea
Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
if (useMinLocalDeletionTime)
controller.updateMinOldestUnrepairedTombstone(iter.getMinLocalDeletionTime());
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));

UnfilteredPartitionIterator partitions = controller.observer().observeUnmergedPartitions(iter);
inputCollector.addMemtableIterator(RTBoundValidator.validate(partitions, RTBoundValidator.Stage.MEMTABLE, false));
}

SSTableReadsListener readCountUpdater = newReadCountUpdater();
for (SSTableReader sstable : view.sstables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
iter = controller.observer().observeUnmergedPartitions(iter);

inputCollector.addSSTableIterator(sstable, RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false));

if (!sstable.isRepaired())
Expand Down
51 changes: 1 addition & 50 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
try
{
iterator = withStateTracking(iterator);
iterator = withReadObserver(iterator);
iterator = executionController.observer().observeMergedPartitions(iterator);
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);

Expand Down Expand Up @@ -462,55 +462,6 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
}
}

public UnfilteredPartitionIterator withReadObserver(UnfilteredPartitionIterator partitions)
{
ReadObserver observer = ReadObserverFactory.instance.create(this.metadata());

// skip if observer is disabled
if (observer == ReadObserver.NO_OP)
return partitions;

class ReadObserverTransformation extends Transformation<UnfilteredRowIterator>
{
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
observer.onPartition(partition.partitionKey(), partition.partitionLevelDeletion());
return Transformation.apply(partition, this);
}

@Override
protected Row applyToStatic(Row row)
{
if (!row.isEmpty())
observer.onStaticRow(row);
return row;
}

@Override
protected Row applyToRow(Row row)
{
observer.onUnfiltered(row);
return row;
}

@Override
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
observer.onUnfiltered(marker);
return marker;
}

@Override
protected void onClose()
{
observer.onComplete();
}
}

return Transformation.apply(partitions, new ReadObserverTransformation());
}

public UnfilteredPartitionIterator searchStorage(Index.Searcher searcher, ReadExecutionController executionController)
{
return searcher.search(executionController);
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/db/ReadExecutionController.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class ReadExecutionController implements AutoCloseable

public final Histogram sstablesScannedPerRowRead;

private final ReadObserver readObserver;

ReadExecutionController(ReadCommand command,
OpOrder.Group baseOp,
TableMetadata baseMetadata,
Expand All @@ -72,6 +74,8 @@ public class ReadExecutionController implements AutoCloseable
this.command = command;
this.createdAtNanos = createdAtNanos;

this.readObserver = ReadObserverFactory.instance.create(baseMetadata);

this.sstablesScannedPerRowRead = new Histogram(new DecayingEstimatedHistogramReservoir(true));

if (trackRepairedStatus)
Expand Down Expand Up @@ -101,6 +105,11 @@ public ReadExecutionController indexReadController()
return indexController;
}

public ReadObserver observer()
{
return readObserver;
}

public WriteContext getWriteContext()
{
return writeContext;
Expand Down Expand Up @@ -217,6 +226,8 @@ public void close()
}
}

readObserver.onComplete();

if (createdAtNanos != NO_SAMPLING)
addSample();

Expand Down
41 changes: 26 additions & 15 deletions src/java/org/apache/cassandra/db/ReadObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.cassandra.db;

import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;

/**
* An interface that allows to capture what local data has been read
Expand All @@ -30,29 +30,40 @@ public interface ReadObserver
ReadObserver NO_OP = new ReadObserver() {};

/**
* Called on every partition read
*
* @param partitionKey the partition key
* @param deletionTime partition deletion time
* Observes the provided unmerged partitions.
* <p>
* This method allows tracking of the partitions from individual sstable or memtable
*/
default void onPartition(DecoratedKey partitionKey, DeletionTime deletionTime) {}
default UnfilteredPartitionIterator observeUnmergedPartitions(UnfilteredPartitionIterator partitions)
{
return partitions;
}

/**
* Called on every static row read.
*
* @param staticRow static row of the partition
* Observes the provided unmerged partition.
* <p>
* This method allows tracking of the partition from individual sstable or memtable
*/
default void onStaticRow(Row staticRow) {}
default UnfilteredRowIterator observeUnmergedPartition(UnfilteredRowIterator partition)
{
return partition;
}

/**
* Called on every unfiltered read.
*
* @param unfiltered either row or range tombstone.
* Observes the provided merged partitions.
* <p>
* This method allows tracking of partitions after they have been merged from multiple sstables and memtable.
*/
default void onUnfiltered(Unfiltered unfiltered) {}
default UnfilteredPartitionIterator observeMergedPartitions(UnfilteredPartitionIterator partitions)
{
return partitions;
}

/**
* Called on read request completion
* <p>
* This method is used to signify that a read request has completed, allowing any necessary
* final tracking or cleanup.
*/
default void onComplete() {}
}
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/db/ReadObserverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
*/
package org.apache.cassandra.db;

import javax.annotation.Nullable;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY;


/**
* Provides custom factory that creates a {@link ReadObserver} instance per read request
*/
Expand All @@ -33,7 +34,7 @@ public interface ReadObserverFactory
new ReadObserverFactory() {} :
FBUtilities.construct(CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY.getString(), "custom read observer factory");

default ReadObserver create(TableMetadata table)
default ReadObserver create(@Nullable TableMetadata table)
{
return ReadObserver.NO_OP;
}
Expand Down
20 changes: 11 additions & 9 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sai.utils.RowWithSourceTable;
import org.apache.cassandra.io.sstable.format.RowIndexEntry;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
Expand Down Expand Up @@ -649,6 +648,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs

@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
iter = controller.observer().observeUnmergedPartition(iter);

var wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(memtable)) : iter;

Expand Down Expand Up @@ -707,6 +707,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
UnfilteredRowIterator iter = intersects
? makeIterator(cfs, sstable, metricsCollector)
: makeIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector);

iter = controller.observer().observeUnmergedPartition(iter);
if (!intersects)
{
nonIntersectingSSTables++;
Expand Down Expand Up @@ -887,7 +889,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
if (partition == null)
continue;

try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
try (UnfilteredRowIterator iter = controller.observer().observeUnmergedPartition(filter.getUnfilteredRowIterator(columnFilter(), partition)))
{
if (iter.isEmpty())
continue;
Expand Down Expand Up @@ -932,13 +934,13 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
continue;
}

try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs,
sstable,
partitionKey(),
intersects ? filter.getSlices(metadata()) : Slices.NONE,
columnFilter(),
filter.isReversed(),
metricsCollector))
try (UnfilteredRowIterator iter = controller.observer().observeUnmergedPartition(StorageHook.instance.makeRowIterator(cfs,
sstable,
partitionKey(),
intersects ? filter.getSlices(metadata()) : Slices.NONE,
columnFilter(),
filter.isReversed(),
metricsCollector)))
{
if (!hasRequiredStatics && !intersects && !iter.partitionLevelDeletion().isLive()) // => partitionLevelDelections == true
{
Expand Down
Loading

1 comment on commit ed2603d

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 3 NEW test failure(s) in 1 builds., Build 1: ran 17693 tests with 10 failures and 128 skipped.
Butler analysis done on ds-cassandra-pr-gate/cndb-10588-vs vs last 16 runs of ds-cassandra-build-nightly/main.
org.apache.cassandra.index.sai.cql.QueryWriteLifecycleTest.testWriteLifecycle[aa_CompoundKeyDataModel{primaryKey=p, c}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.TinySegmentQueryWriteLifecycleTest.testWriteLifecycle[ca_CompositePartitionKeyDataModel{primaryKey=p1, p2}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.VectorSiftSmallTest.testMultiSegmentBuild: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.