From 0cb0d4cbb58b1cdc768f15ee7ea58fd89fd07a45 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Tue, 10 Sep 2024 12:12:20 +0200 Subject: [PATCH] [Java] Use unique `session-id` when creating live log recording and replaying data from the Cluster to ensure that the old replay data is not being picked up upon restart/reset. (cherry picked from commit 4d167e9f9bcda6ceb83ef9d082c6a0ea5064f26f) --- .../io/aeron/cluster/ClusterBackupAgent.java | 46 +++++++++++-------- .../io/aeron/cluster/ClusterBackupTest.java | 32 +++++++++++++ .../io/aeron/test/cluster/TestCluster.java | 27 +++++++---- 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackupAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackupAgent.java index 63c80e9983..19ff877aac 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackupAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackupAgent.java @@ -40,6 +40,7 @@ import io.aeron.cluster.service.ClusterMarkFile; import io.aeron.exceptions.TimeoutException; import io.aeron.logbuffer.Header; +import org.agrona.BitUtil; import org.agrona.CloseHelper; import org.agrona.DirectBuffer; import org.agrona.ErrorHandler; @@ -55,11 +56,7 @@ import java.util.concurrent.TimeUnit; import static io.aeron.Aeron.NULL_VALUE; -import static io.aeron.CommonContext.CONTROL_MODE_RESPONSE; -import static io.aeron.CommonContext.ENDPOINT_PARAM_NAME; -import static io.aeron.CommonContext.MDC_CONTROL_MODE_PARAM_NAME; -import static io.aeron.CommonContext.MDC_CONTROL_PARAM_NAME; -import static io.aeron.CommonContext.TAGS_PARAM_NAME; +import static io.aeron.CommonContext.*; import static io.aeron.archive.client.AeronArchive.NULL_LENGTH; import static io.aeron.archive.client.AeronArchive.NULL_POSITION; import static io.aeron.archive.client.AeronArchive.NULL_TIMESTAMP; @@ -142,7 +139,8 @@ public final class ClusterBackupAgent implements Agent private long liveLogRecordingId = NULL_VALUE; private long liveLogReplaySessionId = NULL_VALUE; private int leaderCommitPositionCounterId = NULL_VALUE; - private int liveLogRecCounterId = NULL_COUNTER_ID; + private int liveLogRecordingCounterId = NULL_COUNTER_ID; + private int liveLogRecordingSessionId = NULL_VALUE; ClusterBackupAgent(final ClusterBackup.Context ctx) { @@ -336,8 +334,9 @@ private void reset() logSupplierMember = null; leaderLogEntry = null; leaderLastTermEntry = null; - liveLogRecCounterId = NULL_COUNTER_ID; + liveLogRecordingCounterId = NULL_COUNTER_ID; liveLogRecordingId = NULL_VALUE; + liveLogRecordingSessionId = NULL_VALUE; snapshotsToRetrieve.clear(); snapshotsRetrieved.clear(); @@ -387,7 +386,7 @@ private void reset() private void onUnavailableCounter(final CountersReader counters, final long registrationId, final int counterId) { - if (counterId == liveLogRecCounterId) + if (counterId == liveLogRecordingCounterId) { if (null != eventsListener) { @@ -758,6 +757,11 @@ private int liveLogRecord(final long nowMs) if (NULL_VALUE == liveLogRecordingSubscriptionId) { + if (NULL_VALUE == liveLogRecordingSessionId) + { + liveLogRecordingSessionId = BitUtil.generateRandomisedId(); + } + final String catchupEndpoint = ctx.catchupEndpoint(); if (catchupEndpoint.endsWith(":0")) { @@ -766,10 +770,11 @@ private int liveLogRecord(final long nowMs) final ChannelUri channelUri = ChannelUri.parse(ctx.catchupChannel()); channelUri.remove(ENDPOINT_PARAM_NAME); channelUri.put(TAGS_PARAM_NAME, aeron.nextCorrelationId() + "," + aeron.nextCorrelationId()); + channelUri.put(SESSION_ID_PARAM_NAME, Integer.toString(liveLogRecordingSessionId)); recordingChannel = channelUri.toString(); - final String channel = recordingChannel + "|endpoint=" + catchupEndpoint; - recordingSubscription = aeron.addSubscription(channel, ctx.logStreamId()); + channelUri.put(ENDPOINT_PARAM_NAME, catchupEndpoint); + recordingSubscription = aeron.addSubscription(channelUri.toString(), ctx.logStreamId()); timeOfLastProgressMs = nowMs; return 1; } @@ -784,6 +789,7 @@ private int liveLogRecord(final long nowMs) final ChannelUri channelUri = ChannelUri.parse(ctx.catchupChannel()); channelUri.put(ENDPOINT_PARAM_NAME, catchupEndpoint); channelUri.replaceEndpointWildcardPort(resolvedEndpoint); + channelUri.put(SESSION_ID_PARAM_NAME, Integer.toString(liveLogRecordingSessionId)); replayChannel = channelUri.toString(); } @@ -792,6 +798,7 @@ private int liveLogRecord(final long nowMs) { final ChannelUri channelUri = ChannelUri.parse(ctx.catchupChannel()); channelUri.put(ENDPOINT_PARAM_NAME, catchupEndpoint); + channelUri.put(SESSION_ID_PARAM_NAME, Integer.toString(liveLogRecordingSessionId)); replayChannel = channelUri.toString(); recordingChannel = replayChannel; } @@ -849,16 +856,16 @@ else if (NULL_VALUE == liveLogReplaySessionId) timeOfLastProgressMs = nowMs; } } - else if (NULL_COUNTER_ID == liveLogRecCounterId) + else if (NULL_COUNTER_ID == liveLogRecordingCounterId) { final CountersReader countersReader = aeron.countersReader(); - liveLogRecCounterId = RecordingPos.findCounterIdBySession( + liveLogRecordingCounterId = RecordingPos.findCounterIdBySession( countersReader, (int)liveLogReplaySessionId, backupArchive.archiveId()); - if (NULL_COUNTER_ID != liveLogRecCounterId) + if (NULL_COUNTER_ID != liveLogRecordingCounterId) { - liveLogPositionCounter.setOrdered(countersReader.getCounterValue(liveLogRecCounterId)); - liveLogRecordingId = RecordingPos.getRecordingId(countersReader, liveLogRecCounterId); + liveLogPositionCounter.setOrdered(countersReader.getCounterValue(liveLogRecordingCounterId)); + liveLogRecordingId = RecordingPos.getRecordingId(countersReader, liveLogRecordingCounterId); timeOfLastBackupQueryMs = nowMs; timeOfLastProgressMs = nowMs; state(UPDATE_RECORDING_LOG, nowMs); @@ -959,15 +966,15 @@ private int backingUp(final long nowMs) workCount += 1; } - if (NULL_COUNTER_ID != liveLogRecCounterId) + if (NULL_COUNTER_ID != liveLogRecordingCounterId) { - final long liveLogPosition = aeron.countersReader().getCounterValue(liveLogRecCounterId); + final long liveLogPosition = aeron.countersReader().getCounterValue(liveLogRecordingCounterId); if (liveLogPositionCounter.proposeMaxOrdered(liveLogPosition)) { if (null != eventsListener) { - eventsListener.onLiveLogProgress(liveLogRecordingId, liveLogRecCounterId, liveLogPosition); + eventsListener.onLiveLogProgress(liveLogRecordingId, liveLogRecordingCounterId, liveLogPosition); } workCount += 1; @@ -1086,7 +1093,8 @@ private long startLogRecording() private boolean hasProgressStalled(final long nowMs) { - return (NULL_COUNTER_ID == liveLogRecCounterId) && (nowMs > (timeOfLastProgressMs + backupProgressTimeoutMs)); + return (NULL_COUNTER_ID == liveLogRecordingCounterId) && + (nowMs > (timeOfLastProgressMs + backupProgressTimeoutMs)); } private long replayStartPosition(final RecordingLog.Entry lastTerm) diff --git a/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterBackupTest.java b/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterBackupTest.java index 2fefa6260d..ec86fb8c33 100644 --- a/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterBackupTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterBackupTest.java @@ -20,6 +20,7 @@ import io.aeron.samples.archive.SampleAuthorisationService; import io.aeron.security.AuthenticatorSupplier; import io.aeron.security.AuthorisationServiceSupplier; +import io.aeron.security.NullCredentialsSupplier; import io.aeron.test.EventLogExtension; import io.aeron.test.InterruptAfter; import io.aeron.test.InterruptingTestCallback; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import static io.aeron.cluster.ClusterBackup.Configuration.ReplayStart.LATEST_SNAPSHOT; import static io.aeron.test.SystemTestWatcher.UNKNOWN_HOST_FILTER; @@ -627,6 +629,36 @@ void shouldBackupClusterAndJoinLive() cluster.awaitServiceMessageCount(node, 200); } + @ParameterizedTest + @ValueSource(ints = { 0, 8833 }) + @InterruptAfter(60) + void shouldResumeBackupIfStopped(final int catchupPort) + { + final TestCluster cluster = aCluster().withStaticNodes(3).start(); + systemTestWatcher.cluster(cluster); + + cluster.connectClient(); + final int initialMessageCount = 100_000; // minimum number of messages to trigger the bug + cluster.sendMessages(initialMessageCount); + cluster.awaitServicesMessageCount(initialMessageCount); + + final TestBackupNode backupNode = cluster.startClusterBackupNode( + true, new NullCredentialsSupplier(), ClusterBackup.SourceType.FOLLOWER, catchupPort); + cluster.awaitBackupLiveLogPosition(1); + backupNode.close(); + + final int delta = 1000; + cluster.sendMessages(delta); + + cluster.awaitServicesMessageCount(initialMessageCount + delta); + cluster.awaitResponseMessageCount(initialMessageCount + delta); + + cluster.startClusterBackupNode( + false, new NullCredentialsSupplier(), ClusterBackup.SourceType.FOLLOWER, catchupPort); + cluster.awaitBackupState(ClusterBackup.State.BACKING_UP); + cluster.awaitBackupLiveLogPosition(cluster.findLeader().service().cluster().logPosition()); + } + private static void awaitErrorLogged(final TestBackupNode testBackupNode, final String expectedErrorMessage) { final AtomicBuffer atomicBuffer = testBackupNode.clusterBackupErrorLog(); diff --git a/aeron-test-support/src/main/java/io/aeron/test/cluster/TestCluster.java b/aeron-test-support/src/main/java/io/aeron/test/cluster/TestCluster.java index 0e9f24fb88..87878533b4 100644 --- a/aeron-test-support/src/main/java/io/aeron/test/cluster/TestCluster.java +++ b/aeron-test-support/src/main/java/io/aeron/test/cluster/TestCluster.java @@ -355,6 +355,15 @@ public TestBackupNode startClusterBackupNode( final boolean cleanStart, final CredentialsSupplier credentialsSupplier, final ClusterBackup.SourceType sourceType) + { + return startClusterBackupNode(cleanStart, credentialsSupplier, sourceType, 0); + } + + public TestBackupNode startClusterBackupNode( + final boolean cleanStart, + final CredentialsSupplier credentialsSupplier, + final ClusterBackup.SourceType sourceType, + final int catchupEndpointPort) { final int index = staticMemberCount; final String baseDirName = clusterBaseDir + "-" + index; @@ -397,17 +406,17 @@ public TestBackupNode startClusterBackupNode( .clusterConsensusEndpoints(clusterConsensusEndpoints) .consensusChannel(consensusChannelUri.toString()) .clusterBackupCoolDownIntervalNs(TimeUnit.SECONDS.toNanos(1)) - .catchupEndpoint(hostname(index) + ":0") + .catchupEndpoint(hostname(index) + ":" + catchupEndpointPort) .archiveContext(new AeronArchive.Context() - .aeronDirectoryName(aeronDirName) - .controlRequestChannel(context.archiveContext.localControlChannel()) - .controlRequestStreamId(context.archiveContext.localControlStreamId()) - .controlResponseChannel("aeron:ipc?alias=backup-archive-local-resp") - .controlResponseStreamId(9090909 + index)) + .aeronDirectoryName(aeronDirName) + .controlRequestChannel(context.archiveContext.localControlChannel()) + .controlRequestStreamId(context.archiveContext.localControlStreamId()) + .controlResponseChannel("aeron:ipc?alias=backup-archive-local-resp") + .controlResponseStreamId(9090909 + index)) .clusterArchiveContext(new AeronArchive.Context() - .aeronDirectoryName(aeronDirName) - .controlRequestChannel(context.archiveContext.controlChannel()) - .controlResponseChannel(archiveControlResponseChannel(index))) + .aeronDirectoryName(aeronDirName) + .controlRequestChannel(context.archiveContext.controlChannel()) + .controlResponseChannel(archiveControlResponseChannel(index))) .clusterDir(new File(baseDirName, "cluster-backup")) .credentialsSupplier(credentialsSupplier) .sourceType(sourceType)