Skip to content

Commit

Permalink
Add the ability to reply asynchronously to a ResendRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
pcdv committed Apr 5, 2024
1 parent 1911973 commit 01ed098
Show file tree
Hide file tree
Showing 9 changed files with 831 additions and 30 deletions.
15 changes: 15 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Helps IDEA users apply some of the formatting rules enforced by checkstyle

root = true

[*.java]
indent_size = 4
max_line_length = 120
ij_java_method_brace_style = next_line
ij_java_block_brace_style = next_line
ij_java_else_on_new_line = true
ij_java_class_brace_style = next_line
ij_java_space_after_type_cast = false
ij_any_catch_on_new_line = true
ij_any_spaces_around_equality_operators = true
ij_java_continuation_indent_size = 4
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* Customer interface to control whether resend requests are responded to or not.
Expand All @@ -33,11 +34,16 @@ public interface ResendRequestController
* (eg: begin sequence number > end sequence number or begin sequence number > last sent sequence number)
* then this callback won't be invoked.
*
* SessionProxy is now also notified immediately after this method is called, with additional parameters that
* allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by
* ResendRequestController.
*
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
void onResend(
Session session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.builder.AbstractRejectEncoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

public class ResendRequestResponse
{
private boolean result;
private boolean resendNow;
private boolean delayProcessing;

private int refTagId;
private AbstractRejectEncoder rejectEncoder;
Expand All @@ -29,7 +31,8 @@ public class ResendRequestResponse
*/
public void resend()
{
result = true;
resendNow = true;
delayProcessing = false;
}

/**
Expand All @@ -41,14 +44,16 @@ public void reject(final int refTagId)
{
this.refTagId = refTagId;

result = false;
resendNow = false;
delayProcessing = false;
}

public void reject(final AbstractRejectEncoder rejectEncoder)
{
this.rejectEncoder = rejectEncoder;

result = false;
resendNow = false;
delayProcessing = false;
}

AbstractRejectEncoder rejectEncoder()
Expand All @@ -58,11 +63,36 @@ AbstractRejectEncoder rejectEncoder()

boolean result()
{
return result;
return resendNow;
}

int refTagId()
{
return refTagId;
}

/**
* Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates
* that the request must not be processed nor rejected. It is the responsibility of the caller to call
* Session.executeResendRequest() when ready.
*
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
* @return true if response to the request must not be done immediately
*/
public boolean shouldDelay()
{
return delayProcessing;
}

/**
* This method indicates that the request must not be processed nor rejected. It is the responsibility of
* the caller to call Session.executeResendRequest() when ready.
*
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
*/
public void delay()
{
resendNow = false;
delayProcessing = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2097,50 +2097,99 @@ Action onResendRequest(
final ResendRequestResponse resendRequestResponse = this.resendRequestResponse;
if (!backpressuredResendRequestResponse)
{
// historic behavior
resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse);

// also invoke the proxy
if (Pressure.isBackPressured(proxy.onResend(this, resendRequest,
correctedEndSeqNo, resendRequestResponse, messageBuffer, messageOffset, messageLength)))
{
return ABORT;
}
}

if (resendRequestResponse.result())
{
final long correlationId = generateReplayCorrelationId();

// Notify the sender end point that a replay is going to happen.
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
return executeResendRequest(
beginSeqNum, correctedEndSeqNo, oldLastReceivedMsgSeqNum, messageBuffer, messageOffset, messageLength
);
}
else if (!resendRequestResponse.shouldDelay())
{
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
if (rejectEncoder != null)
{
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, outboundPublication))
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
backpressuredResendRequestResponse = true;
backpressuredOutboundValidResendRequest = true;
return ABORT;
}

backpressuredOutboundValidResendRequest = false;
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
}

return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
}
else
{
return CONTINUE;
}
}

private Action executeResendRequest(
final int beginSeqNum, final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
)
{
final long correlationId = generateReplayCorrelationId();

// Notify the sender end point that a replay is going to happen.
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
{
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, inboundPublication))
correlationId, outboundPublication))
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
if (lastReceivedMsgSeqNum >= 0)
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}
backpressuredResendRequestResponse = true;
backpressuredOutboundValidResendRequest = true;
return ABORT;
}

backpressuredResendRequestResponse = false;
replaysInFlight++;
return CONTINUE;
backpressuredOutboundValidResendRequest = false;
}
else

if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, inboundPublication))
{
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
if (rejectEncoder != null)
if (lastReceivedMsgSeqNum >= 0)
{
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}

return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
backpressuredResendRequestResponse = true;
return ABORT;
}

backpressuredResendRequestResponse = false;
replaysInFlight++;
return CONTINUE;
}


/**
* Executes a resend request. Used to be done immediately when receiving such a request, but
* it is now possible to delay the execution, so this method must be called when ready.
*
* @param beginSeqNum begin sequence number found in received ResendRequest
* @param correctedEndSeqNo corrected end sequence number
* @param messageBuffer buffer containing the ResendRequest message
* @param messageOffset offset of message in buffer
* @param messageLength length of message in buffer
* @return an Action: be sure to handle back pressure!
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
public Action executeResendRequest(
final int beginSeqNum, final int correctedEndSeqNo,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
)
{
return executeResendRequest(beginSeqNum, correctedEndSeqNo, -1, messageBuffer, messageOffset, messageLength);
}

private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.dictionary.FixDictionary;
import uk.co.real_logic.artio.fields.RejectReason;
import uk.co.real_logic.artio.messages.CancelOnDisconnectOption;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* A proxy that allows users to hook the sending of FIX session protocol messages through an external system. This can
Expand Down Expand Up @@ -116,4 +118,34 @@ long sendSequenceReset(
* @return true if asynchronous, false otherwise.
*/
boolean isAsync();

/**
* Equivalent to onResend() method in ResendRequestController, but with finer control. It receives the buffer
* containing the ResendRequest message, so a copy can be made in case we want to delay the processing of the
* Resend request.
*
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
* @param messageBuffer buffer containing the ResendRequest message
* @param messageOffset offset of message in buffer
* @param messageLength length of message in buffer
* @return a null or negative number if back pressured
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
* @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse)
*/
default long onResend(
Session session,
AbstractResendRequestDecoder resendRequest,
int correctedEndSeqNo,
ResendRequestResponse response,
AsciiBuffer messageBuffer,
int messageOffset,
int messageLength
)
{
return 1;
}
}
Loading

0 comments on commit 01ed098

Please sign in to comment.