Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First dumb of logic to get route index running on the journal #3743

Open
wants to merge 27 commits into
base: cep-15-accord
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/dcapwell/cassandra-accord.git
branch = CASSANDRA-20144
120 changes: 2 additions & 118 deletions src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.api.Agent;
import accord.local.Cleanup;
import accord.local.CommandStores;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.DurableBefore;
import accord.local.RedundantBefore;
import accord.local.StoreParticipants;
import accord.primitives.SaveStatus;
import accord.primitives.Status;
import accord.primitives.Status.Durability;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
Expand All @@ -67,7 +61,6 @@
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
Expand Down Expand Up @@ -96,8 +89,7 @@
import org.apache.cassandra.service.accord.AccordJournalValueSerializers;
import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordKeyspace.CommandRows;
import org.apache.cassandra.service.accord.AccordKeyspace.CommandsColumns;

import org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
import org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyRows;
import org.apache.cassandra.service.accord.AccordService;
Expand All @@ -111,23 +103,15 @@
import org.apache.cassandra.utils.TimeUUID;

import static accord.local.Cleanup.ERASE;
import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
import static accord.local.Cleanup.shouldCleanupPartial;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandRows.expungePartial;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandRows.saveStatusOnly;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandRows.truncatedApply;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeysAccessor;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.last_executed_micros;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.last_executed_timestamp;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.last_write_timestamp;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyRows.truncateTimestampsForKeyRow;
import static org.apache.cassandra.service.accord.AccordKeyspace.deserializeDurabilityOrNull;
import static org.apache.cassandra.service.accord.AccordKeyspace.deserializeParticipantsOrNull;
import static org.apache.cassandra.service.accord.AccordKeyspace.deserializeSaveStatusOrNull;
import static org.apache.cassandra.service.accord.AccordKeyspace.deserializeTimestampOrNull;

