Skip to content

Commit

Permalink
Fix topology replay during bootstrap and startup, decouple Accord fro…
Browse files Browse the repository at this point in the history
…m TCM

Includes multiple changes, primary ones:
  * Removed nodes now live in TCM, no need to discover historic epochs in order to find removed nodes
  * CommandStore <-> RangesForEpochs mappings required for startup are now stored in journal, and CS can be set up _without_ topology replay
  * Topology replay is fully done via journal (where we store topologies themselves), and topology metadata table (where we store redundant/closed information)
  * Fixed various bugs related to propagation and staleness
     * TCM was previously relied on for "fetching" epoch: we can not rely on it as there's no guarantee we will see a consecutive epoch when grabbing Metadata#current
     * Redundant / closed during replay was set with incorrect ranges in 1 of the code paths
     * TCM was contacted multiple times for historical epochs, which made startup much longer under some circumstances
  • Loading branch information
ifesdjeen committed Dec 16, 2024
1 parent 8fdcbe2 commit a1e2c61
Show file tree
Hide file tree
Showing 39 changed files with 883 additions and 362 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/ifesdjeen/cassandra-accord.git
branch = CASSANDRA-20115
branch = topology-rebuild-on-bounce
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/AccordSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public enum TransactionalRangeMigration
public boolean ephemeralReadEnabled = true;
public boolean state_cache_listener_jfr_enabled = true;
public final JournalSpec journal = new JournalSpec();
// TODO: do we need more retries; rename
public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec();

public static class MinEpochRetrySpec extends RetrySpec
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/exceptions/RequestFailure.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class RequestFailure
public static final RequestFailure COORDINATOR_BEHIND = new RequestFailure(RequestFailureReason.COORDINATOR_BEHIND);
public static final RequestFailure READ_TOO_MANY_INDEXES = new RequestFailure(RequestFailureReason.READ_TOO_MANY_INDEXES);
public static final RequestFailure RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM = new RequestFailure(RequestFailureReason.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);
public static final RequestFailure BOOTING = new RequestFailure(RequestFailureReason.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);

