Skip to content

Commit

Permalink
Handle partitioned nodes in DiscoverySimulationTest
Browse files Browse the repository at this point in the history
  • Loading branch information
beobal committed Dec 16, 2024
1 parent 48dcf5e commit 97489eb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testMoveToCMS() throws IOException
{
try (Cluster cluster = init(Cluster.build(4)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.withoutVNodes()
.start()))
{
cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success();
Expand Down
62 changes: 47 additions & 15 deletions test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
package org.apache.cassandra.tcm;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.NotImplementedException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -49,6 +48,9 @@
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.utils.concurrent.Future;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class DiscoverySimulationTest
{
static
Expand Down Expand Up @@ -84,6 +86,9 @@ public void discoveryTest() throws Throwable
Set<InetAddressAndPort> seeds = new HashSet<>();
seeds.add(InetAddressAndPort.getByName("127.0.100.1"));
seeds.add(InetAddressAndPort.getByName("127.0.100.100")); // add an unreachable node
// Thread per-node to try and avoid nodes which start first finishing
// before later nodes have made first contact with the seed.
Executor executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++)
{
InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.100." + i);
Expand All @@ -94,15 +99,33 @@ public void discoveryTest() throws Throwable
messaging.handlers.put(Verb.TCM_DISCOVER_REQ, discovery.requestHandler);
}

List<CompletableFuture<Discovery.DiscoveredNodes>> futures = new ArrayList<>();
for (Discovery value : nodes.values())
futures.add(CompletableFuture.supplyAsync(() -> value.discover(5)));

for (CompletableFuture<Discovery.DiscoveredNodes> future : futures)
future.get();

for (CompletableFuture<Discovery.DiscoveredNodes> future : futures)
Assert.assertEquals(nodes.keySet(), future.get().nodes());
Map<InetAddressAndPort, CompletableFuture<Discovery.DiscoveredNodes>> futures = new HashMap<>();
nodes.forEach((addr, discovery) -> {
futures.put(addr, CompletableFuture.supplyAsync(() -> discovery.discover(5), executor));
});

Map<InetAddressAndPort, Discovery.DiscoveredNodes> discovered = new HashMap<>();
for (Map.Entry<InetAddressAndPort, CompletableFuture<Discovery.DiscoveredNodes>> future : futures.entrySet())
discovered.put(future.getKey(), future.getValue().get());

// It's possible that some node(s) in the cluster were completely unable to contact any seed. Therefore, these
// nodes are undiscoverable by the rest of the cluster so we exclude them from the expected results. We should
// also expect those partitioned nodes to have failed to discover any of their peers.
Set<InetAddressAndPort> connected = new HashSet<>();
cluster.forEach((addr, messaging) -> {
if (messaging.hasSentAtLeastOneRequest)
connected.add(addr);
});

for (Map.Entry<InetAddressAndPort, Discovery.DiscoveredNodes> result : discovered.entrySet())
{
InetAddressAndPort node = result.getKey();
Set<InetAddressAndPort> peersDiscovered = result.getValue().nodes();
if (connected.contains(node))
assertEquals(node + " was able to contact seed, but didn't discover expected peers", connected, peersDiscovered);
else
assertTrue(node + " was unable to contact seed, but discovered " + peersDiscovered, peersDiscovered.isEmpty());
}
}

/**
Expand All @@ -113,12 +136,17 @@ public void discoveryTest() throws Throwable
*/
public static class FakeMessageDelivery implements MessageDelivery
{
private static AtomicInteger counter = new AtomicInteger();
private final AtomicInteger counter = new AtomicInteger();

private final Map<InetAddressAndPort, FakeMessageDelivery> cluster;
private final Map<Long, RequestCallback<?>> callbacks;
private final Map<Verb, IVerbHandler<?>> handlers;
private final InetAddressAndPort addr;
// If we're unlucky, every attempt by this node to contact the single live peer may
// be randomly chosen to fail. If this happens, no other node is able to discover
// that this one exists and so it should be excluded from the expected set when
// checking results.
public boolean hasSentAtLeastOneRequest = false;

public FakeMessageDelivery(Map<InetAddressAndPort, FakeMessageDelivery> cluster,
InetAddressAndPort addr)
Expand All @@ -139,6 +167,7 @@ public <REQ> void send(Message<REQ> message, InetAddressAndPort to)
{
if (message.verb().isResponse())
{
logger.info("{} sending response to {}", addr, to);
cluster.get(to).deliverResponse(Message.forgeIdentityForTests(message, addr));
return;
}
Expand All @@ -154,14 +183,17 @@ public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort
FakeMessageDelivery node = cluster.get(to);
if (node != null &&
// Inject some failures
counter.incrementAndGet() % 2 != 0)
counter.incrementAndGet() % 5 != 0)
{
hasSentAtLeastOneRequest = true;
logger.info("{} sending request to {}", addr, to);
callbacks.put(message.id(), cb);
IVerbHandler<REQ> handler = (IVerbHandler<REQ>) node.handlers.get(message.verb());
handler.doVerb(message);
}
else
{
logger.info("{} simulating failure sending request to {}", addr, to);
cb.onFailure(to, RequestFailureReason.TIMEOUT);
}
}
Expand All @@ -186,4 +218,4 @@ public <V> void respond(V response, Message<?> message)
throw new NotImplementedException();
}
}
}
}

0 comments on commit 97489eb

Please sign in to comment.