Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23304 #4821

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
32a4b69
IGNITE-23304 Rework safe time tracking logic.
ascherbakoff Nov 22, 2024
2def25f
IGNITE-23304 Rework safe time tracking logic.
ascherbakoff Nov 26, 2024
32ecacf
IGNITE-23304 Test wip.
ascherbakoff Nov 26, 2024
cfdbe94
IGNITE-23304 HLC inside raft wip.
ascherbakoff Nov 28, 2024
a12413f
IGNITE-23304 Primary replica ts propagation.
ascherbakoff Nov 29, 2024
7bc211e
IGNITE-23304 Add marshaller in metastore.
ascherbakoff Dec 2, 2024
4d0b9c5
IGNITE-23304 Use maxClockSkew to set election timeout.
ascherbakoff Dec 2, 2024
c0e1842
Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 i…
ascherbakoff Dec 3, 2024
836b7d8
IGNITE-23304 Fix metastore tests.
ascherbakoff Dec 3, 2024
e6e3dec
IGNITE-23304 Fix tests wip 1.
ascherbakoff Dec 3, 2024
661f502
IGNITE-23304 Fix tests wip 2.
ascherbakoff Dec 6, 2024
3e4af63
IGNITE-23304 Fix tests wip 3.
ascherbakoff Dec 6, 2024
ce04313
IGNITE-23304 Fix tests wip 4.
ascherbakoff Dec 6, 2024
3dbd60e
IGNITE-23304 Fix tests wip 6.
ascherbakoff Dec 9, 2024
4a6f8d0
IGNITE-23304 Fix tests wip 7.
ascherbakoff Dec 9, 2024
eb99fda
IGNITE-23304 Fix tests wip 8.
ascherbakoff Dec 9, 2024
2500708
IGNITE-23304 Fix tests wip 10.
ascherbakoff Dec 10, 2024
f6767b2
IGNITE-23304 Fix tests wip 11.
ascherbakoff Dec 10, 2024
dbe3fe1
Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 i…
ascherbakoff Dec 10, 2024
8279069
IGNITE-23304 Fix tests wip 12.
ascherbakoff Dec 10, 2024
638d15e
IGNITE-23304 Cleanup.
ascherbakoff Dec 11, 2024
da99340
IGNITE-23304 Fix review comments wip 1.
ascherbakoff Dec 18, 2024
f06f65c
IGNITE-23304 Fix review comments wip 2.
ascherbakoff Dec 18, 2024
688735b
IGNITE-23304 Fix review comments wip 3.
ascherbakoff Dec 18, 2024
6868f38
IGNITE-23304 Fix review comments wip 4.
ascherbakoff Dec 18, 2024
d9e71b9
IGNITE-23304 Fix review comments wip 5.
ascherbakoff Dec 18, 2024
6f606c8
IGNITE-23304 Fix review comments wip 7.
ascherbakoff Dec 19, 2024
0037235
IGNITE-23304 Fix review comments wip 8.
ascherbakoff Dec 23, 2024
e3c9d23
IGNITE-23304 Fix review comments wip 9.
ascherbakoff Dec 23, 2024
f61d2c7
IGNITE-23304 Fix review comments wip 10.
ascherbakoff Dec 24, 2024
7376767
IGNITE-23304 Fix review comments wip 11.
ascherbakoff Dec 24, 2024
311c7c3
IGNITE-23304 Fix review comments wip 12.
ascherbakoff Dec 25, 2024
29cb8fa
Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 i…
ascherbakoff Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@ public static class Replicator {

/** Stopping replica exception code. */
public static final int REPLICA_STOPPING_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 8);

/** Replication safe time reordering. */
public static final int REPLICATION_SAFE_TIME_REORDERING_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 9);
}

/** Storage error group. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ public interface HybridClock {
*/
long nowLong();

/**
* Creates a timestamp for new event. A timestamp is guarantied to be unique and monotonically grown and follow the causal.
*
* @return The hybrid timestamp.
*/
long nowLong(HybridTimestamp causal);