static
{
Expand Down Expand Up @@ -131,6 +132,7 @@ public static RequestFailure forReason(RequestFailureReason reason)
{
switch (reason)
{
// TODO: this really screws things up
default: throw new IllegalStateException("Unhandled request failure reason " + reason);
case UNKNOWN: return UNKNOWN;
case READ_TOO_MANY_TOMBSTONES: return READ_TOO_MANY_TOMBSTONES;
Expand All @@ -144,6 +146,7 @@ public static RequestFailure forReason(RequestFailureReason reason)
case COORDINATOR_BEHIND: return COORDINATOR_BEHIND;
case READ_TOO_MANY_INDEXES: return READ_TOO_MANY_INDEXES;
case RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM: return RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM;
case BOOTING: return BOOTING;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public enum RequestFailureReason
NOT_CMS (7),
INVALID_ROUTING (8),
COORDINATOR_BEHIND (9),
READ_TOO_MANY_INDEXES (10),
READ_TOO_MANY_INDEXES (10),
RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM (11),
BOOTING (12),
;

static
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Set<NodeId> reconfigure(ClusterMetadata metadata, Map<String, Replication
{
if (!filter.apply(metadata, peerId))
{
tmpDirectory = tmpDirectory.without(peerId);
tmpDirectory = tmpDirectory.without(metadata.nextEpoch(), peerId);
tokenMap = tokenMap.unassignTokens(peerId);
}
}
Expand Down
43 changes: 42 additions & 1 deletion src/java/org/apache/cassandra/net/MessageDelivery.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.slf4j.Logger;
Expand All @@ -32,6 +31,7 @@
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.utils.Backoff;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Accumulator;
Expand Down Expand Up @@ -100,6 +100,43 @@ public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Backoff backoff,
return promise;
}

public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(SharedContext sharedContext, Verb verb, REQ request, Collection<InetAddressAndPort> peers)
{
// TODO: move somewhere else
Iterator<InetAddressAndPort> candidates = new Iterator<>()
{
boolean firstRun = true;
Iterator<InetAddressAndPort> iter = peers.iterator();

public boolean hasNext()
{
return !peers.isEmpty();
}

public InetAddressAndPort next()
{
// At first, try all alive nodes
if (firstRun)
{
while (iter.hasNext())
{
InetAddressAndPort candidate = iter.next();
if (sharedContext.failureDetector().isAlive(candidate))
return candidate;
}
firstRun = false;
}

// After that, cycle through all nodes
if (!iter.hasNext())
iter = peers.iterator();

return iter.next();
}
};
return sendWithRetries(Backoff.None.INSTANCE, ImmediateRetryScheduler.instance, verb, request, candidates, RetryPredicate.ALWAYS_RETRY, RetryErrorMessage.EMPTY);
}

public default <REQ, RSP> void sendWithRetries(Backoff backoff, RetryScheduler retryThreads,
Verb verb, REQ request,
Iterator<InetAddressAndPort> candidates,
Expand Down Expand Up @@ -127,11 +164,15 @@ interface OnResult<T>

interface RetryPredicate
{
public static RetryPredicate times(int n) { return (attempt, from, failure) -> attempt < n; }
RetryPredicate ALWAYS_RETRY = (i1, i2, i3) -> true;
RetryPredicate ALWAYS_REJECT = (i1, i2, i3) -> false;
boolean test(int attempt, InetAddressAndPort from, RequestFailure failure);
}

interface RetryErrorMessage
{
RetryErrorMessage EMPTY = (i1, i2, i3, i4) -> null;
String apply(int attempt, ResponseFailureReason retryFailure, @Nullable InetAddressAndPort from, @Nullable RequestFailure reason);
}

Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordSyncPropagator;
import org.apache.cassandra.service.accord.AccordSyncPropagator.Notification;
import org.apache.cassandra.service.accord.FetchTopology;
import org.apache.cassandra.service.accord.FetchMinEpoch;
import org.apache.cassandra.service.accord.interop.AccordInteropApply;
import org.apache.cassandra.service.accord.interop.AccordInteropCommit;
Expand Down Expand Up @@ -356,8 +357,11 @@ public enum Verb
ACCORD_INTEROP_READ_REPAIR_RSP (158, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.replySerializer, AccordService::responseHandlerOrNoop),
ACCORD_INTEROP_READ_REPAIR_REQ (159, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_REPAIR_RSP),
ACCORD_INTEROP_APPLY_REQ (160, P2, writeTimeout, IMMEDIATE, () -> AccordInteropApply.serializer, AccordService::requestHandlerOrNoop, ACCORD_APPLY_RSP),
// TODO: swap IDS?
ACCORD_FETCH_MIN_EPOCH_RSP (166, P2, writeTimeout, IMMEDIATE, () -> FetchMinEpoch.Response.serializer, RESPONSE_HANDLER),
ACCORD_FETCH_MIN_EPOCH_REQ (165, P2, writeTimeout, IMMEDIATE, () -> FetchMinEpoch.serializer, () -> FetchMinEpoch.handler, ACCORD_FETCH_MIN_EPOCH_RSP),
ACCORD_FETCH_TOPOLOGY_RSP (167, P2, writeTimeout, IMMEDIATE, () -> FetchTopology.Response.serializer, RESPONSE_HANDLER),
ACCORD_FETCH_TOPOLOGY_REQ (168, P2, writeTimeout, IMMEDIATE, () -> FetchTopology.serializer, () -> FetchTopology.handler, ACCORD_FETCH_TOPOLOGY_RSP),

// generic failure response
FAILURE_RSP (99, P0, noTimeout, REQUEST_RESPONSE, () -> RequestFailure.serializer, RESPONSE_HANDLER ),
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/repair/SharedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
*
* See {@link Global#instance} for the main production path
*/
// TODO: move to util or something?
public interface SharedContext
{
InetAddressAndPort broadcastAddressAndPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public AccordCommandStore(int id,
loadRangesForEpoch(journal.loadRangesForEpoch(id()));
}

static Factory factory(AccordJournal journal, IntFunction<AccordExecutor> executorFactory)
static Factory factory(Journal journal, IntFunction<AccordExecutor> executorFactory)
{
return (id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch) ->
new AccordCommandStore(id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch, journal, executorFactory.apply(id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.local.CommandStores;
Expand All @@ -47,8 +48,8 @@
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;

import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITHOUT_LOCK;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
import static org.apache.cassandra.service.accord.AccordExecutor.constant;
Expand All @@ -60,15 +61,16 @@ public class AccordCommandStores extends CommandStores implements CacheSize

private final CacheSizeMetrics cacheSizeMetrics;
private final AccordExecutor[] executors;

private long cacheSize, workingSetSize;
private int maxQueuedLoads, maxQueuedRangeLoads;
private boolean shrinkingOn;

AccordCommandStores(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random,
ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory,
AccordJournal journal, AccordExecutor[] executors)
Journal journal, AccordExecutor[] executors)
{
super(node, agent, store, random, shardDistributor, progressLogFactory, listenerFactory,
super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenerFactory,
AccordCommandStore.factory(journal, id -> executors[id % executors.length]));
this.executors = executors;
this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this);
Expand All @@ -80,9 +82,9 @@ public class AccordCommandStores extends CommandStores implements CacheSize
refreshCapacities();
}

static Factory factory(AccordJournal journal)
static Factory factory()
{
return (time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory) -> {
return (NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) -> {
AccordExecutor[] executors = new AccordExecutor[getAccordQueueShardCount()];
AccordExecutorFactory factory;
int maxThreads = Integer.MAX_VALUE;
Expand Down Expand Up @@ -119,7 +121,7 @@ static Factory factory(AccordJournal journal)
}
}

return new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory, journal, executors);
return new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, journal, executors);
};
}

Expand Down
Loading

0 comments on commit a1e2c61

Please sign in to comment.