diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000000..2ab6bda649
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,15 @@
+# Helps IDEA users apply some of the formatting rules enforced by checkstyle
+
+root = true
+
+[*.java]
+indent_size = 4
+max_line_length = 120
+ij_java_method_brace_style = next_line
+ij_java_block_brace_style = next_line
+ij_java_else_on_new_line = true
+ij_java_class_brace_style = next_line
+ij_java_space_after_type_cast = false
+ij_any_catch_on_new_line = true
+ij_any_spaces_around_equality_operators = true
+ij_java_continuation_indent_size = 4
\ No newline at end of file
diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java
index be3b383cab..99897327ca 100644
--- a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java
+++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java
@@ -16,6 +16,7 @@
package uk.co.real_logic.artio.session;
import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
+import uk.co.real_logic.artio.util.AsciiBuffer;
/**
* Customer interface to control whether resend requests are responded to or not.
@@ -33,11 +34,16 @@ public interface ResendRequestController
* (eg: begin sequence number > end sequence number or begin sequence number > last sent sequence number)
* then this callback won't be invoked.
*
+ * SessionProxy is now also notified immediately after this method is called, with additional parameters that
+ * allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by
+ * ResendRequestController.
+ *
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
+ * @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
void onResend(
Session session,
diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java
index d715af54f2..5f05e62287 100644
--- a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java
+++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java
@@ -16,10 +16,12 @@
package uk.co.real_logic.artio.session;
import uk.co.real_logic.artio.builder.AbstractRejectEncoder;
+import uk.co.real_logic.artio.util.AsciiBuffer;
public class ResendRequestResponse
{
- private boolean result;
+ private boolean resendNow;
+ private boolean delayProcessing;
private int refTagId;
private AbstractRejectEncoder rejectEncoder;
@@ -29,7 +31,8 @@ public class ResendRequestResponse
*/
public void resend()
{
- result = true;
+ resendNow = true;
+ delayProcessing = false;
}
/**
@@ -41,14 +44,16 @@ public void reject(final int refTagId)
{
this.refTagId = refTagId;
- result = false;
+ resendNow = false;
+ delayProcessing = false;
}
public void reject(final AbstractRejectEncoder rejectEncoder)
{
this.rejectEncoder = rejectEncoder;
- result = false;
+ resendNow = false;
+ delayProcessing = false;
}
AbstractRejectEncoder rejectEncoder()
@@ -58,11 +63,36 @@ AbstractRejectEncoder rejectEncoder()
boolean result()
{
- return result;
+ return resendNow;
}
int refTagId()
{
return refTagId;
}
+
+ /**
+ * Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates
+ * that the request must not be processed nor rejected. It is the responsibility of the caller to call
+ * Session.executeResendRequest() when ready.
+ *
+ * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
+ * @return true if response to the request must not be done immediately
+ */
+ public boolean shouldDelay()
+ {
+ return delayProcessing;
+ }
+
+ /**
+ * This method indicates that the request must not be processed nor rejected. It is the responsibility of
+ * the caller to call Session.executeResendRequest() when ready.
+ *
+ * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
+ */
+ public void delay()
+ {
+ resendNow = false;
+ delayProcessing = true;
+ }
}
diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java
index 4378fd117b..106d57bdef 100644
--- a/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java
+++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java
@@ -2097,50 +2097,99 @@ Action onResendRequest(
final ResendRequestResponse resendRequestResponse = this.resendRequestResponse;
if (!backpressuredResendRequestResponse)
{
+ // historic behavior
resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse);
+
+ // also invoke the proxy
+ if (Pressure.isBackPressured(proxy.onResend(this, resendRequest,
+ correctedEndSeqNo, resendRequestResponse, messageBuffer, messageOffset, messageLength)))
+ {
+ return ABORT;
+ }
}
if (resendRequestResponse.result())
{
- final long correlationId = generateReplayCorrelationId();
-
- // Notify the sender end point that a replay is going to happen.
- if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
+ return executeResendRequest(
+ beginSeqNum, correctedEndSeqNo, oldLastReceivedMsgSeqNum, messageBuffer, messageOffset, messageLength
+ );
+ }
+ else if (!resendRequestResponse.shouldDelay())
+ {
+ final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
+ if (rejectEncoder != null)
{
- if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
- correlationId, outboundPublication))
- {
- lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
- backpressuredResendRequestResponse = true;
- backpressuredOutboundValidResendRequest = true;
- return ABORT;
- }
-
- backpressuredOutboundValidResendRequest = false;
+ return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
}
+ return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
+ }
+ else
+ {
+ return CONTINUE;
+ }
+ }
+
+ private Action executeResendRequest(
+ final int beginSeqNum, final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum,
+ final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
+ )
+ {
+ final long correlationId = generateReplayCorrelationId();
+
+ // Notify the sender end point that a replay is going to happen.
+ if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
+ {
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
- correlationId, inboundPublication))
+ correlationId, outboundPublication))
{
- lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
+ if (lastReceivedMsgSeqNum >= 0)
+ {
+ lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
+ }
backpressuredResendRequestResponse = true;
+ backpressuredOutboundValidResendRequest = true;
return ABORT;
}
- backpressuredResendRequestResponse = false;
- replaysInFlight++;
- return CONTINUE;
+ backpressuredOutboundValidResendRequest = false;
}
- else
+
+ if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
+ correlationId, inboundPublication))
{
- final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
- if (rejectEncoder != null)
+ if (lastReceivedMsgSeqNum >= 0)
{
- return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
+ lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}
-
- return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
+ backpressuredResendRequestResponse = true;
+ return ABORT;
}
+
+ backpressuredResendRequestResponse = false;
+ replaysInFlight++;
+ return CONTINUE;
+ }
+
+
+ /**
+ * Executes a resend request. Used to be done immediately when receiving such a request, but
+ * it is now possible to delay the execution, so this method must be called when ready.
+ *
+ * @param beginSeqNum begin sequence number found in received ResendRequest
+ * @param correctedEndSeqNo corrected end sequence number
+ * @param messageBuffer buffer containing the ResendRequest message
+ * @param messageOffset offset of message in buffer
+ * @param messageLength length of message in buffer
+ * @return an Action: be sure to handle back pressure!
+ * @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
+ */
+ public Action executeResendRequest(
+ final int beginSeqNum, final int correctedEndSeqNo,
+ final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
+ )
+ {
+ return executeResendRequest(beginSeqNum, correctedEndSeqNo, -1, messageBuffer, messageOffset, messageLength);
}
private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder)
diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java
index 66ac31e3f1..f629712ef0 100644
--- a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java
+++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java
@@ -15,10 +15,12 @@
*/
package uk.co.real_logic.artio.session;
+import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.dictionary.FixDictionary;
import uk.co.real_logic.artio.fields.RejectReason;
import uk.co.real_logic.artio.messages.CancelOnDisconnectOption;
import uk.co.real_logic.artio.messages.DisconnectReason;
+import uk.co.real_logic.artio.util.AsciiBuffer;
/**
* A proxy that allows users to hook the sending of FIX session protocol messages through an external system. This can
@@ -116,4 +118,34 @@ long sendSequenceReset(
* @return true if asynchronous, false otherwise.
*/
boolean isAsync();
+
+ /**
+ * Equivalent to onResend() method in ResendRequestController, but with finer control. It receives the buffer
+ * containing the ResendRequest message, so a copy can be made in case we want to delay the processing of the
+ * Resend request.
+ *
+ * @param session the session that has received the resend request.
+ * @param resendRequest the decoded resend request in question.
+ * @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
+ * resend request uses 0 for its endSeqNo parameter.
+ * @param response respond to the resend request by calling methods on this object.
+ * @param messageBuffer buffer containing the ResendRequest message
+ * @param messageOffset offset of message in buffer
+ * @param messageLength length of message in buffer
+ * @return a null or negative number if back pressured
+ * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
+ * @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse)
+ */
+ default long onResend(
+ Session session,
+ AbstractResendRequestDecoder resendRequest,
+ int correctedEndSeqNo,
+ ResendRequestResponse response,
+ AsciiBuffer messageBuffer,
+ int messageOffset,
+ int messageLength
+ )
+ {
+ return 1;
+ }
}
diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java
new file mode 100644
index 0000000000..158e0a1aa4
--- /dev/null
+++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java
@@ -0,0 +1,354 @@
+package uk.co.real_logic.artio.system_tests;
+
+import org.agrona.ErrorHandler;
+import org.agrona.concurrent.EpochNanoClock;
+import org.junit.Ignore;
+import org.junit.Test;
+import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
+import uk.co.real_logic.artio.dictionary.generation.Exceptions;
+import uk.co.real_logic.artio.engine.EngineConfiguration;
+import uk.co.real_logic.artio.engine.FixEngine;
+import uk.co.real_logic.artio.fields.EpochFractionFormat;
+import uk.co.real_logic.artio.library.LibraryConfiguration;
+import uk.co.real_logic.artio.protocol.GatewayPublication;
+import uk.co.real_logic.artio.session.DirectSessionProxy;
+import uk.co.real_logic.artio.session.ResendRequestResponse;
+import uk.co.real_logic.artio.session.Session;
+import uk.co.real_logic.artio.session.SessionCustomisationStrategy;
+import uk.co.real_logic.artio.session.SessionIdStrategy;
+import uk.co.real_logic.artio.session.SessionProxy;
+import uk.co.real_logic.artio.util.AsciiBuffer;
+import uk.co.real_logic.artio.util.DebugFIXClient;
+import uk.co.real_logic.artio.util.DebugServer;
+import uk.co.real_logic.artio.util.MutableAsciiBuffer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver;
+import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig;
+import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig;
+
+/**
+ * Reproduce race (issue #503) while sending ResendRequest and ResetSequence when both
+ * parties detect a gap on Logon.
+ *
+ * Also reproduces the fact that SessionProxy is not invoked when a ResetSequence message is sent during replay.
+ */
+public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest
+{
+ private boolean useProxy;
+ private boolean sendResendRequestCalled;
+ private boolean sendSequenceResetCalled;
+
+ /**
+ * When positive, simulate a SessionProxy that sends outbound FIX messages asynchronously,
+ * through an external cluster.
+ */
+ private long sleepBeforeSendResendRequest;
+
+ private final ArrayList autoClose = new ArrayList<>();
+
+ private void launch()
+ {
+ mediaDriver = launchMediaDriver();
+ launchAccepting();
+ launchInitiating();
+ testSystem = new TestSystem(acceptingLibrary, initiatingLibrary);
+ }
+
+ private void launchInitiating()
+ {
+ final EngineConfiguration initiatingConfig = initiatingConfig(libraryAeronPort, nanoClock)
+ .deleteLogFileDirOnStart(true)
+ .initialAcceptedSessionOwner(SOLE_LIBRARY);
+ initiatingEngine = FixEngine.launch(initiatingConfig);
+ final LibraryConfiguration lib = initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock);
+ if (useProxy)
+ {
+ lib.sessionProxyFactory(this::sessionProxyFactory);
+ }
+ initiatingLibrary = connect(lib);
+ }
+
+ static class PendingResendRequest
+ {
+ final Session session;
+ final MutableAsciiBuffer message;
+ final int beginSeqNo;
+ final int endSeqNo;
+
+ PendingResendRequest(
+ final Session session, final int beginSeqNo, final int endSeqNo, final MutableAsciiBuffer message
+ )
+ {
+ this.session = session;
+ this.beginSeqNo = beginSeqNo;
+ this.endSeqNo = endSeqNo;
+ this.message = message;
+ }
+
+ public void execute()
+ {
+ System.err.println("Execute resend request");
+ session.executeResendRequest(beginSeqNo, endSeqNo, message, 0, message.capacity());
+ }
+ }
+
+ private void launchAccepting()
+ {
+ final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock)
+ .deleteLogFileDirOnStart(true)
+ .initialAcceptedSessionOwner(SOLE_LIBRARY);
+ acceptingEngine = FixEngine.launch(acceptingConfig);
+
+ final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock);
+ acceptingLibrary = connect(acceptingLibraryConfig);
+ }
+
+ /**
+ * Sanity check that we can connect Artio to a debug server with canned messages.
+ */
+ @Test
+ public void testDebugServer() throws IOException
+ {
+ final DebugServer srv = new DebugServer(port);
+ srv.setWaitForData(true);
+ srv.addFIXResponse(
+ "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|"
+ );
+ srv.start();
+
+ mediaDriver = launchMediaDriver();
+ launchInitiating();
+ testSystem = new TestSystem(initiatingLibrary);
+ connectAndAcquire();
+ }
+
+ class Proxy extends DirectSessionProxy
+ {
+ /**
+ * Stores details of received ResendRequest while we wait for ours to be sent.
+ */
+ private PendingResendRequest pendingResendRequest;
+
+ Proxy(
+ final int sessionBufferSize, final GatewayPublication gatewayPublication,
+ final SessionIdStrategy sessionIdStrategy, final SessionCustomisationStrategy customisationStrategy,
+ final EpochNanoClock clock, final long connectionId, final int libraryId,
+ final ErrorHandler errorHandler, final EpochFractionFormat epochFractionPrecision
+ )
+ {
+ super(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, clock, connectionId,
+ libraryId, errorHandler, epochFractionPrecision);
+ }
+
+ @Override
+ public long onResend(
+ final Session session, final AbstractResendRequestDecoder resendRequest,
+ final int correctedEndSeqNo, final ResendRequestResponse response,
+ final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
+ )
+ {
+ onResendRequestReceived(session, resendRequest, correctedEndSeqNo, response,
+ messageBuffer, messageOffset, messageLength);
+ return 1;
+ }
+
+ private void onResendRequestReceived(
+ final Session session, final AbstractResendRequestDecoder request, final int endSeqNo,
+ final ResendRequestResponse response,
+ final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
+ )
+ {
+ System.err.println("onResendRequestReceived() called");
+ if (!useProxy || sleepBeforeSendResendRequest == 0)
+ {
+ response.resend();
+ }
+ else
+ {
+ response.delay();
+ final MutableAsciiBuffer buf = new MutableAsciiBuffer(new byte[messageLength]);
+ buf.putBytes(0, messageBuffer, messageOffset, messageLength);
+ pendingResendRequest = new PendingResendRequest(session, request.beginSeqNo(), endSeqNo, buf);
+ }
+ }
+
+ @Override
+ public long sendResendRequest(
+ final int msgSeqNo,
+ final int beginSeqNo,
+ final int endSeqNo,
+ final int sequenceIndex,
+ final int lastMsgSeqNumProcessed)
+ {
+ System.err.println("sendResendRequest called with msgSeqNo = " + msgSeqNo);
+ sendResendRequestCalled = true;
+ if (sleepBeforeSendResendRequest > 0)
+ {
+ new Thread(() ->
+ {
+ try
+ {
+ Thread.sleep(sleepBeforeSendResendRequest);
+ }
+ catch (final InterruptedException ignored)
+ {
+ }
+ System.err.println("Executing super.sendResendRequest() after delay: msgSeqNo = " + msgSeqNo);
+ super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed);
+ if (pendingResendRequest != null)
+ {
+ pendingResendRequest.execute();
+ }
+ else
+ {
+ System.err.println("onResend not called (async)");
+ }
+ }).start();
+ }
+ else
+ {
+ System.err.println("Directly executing sendResendRequest msgSeqNo = " + msgSeqNo);
+ super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed);
+ if (pendingResendRequest != null)
+ {
+ pendingResendRequest.execute();
+ }
+ else
+ {
+ System.err.println("onResend not called (direct)");
+ }
+ }
+ return 1;
+ }
+
+ @Override
+ public long sendSequenceReset(
+ final int msgSeqNo,
+ final int newSeqNo,
+ final int sequenceIndex,
+ final int lastMsgSeqNumProcessed)
+ {
+ sendSequenceResetCalled = true;
+ return super.sendSequenceReset(msgSeqNo, newSeqNo, sequenceIndex, lastMsgSeqNumProcessed);
+ }
+
+ @Override
+ public boolean isAsync()
+ {
+ return true;
+ }
+ }
+
+ private SessionProxy sessionProxyFactory(
+ final int sessionBufferSize,
+ final GatewayPublication gatewayPublication,
+ final SessionIdStrategy sessionIdStrategy,
+ final SessionCustomisationStrategy customisationStrategy,
+ final EpochNanoClock clock,
+ final long connectionId,
+ final int libraryId,
+ final ErrorHandler errorHandler,
+ final EpochFractionFormat epochFractionPrecision)
+ {
+ return new Proxy(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy,
+ clock, connectionId, libraryId, errorHandler, epochFractionPrecision);
+ }
+
+ @Test(timeout = TEST_TIMEOUT_IN_MS)
+ public void shouldNotInvertResendAndResetNoProxy() throws Exception
+ {
+ useProxy = false;
+ reconnectTest();
+ }
+
+ @Test(timeout = TEST_TIMEOUT_IN_MS)
+ public void shouldSendResendBeforeResetSyncProxy() throws Exception
+ {
+ useProxy = true;
+ sleepBeforeSendResendRequest = 0;
+ reconnectTest();
+ }
+
+ @Test(timeout = TEST_TIMEOUT_IN_MS)
+ public void shouldSendResendBeforeResetAsyncProxy() throws Exception
+ {
+ useProxy = true;
+ sleepBeforeSendResendRequest = 100;
+ reconnectTest();
+ }
+
+ @Ignore // SequenceReset is directly sent by replayer, does not go through SessionProxy
+ @Test(timeout = TEST_TIMEOUT_IN_MS)
+ public void shouldCallProxySendSequenceReset() throws Exception
+ {
+ useProxy = true;
+ reconnectTest();
+ assertTrue("SessionProxy.sendResendRequest() not called", sendResendRequestCalled);
+ assertTrue("SessionProxy.sendSequenceReset() not called", sendSequenceResetCalled);
+ }
+
+ private void reconnectTest() throws Exception
+ {
+ launch();
+
+ connectAndAcquire();
+
+ messagesCanBeExchanged();
+
+ disconnectSessions();
+ Exceptions.closeAll(this::closeAcceptingEngine);
+
+ assertEquals(3, acceptingSession.lastReceivedMsgSeqNum());
+ assertEquals(3, initiatingSession.lastReceivedMsgSeqNum());
+
+ final DebugServer srv = new DebugServer(port);
+ srv.setWaitForData(true);
+ srv.addFIXResponse(
+ "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=5|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|",
+ "8=FIX.4.4|9=94|35=2|49=acceptor|56=initiator|34=6|52=***|7=4|16=0|10=024|"
+ );
+ srv.start();
+ autoClose.add(srv::stop);
+
+ connectPersistentSessions(4, 4, false);
+
+ final DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000));
+ autoClose.add(exchange::close);
+ exchange.popAndAssert("35=A 34=4");
+ exchange.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first
+ exchange.popAndAssert("35=4 34=4 36=6");
+ }
+
+ @Override
+ public void close()
+ {
+ for (final AutoCloseable autoCloseable : autoClose)
+ {
+ try
+ {
+ autoCloseable.close();
+ }
+ catch (final Exception ignored)
+ {
+ }
+ }
+ super.close();
+ }
+
+ private void connectAndAcquire()
+ {
+ connectSessions();
+ acceptingSession = acceptingHandler.lastSession();
+ }
+}
diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java
new file mode 100644
index 0000000000..635a7df540
--- /dev/null
+++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java
@@ -0,0 +1,98 @@
+package uk.co.real_logic.artio.util;
+
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Scanner;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper to pop FIX messages received on a socket.
+ *
+ * @see DebugServer
+ */
+public class DebugFIXClient
+{
+ private final DebugServer.HasIOStream io;
+ private final Thread thread;
+
+ private final BlockingQueue