diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/CatchupReplayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/CatchupReplayer.java index 482e2093f8..9d3306dc39 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/CatchupReplayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/CatchupReplayer.java @@ -256,6 +256,11 @@ public Action onFragment( messageHeaderDecoder.blockLength(), version); + if (messageDecoder.status() == CATCHUP_REPLAY) + { + return CONTINUE; + } + final long messageType = MessageTypeExtractor.getMessageType(messageDecoder); messageDecoder.skipMetaData(); diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index 63e8d3ddf7..6b907979d0 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -49,6 +49,7 @@ import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; import static org.mockito.Mockito.verify; +import static uk.co.real_logic.artio.CommonConfiguration.DEFAULT_REPLY_TIMEOUT_IN_MS; import static uk.co.real_logic.artio.Constants.EXECUTION_REPORT_MESSAGE_AS_STR; import static uk.co.real_logic.artio.Constants.RESEND_REQUEST_MESSAGE_AS_STR; import static uk.co.real_logic.artio.SessionRejectReason.COMPID_PROBLEM; @@ -56,6 +57,7 @@ import static uk.co.real_logic.artio.dictionary.SessionConstants.*; import static uk.co.real_logic.artio.engine.EngineConfiguration.DEFAULT_RECEIVER_BUFFER_SIZE; import static uk.co.real_logic.artio.engine.logger.Replayer.MOST_RECENT_MESSAGE; +import static uk.co.real_logic.artio.library.FixLibrary.CURRENT_SEQUENCE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK; @@ -684,6 +686,29 @@ public void shouldSupportReplayingReceivedMessagesWithDifferentFromAndToSequence }); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldFilterCatchupReplayMessagesWhenRequestingSession() throws IOException + { + setup(false, true); + setupLibrary(); + + try (FixConnection connection = FixConnection.initiate(port)) + { + logon(connection); + + final Session session = acquireSession(0, CURRENT_SEQUENCE); + assertThat(otfAcceptor.messages(), hasSize(1)); + + testSystem.awaitReply(library.releaseToGateway(session, DEFAULT_REPLY_TIMEOUT_IN_MS)); + otfAcceptor.messages().clear(); + + connection.exchangeTestRequestHeartbeat("ABC"); + + testSystem.awaitReply(library.requestSession(session.id(), 0, CURRENT_SEQUENCE, DEFAULT_REPLY_TIMEOUT_IN_MS)); + assertThat(otfAcceptor.messages(), hasSize(2)); + } + } + @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldGracefullyHandleExceptionsInOnSessionStart() throws IOException {