/**
Expand Down Expand Up @@ -239,8 +223,6 @@ private Transformation<UnfilteredRowIterator> purger(ColumnFamilyStore cfs, Supp
if (!requiresAccordSpecificPurger(cfs))
return new Purger(controller, nowInSec);

if (isAccordCommands(cfs))
return new AccordCommandsPurger(accordService);
if (isAccordTimestampsForKey(cfs))
return new AccordTimestampsForKeyPurger(accordService);
if (isAccordJournal(cfs))
Expand Down Expand Up @@ -805,98 +787,6 @@ protected Row applyToRow(Row row)
}
}

class AccordCommandsPurger extends AbstractPurger
{
final Agent agent;
final Int2ObjectHashMap<RedundantBefore> redundantBefores;
final Int2ObjectHashMap<DurableBefore> durableBefores;
final Int2ObjectHashMap<RangesForEpoch> ranges;

int storeId;
TxnId txnId;

AccordCommandsPurger(Supplier<IAccordService> accordService)
{
IAccordService service = accordService.get();
IAccordService.CompactionInfo compactionInfo = service.getCompactionInfo();
this.agent = service.agent();
this.redundantBefores = compactionInfo.redundantBefores;
this.ranges = compactionInfo.ranges;
this.durableBefores = compactionInfo.durableBefores;
}

protected void beginPartition(UnfilteredRowIterator partition)
{
ByteBuffer[] partitionKeyComponents = CommandRows.splitPartitionKey(partition.partitionKey());
storeId = CommandRows.getStoreId(partitionKeyComponents);
txnId = CommandRows.getTxnId(partitionKeyComponents);
}

@Override
protected Row applyToRow(Row row)
{
updateProgress();

RedundantBefore redundantBefore = redundantBefores.get(storeId);
DurableBefore durableBefore = durableBefores.get(storeId);
// TODO (expected): if the store has been retired, this should return null
if (redundantBefore == null)
return row;

Cell durabilityCell = row.getCell(CommandsColumns.durability);
Durability durability = deserializeDurabilityOrNull(durabilityCell);
Cell executeAtCell = row.getCell(CommandsColumns.execute_at);
Cell participantsCell = row.getCell(CommandsColumns.participants);
StoreParticipants participants = deserializeParticipantsOrNull(participantsCell);
Cell statusCell = row.getCell(CommandsColumns.status);
SaveStatus saveStatus = deserializeSaveStatusOrNull(statusCell);

if (saveStatus == null)
return row;

if (saveStatus.is(Status.Invalidated))
return saveStatusOnly(saveStatus, row, nowInSec);

Cleanup cleanup = shouldCleanupPartial(agent, txnId, saveStatus, durability, participants,
redundantBefore, durableBefore);
switch (cleanup)
{
default: throw new AssertionError(String.format("Unexpected cleanup task: %s", cleanup));
case EXPUNGE:
return null;

case EXPUNGE_PARTIAL:
return expungePartial(row, durabilityCell, executeAtCell, participantsCell);

case ERASE:
// Emit a tombstone so if this is slicing the command and making it not possible to determine if it
// can be truncated later it can still be dropped via the tombstone.
// Eventually the tombstone can be dropped by `durableBefore.min(txnId) == Universal`
// We can still encounter sliced command state just because compaction inputs are random
return BTreeRow.emptyDeletedRow(row.clustering(), new Row.Deletion(DeletionTime.build(row.primaryKeyLivenessInfo().timestamp(), nowInSec), false));

case VESTIGIAL:
case INVALIDATE:
return saveStatusOnly(cleanup.appliesIfNot, row, nowInSec);

case TRUNCATE_WITH_OUTCOME:
case TRUNCATE:
return truncatedApply(cleanup.appliesIfNot, row, nowInSec, durability, durabilityCell, executeAtCell, participantsCell, cleanup == TRUNCATE_WITH_OUTCOME);

case NO:
// TODO (required): when we port this to journal, make sure to expunge extra fields beyond those we need to retain
return row;
}
}

@Override
protected Row applyToStatic(Row row)
{
checkState(row.isStatic() && row.isEmpty());
return row;
}
}

class AccordTimestampsForKeyPurger extends AbstractPurger
{
final Int2ObjectHashMap<RedundantBefore> redundantBefores;
Expand Down Expand Up @@ -1225,8 +1115,7 @@ private static boolean isPaxos(ColumnFamilyStore cfs)
private static boolean requiresAccordSpecificPurger(ColumnFamilyStore cfs)
{
return cfs.getKeyspaceName().equals(SchemaConstants.ACCORD_KEYSPACE_NAME) &&
ImmutableSet.of(AccordKeyspace.COMMANDS,
AccordKeyspace.TIMESTAMPS_FOR_KEY,
ImmutableSet.of(AccordKeyspace.TIMESTAMPS_FOR_KEY,
AccordKeyspace.JOURNAL,
AccordKeyspace.COMMANDS_FOR_KEY)
.contains(cfs.getTableName());
Expand All @@ -1242,11 +1131,6 @@ private static boolean isAccordJournal(ColumnFamilyStore cfs)
return cfs.getKeyspaceName().equals(SchemaConstants.ACCORD_KEYSPACE_NAME) && cfs.name.startsWith(AccordKeyspace.JOURNAL);
}

private static boolean isAccordCommands(ColumnFamilyStore cfs)
{
return isAccordTable(cfs, AccordKeyspace.COMMANDS);
}

private static boolean isAccordTimestampsForKey(ColumnFamilyStore cfs)
{
return isAccordTable(cfs, AccordKeyspace.TIMESTAMPS_FOR_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public DataSet data()
{
try (AccordExecutor.ExclusiveGlobalCaches cache = executor.lockCaches())
{
addRow(ds, executor.executorId(), AccordKeyspace.COMMANDS, cache.commands.statsSnapshot());
addRow(ds, executor.executorId(), "commands", cache.commands.statsSnapshot());
addRow(ds, executor.executorId(), AccordKeyspace.COMMANDS_FOR_KEY, cache.commandsForKey.statsSnapshot());
addRow(ds, executor.executorId(), AccordKeyspace.TIMESTAMPS_FOR_KEY, cache.timestampsForKey.statsSnapshot());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IndexMetrics
private final String indexName;
public final Timer memtableIndexWriteLatency;

public IndexMetrics(RouteIndex index)
public IndexMetrics(ParticipantsJournalIndex index)
{
this.ks = index.baseCfs().getKeyspaceName();
this.table = index.baseCfs().name;
Expand Down
Loading