Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23304 #4821

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
32a4b69
IGNITE-23304 Rework safe time tracking logic.
ascherbakoff Nov 22, 2024
2def25f
IGNITE-23304 Rework safe time tracking logic.
ascherbakoff Nov 26, 2024
32ecacf
IGNITE-23304 Test wip.
ascherbakoff Nov 26, 2024
cfdbe94
IGNITE-23304 HLC inside raft wip.
ascherbakoff Nov 28, 2024
a12413f
IGNITE-23304 Primary replica ts propagation.
ascherbakoff Nov 29, 2024
7bc211e
IGNITE-23304 Add marshaller in metastore.
ascherbakoff Dec 2, 2024
4d0b9c5
IGNITE-23304 Use maxClockSkew to set election timeout.
ascherbakoff Dec 2, 2024
c0e1842
Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 i…
ascherbakoff Dec 3, 2024
836b7d8
IGNITE-23304 Fix metastore tests.
ascherbakoff Dec 3, 2024
e6e3dec
IGNITE-23304 Fix tests wip 1.
ascherbakoff Dec 3, 2024
661f502
IGNITE-23304 Fix tests wip 2.
ascherbakoff Dec 6, 2024
3e4af63
IGNITE-23304 Fix tests wip 3.
ascherbakoff Dec 6, 2024
ce04313
IGNITE-23304 Fix tests wip 4.
ascherbakoff Dec 6, 2024
3dbd60e
IGNITE-23304 Fix tests wip 6.
ascherbakoff Dec 9, 2024
4a6f8d0
IGNITE-23304 Fix tests wip 7.
ascherbakoff Dec 9, 2024
eb99fda
IGNITE-23304 Fix tests wip 8.
ascherbakoff Dec 9, 2024
2500708
IGNITE-23304 Fix tests wip 10.
ascherbakoff Dec 10, 2024
f6767b2
IGNITE-23304 Fix tests wip 11.
ascherbakoff Dec 10, 2024
dbe3fe1
Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 i…
ascherbakoff Dec 10, 2024
8279069
IGNITE-23304 Fix tests wip 12.
ascherbakoff Dec 10, 2024
638d15e
IGNITE-23304 Cleanup.
ascherbakoff Dec 11, 2024
da99340
IGNITE-23304 Fix review comments wip 1.
ascherbakoff Dec 18, 2024
f06f65c
IGNITE-23304 Fix review comments wip 2.
ascherbakoff Dec 18, 2024
688735b
IGNITE-23304 Fix review comments wip 3.
ascherbakoff Dec 18, 2024
6868f38
IGNITE-23304 Fix review comments wip 4.
ascherbakoff Dec 18, 2024
d9e71b9
IGNITE-23304 Fix review comments wip 5.
ascherbakoff Dec 18, 2024
6f606c8
IGNITE-23304 Fix review comments wip 7.
ascherbakoff Dec 19, 2024
0037235
IGNITE-23304 Fix review comments wip 8.
ascherbakoff Dec 23, 2024
e3c9d23
IGNITE-23304 Fix review comments wip 9.
ascherbakoff Dec 23, 2024
f61d2c7
IGNITE-23304 Fix review comments wip 10.
ascherbakoff Dec 24, 2024
7376767
IGNITE-23304 Fix review comments wip 11.
ascherbakoff Dec 24, 2024
311c7c3
IGNITE-23304 Fix review comments wip 12.
ascherbakoff Dec 25, 2024
29cb8fa
Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 i…
ascherbakoff Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@ public static class Replicator {

/** Stopping replica exception code. */
public static final int REPLICA_STOPPING_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 8);

/** Replication safe time reordering. */
public static final int REPLICATION_SAFE_TIME_REORDERING_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 9);
}