/**
* Gets a current timestamp as long. It is a fast way to get timestamp because it doesn't have to tick the logical part of the clock.
* This timestamp is not unique, and equal to or less than that value is returned by {@link this#nowLong()}.
Expand All @@ -44,7 +51,16 @@ public interface HybridClock {
HybridTimestamp now();

/**
* Gets a current timestamp. It is a fast way to get timestamp because it doesn't have to tick the logical part of the clock.
* Creates a timestamp for new event. A timestamp is guarantied to be unique and monotonically grown and follow the causal.
*
* @param causal The causal timestamp.
*
* @return The hybrid timestamp.
*/
HybridTimestamp now(HybridTimestamp causal);
rpuch marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets a current timestamp. It is a fast way to get timestamp because it doesn't have to tick.
* This timestamp is not unique, and equal to or less than that value is returned by {@link this#now()}.
*
* @return The hybrid timestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,6 @@ public class HybridClockImpl implements HybridClock {

private final List<ClockUpdateListener> updateListeners = new CopyOnWriteArrayList<>();

/**
* Returns current physical time in milliseconds.
*
* @return Current time.
*/
protected long physicalTime() {
return System.currentTimeMillis();
}

@Override
public final long nowLong() {
while (true) {
Expand All @@ -72,31 +63,40 @@ public final long nowLong() {
}

@Override
public final long currentLong() {
long current = currentTime();
public final long nowLong(HybridTimestamp causal) {
ascherbakoff marked this conversation as resolved.
Show resolved Hide resolved
while (true) {
long now = max(currentTime(), causal.longValue());

return max(latestTime, current);
}
// Read the latest time after accessing UTC time to reduce contention.
long oldLatestTime = latestTime;

private void notifyUpdateListeners(long newTs) {
for (ClockUpdateListener listener : updateListeners) {
try {
listener.onUpdate(newTs);
} catch (Throwable e) {
log.error("ClockUpdateListener#onUpdate() failed for {} at {}", e, listener, newTs);
if (oldLatestTime >= now) {
return LATEST_TIME.incrementAndGet(this);
}

if (e instanceof Error) {
throw e;
}
if (LATEST_TIME.compareAndSet(this, oldLatestTime, now)) {
return now;
}
}
}

@Override
public final long currentLong() {
long current = currentTime();

return max(latestTime, current);
}

@Override
public final HybridTimestamp now() {
return hybridTimestamp(nowLong());
}

@Override
public HybridTimestamp now(HybridTimestamp causal) {
return hybridTimestamp(nowLong(causal));
}

@Override
public final HybridTimestamp current() {
return hybridTimestamp(currentLong());
Expand Down Expand Up @@ -138,10 +138,33 @@ public final HybridTimestamp update(HybridTimestamp requestTime) {
}
}

/**
* Returns current physical time in milliseconds.
*
* @return Current time.
*/
protected long physicalTime() {
return System.currentTimeMillis();
}

private long currentTime() {
return physicalTime() << LOGICAL_TIME_BITS_SIZE;
}

private void notifyUpdateListeners(long newTs) {
for (ClockUpdateListener listener : updateListeners) {
try {
listener.onUpdate(newTs);
} catch (Throwable e) {
log.error("ClockUpdateListener#onUpdate() failed for {} at {}", e, listener, newTs);

if (e instanceof Error) {
throw e;
}
}
}
}

@Override
public void addUpdateListener(ClockUpdateListener listener) {
updateListeners.add(listener);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
}
}

/**
* Strict update with reordering check. Always called from the same updater thread.
rpuch marked this conversation as resolved.
Show resolved Hide resolved
*
* @param newValue New value.
* @param futureResult A result that will be used to complete a future returned by the
* {@link PendingComparableValuesTracker#waitFor(Comparable)}.
*/
public void updateStrict(T newValue, @Nullable R futureResult) {
rpuch marked this conversation as resolved.
Show resolved Hide resolved
if (!busyLock.readLock().tryLock()) {
throw new TrackerClosedException();
}

try {
Map.Entry<T, @Nullable R> current = this.current;

IgniteBiTuple<T, @Nullable R> newEntry = new IgniteBiTuple<>(newValue, futureResult);

// Entries from the same batch receive equal safe timestamps.
rpuch marked this conversation as resolved.
Show resolved Hide resolved
if (comparator.compare(newEntry, current) < 0) {
throw new AssertionError("Reordering detected: [old=" + current.getKey() + ", new=" + newEntry.get1() + ']');
}

CURRENT.set(this, newEntry);
completeWaitersOnUpdate(newValue, futureResult);
} finally {
busyLock.readLock().unlock();
}
}

/**
* Provides the future that is completed when this tracker's internal value reaches given one. If the internal value is greater or equal
* then the given one, returns completed future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.ignite.internal.hlc.HybridClockImpl;

/**
* Test hybrid clock with custom supplier of current time. TODO delete
* Test hybrid clock with custom supplier of current time.
*/
public class TestHybridClock extends HybridClockImpl {
/** Supplier of current time in milliseconds. */
Expand Down
1 change: 1 addition & 0 deletions modules/distribution-zones/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
testImplementation project(':ignite-metastorage')
testImplementation project(':ignite-system-view-api')
testImplementation project(':ignite-configuration-system')
testImplementation project(':ignite-replicator')
sanpwc marked this conversation as resolved.
Show resolved Hide resolved

testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-catalog')))
Expand Down
2 changes: 2 additions & 0 deletions modules/metastorage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation project(':ignite-configuration-system')
implementation project(':ignite-cluster-management')
implementation project(':ignite-network-api')
implementation project(':ignite-network')
rpuch marked this conversation as resolved.
Show resolved Hide resolved
implementation project(':ignite-vault')
implementation project(':ignite-raft-api')
implementation project(':ignite-raft')
Expand All @@ -38,6 +39,7 @@ dependencies {
implementation project(':ignite-failure-handler')
implementation project(':ignite-metrics')
implementation project(':ignite-system-disaster-recovery-api')
implementation project(':ignite-catalog')
rpuch marked this conversation as resolved.
Show resolved Hide resolved
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
/** Base meta storage write command. */
public interface MetaStorageWriteCommand extends WriteCommand {
/** Time on the initiator node. */
@Override
HybridTimestamp initiatorTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public interface DirectByteBufferStream {
*/
void writeInt(int val);

void writeFixedInt(int val);

void writeFixedLong(long val);

/**
* Writes {@code Integer}.
*
Expand Down Expand Up @@ -361,6 +365,10 @@ <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCo
*/
int readInt();

int readFixedInt();

long readFixedLong();

/**
* Reads {@code Integer}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,38 @@ public void writeInt(int val) {
writeVarInt(val + 1);
}

/** {@inheritDoc} */
@Override
public void writeFixedInt(int val) {
lastFinished = remainingInternal() >= Integer.BYTES;

if (lastFinished) {
int pos = buf.position();

GridUnsafe.putInt(heapArr, baseOff + pos, val);
ascherbakoff marked this conversation as resolved.
Show resolved Hide resolved

pos += Integer.BYTES;

setPosition(pos);
}
}

/** {@inheritDoc} */
@Override
public void writeFixedLong(long val) {
lastFinished = remainingInternal() >= Long.BYTES;

if (lastFinished) {
int pos = buf.position();

GridUnsafe.putLong(heapArr, baseOff + pos, val);

pos += Long.BYTES;

setPosition(pos);
}
}

@Override
public void writeBoxedInt(@Nullable Integer val) {
if (val != null) {
Expand Down Expand Up @@ -1123,6 +1155,40 @@ public int readInt() {
return val;
}

@Override
public int readFixedInt() {
lastFinished = remainingInternal() >= Integer.BYTES;

int val = 0;

if (lastFinished) {
int pos = buf.position();

val = GridUnsafe.getInt(heapArr, baseOff + pos);

setPosition(pos + Integer.BYTES);
}

return val;
}

@Override
public long readFixedLong() {
lastFinished = remainingInternal() >= Long.BYTES;

long val = 0;

if (lastFinished) {
int pos = buf.position();

val = GridUnsafe.getLong(heapArr, baseOff + pos);

setPosition(pos + Long.BYTES);
}

return val;
}

@Override
public @Nullable Integer readBoxedInt() {
return readBoxedValue(this::readInt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public interface PartitionCommandsMarshaller extends Marshaller {
* @return Catalog version. {@value #NO_VERSION_REQUIRED} if version is not required for the given command.
*/
int readRequiredCatalogVersion(ByteBuffer raw);

long readSafeTimestamp(ByteBuffer raw);
}
Loading