Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-10588: change ReadObserver to track source iterators from sstables and memtable instead of merged iterator #1287

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,18 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea
Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
if (useMinLocalDeletionTime)
controller.updateMinOldestUnrepairedTombstone(iter.getMinLocalDeletionTime());
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));

UnfilteredPartitionIterator partitions = controller.observer().observeUnmergedPartitions(iter);
inputCollector.addMemtableIterator(RTBoundValidator.validate(partitions, RTBoundValidator.Stage.MEMTABLE, false));
}

SSTableReadsListener readCountUpdater = newReadCountUpdater();
for (SSTableReader sstable : view.sstables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
iter = controller.observer().observeUnmergedPartitions(iter);

inputCollector.addSSTableIterator(sstable, RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false));

if (!sstable.isRepaired())
Expand Down
51 changes: 1 addition & 50 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
try
{
iterator = withStateTracking(iterator);
iterator = withReadObserver(iterator);
iterator = executionController.observer().observeMergedPartitions(iterator);
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);

Expand Down Expand Up @@ -462,55 +462,6 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
}
}

public UnfilteredPartitionIterator withReadObserver(UnfilteredPartitionIterator partitions)
{
ReadObserver observer = ReadObserverFactory.instance.create(this.metadata());

// skip if observer is disabled
if (observer == ReadObserver.NO_OP)
return partitions;

class ReadObserverTransformation extends Transformation<UnfilteredRowIterator>
{
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
observer.onPartition(partition.partitionKey(), partition.partitionLevelDeletion());
return Transformation.apply(partition, this);
}

@Override
protected Row applyToStatic(Row row)
{
if (!row.isEmpty())
observer.onStaticRow(row);
return row;
}

@Override
protected Row applyToRow(Row row)
{
observer.onUnfiltered(row);
return row;
}

@Override
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
observer.onUnfiltered(marker);
return marker;
}

@Override
protected void onClose()
{
observer.onComplete();
}
}

return Transformation.apply(partitions, new ReadObserverTransformation());
}

public UnfilteredPartitionIterator searchStorage(Index.Searcher searcher, ReadExecutionController executionController)
{
return searcher.search(executionController);
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/db/ReadExecutionController.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class ReadExecutionController implements AutoCloseable

public final Histogram sstablesScannedPerRowRead;

private final ReadObserver readObserver;

ReadExecutionController(ReadCommand command,
OpOrder.Group baseOp,
TableMetadata baseMetadata,
Expand All @@ -72,6 +74,8 @@ public class ReadExecutionController implements AutoCloseable
this.command = command;
this.createdAtNanos = createdAtNanos;

this.readObserver = ReadObserverFactory.instance.create(baseMetadata);

this.sstablesScannedPerRowRead = new Histogram(new DecayingEstimatedHistogramReservoir(true));

if (trackRepairedStatus)
Expand Down Expand Up @@ -101,6 +105,11 @@ public ReadExecutionController indexReadController()
return indexController;
}

public ReadObserver observer()
{
return readObserver;
}

public WriteContext getWriteContext()
{
return writeContext;
Expand Down Expand Up @@ -217,6 +226,8 @@ public void close()
}
}

readObserver.onComplete();

if (createdAtNanos != NO_SAMPLING)
addSample();

Expand Down
41 changes: 26 additions & 15 deletions src/java/org/apache/cassandra/db/ReadObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.cassandra.db;

import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;

/**
* An interface that allows to capture what local data has been read
Expand All @@ -30,29 +30,40 @@ public interface ReadObserver
ReadObserver NO_OP = new ReadObserver() {};

/**
* Called on every partition read
*
* @param partitionKey the partition key
* @param deletionTime partition deletion time
* Observes the provided unmerged partitions.
* <p>
* This method allows tracking of the partitions from individual sstable or memtable
*/
default void onPartition(DecoratedKey partitionKey, DeletionTime deletionTime) {}
default UnfilteredPartitionIterator observeUnmergedPartitions(UnfilteredPartitionIterator partitions)
{
return partitions;
}

/**
* Called on every static row read.
*
* @param staticRow static row of the partition
* Observes the provided unmerged partition.
* <p>
* This method allows tracking of the partition from individual sstable or memtable
*/
default void onStaticRow(Row staticRow) {}
default UnfilteredRowIterator observeUnmergedPartition(UnfilteredRowIterator partition)
{
return partition;
}

/**
* Called on every unfiltered read.
*
* @param unfiltered either row or range tombstone.
* Observes the provided merged partitions.
* <p>
* This method allows tracking of partitions after they have been merged from multiple sstables and memtable.
*/
default void onUnfiltered(Unfiltered unfiltered) {}
default UnfilteredPartitionIterator observeMergedPartitions(UnfilteredPartitionIterator partitions)
{
return partitions;
}

/**
* Called on read request completion
* <p>
* This method is used to signify that a read request has completed, allowing any necessary
* final tracking or cleanup.
*/
default void onComplete() {}
}
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/db/ReadObserverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
*/
package org.apache.cassandra.db;

import javax.annotation.Nullable;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY;


/**
* Provides custom factory that creates a {@link ReadObserver} instance per read request
*/
Expand All @@ -33,7 +34,7 @@ public interface ReadObserverFactory
new ReadObserverFactory() {} :
FBUtilities.construct(CassandraRelevantProperties.CUSTOM_READ_OBSERVER_FACTORY.getString(), "custom read observer factory");

default ReadObserver create(TableMetadata table)
default ReadObserver create(@Nullable TableMetadata table)
{
return ReadObserver.NO_OP;
}
Expand Down
20 changes: 11 additions & 9 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sai.utils.RowWithSourceTable;
import org.apache.cassandra.io.sstable.format.RowIndexEntry;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
Expand Down Expand Up @@ -649,6 +648,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs

@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
iter = controller.observer().observeUnmergedPartition(iter);

var wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(memtable)) : iter;

Expand Down Expand Up @@ -707,6 +707,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
UnfilteredRowIterator iter = intersects
? makeIterator(cfs, sstable, metricsCollector)
: makeIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector);

iter = controller.observer().observeUnmergedPartition(iter);
if (!intersects)
{
nonIntersectingSSTables++;
Expand Down Expand Up @@ -887,7 +889,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
if (partition == null)
continue;

try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
try (UnfilteredRowIterator iter = controller.observer().observeUnmergedPartition(filter.getUnfilteredRowIterator(columnFilter(), partition)))
{
if (iter.isEmpty())
continue;
Expand Down Expand Up @@ -932,13 +934,13 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
continue;
}

try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs,
sstable,
partitionKey(),
intersects ? filter.getSlices(metadata()) : Slices.NONE,
columnFilter(),
filter.isReversed(),
metricsCollector))
try (UnfilteredRowIterator iter = controller.observer().observeUnmergedPartition(StorageHook.instance.makeRowIterator(cfs,
sstable,
partitionKey(),
intersects ? filter.getSlices(metadata()) : Slices.NONE,
columnFilter(),
filter.isReversed(),
metricsCollector)))
{
if (!hasRequiredStatics && !intersects && !iter.partitionLevelDeletion().isLive()) // => partitionLevelDelections == true
{
Expand Down
Loading