Skip to content

Commit

Permalink
When using CatchupReplayer to replay inbound archive messages, skip m…
Browse files Browse the repository at this point in the history
…essages with CATCHUP_REPLAY MessageStatus (#529)
  • Loading branch information
marc-adaptive committed Jan 14, 2025
1 parent d138075 commit fee41bd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ public Action onFragment(
messageHeaderDecoder.blockLength(),
version);

if (messageDecoder.status() == CATCHUP_REPLAY)
{
return CONTINUE;
}

final long messageType = MessageTypeExtractor.getMessageType(messageDecoder);

messageDecoder.skipMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.verify;
import static uk.co.real_logic.artio.CommonConfiguration.DEFAULT_REPLY_TIMEOUT_IN_MS;
import static uk.co.real_logic.artio.Constants.EXECUTION_REPORT_MESSAGE_AS_STR;
import static uk.co.real_logic.artio.Constants.RESEND_REQUEST_MESSAGE_AS_STR;
import static uk.co.real_logic.artio.SessionRejectReason.COMPID_PROBLEM;
import static uk.co.real_logic.artio.TestFixtures.cleanupMediaDriver;
import static uk.co.real_logic.artio.dictionary.SessionConstants.*;
import static uk.co.real_logic.artio.engine.EngineConfiguration.DEFAULT_RECEIVER_BUFFER_SIZE;
import static uk.co.real_logic.artio.engine.logger.Replayer.MOST_RECENT_MESSAGE;
import static uk.co.real_logic.artio.library.FixLibrary.CURRENT_SEQUENCE;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK;
Expand Down Expand Up @@ -684,6 +686,29 @@ public void shouldSupportReplayingReceivedMessagesWithDifferentFromAndToSequence
});
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldFilterCatchupReplayMessagesWhenRequestingSession() throws IOException
{
setup(false, true);
setupLibrary();

try (FixConnection connection = FixConnection.initiate(port))
{
logon(connection);

final Session session = acquireSession(0, CURRENT_SEQUENCE);
assertThat(otfAcceptor.messages(), hasSize(1));

testSystem.awaitReply(library.releaseToGateway(session, DEFAULT_REPLY_TIMEOUT_IN_MS));
otfAcceptor.messages().clear();

connection.exchangeTestRequestHeartbeat("ABC");

testSystem.awaitReply(library.requestSession(session.id(), 0, CURRENT_SEQUENCE, DEFAULT_REPLY_TIMEOUT_IN_MS));
assertThat(otfAcceptor.messages(), hasSize(2));
}
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldGracefullyHandleExceptionsInOnSessionStart() throws IOException
{
Expand Down

0 comments on commit fee41bd

Please sign in to comment.