/** Storage error group. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface HybridClock {
HybridTimestamp now();

/**
* Gets a current timestamp. It is a fast way to get timestamp because it doesn't have to tick the logical part of the clock.
* Gets a current timestamp. It is a fast way to get timestamp because it doesn't have to tick.
* This timestamp is not unique, and equal to or less than that value is returned by {@link this#now()}.
*
* @return The hybrid timestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,6 @@ public class HybridClockImpl implements HybridClock {

private final List<ClockUpdateListener> updateListeners = new CopyOnWriteArrayList<>();

/**
* Returns current physical time in milliseconds.
*
* @return Current time.
*/
protected long physicalTime() {
return System.currentTimeMillis();
}

@Override
public final long nowLong() {
while (true) {
Expand All @@ -78,20 +69,6 @@ public final long currentLong() {
return max(latestTime, current);
}

private void notifyUpdateListeners(long newTs) {
for (ClockUpdateListener listener : updateListeners) {
try {
listener.onUpdate(newTs);
} catch (Throwable e) {
log.error("ClockUpdateListener#onUpdate() failed for {} at {}", e, listener, newTs);

if (e instanceof Error) {
throw e;
}
}
}
}

@Override
public final HybridTimestamp now() {
return hybridTimestamp(nowLong());
Expand Down Expand Up @@ -138,10 +115,33 @@ public final HybridTimestamp update(HybridTimestamp requestTime) {
}
}

/**
* Returns current physical time in milliseconds.
*
* @return Current time.
*/
protected long physicalTime() {
return System.currentTimeMillis();
}

private long currentTime() {
return physicalTime() << LOGICAL_TIME_BITS_SIZE;
}

private void notifyUpdateListeners(long newTs) {
for (ClockUpdateListener listener : updateListeners) {
try {
listener.onUpdate(newTs);
} catch (Throwable e) {
log.error("ClockUpdateListener#onUpdate() failed for {} at {}", e, listener, newTs);

if (e instanceof Error) {
throw e;
}
}
}
}

@Override
public void addUpdateListener(ClockUpdateListener listener) {
updateListeners.add(listener);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* ability to wait for certain value, see {@link #waitFor(Comparable)}.
*/
public class PendingComparableValuesTracker<T extends Comparable<T>, R> implements ManuallyCloseable {
private static final VarHandle CURRENT;
protected static final VarHandle CURRENT;

private static final VarHandle CLOSE_GUARD;

Expand All @@ -55,7 +55,7 @@ public class PendingComparableValuesTracker<T extends Comparable<T>, R> implemen

/** Current value along with associated result. */
@SuppressWarnings("FieldMayBeFinal") // Changed through CURRENT VarHandle.
private volatile Map.Entry<T, @Nullable R> current;
protected volatile Map.Entry<T, @Nullable R> current;

/** Prevents double closing. */
@SuppressWarnings("unused")
Expand All @@ -64,7 +64,7 @@ public class PendingComparableValuesTracker<T extends Comparable<T>, R> implemen
/** Busy lock to close synchronously. */
private final IgniteStripedReadWriteLock busyLock = new IgniteStripedReadWriteLock();

private final Comparator<Map.Entry<T, @Nullable R>> comparator;
protected final Comparator<Map.Entry<T, @Nullable R>> comparator;

/**
* Constructor with initial value.
Expand Down Expand Up @@ -203,11 +203,11 @@ public boolean isEmpty() {
return valueFutures.isEmpty();
}

private boolean enterBusy() {
protected boolean enterBusy() {
rpuch marked this conversation as resolved.
Show resolved Hide resolved
return !busyLock.isWriteLockedByCurrentThread() && busyLock.readLock().tryLock();
}

private void leaveBusy() {
protected void leaveBusy() {
busyLock.readLock().unlock();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.util;

import java.util.Map;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;

/**
* Safe time values tracker. Checks for reordering. Must be updated from a single thread.
*/
public class SafeTimeValuesTracker extends PendingComparableValuesTracker<HybridTimestamp, Void> {
public SafeTimeValuesTracker(HybridTimestamp initialValue) {
super(initialValue);
}

@Override
public void update(HybridTimestamp newValue, @Nullable Void futureResult) {
if (!enterBusy()) {
throw new TrackerClosedException();
}

try {
Map.Entry<HybridTimestamp, @Nullable Void> current = this.current;

IgniteBiTuple<HybridTimestamp, @Nullable Void> newEntry = new IgniteBiTuple<>(newValue, futureResult);

// Entries from the same batch receive equal safe timestamps.
if (comparator.compare(newEntry, current) < 0) {
throw new AssertionError("Reordering detected: [old=" + current.getKey() + ", new=" + newEntry.get1() + ']');
}

CURRENT.set(this, newEntry);
completeWaitersOnUpdate(newValue, futureResult);
} finally {
leaveBusy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.ignite.internal.hlc.HybridClockImpl;

/**
* Test hybrid clock with custom supplier of current time. TODO delete
* Test hybrid clock with custom supplier of current time.
*/
public class TestHybridClock extends HybridClockImpl {
/** Supplier of current time in milliseconds. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
/** Base meta storage write command. */
public interface MetaStorageWriteCommand extends WriteCommand {
/** Time on the initiator node. */
@Override
HybridTimestamp initiatorTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public interface DirectByteBufferStream {
*/
void writeInt(int val);

void writeFixedInt(int val);

void writeFixedLong(long val);

/**
* Writes {@code Integer}.
*
Expand Down Expand Up @@ -361,6 +365,10 @@ <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCo
*/
int readInt();

int readFixedInt();

long readFixedLong();

/**
* Reads {@code Integer}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,46 @@ public void writeInt(int val) {
writeVarInt(val + 1);
}

/** {@inheritDoc} */
@Override
public void writeFixedInt(int val) {
lastFinished = remainingInternal() >= Integer.BYTES;

if (lastFinished) {
int pos = buf.position();

if (IS_BIG_ENDIAN) {
GridUnsafe.putIntLittleEndian(heapArr, baseOff + pos, val);
} else {
GridUnsafe.putInt(heapArr, baseOff + pos, val);
}

pos += Integer.BYTES;

setPosition(pos);
}
}

/** {@inheritDoc} */
@Override
public void writeFixedLong(long val) {
lastFinished = remainingInternal() >= Long.BYTES;

if (lastFinished) {
int pos = buf.position();

if (IS_BIG_ENDIAN) {
GridUnsafe.putLongLittleEndian(heapArr, baseOff + pos, val);
} else {
GridUnsafe.putLong(heapArr, baseOff + pos, val);
}

pos += Long.BYTES;

setPosition(pos);
}
}

@Override
public void writeBoxedInt(@Nullable Integer val) {
if (val != null) {
Expand Down Expand Up @@ -1123,6 +1163,40 @@ public int readInt() {
return val;
}

@Override
public int readFixedInt() {
lastFinished = remainingInternal() >= Integer.BYTES;

int val = 0;

if (lastFinished) {
int pos = buf.position();

val = GridUnsafe.getInt(heapArr, baseOff + pos);

setPosition(pos + Integer.BYTES);
}

return val;
}

@Override
public long readFixedLong() {
lastFinished = remainingInternal() >= Long.BYTES;

long val = 0;

if (lastFinished) {
int pos = buf.position();

val = GridUnsafe.getLong(heapArr, baseOff + pos);

setPosition(pos + Long.BYTES);
}

return val;
}

@Override
public @Nullable Integer readBoxedInt() {
return readBoxedValue(this::readInt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ public interface PartitionCommandsMarshaller extends Marshaller {
* @return Catalog version. {@value #NO_VERSION_REQUIRED} if version is not required for the given command.
*/
int readRequiredCatalogVersion(ByteBuffer raw);

/**
* Reads safe timestamp from the provided buffer.
*
* @param raw Buffer to read from.
* @return The timestamp.
*/
long readSafeTimestamp(ByteBuffer raw);
}
Loading