Skip to content

Commit

Permalink
fix a race between closing the archive and resetting state
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardWarburton committed Mar 11, 2022
1 parent cf34d51 commit 6d84f6f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongHashSet;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;
Expand All @@ -52,11 +53,13 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.LongConsumer;

import static io.aeron.Aeron.NULL_VALUE;
import static io.aeron.CommonContext.IPC_CHANNEL;
import static io.aeron.CommonContext.MTU_LENGTH_PARAM_NAME;
import static io.aeron.archive.client.AeronArchive.NULL_POSITION;
import static io.aeron.archive.codecs.SourceLocation.LOCAL;
import static io.aeron.archive.codecs.SourceLocation.REMOTE;
import static io.aeron.archive.status.RecordingPos.NULL_RECORDING_ID;
import static io.aeron.driver.Configuration.publicationReservedSessionIdHigh;
import static io.aeron.driver.Configuration.publicationReservedSessionIdLow;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
Expand Down Expand Up @@ -94,7 +97,8 @@ public static File recordingIdsFile(final EngineConfiguration configuration)
private final IdleStrategy idleStrategy = CommonConfiguration.backoffIdleStrategy();

private final SourceLocation outboundLocation;
private final LongHashSet trackedRegistrationIds = new LongHashSet();
private final MutableLong registrationId = new MutableLong(NULL_VALUE);
private final Long2LongHashMap trackedRegistrationIdToRecordingId = new Long2LongHashMap(NULL_RECORDING_ID);
private final Aeron aeron;
private final AeronArchive archive;
private final String channel;
Expand Down Expand Up @@ -289,9 +293,10 @@ private void extendRecording(
try
{
final String recordingChannel = ChannelUri.addSessionId(channel, sessionId);
final long recordingId = libraryExtendPosition.recordingId;
final long registrationId = archive.extendRecording(
libraryExtendPosition.recordingId, recordingChannel, streamId, LOCAL);
trackedRegistrationIds.add(registrationId);
recordingId, recordingChannel, streamId, LOCAL);
trackedRegistrationIdToRecordingId.put(registrationId, recordingId);
}
catch (final ArchiveException e)
{
Expand Down Expand Up @@ -411,14 +416,14 @@ private boolean startRecording(
{
if (recordingAlreadyStarted(sessionId))
{
// Can happen when a library reconnects, the registration id will already be stored
return true;
}

try
{
final String channel = ChannelUri.addSessionId(this.channel, sessionId);
final long registrationId = archive.startRecording(channel, streamId, local);
trackedRegistrationIds.add(registrationId);
registrationId.set(archive.startRecording(channel, streamId, local));

return true;
}
Expand All @@ -432,7 +437,7 @@ private boolean startRecording(

private boolean recordingAlreadyStarted(final int sessionId)
{
return RecordingPos.findCounterIdBySession(counters, sessionId) != Aeron.NULL_VALUE;
return RecordingPos.findCounterIdBySession(counters, sessionId) != NULL_VALUE;
}

// awaits the recording start and saves the file
Expand All @@ -441,6 +446,12 @@ private void checkRecordingStart(
{
final long recordingId = lookup.getRecordingId(sessionId);
recordingIds.add(recordingId);
final long registrationId = this.registrationId.get();
if (registrationId != NULL_VALUE)
{
trackedRegistrationIdToRecordingId.put(registrationId, recordingId);
this.registrationId.set(NULL_VALUE);
}

saveRecordingIdsFile();

Expand Down Expand Up @@ -524,11 +535,23 @@ private void awaitRecordingsCompletion(

private void shutdownArchiver()
{
final LongHashSet.LongIterator it = trackedRegistrationIds.iterator();
final Long2LongHashMap.EntryIterator it = trackedRegistrationIdToRecordingId.entrySet().iterator();

while (it.hasNext())
{
final long registrationId = it.nextValue();
it.next();
final long registrationId = it.getLongKey();
final long recordingId = it.getLongValue();
final int counterId = RecordingPos.findCounterIdByRecording(counters, recordingId);
archive.stopRecording(registrationId);
if (counterId != NULL_COUNTER_ID)
{
while (RecordingPos.isActive(counters, counterId, recordingId))
{
idleStrategy.idle();
}
idleStrategy.reset();
}
}

if (configuration.logAnyMessages())
Expand Down Expand Up @@ -573,6 +596,15 @@ boolean hasRecordingCompleted()

return false;
}

public String toString()
{
return "CompletingRecording{" +
"completedPosition=" + completedPosition +
", recordingId=" + recordingId +
", counterId=" + counterId +
'}';
}
}

RecordingIdLookup indexerInboundRecordingIdLookup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import io.aeron.Aeron;
import io.aeron.archive.client.AeronArchive;
import org.agrona.CloseHelper;
import uk.co.real_logic.artio.DebugLogger;

import java.io.File;

import static uk.co.real_logic.artio.LogTag.ARCHIVE_SCAN;

class ResetArchiveState
{
private final File backupLocation;
Expand Down Expand Up @@ -63,6 +66,17 @@ private void truncateArchive()
recordingCoordinator.forEachRecording(recordingId ->
{
final long startPosition = archive.getStartPosition(recordingId);
// If we hit some error case where
if (archive.getStopPosition(recordingId) != AeronArchive.NULL_POSITION)
{
final boolean tryStop = archive.tryStopRecordingByIdentity(recordingId);

if (DebugLogger.isEnabled(ARCHIVE_SCAN))
{
DebugLogger.log(ARCHIVE_SCAN, "Recording " + recordingId + " not stopped, stopping " +
(tryStop ? "succeeded" : "failed"));
}
}
archive.truncateRecording(recordingId, startPosition);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ public void shouldPerformEndOfDayOperationWithGatewaySession()
shouldPerformEndOfDayOperation(false);
}

// Reproduces a case where a library reconnect, continuing a previous recording
@Test
public void shouldResetStateEvenWithALibraryReconnect()
{
deleteLogs();
mediaDriver = launchMediaDriver();
launch(AUTOMATIC_INITIAL_SEQUENCE_NUMBER, AUTOMATIC_INITIAL_SEQUENCE_NUMBER);
acquireAcceptingSession();

messagesCanBeExchanged();

testSystem.remove(acceptingLibrary);
awaitLibraryDisconnect(acceptingEngine, testSystem);

testSystem.add(acceptingLibrary);
awaitLibraryCount(acceptingEngine, testSystem, 2);

testSystem.awaitBlocking(() -> acceptingEngine.close());
acceptingEngine.resetState(null);
}

private void shouldPerformEndOfDayOperation(final boolean libraryOwnsSession)
{
deleteLogs();
Expand Down Expand Up @@ -128,7 +149,7 @@ private void launch(

final Reply<Session> reply = connectPersistentSessions(
initiatorInitialSentSequenceNumber, initialReceivedSequenceNumber, false);
assertEquals("Repy failed: " + reply, Reply.State.COMPLETED, reply.state());
assertEquals("Reply failed: " + reply, Reply.State.COMPLETED, reply.state());
initiatingSession = reply.resultIfPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,15 @@ public static void awaitLibraryDisconnect(final FixEngine engine)
}

public static void awaitLibraryDisconnect(final FixEngine engine, final TestSystem testSystem)
{
awaitLibraryCount(engine, testSystem, 1);
}

public static void awaitLibraryCount(final FixEngine engine, final TestSystem testSystem, final int count)
{
assertEventuallyTrue(
() -> "libraries haven't disconnected yet",
() -> libraries(engine, testSystem).size() == 1,
() -> libraries(engine, testSystem).size() == count,
AWAIT_TIMEOUT_IN_MS,
() ->
{
Expand Down

0 comments on commit 6d84f6f

Please sign in to comment.