Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-20118; keep old schema version when upgrading to 5.1 #3749

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public DistributedSchema(Keyspaces keyspaces, Epoch epoch)
Objects.requireNonNull(keyspaces);
this.keyspaces = keyspaces;
this.epoch = epoch;
this.version = new UUID(0, epoch.getEpoch());
this.version = epoch.isBefore(Epoch.EMPTY) ? SchemaKeyspace.calculateSchemaDigest() : new UUID(0, epoch.getEpoch());
validate();
}

Expand Down
26 changes: 26 additions & 0 deletions src/java/org/apache/cassandra/schema/SchemaKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,32 @@ private static void flush()
ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush(ColumnFamilyStore.FlushReason.INTERNALLY_FORCED)));
}

/**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*/
public static UUID calculateSchemaDigest()
{
Digest digest = Digest.forSchema();
for (String table : ALL)
{
ReadCommand cmd = getReadCommandForTableSchema(table);
try (ReadExecutionController executionController = cmd.executionController();
PartitionIterator schema = cmd.executeInternal(executionController))
{
while (schema.hasNext())
{
try (RowIterator partition = schema.next())
{
if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
RowIterators.digest(partition, digest);
}
}
}
}
return UUID.nameUUIDFromBytes(digest.digest());
}

/**
* @param schemaTableName The name of the table responsible for part of the schema
* @return CFS responsible to hold low-level serialized schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,41 @@ public void upgradeWithHintsTest() throws Throwable
});

IInvokableInstance inst = (IInvokableInstance)cluster.get(2);
long hintsDelivered = inst.callOnInstance(() -> {
return (long)HintsServiceMetrics.hintsSucceeded.getCount();
});
long hintsDelivered = inst.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount());
assertEquals(rowCount, hintsDelivered);
}).run();
}

@Test
public void upgradeWithHintsMixedModeTest() throws Throwable
{
final int rowCount = 50;
new TestCase()
.nodes(2)
.nodesToUpgrade(1)
.withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP))
.upgradesToCurrentFrom(v50)
.setup((cluster) -> {
cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int, v int, PRIMARY KEY (k))");
cluster.get(2).nodetoolResult("pausehandoff").asserts().success();

// insert some data while node1 is down so that hints are written
cluster.get(1).shutdown().get();
for (int i = 0; i < rowCount; i++)
cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl(k,v) VALUES (?, ?)", ConsistencyLevel.ANY, i, i);
cluster.get(2).flush(KEYSPACE);
cluster.get(1).startup();
// Check that none of the writes got to node1
SimpleQueryResult rows = cluster.get(1).executeInternalWithResult("SELECT * FROM " + KEYSPACE + ".tbl");
assertFalse(rows.hasNext());
})
.runAfterNodeUpgrade((cluster, i) -> {
cluster.get(2).nodetoolResult("resumehandoff").asserts().success();
Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> {
SimpleQueryResult rows = cluster.get(1).executeInternalWithResult("SELECT * FROM " + KEYSPACE + ".tbl");
return rows.toObjectArrays().length == rowCount;
});
}).run();
}
}