Skip to content

Commit

Permalink
Add user driven back-pressure handling to FIXP protocols through the …
Browse files Browse the repository at this point in the history
…handlers returning Action instances
  • Loading branch information
RichardWarburton committed Jan 13, 2022
1 parent 4cf4166 commit ed7f8ad
Show file tree
Hide file tree
Showing 17 changed files with 416 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.binary_entrypoint;

import b3.entrypoint.fixp.sbe.*;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
import org.agrona.DirectBuffer;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.fixp.AbstractFixPParser;
Expand Down Expand Up @@ -70,7 +71,7 @@ public int version(final DirectBuffer buffer, final int offset)
return header.version();
}

public long onMessage(final DirectBuffer buffer, final int start)
public Action onMessage(final DirectBuffer buffer, final int start)
{
int offset = start + SOFH_LENGTH;

Expand Down Expand Up @@ -112,7 +113,7 @@ public long onMessage(final DirectBuffer buffer, final int start)
}
}

private long onRetransmitRequest(
private Action onRetransmitRequest(
final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
final RetransmitRequestDecoder retransmitRequest = this.retransmitRequest;
Expand All @@ -127,7 +128,7 @@ private long onRetransmitRequest(
retransmitRequest.count());
}

private long onFinishedSending(
private Action onFinishedSending(
final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
final FinishedSendingDecoder finishedSending = this.finishedSending;
Expand All @@ -141,7 +142,7 @@ private long onFinishedSending(
finishedSending.lastSeqNo());
}

private long onFinishedReceiving(
private Action onFinishedReceiving(
final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
final FinishedReceivingDecoder finishedReceiving = this.finishedReceiving;
Expand All @@ -154,7 +155,7 @@ private long onFinishedReceiving(
finishedReceiving.sessionVerID());
}

private long onSequence(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
private Action onSequence(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
sequence.wrap(buffer, offset, blockLength, version);

Expand All @@ -163,7 +164,7 @@ private long onSequence(final DirectBuffer buffer, final int offset, final int b
return handler.onSequence(sequence.nextSeqNo());
}

private long onTerminate(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
private Action onTerminate(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
terminate.wrap(buffer, offset, blockLength, version);

Expand All @@ -175,7 +176,7 @@ private long onTerminate(final DirectBuffer buffer, final int offset, final int
terminate.terminationCode());
}

private long onEstablish(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
private Action onEstablish(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
establish.wrap(buffer, offset, blockLength, version);

Expand All @@ -191,7 +192,7 @@ private long onEstablish(final DirectBuffer buffer, final int offset, final int
establish.codTimeoutWindow().time());
}

private long onNegotiate(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
private Action onNegotiate(final DirectBuffer buffer, final int offset, final int blockLength, final int version)
{
negotiate.wrap(buffer, offset, blockLength, version);

Expand Down
Loading

0 comments on commit ed7f8ad

Please sign in to comment.