-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-23304 #4821
base: main
Are you sure you want to change the base?
IGNITE-23304 #4821
Conversation
…nto ignite-23304 # Conflicts: # modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
…nto ignite-23304 # Conflicts: # modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java # modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
...main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
Outdated
Show resolved
Hide resolved
.../apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
Outdated
Show resolved
Hide resolved
.../apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
Outdated
Show resolved
Hide resolved
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java
Outdated
Show resolved
Hide resolved
...rc/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
Outdated
Show resolved
Hide resolved
...table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
Show resolved
Hide resolved
...n/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
Show resolved
Hide resolved
.../org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java
Show resolved
Hide resolved
@@ -72,31 +63,40 @@ public final long nowLong() { | |||
} | |||
|
|||
@Override | |||
public final long currentLong() { | |||
long current = currentTime(); | |||
public final long nowLong(HybridTimestamp causal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you rewrite the logic instead of reuse HLC#update method? Moreover the update method is better, because more frequently use increment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems I unintentionally re-wrote update method.
Removed these methods and switched to existing "update".
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
Outdated
Show resolved
Hide resolved
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
Outdated
Show resolved
Hide resolved
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
Outdated
Show resolved
Hide resolved
.../apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
Show resolved
Hide resolved
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
Outdated
Show resolved
Hide resolved
...apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
Show resolved
Hide resolved
modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java
Show resolved
Hide resolved
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
Outdated
Show resolved
Hide resolved
* | ||
* @return The timestamp. | ||
*/ | ||
default @Nullable HybridTimestamp initiatorTime() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do you mean that ReadCommand should also be initiatorTime aware?
- Do you mean that all WriteCommand should be initiatorTime aware?
- Why it's @nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- optional
- optional
- this is nullable because this field is optional (protocol dependent). but it will always be not null for raft + writecommand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do we ever set initiatorTime to any ReadCommand?
- Is it ever valid not to provide initiatorTime to corresponding commands in case of jraft?
this is nullable because this field is optional (protocol dependent).
Are we going to have anything besides jraft in nearest future (say a year)? If not I'd rather remove @nullable, because otherwise we may forget to set the value while using jraft.
but it will always be not null for raft + writecommand.
Why HybridTimestamp initiatorTime()
is not in WriteCommand in that case?
BTW, do you mean that every WriteCommand should be initiatorTime aware?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- No. I've moved initiatorTime to WriteCommand.
- It's valid, I was wrong before. It can be null for CMG for example and any other group not requring mvcc extensions.
So it's not possible to remove @nullable in WriteCommand.
Correctness is ensured by SafeTimestampPropagatingCommand, where this annotation is removed, so it's not possible to create SafeTimestampPropagatingCommand without initialTime.
* | ||
* @param safeTs Safe timestamp. | ||
*/ | ||
default void patch(HybridTimestamp safeTs) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concept of patching the command isn't actually good. Meaning that command becomes mutable and thus not-thread safe. Let's say on the replica side, within one of the replica's threads ,UpdateAllCommand was created a bit later, exactly that command was patched with some other in-raft thread. Without extra synchronization, replica won't see proposed safeTime. Basically, it's quite fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this concept. See patchCommandBeforeApply. This looks really ugly.
My PR actually makes this concept clean.
Patching is hidden on replication layer and not visible outside. Message may or may not be patched, depending on protocol requirements. It still remains immutable then it enters messaging layer on initiator side.
No known to me thread safety issues exist in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this concept.
Well, thats true. However you are currently reworking the logic. Why not to remove 'ugly' part of the code.
No known to me thread safety issues exist in the PR.
Reading safeTime anyware besides raft linearised logic isn't thread safe. Meaning that if i'll read original command.safeTime that was created on the Replica side I may not see patched value. I don't think that currently anyone does that, however it's just fragile. At least you may add a comment to safeTime getter explaining that it's not thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to remove 'ugly' part of the code.
I did exactly that. The next step is to remove beforeApply handler from metastore.
Reading safeTime anyware besides raft linearised logic isn't thread safe
I don't get you. safeTime is no longer assigned on primary replica. It's assinged in a fully thread safe way on entering replication layer. All safeTs usages are thread safe in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get you. safeTime is no longer assigned on primary replica. It's assinged in a fully thread safe way on entering replication layer. All safeTs usages are thread safe in the PR.
I mean what if someone will try to
- create UpdateCommand cmd in PartitionReplicaListener, send it to Raft whehere it'll be patched.
- will call cmd.safeTime() on the PartitionReplicaListener side. What will he see?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I got you.
Reading safeTime after patching the class in another thread is not thread safe in the current implementation. This is mutability consequences.
Javadoc on safeTime already mentions special processing for this field:
/** The Safe timestamp. A value for this field is auto generated and should not be set manually. */
So it should never be read back.
Thread safety issues can be fixed by cloning the command before submitting it to raft, but IMO doesn't give any profit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also mention in safeTime();
javadoc that it's not thread-safe, and that proper visibility is only guaranteed inside raft.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🆗
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
Show resolved
Hide resolved
@@ -447,6 +447,10 @@ public boolean startRaftNode( | |||
// Thread pools are shared by all raft groups. | |||
NodeOptions nodeOptions = opts.copy(); | |||
|
|||
// Then a new election starts on a node, it has local physical time higher than last generated safe ts | |||
// because we wait out the clock skew. | |||
nodeOptions.setElectionTimeoutMs(Math.max(nodeOptions.getElectionTimeoutMs(), groupOptions.maxClockSkew())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you going to guarantee it has local physical time higher than last generated safe ts
in case of immediate leader election?
E.g. if there is only one node in partition. (let's say that partition was rebalanced from A to B)
I'm not sure whether it's the only case of immediate leader election attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, leader lease timeout enforces this condition.
I know only one scenario, where manual ordering propagation is required, see below comment on timeoutnowrequest.
for a single node partition I see zero issues.
can you provide more details ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've investigated this scenario and ensured everything is ok, because:
- Then a configuration is changed from A to B, on new configuration commit A steps down and sends timeoutnowrequest to B
- If it dies before sending the request, B will elect self a leader after previos leader (A) lease timeout.
Added a new test for this scenario: org.apache.ignite.distributed.ReplicasSafeTimePropagationTest#testSafeTimeReorderingOnClusterShrink
@@ -3733,6 +3750,11 @@ public Message handleTimeoutNowRequest(final TimeoutNowRequest request, final Rp | |||
.build(); | |||
} | |||
|
|||
// Keep ordering with current primary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate this? Do you mean that we should use raft options in order to notify primary replica about in-raft safeTime updates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a case in raft, then a leader can directly transfer leadership to follower.
For this case it's necessary to include ordering information, because such request ignores leader lease timeout.
Applicable only to timeoutnowrequest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any better options besides using raft options in order to transfer safeTime? If not please add detailed comment similar to
There is a case in raft, then a leader can directly transfer leadership to follower.
For this case it's necessary to include ordering information, because such request ignores leader lease timeout.
Applicable only to timeoutnowrequest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comment.
...replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
Show resolved
Hide resolved
.../src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
Show resolved
Hide resolved
@@ -86,6 +83,8 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { | |||
@InjectConfiguration("mock: { fsync: false }") | |||
private RaftConfiguration raftConfiguration; | |||
|
|||
private static final int MAX_CLOCK_SKEW = 500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max clock skew is a part of configuration, why not to inject it? Or at least adding a todo that will let us not to forget to inject it when maxClockSkew will be extracted from SchemaSynchronizationConfiguration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was already written this way. RaftGroupOptions is created manually. Let's keep it as is for simplicity.
Adding TODO requires a ticket - IMO overkill for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that you got my point
I'm suggesting to add something like
@InjectConfiguration("mock: { maxClockSkew: 500 }")
private SchemaSynchronizationConfiguration schemaSynchronizationConfiguration;
instead private static final int MAX_CLOCK_SKEW = 500;
and using it
RaftGroupOptions.defaults()
.maxClockSkew(schemaSynchronizationConfiguration.maxClockSkew())
Please be aware that I didn't check that "mock: { maxClockSkew: 500 }"
is correct syntax. Please consider it as a presudocode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🆗
// Because of clock.tick it's guaranteed that two different commands will have different safe timestamps. | ||
// maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried | ||
// by raft client because of either TimeoutException or inner raft server recoverable exception. | ||
assert proposedSafeTime >= maxObservableSafeTimeVerifier : "Safe time reordering detected [current=" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not remove such an assert. It's a method of verification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert is moved to SafeTimeValuesTracker
} else { | ||
assert false : "Command was not found [cmd=" + command + ']'; | ||
} | ||
|
||
if (applied[0]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Roma, that decreases the readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you suggest to fix this? Java doesn't support returning tuples unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may introduce a special class to return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to return IgniteBiTuple.
Not sure if it became better.
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
Outdated
Show resolved
Hide resolved
…nto ignite-23304 # Conflicts: # modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
This PR addresses the safe timestamp generation behavior for partition replication groups.
safeTs is an entity tied to majority based replication protocols and is used for serializable backups reads.
Each raft command is assigned a mototonic ts and a replica updates its local ts value on receiving replication commands.
All reads at safe ts are serializable
Currently safeTs is assigined on primary replica, which involves additional synchronizatoin (currently uses huge critical section) and involves retries (added latency).
Also it's bad from the pluggable replication point of view, because not all protocols require this concept.
Safe ts behavior was modified in the following way:
3.1 raft election timeout now accounts max clock skew. Then a new election starts on a node, it has local time higher than last generated safe ts.
3.2 hlc is propagated in timoutnow requests, then a leader directly transfers ownership to other candidate to maintain proper clock ordering.
Benchmark results
oracle JDK 21.0.4, Xeon Silver 4314, aipersist engine (20G pagecache size)
master revision=32737c0dc9fcd0632ba37e2949a40b199429fddb
8 thread(new)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 1 thrpt 20 197936.874 ± 12727.709 ops/s
16 threads(new)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 1 thrpt 20 254981.169 ± 21278.635 ops/s
32 threads(new)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 1 thrpt 20 286127.032 ± 16145.256 ops/s
8 threads(old)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 1 thrpt 20 86624.141 ± 3472.632 ops/s
16 threads(old)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 1 thrpt 20 89446.504 ± 6623.490 ops/s
32 threads(old)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 1 thrpt 20 89516.016 ± 6092.740 ops/s
It's obvious old version has zero scaling on writing to partition.
LOGIT_STORAGE_ENABLED=true
IGNITE_SKIP_REPLICATION_IN_BENCHMARK=false
IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK=false
32 threads(new)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 32 thrpt 20 229083.089 ± 36856.962 ops/s
32 thread(old)
Benchmark (batch) (fsync) (partitionCount) Mode Cnt Score Error Units
UpsertKvBenchmark.upsert 1 false 32 thrpt 20 181908.090 ± 26821.026 ops/s