Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First dumb of logic to get route index running on the journal #3743

Open
wants to merge 27 commits into
base: cep-15-accord
Choose a base branch
from

Conversation

dcapwell
Copy link
Contributor

No description provided.

@dcapwell dcapwell requested a review from ifesdjeen December 13, 2024 21:29
@@ -1190,7 +1190,7 @@ public Runnable save(AccordCommandStore commandStore, TxnId txnId, @Nullable Com
return null;
}

return commandStore.appendToKeyspace(txnId, value);
return null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to lower the diff I didn't want to rewrite the whole cache logic... null is the only thing possible to return now

@@ -126,6 +126,7 @@ public Collection<StaticSegment<JournalKey, V>> compact(Collection<StaticSegment
while ((advanced = reader.advance()) && reader.key().equals(key));

if (advanced) readers.offer(reader); // there is more to this reader, but not with this key
else reader.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this leak detection was caught by org.apache.cassandra.index.accord.RouteIndexTest#test. The fs would run out of space and when you looked it had very few files and only ~40mb of data, yet 4g was allocated! The root cause was that each time we open a reader we create a new channel and didn't close it, so we could never purge segments.

@@ -248,6 +248,8 @@ public static IVerbHandler<? extends Reply> responseHandlerOrNoop()

public synchronized static void startup(NodeId tcmId)
{
if (instance != null)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safety check, I hit an issue and saw we didn't detect this so just added to be safer.

durableBefores.put(safeStore.commandStore().id(), safeStore.durableBefore());
}
}));
if (node.commandStores().all().length > 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my tests we are testing journal but we don't have any tables marked for accord, so we had 0 stores... compaction tests were also impacted as we could have data but due to us dropping accord we no longer have stores

import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ObjectSizes;

public class IndexRange implements Comparable<IndexRange>, IMeasurableMemory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just extracted from the previous impl and put to the top level. With journal and tables both touching this, it was easier to make top level rather than an inner class


package org.apache.cassandra.concurrent;

public class ForwardingExecutorFactory implements ExecutorFactory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used by RouteIndexTest to make the close/release threads immediately run rather than async, this avoided race condition issues with validation

@@ -93,7 +94,9 @@ private boolean isTestType(Class<? extends IPartitioner> klass)
@Test
public void byteCompareSerde()
{
qt().forAll(AccordGenerators.fromQT(CassandraGenerators.token())).check(token -> {
// make sure to use simplify as local partitioner can have a type that could generate data too large causing this test to be flakey
Gen<Token> qt = CassandraGenerators.partitioners().flatMap(p -> CassandraGenerators.token(CassandraGenerators.simplify(p)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flakey test during my first CI run. LocalPartitioner can have very complex type, which leads to tokens too large causing the test to be flakey

@@ -54,7 +54,7 @@ public void partitionerToToken()
@Test
public void partitionerKeys()
{
qt().forAll(Gens.random(), toGen(CassandraGenerators.partitioners()))
qt().forAll(Gens.random(), toGen(CassandraGenerators.partitioners().map(CassandraGenerators::simplify)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flakey test during my first CI run. LocalPartitioner can have very complex type, which leads to tokens too large causing the test to be flakey

@@ -939,6 +985,8 @@ private StaticSegmentIterator()
StaticSegment.KeyOrderReader<K> reader = staticSegment.keyOrderReader();
if (reader.advance())
this.readers.add(reader);
else
reader.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file leak found in testing

@@ -962,6 +1010,8 @@ public void readAllForKey(K key, RecordConsumer<K> reader)
reader.accept(next.descriptor.timestamp, next.offset, next.key(), next.record(), next.hosts(), next.descriptor.userVersion);
if (next.advance())
readers.add(next);
else
next.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file leak found in testing

@@ -796,18 +832,30 @@ else if (mode == MapMode.READ_WRITE)
long pos = position;
try
{
while (local.hasRemaining())
// the channel could be closed... so always create a new channel to avoid this problem
try (FileChannel channel = provider().newFileChannel(path, Set.of(StandardOpenOption.WRITE)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am finding cases where the journal file is deleted before we finish writing to it... which is fine from a FS point of view... so to better handle that without failing due to the channel being closed, I moved to always opening a new channel for the write... if the file doesn't exist we just no-op (which is the behavior on real FS anyways

@@ -67,6 +67,8 @@ public Collection<StaticSegment<JournalKey, V>> compact(Collection<StaticSegment
KeyOrderReader<JournalKey> reader = segment.keyOrderReader();
if (reader.advance())
readers.add(reader);
else
reader.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another leak detected in the test

@@ -906,8 +941,19 @@ private String maybeAddDiskSpaceContext(String message)
@VisibleForTesting
public void truncateForTesting()
{
advanceSegment(null);
segments.set(Segments.none());
ActiveSegment<?, ?> discarding = currentSegment;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't feel this is safe... but its been stable for me...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant