From 2572ad34097110f8451a21bfddeacb8cf74d4fd6 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 13:27:38 -0400 Subject: [PATCH 01/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/engine/framer/EngineStreamInfo.java | 9 +++ .../artio/engine/framer/Framer.java | 3 +- .../MessageBasedAcceptorSystemTest.java | 70 +++++++++++++++++-- .../artio/system_tests/SystemTestUtil.java | 46 ++++++++++++ 4 files changed, 121 insertions(+), 7 deletions(-) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java index c87bbbd775..0c60b025d1 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java @@ -4,6 +4,7 @@ public final class EngineStreamInfo { private final long inboundIndexSubscriptionRegistrationId; private final long outboundIndexSubscriptionRegistrationId; + private final long librarySubscriptionRegistrationId; private final int inboundPublicationSessionId; private final long inboundPublicationPosition; private final int outboundPublicationSessionId; @@ -12,6 +13,7 @@ public final class EngineStreamInfo EngineStreamInfo( final long inboundIndexSubscriptionRegistrationId, final long outboundIndexSubscriptionRegistrationId, + final long librarySubscriptionRegistrationId, final int inboundPublicationSessionId, final long inboundPublicationPosition, final int outboundPublicationSessionId, @@ -19,6 +21,7 @@ public final class EngineStreamInfo { this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId; this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId; + this.librarySubscriptionRegistrationId = librarySubscriptionRegistrationId; this.inboundPublicationSessionId = inboundPublicationSessionId; this.inboundPublicationPosition = inboundPublicationPosition; this.outboundPublicationSessionId = outboundPublicationSessionId; @@ -35,6 +38,11 @@ public long outboundIndexSubscriptionRegistrationId() return outboundIndexSubscriptionRegistrationId; } + public long librarySubscriptionRegistrationId() + { + return librarySubscriptionRegistrationId; + } + public int inboundPublicationSessionId() { return inboundPublicationSessionId; @@ -60,6 +68,7 @@ public String toString() return "EngineStreamInfo{" + "inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId + ", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId + + ", librarySubscriptionRegistrationId=" + librarySubscriptionRegistrationId + ", inboundPublicationSessionId=" + inboundPublicationSessionId + ", inboundPublicationPosition=" + inboundPublicationPosition + ", outboundPublicationSessionId=" + outboundPublicationSessionId + diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index 76a07bc1d1..3f613724f4 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -1723,7 +1723,7 @@ private void checkOfflineSequenceReset(final long sessionId, final long messageT if (entry != null) { final SessionContext context = entry.getValue(); - context.onSequenceReset(clock.nanoTime()); + context.onSequenceIndex(clock.nanoTime(), sequenceIndex); } } else if (messageType == SEQUENCE_RESET_MESSAGE_TYPE) @@ -3503,6 +3503,7 @@ public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand comma command.complete(new EngineStreamInfo( inboundIndexRegistrationId, outboundIndexRegistrationId, + librarySubscription.registrationId(), inboundPublication.sessionId(), inboundPublication.position(), outboundPublication.sessionId(), diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index a56413918a..9bb10748f4 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -23,12 +23,11 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.decoder.*; import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; -import uk.co.real_logic.artio.messages.DisconnectReason; -import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner; -import uk.co.real_logic.artio.messages.SessionReplyStatus; -import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus; +import uk.co.real_logic.artio.messages.*; import uk.co.real_logic.artio.session.Session; +import uk.co.real_logic.artio.session.SessionWriter; import uk.co.real_logic.artio.util.MutableAsciiBuffer; import java.io.IOException; @@ -55,8 +54,7 @@ import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*; import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE; import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*; @@ -609,6 +607,66 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo }); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() + throws IOException + { + setup(true, true); + setupLibrary(); + + final SessionWriter sessionWriter = createFollowerSession( + TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); + assertEquals(SessionReplyStatus.OK, requestSession(library, sessionWriter.id(), testSystem)); + + try (FixConnection connection = FixConnection.initiate(port)) + { + connection.logon(true); + Timing.assertEventuallyTrue("Library did not transition session to connected", + () -> + { + library.poll(1); + final List sessions = library.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; + } + ); + } + + Timing.assertEventuallyTrue("Fix connection was not disconnected", + () -> + { + final Reply> libraryReply = engine.libraries(); + while (!libraryReply.hasCompleted()) + { + sleep(100); + } + + final List allLibraryInfo = libraryReply.resultIfPresent(); + for (final LibraryInfo libraryInfo : allLibraryInfo) { + if (libraryInfo.libraryId() == libraryId) { + return libraryInfo.sessions().isEmpty(); + } + } + return false; + } + ); + + Timing.assertEventuallyTrue("Library did not transition session to active", + () -> + { + library.poll(1); + final List sessions = library.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; + } + ); + + assertEngineSubscriptionCaughtUpToLibraryPublication( + testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); + + final List framerSessionContext = engine.allSessions(); + assertEquals(1, framerSessionContext.size()); + assertEquals(0, framerSessionContext.get(0).sequenceIndex()); + } + @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldHandleOnlineResetFollowedByDisconnectAndRestart() throws IOException { diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index d8b78853b4..1eb27e1995 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -707,4 +707,50 @@ static void awaitIndexerCaughtUp( () -> {}); } } + + static void assertEngineSubscriptionCaughtUpToLibraryPublication( + final TestSystem testSystem, + final String aeronDirectoryName, + final FixEngine engine, + final FixLibrary library) + { + final EngineStreamInfo engineStreamInfo = + testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); + + final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library); + + final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(aeronDirectoryName); + try (Aeron aeron = Aeron.connect(aeronCtx)) + { + final CountersReader countersReader = aeron.countersReader(); + + final SubPosMatcher subPosMatcher = new SubPosMatcher( + countersReader, + engineStreamInfo.librarySubscriptionRegistrationId(), + libraryStreamInfo.outboundPublicationSessionId(), + libraryStreamInfo.outboundPublicationPosition()); + + countersReader.forEach((counterId, typeId, keyBuffer, label) -> + subPosMatcher.tryMatch(counterId, typeId, keyBuffer)); + + assertEventuallyTrue( + () -> + { + final StringBuilder builder = new StringBuilder(); + builder.append("expected sub-pos counters:\n"); + builder.append(subPosMatcher).append('\n'); + builder.append("\nbut counters were:\n"); + countersReader.forEach((value, counterId, label) -> + builder.append(String.format("%d: %d - %s%n", counterId, value, label))); + return builder.toString(); + }, + () -> + { + testSystem.poll(); + return subPosMatcher.isCaughtUp(); + }, + DEFAULT_TIMEOUT_IN_MS, + () -> {}); + } + } } From c36713c42525cb6b0d0561bb96c96d4669769f2c Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 13:36:50 -0400 Subject: [PATCH 02/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../MessageBasedAcceptorSystemTest.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 9bb10748f4..0749dd385e 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -614,9 +614,17 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef setup(true, true); setupLibrary(); + final List noSessionContext = engine.allSessions(); + assertEquals(0, noSessionContext.size()); + final SessionWriter sessionWriter = createFollowerSession( TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); - assertEquals(SessionReplyStatus.OK, requestSession(library, sessionWriter.id(), testSystem)); + final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); + assertEquals(SessionReplyStatus.OK, requestSessionReply); + + final List sessionContextAfterFollower = engine.allSessions(); + assertEquals(1, sessionContextAfterFollower.size()); + assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFollower.get(0).sequenceIndex()); try (FixConnection connection = FixConnection.initiate(port)) { @@ -629,6 +637,10 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; } ); + + final List sessionContextAfterFramerAuthenticates = engine.allSessions(); + assertEquals(1, sessionContextAfterFramerAuthenticates.size()); + assertEquals(0, sessionContextAfterFramerAuthenticates.get(0).sequenceIndex()); } Timing.assertEventuallyTrue("Fix connection was not disconnected", @@ -662,9 +674,9 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef assertEngineSubscriptionCaughtUpToLibraryPublication( testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); - final List framerSessionContext = engine.allSessions(); - assertEquals(1, framerSessionContext.size()); - assertEquals(0, framerSessionContext.get(0).sequenceIndex()); + final List sessionContextAfterLogonNoSenderEndpoint = engine.allSessions(); + assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); + assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); } @Test(timeout = TEST_TIMEOUT_IN_MS) From f34f4b887751db12c68e7b268412aa35ef47e4e2 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 13:37:43 -0400 Subject: [PATCH 03/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/system_tests/MessageBasedAcceptorSystemTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 0749dd385e..8b094fc828 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -622,9 +622,9 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); assertEquals(SessionReplyStatus.OK, requestSessionReply); - final List sessionContextAfterFollower = engine.allSessions(); - assertEquals(1, sessionContextAfterFollower.size()); - assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFollower.get(0).sequenceIndex()); + final List sessionContextAfterFramerCreatesFollower = engine.allSessions(); + assertEquals(1, sessionContextAfterFramerCreatesFollower.size()); + assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFramerCreatesFollower.get(0).sequenceIndex()); try (FixConnection connection = FixConnection.initiate(port)) { From cbfffe3c1ad736d0ea0d03946c12c22d0fa77fff Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 14:04:39 -0400 Subject: [PATCH 04/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/system_tests/MessageBasedAcceptorSystemTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 8b094fc828..52909b6a8f 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -622,10 +622,6 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); assertEquals(SessionReplyStatus.OK, requestSessionReply); - final List sessionContextAfterFramerCreatesFollower = engine.allSessions(); - assertEquals(1, sessionContextAfterFramerCreatesFollower.size()); - assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFramerCreatesFollower.get(0).sequenceIndex()); - try (FixConnection connection = FixConnection.initiate(port)) { connection.logon(true); @@ -649,7 +645,7 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef final Reply> libraryReply = engine.libraries(); while (!libraryReply.hasCompleted()) { - sleep(100); + sleep(500); } final List allLibraryInfo = libraryReply.resultIfPresent(); From 0147841750ae4ac8dad4614e0d64ef3c85e8c9cb Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 14:11:50 -0400 Subject: [PATCH 05/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/system_tests/MessageBasedAcceptorSystemTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 52909b6a8f..c034a8c910 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -633,10 +633,6 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; } ); - - final List sessionContextAfterFramerAuthenticates = engine.allSessions(); - assertEquals(1, sessionContextAfterFramerAuthenticates.size()); - assertEquals(0, sessionContextAfterFramerAuthenticates.get(0).sequenceIndex()); } Timing.assertEventuallyTrue("Fix connection was not disconnected", From 45131201926b831569b9721a0c677f481d4db9d5 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 17:22:52 -0400 Subject: [PATCH 06/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../AbstractGatewayToGatewaySystemTest.java | 17 ++--- .../MessageBasedAcceptorSystemTest.java | 2 +- ...uenceNumberGatewayToGatewaySystemTest.java | 66 +++++++++++++++++++ 3 files changed, 74 insertions(+), 11 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java index 55e0a81903..5803b9f220 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java @@ -830,17 +830,14 @@ void assertReplayReceivedMessages() void sleep(final int timeInMs) { - testSystem.awaitBlocking(() -> + try { - try - { - Thread.sleep(timeInMs); - } - catch (final InterruptedException e) - { - e.printStackTrace(); - } - }); + Thread.sleep(timeInMs); + } + catch (final InterruptedException e) + { + e.printStackTrace(); + } } void assertResendsCompleted(final int count, final Matcher> items) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index c034a8c910..a7654bd684 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -611,7 +611,7 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() throws IOException { - setup(true, true); + setup(false, true); setupLibrary(); final List noSessionContext = engine.allSessions(); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java index 50758dd97b..98166b9f95 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java @@ -27,6 +27,8 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.engine.EngineConfiguration; import uk.co.real_logic.artio.engine.FixEngine; +import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.fields.UtcTimestampEncoder; import uk.co.real_logic.artio.library.DynamicLibraryScheduler; import uk.co.real_logic.artio.messages.DisconnectReason; @@ -42,6 +44,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.List; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -1015,6 +1018,69 @@ private void connectPersistingSessions() connectPersistingSessions(AUTOMATIC_INITIAL_SEQUENCE_NUMBER, false); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() + throws IOException + { + launch(this::nothing); + + final List noSessionContext = acceptingEngine.allSessions(); + assertEquals(0, noSessionContext.size()); + + final SessionWriter sessionWriter = createFollowerSession( + TEST_TIMEOUT_IN_MS, testSystem, acceptingLibrary, INITIATOR_ID, ACCEPTOR_ID); + final SessionReplyStatus requestSessionReply = requestSession(acceptingLibrary, sessionWriter.id(), testSystem); + assertEquals(SessionReplyStatus.OK, requestSessionReply); + + try (FixConnection connection = FixConnection.initiate(port)) + { + connection.logon(true); + Timing.assertEventuallyTrue("Library did not transition session to connected", + () -> + { + acceptingLibrary.poll(1); + final List sessions = acceptingLibrary.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; + } + ); + } + + Timing.assertEventuallyTrue("Fix connection was not disconnected", + () -> + { + final Reply> libraryReply = acceptingEngine.libraries(); + while (!libraryReply.hasCompleted()) + { + sleep(500); + } + + final List allLibraryInfo = libraryReply.resultIfPresent(); + for (final LibraryInfo libraryInfo : allLibraryInfo) { + if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) { + return libraryInfo.sessions().isEmpty(); + } + } + return false; + } + ); + + Timing.assertEventuallyTrue("Library did not transition session to active", + () -> + { + acceptingLibrary.poll(1); + final List sessions = acceptingLibrary.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; + } + ); + + assertEngineSubscriptionCaughtUpToLibraryPublication( + testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), acceptingEngine, acceptingLibrary); + + final List sessionContextAfterLogonNoSenderEndpoint = acceptingEngine.allSessions(); + assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); + assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); + } + private void resetSequenceNumbers() { testSystem.resetSequenceNumber(initiatingEngine, initiatingSession.id()); From 02bab7c1fce16e339ddafbe03246ccff3713dde5 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 17:25:25 -0400 Subject: [PATCH 07/17] Move test to persistent sequence numbers --- .../MessageBasedAcceptorSystemTest.java | 74 ++----------------- 1 file changed, 6 insertions(+), 68 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index a7654bd684..a56413918a 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -23,11 +23,12 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.decoder.*; import uk.co.real_logic.artio.engine.SessionInfo; -import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; -import uk.co.real_logic.artio.messages.*; +import uk.co.real_logic.artio.messages.DisconnectReason; +import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner; +import uk.co.real_logic.artio.messages.SessionReplyStatus; +import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus; import uk.co.real_logic.artio.session.Session; -import uk.co.real_logic.artio.session.SessionWriter; import uk.co.real_logic.artio.util.MutableAsciiBuffer; import java.io.IOException; @@ -54,7 +55,8 @@ import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS; import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE; import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*; @@ -607,70 +609,6 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo }); } - @Test(timeout = TEST_TIMEOUT_IN_MS) - public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() - throws IOException - { - setup(false, true); - setupLibrary(); - - final List noSessionContext = engine.allSessions(); - assertEquals(0, noSessionContext.size()); - - final SessionWriter sessionWriter = createFollowerSession( - TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); - final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); - assertEquals(SessionReplyStatus.OK, requestSessionReply); - - try (FixConnection connection = FixConnection.initiate(port)) - { - connection.logon(true); - Timing.assertEventuallyTrue("Library did not transition session to connected", - () -> - { - library.poll(1); - final List sessions = library.sessions(); - return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; - } - ); - } - - Timing.assertEventuallyTrue("Fix connection was not disconnected", - () -> - { - final Reply> libraryReply = engine.libraries(); - while (!libraryReply.hasCompleted()) - { - sleep(500); - } - - final List allLibraryInfo = libraryReply.resultIfPresent(); - for (final LibraryInfo libraryInfo : allLibraryInfo) { - if (libraryInfo.libraryId() == libraryId) { - return libraryInfo.sessions().isEmpty(); - } - } - return false; - } - ); - - Timing.assertEventuallyTrue("Library did not transition session to active", - () -> - { - library.poll(1); - final List sessions = library.sessions(); - return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; - } - ); - - assertEngineSubscriptionCaughtUpToLibraryPublication( - testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); - - final List sessionContextAfterLogonNoSenderEndpoint = engine.allSessions(); - assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); - assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); - } - @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldHandleOnlineResetFollowedByDisconnectAndRestart() throws IOException { From d9c1014a392d3551d54d2f4de1946fd004a34437 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 21:11:46 -0400 Subject: [PATCH 08/17] Formatting --- ...stentSequenceNumberGatewayToGatewaySystemTest.java | 11 ++++++----- .../real_logic/artio/system_tests/SystemTestUtil.java | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java index 98166b9f95..32cbed382b 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java @@ -1020,15 +1020,14 @@ private void connectPersistingSessions() @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() - throws IOException + throws IOException { launch(this::nothing); final List noSessionContext = acceptingEngine.allSessions(); assertEquals(0, noSessionContext.size()); - final SessionWriter sessionWriter = createFollowerSession( - TEST_TIMEOUT_IN_MS, testSystem, acceptingLibrary, INITIATOR_ID, ACCEPTOR_ID); + final SessionWriter sessionWriter = createFollowerSession(TEST_TIMEOUT_IN_MS); final SessionReplyStatus requestSessionReply = requestSession(acceptingLibrary, sessionWriter.id(), testSystem); assertEquals(SessionReplyStatus.OK, requestSessionReply); @@ -1055,8 +1054,10 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef } final List allLibraryInfo = libraryReply.resultIfPresent(); - for (final LibraryInfo libraryInfo : allLibraryInfo) { - if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) { + for (final LibraryInfo libraryInfo : allLibraryInfo) + { + if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) + { return libraryInfo.sessions().isEmpty(); } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index 1eb27e1995..84bc4fc74b 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -709,13 +709,13 @@ static void awaitIndexerCaughtUp( } static void assertEngineSubscriptionCaughtUpToLibraryPublication( - final TestSystem testSystem, - final String aeronDirectoryName, - final FixEngine engine, - final FixLibrary library) + final TestSystem testSystem, + final String aeronDirectoryName, + final FixEngine engine, + final FixLibrary library) { final EngineStreamInfo engineStreamInfo = - testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); + testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library); From 40f2b893f2ea9e1460d5ad8a5a0486de8653e188 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 13:27:38 -0400 Subject: [PATCH 09/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/engine/framer/EngineStreamInfo.java | 9 +++ .../artio/engine/framer/Framer.java | 3 +- .../MessageBasedAcceptorSystemTest.java | 70 +++++++++++++++++-- .../artio/system_tests/SystemTestUtil.java | 46 ++++++++++++ 4 files changed, 121 insertions(+), 7 deletions(-) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java index c87bbbd775..0c60b025d1 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/EngineStreamInfo.java @@ -4,6 +4,7 @@ public final class EngineStreamInfo { private final long inboundIndexSubscriptionRegistrationId; private final long outboundIndexSubscriptionRegistrationId; + private final long librarySubscriptionRegistrationId; private final int inboundPublicationSessionId; private final long inboundPublicationPosition; private final int outboundPublicationSessionId; @@ -12,6 +13,7 @@ public final class EngineStreamInfo EngineStreamInfo( final long inboundIndexSubscriptionRegistrationId, final long outboundIndexSubscriptionRegistrationId, + final long librarySubscriptionRegistrationId, final int inboundPublicationSessionId, final long inboundPublicationPosition, final int outboundPublicationSessionId, @@ -19,6 +21,7 @@ public final class EngineStreamInfo { this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId; this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId; + this.librarySubscriptionRegistrationId = librarySubscriptionRegistrationId; this.inboundPublicationSessionId = inboundPublicationSessionId; this.inboundPublicationPosition = inboundPublicationPosition; this.outboundPublicationSessionId = outboundPublicationSessionId; @@ -35,6 +38,11 @@ public long outboundIndexSubscriptionRegistrationId() return outboundIndexSubscriptionRegistrationId; } + public long librarySubscriptionRegistrationId() + { + return librarySubscriptionRegistrationId; + } + public int inboundPublicationSessionId() { return inboundPublicationSessionId; @@ -60,6 +68,7 @@ public String toString() return "EngineStreamInfo{" + "inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId + ", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId + + ", librarySubscriptionRegistrationId=" + librarySubscriptionRegistrationId + ", inboundPublicationSessionId=" + inboundPublicationSessionId + ", inboundPublicationPosition=" + inboundPublicationPosition + ", outboundPublicationSessionId=" + outboundPublicationSessionId + diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index 49fd8167ac..4950f7d5be 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -1722,7 +1722,7 @@ private void checkOfflineSequenceReset(final long sessionId, final long messageT if (entry != null) { final SessionContext context = entry.getValue(); - context.onSequenceReset(clock.nanoTime()); + context.onSequenceIndex(clock.nanoTime(), sequenceIndex); } } else if (messageType == SEQUENCE_RESET_MESSAGE_TYPE) @@ -3502,6 +3502,7 @@ public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand comma command.complete(new EngineStreamInfo( inboundIndexRegistrationId, outboundIndexRegistrationId, + librarySubscription.registrationId(), inboundPublication.sessionId(), inboundPublication.position(), outboundPublication.sessionId(), diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index a56413918a..9bb10748f4 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -23,12 +23,11 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.decoder.*; import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; -import uk.co.real_logic.artio.messages.DisconnectReason; -import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner; -import uk.co.real_logic.artio.messages.SessionReplyStatus; -import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus; +import uk.co.real_logic.artio.messages.*; import uk.co.real_logic.artio.session.Session; +import uk.co.real_logic.artio.session.SessionWriter; import uk.co.real_logic.artio.util.MutableAsciiBuffer; import java.io.IOException; @@ -55,8 +54,7 @@ import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*; import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE; import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*; @@ -609,6 +607,66 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo }); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() + throws IOException + { + setup(true, true); + setupLibrary(); + + final SessionWriter sessionWriter = createFollowerSession( + TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); + assertEquals(SessionReplyStatus.OK, requestSession(library, sessionWriter.id(), testSystem)); + + try (FixConnection connection = FixConnection.initiate(port)) + { + connection.logon(true); + Timing.assertEventuallyTrue("Library did not transition session to connected", + () -> + { + library.poll(1); + final List sessions = library.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; + } + ); + } + + Timing.assertEventuallyTrue("Fix connection was not disconnected", + () -> + { + final Reply> libraryReply = engine.libraries(); + while (!libraryReply.hasCompleted()) + { + sleep(100); + } + + final List allLibraryInfo = libraryReply.resultIfPresent(); + for (final LibraryInfo libraryInfo : allLibraryInfo) { + if (libraryInfo.libraryId() == libraryId) { + return libraryInfo.sessions().isEmpty(); + } + } + return false; + } + ); + + Timing.assertEventuallyTrue("Library did not transition session to active", + () -> + { + library.poll(1); + final List sessions = library.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; + } + ); + + assertEngineSubscriptionCaughtUpToLibraryPublication( + testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); + + final List framerSessionContext = engine.allSessions(); + assertEquals(1, framerSessionContext.size()); + assertEquals(0, framerSessionContext.get(0).sequenceIndex()); + } + @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldHandleOnlineResetFollowedByDisconnectAndRestart() throws IOException { diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index d8b78853b4..1eb27e1995 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -707,4 +707,50 @@ static void awaitIndexerCaughtUp( () -> {}); } } + + static void assertEngineSubscriptionCaughtUpToLibraryPublication( + final TestSystem testSystem, + final String aeronDirectoryName, + final FixEngine engine, + final FixLibrary library) + { + final EngineStreamInfo engineStreamInfo = + testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); + + final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library); + + final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(aeronDirectoryName); + try (Aeron aeron = Aeron.connect(aeronCtx)) + { + final CountersReader countersReader = aeron.countersReader(); + + final SubPosMatcher subPosMatcher = new SubPosMatcher( + countersReader, + engineStreamInfo.librarySubscriptionRegistrationId(), + libraryStreamInfo.outboundPublicationSessionId(), + libraryStreamInfo.outboundPublicationPosition()); + + countersReader.forEach((counterId, typeId, keyBuffer, label) -> + subPosMatcher.tryMatch(counterId, typeId, keyBuffer)); + + assertEventuallyTrue( + () -> + { + final StringBuilder builder = new StringBuilder(); + builder.append("expected sub-pos counters:\n"); + builder.append(subPosMatcher).append('\n'); + builder.append("\nbut counters were:\n"); + countersReader.forEach((value, counterId, label) -> + builder.append(String.format("%d: %d - %s%n", counterId, value, label))); + return builder.toString(); + }, + () -> + { + testSystem.poll(); + return subPosMatcher.isCaughtUp(); + }, + DEFAULT_TIMEOUT_IN_MS, + () -> {}); + } + } } From d8078cec99bf4de6928b6b2169eee9144c44807c Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 13:36:50 -0400 Subject: [PATCH 10/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../MessageBasedAcceptorSystemTest.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 9bb10748f4..0749dd385e 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -614,9 +614,17 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef setup(true, true); setupLibrary(); + final List noSessionContext = engine.allSessions(); + assertEquals(0, noSessionContext.size()); + final SessionWriter sessionWriter = createFollowerSession( TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); - assertEquals(SessionReplyStatus.OK, requestSession(library, sessionWriter.id(), testSystem)); + final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); + assertEquals(SessionReplyStatus.OK, requestSessionReply); + + final List sessionContextAfterFollower = engine.allSessions(); + assertEquals(1, sessionContextAfterFollower.size()); + assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFollower.get(0).sequenceIndex()); try (FixConnection connection = FixConnection.initiate(port)) { @@ -629,6 +637,10 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; } ); + + final List sessionContextAfterFramerAuthenticates = engine.allSessions(); + assertEquals(1, sessionContextAfterFramerAuthenticates.size()); + assertEquals(0, sessionContextAfterFramerAuthenticates.get(0).sequenceIndex()); } Timing.assertEventuallyTrue("Fix connection was not disconnected", @@ -662,9 +674,9 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef assertEngineSubscriptionCaughtUpToLibraryPublication( testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); - final List framerSessionContext = engine.allSessions(); - assertEquals(1, framerSessionContext.size()); - assertEquals(0, framerSessionContext.get(0).sequenceIndex()); + final List sessionContextAfterLogonNoSenderEndpoint = engine.allSessions(); + assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); + assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); } @Test(timeout = TEST_TIMEOUT_IN_MS) From a6eb87f34f80c108eda9ed7ef1ffc910b6601b90 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 13:37:43 -0400 Subject: [PATCH 11/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/system_tests/MessageBasedAcceptorSystemTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 0749dd385e..8b094fc828 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -622,9 +622,9 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); assertEquals(SessionReplyStatus.OK, requestSessionReply); - final List sessionContextAfterFollower = engine.allSessions(); - assertEquals(1, sessionContextAfterFollower.size()); - assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFollower.get(0).sequenceIndex()); + final List sessionContextAfterFramerCreatesFollower = engine.allSessions(); + assertEquals(1, sessionContextAfterFramerCreatesFollower.size()); + assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFramerCreatesFollower.get(0).sequenceIndex()); try (FixConnection connection = FixConnection.initiate(port)) { From b2f0a77304e262ff295aec75623c318d660b3b09 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 14:04:39 -0400 Subject: [PATCH 12/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/system_tests/MessageBasedAcceptorSystemTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 8b094fc828..52909b6a8f 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -622,10 +622,6 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); assertEquals(SessionReplyStatus.OK, requestSessionReply); - final List sessionContextAfterFramerCreatesFollower = engine.allSessions(); - assertEquals(1, sessionContextAfterFramerCreatesFollower.size()); - assertEquals(SessionInfo.UNKNOWN_SEQUENCE_INDEX, sessionContextAfterFramerCreatesFollower.get(0).sequenceIndex()); - try (FixConnection connection = FixConnection.initiate(port)) { connection.logon(true); @@ -649,7 +645,7 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef final Reply> libraryReply = engine.libraries(); while (!libraryReply.hasCompleted()) { - sleep(100); + sleep(500); } final List allLibraryInfo = libraryReply.resultIfPresent(); From 4dac60a56d23ccde655b54a7dd1c2a5c3eb69e74 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 14:11:50 -0400 Subject: [PATCH 13/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../artio/system_tests/MessageBasedAcceptorSystemTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 52909b6a8f..c034a8c910 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -633,10 +633,6 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; } ); - - final List sessionContextAfterFramerAuthenticates = engine.allSessions(); - assertEquals(1, sessionContextAfterFramerAuthenticates.size()); - assertEquals(0, sessionContextAfterFramerAuthenticates.get(0).sequenceIndex()); } Timing.assertEventuallyTrue("Fix connection was not disconnected", From 0af4191d76c8e1d7ac980ea3afa58c433f0c11cb Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 17:22:52 -0400 Subject: [PATCH 14/17] Fix scenario in framer where initiator disconnects before follower session in library can respond to logon leading to sequence index bump --- .../AbstractGatewayToGatewaySystemTest.java | 17 ++--- .../MessageBasedAcceptorSystemTest.java | 2 +- ...uenceNumberGatewayToGatewaySystemTest.java | 66 +++++++++++++++++++ 3 files changed, 74 insertions(+), 11 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java index 55e0a81903..5803b9f220 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java @@ -830,17 +830,14 @@ void assertReplayReceivedMessages() void sleep(final int timeInMs) { - testSystem.awaitBlocking(() -> + try { - try - { - Thread.sleep(timeInMs); - } - catch (final InterruptedException e) - { - e.printStackTrace(); - } - }); + Thread.sleep(timeInMs); + } + catch (final InterruptedException e) + { + e.printStackTrace(); + } } void assertResendsCompleted(final int count, final Matcher> items) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index c034a8c910..a7654bd684 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -611,7 +611,7 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() throws IOException { - setup(true, true); + setup(false, true); setupLibrary(); final List noSessionContext = engine.allSessions(); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java index 50758dd97b..98166b9f95 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java @@ -27,6 +27,8 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.engine.EngineConfiguration; import uk.co.real_logic.artio.engine.FixEngine; +import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.fields.UtcTimestampEncoder; import uk.co.real_logic.artio.library.DynamicLibraryScheduler; import uk.co.real_logic.artio.messages.DisconnectReason; @@ -42,6 +44,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.List; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -1015,6 +1018,69 @@ private void connectPersistingSessions() connectPersistingSessions(AUTOMATIC_INITIAL_SEQUENCE_NUMBER, false); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() + throws IOException + { + launch(this::nothing); + + final List noSessionContext = acceptingEngine.allSessions(); + assertEquals(0, noSessionContext.size()); + + final SessionWriter sessionWriter = createFollowerSession( + TEST_TIMEOUT_IN_MS, testSystem, acceptingLibrary, INITIATOR_ID, ACCEPTOR_ID); + final SessionReplyStatus requestSessionReply = requestSession(acceptingLibrary, sessionWriter.id(), testSystem); + assertEquals(SessionReplyStatus.OK, requestSessionReply); + + try (FixConnection connection = FixConnection.initiate(port)) + { + connection.logon(true); + Timing.assertEventuallyTrue("Library did not transition session to connected", + () -> + { + acceptingLibrary.poll(1); + final List sessions = acceptingLibrary.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; + } + ); + } + + Timing.assertEventuallyTrue("Fix connection was not disconnected", + () -> + { + final Reply> libraryReply = acceptingEngine.libraries(); + while (!libraryReply.hasCompleted()) + { + sleep(500); + } + + final List allLibraryInfo = libraryReply.resultIfPresent(); + for (final LibraryInfo libraryInfo : allLibraryInfo) { + if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) { + return libraryInfo.sessions().isEmpty(); + } + } + return false; + } + ); + + Timing.assertEventuallyTrue("Library did not transition session to active", + () -> + { + acceptingLibrary.poll(1); + final List sessions = acceptingLibrary.sessions(); + return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; + } + ); + + assertEngineSubscriptionCaughtUpToLibraryPublication( + testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), acceptingEngine, acceptingLibrary); + + final List sessionContextAfterLogonNoSenderEndpoint = acceptingEngine.allSessions(); + assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); + assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); + } + private void resetSequenceNumbers() { testSystem.resetSequenceNumber(initiatingEngine, initiatingSession.id()); From 37731d71e13e92137ad81d767466b70810f147c1 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 17:25:25 -0400 Subject: [PATCH 15/17] Move test to persistent sequence numbers --- .../MessageBasedAcceptorSystemTest.java | 74 ++----------------- 1 file changed, 6 insertions(+), 68 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index a7654bd684..a56413918a 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -23,11 +23,12 @@ import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.decoder.*; import uk.co.real_logic.artio.engine.SessionInfo; -import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; -import uk.co.real_logic.artio.messages.*; +import uk.co.real_logic.artio.messages.DisconnectReason; +import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner; +import uk.co.real_logic.artio.messages.SessionReplyStatus; +import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus; import uk.co.real_logic.artio.session.Session; -import uk.co.real_logic.artio.session.SessionWriter; import uk.co.real_logic.artio.util.MutableAsciiBuffer; import java.io.IOException; @@ -54,7 +55,8 @@ import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK; -import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS; +import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS; import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE; import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*; @@ -607,70 +609,6 @@ public void shouldSupportLogonBasedSequenceNumberResetWithMessagesSentBeforeLogo }); } - @Test(timeout = TEST_TIMEOUT_IN_MS) - public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() - throws IOException - { - setup(false, true); - setupLibrary(); - - final List noSessionContext = engine.allSessions(); - assertEquals(0, noSessionContext.size()); - - final SessionWriter sessionWriter = createFollowerSession( - TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID); - final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem); - assertEquals(SessionReplyStatus.OK, requestSessionReply); - - try (FixConnection connection = FixConnection.initiate(port)) - { - connection.logon(true); - Timing.assertEventuallyTrue("Library did not transition session to connected", - () -> - { - library.poll(1); - final List sessions = library.sessions(); - return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED; - } - ); - } - - Timing.assertEventuallyTrue("Fix connection was not disconnected", - () -> - { - final Reply> libraryReply = engine.libraries(); - while (!libraryReply.hasCompleted()) - { - sleep(500); - } - - final List allLibraryInfo = libraryReply.resultIfPresent(); - for (final LibraryInfo libraryInfo : allLibraryInfo) { - if (libraryInfo.libraryId() == libraryId) { - return libraryInfo.sessions().isEmpty(); - } - } - return false; - } - ); - - Timing.assertEventuallyTrue("Library did not transition session to active", - () -> - { - library.poll(1); - final List sessions = library.sessions(); - return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE; - } - ); - - assertEngineSubscriptionCaughtUpToLibraryPublication( - testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library); - - final List sessionContextAfterLogonNoSenderEndpoint = engine.allSessions(); - assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size()); - assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex()); - } - @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldHandleOnlineResetFollowedByDisconnectAndRestart() throws IOException { From 1d0e17a44d4bc24f19904cbdfb743d090a1432eb Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 29 Aug 2024 21:11:46 -0400 Subject: [PATCH 16/17] Formatting --- ...stentSequenceNumberGatewayToGatewaySystemTest.java | 11 ++++++----- .../real_logic/artio/system_tests/SystemTestUtil.java | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java index 98166b9f95..32cbed382b 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/PersistentSequenceNumberGatewayToGatewaySystemTest.java @@ -1020,15 +1020,14 @@ private void connectPersistingSessions() @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse() - throws IOException + throws IOException { launch(this::nothing); final List noSessionContext = acceptingEngine.allSessions(); assertEquals(0, noSessionContext.size()); - final SessionWriter sessionWriter = createFollowerSession( - TEST_TIMEOUT_IN_MS, testSystem, acceptingLibrary, INITIATOR_ID, ACCEPTOR_ID); + final SessionWriter sessionWriter = createFollowerSession(TEST_TIMEOUT_IN_MS); final SessionReplyStatus requestSessionReply = requestSession(acceptingLibrary, sessionWriter.id(), testSystem); assertEquals(SessionReplyStatus.OK, requestSessionReply); @@ -1055,8 +1054,10 @@ public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBef } final List allLibraryInfo = libraryReply.resultIfPresent(); - for (final LibraryInfo libraryInfo : allLibraryInfo) { - if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) { + for (final LibraryInfo libraryInfo : allLibraryInfo) + { + if (libraryInfo.libraryId() == acceptingLibrary.libraryId()) + { return libraryInfo.sessions().isEmpty(); } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index 1eb27e1995..84bc4fc74b 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -709,13 +709,13 @@ static void awaitIndexerCaughtUp( } static void assertEngineSubscriptionCaughtUpToLibraryPublication( - final TestSystem testSystem, - final String aeronDirectoryName, - final FixEngine engine, - final FixLibrary library) + final TestSystem testSystem, + final String aeronDirectoryName, + final FixEngine engine, + final FixLibrary library) { final EngineStreamInfo engineStreamInfo = - testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); + testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent(); final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library); From 0862cec5bbd015d333cfe1b48d5f7e6537891450 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Tue, 3 Sep 2024 19:42:21 -0400 Subject: [PATCH 17/17] Add counter check --- .../uk/co/real_logic/artio/system_tests/SystemTestUtil.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index 84bc4fc74b..f3e6c62f6e 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -733,6 +733,11 @@ static void assertEngineSubscriptionCaughtUpToLibraryPublication( countersReader.forEach((counterId, typeId, keyBuffer, label) -> subPosMatcher.tryMatch(counterId, typeId, keyBuffer)); + if (!subPosMatcher.hasCounterId()) + { + throw new IllegalStateException("did not match counter: " + subPosMatcher); + } + assertEventuallyTrue( () -> {