Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request_order_book_snapshot method #1745

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions nautilus_trader/adapters/_template/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,13 @@ async def _request_bars(
raise NotImplementedError(
"method `_request_bars` must be implemented in the subclass",
) # pragma: no cover

async def _request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
raise NotImplementedError(
"method `_request_order_book_snapshot(` must be implemented in the subclass",
) # pragma: no cover
33 changes: 33 additions & 0 deletions nautilus_trader/adapters/binance/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,39 @@ async def _request_bars( # (too complex)
partial: Bar = bars.pop()
self._handle_bars(bar_type, bars, partial, correlation_id)

async def _request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
if limit not in [5, 10, 20, 50, 100, 500, 1000]:
self._log.error(
"Cannot get order book snapshots: "
f"invalid `limit`, was {limit}. "
"Valid limits are 5, 10, 20, 50, 100, 500 or 1000",
)
return
else:
snapshot: OrderBookDeltas = await self._http_market.request_order_book_snapshot(
instrument_id=instrument_id,
limit=limit,
ts_init=self._clock.timestamp_ns(),
)

data_type = DataType(
OrderBookDeltas,
metadata={
"instrument_id": instrument_id,
"limit": limit,
},
)
self._handle_data_response(
data_type=data_type,
data=snapshot,
correlation_id=correlation_id,
)

async def _aggregate_internal_from_minute_bars(
self,
bar_type: BarType,
Expand Down
12 changes: 12 additions & 0 deletions nautilus_trader/backtest/data_client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,15 @@ cdef class BacktestMarketDataClient(MarketDataClient):
Condition.not_none(correlation_id, "correlation_id")

# Do nothing else for backtest

cpdef void request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
UUID4 correlation_id,
):
Condition.not_none(instrument_id, "instrument_id")
Condition.not_negative_int(limit, "limit")
Condition.not_none(correlation_id, "correlation_id")

# Do nothing else for backtest
8 changes: 8 additions & 0 deletions nautilus_trader/common/actor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ cdef class Actor(Component):
ClientId client_id=*,
callback=*,
)

cpdef UUID4 request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
ClientId client_id=*,
callback=*,
)
cpdef bint is_pending_request(self, UUID4 request_id)
cpdef bint has_pending_requests(self)
cpdef set pending_requests(self)
Expand Down
63 changes: 63 additions & 0 deletions nautilus_trader/common/actor.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,69 @@ cdef class Actor(Component):

return request_id


cpdef UUID4 request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
ClientId client_id=None,
callback: Callable[[UUID4], None] | None=None
):
"""
Request an order book snapshot.

Parameters
----------
instrument_id : InstrumentId
The instrument ID for the order book snapshot request.
limit : int, optional
The limit on the depth of the order book snapshot (default is None).
client_id : ClientId, optional
The specific client ID for the command.
If None, it will be inferred from the venue in the instrument ID.
callback : Callable[[UUID4], None], optional
The registered callback, to be called with the request ID when the response has completed processing.

Returns
-------
UUID4
The request_id for the request.

Raises
------
ValueError
If the instrument_id is None.
TypeError
If callback is not None and not of type Callable.
"""
# Preconditions and validations
Condition.true(self.trader_id is not None, "The actor has not been registered")
Condition.not_none(instrument_id, "instrument_id")
Condition.callable_or_none(callback, "callback")

# Generate a unique request ID
cdef UUID4 request_id = UUID4()

# Create the data request
cdef DataRequest request = DataRequest(
client_id=client_id,
venue=instrument_id.venue,
data_type=DataType(OrderBookDeltas, metadata={
"instrument_id": instrument_id,
"limit": limit,
}),
callback=self._handle_data_response,
request_id=request_id,
ts_init=self._clock.timestamp_ns(),
)

# Store pending request and send the data request
self._pending_requests[request_id] = callback
self._send_data_req(request)

return request_id


cpdef bint is_pending_request(self, UUID4 request_id):
"""
Return whether the request for the given identifier is pending processing.
Expand Down
7 changes: 7 additions & 0 deletions nautilus_trader/data/client.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ cdef class MarketDataClient(DataClient):
datetime end=*,
)

cpdef void request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
UUID4 correlation_id
)

# -- DATA HANDLERS --------------------------------------------------------------------------------

cpdef void _handle_instrument(self, Instrument instrument, UUID4 correlation_id)
Expand Down
23 changes: 23 additions & 0 deletions nautilus_trader/data/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,29 @@ cdef class MarketDataClient(DataClient):
f"You can implement by overriding the `request_bars` method for this client", # pragma: no cover # noqa
)

cpdef void request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
UUID4 correlation_id,
):
"""
Request order book snapshot data.

Parameters
----------
instrument_id : InstrumentId
The instrument ID for the order book snapshot request.
limit : int
The limit on the depth of the order book snapshot.
correction_id : UUID4
The correlation ID for the request.
"""
self._log.error(
f"Cannot request order book snapshot data for {instrument_id}: not implemented. "
"You can implement by overriding the `request_order_book_snapshot` method for this client."
)

# -- PYTHON WRAPPERS ------------------------------------------------------------------------------

# Convenient Python wrappers for the data handlers. Often Python methods
Expand Down
7 changes: 7 additions & 0 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,13 @@ cdef class DataEngine(Component):
request.data_type.metadata.get("start"),
request.data_type.metadata.get("end"),
)
elif request.data_type.type == OrderBookDeltas:
Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient")
client.request_order_book_snapshot(
request.data_type.metadata.get("instrument_id"),
request.data_type.metadata.get("limit", 0),
request.id
)
else:
try:
client.request(request.data_type, request.id)
Expand Down
27 changes: 27 additions & 0 deletions nautilus_trader/live/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,23 @@ def request_bars(
log_msg=f"request: bars {bar_type}",
)

def request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
limit_str = f" limit={limit}" if limit else ""
self._log.info(f"Request {instrument_id} order_book_snapshot{limit_str}", LogColor.BLUE)
self.create_task(
self._request_order_book_snapshot(
instrument_id=instrument_id,
limit=limit,
correlation_id=correlation_id,
),
log_msg=f"request: order_book_snapshot {instrument_id}",
)

############################################################################
# Coroutines to implement
############################################################################
Expand Down Expand Up @@ -949,3 +966,13 @@ async def _request_bars(
raise NotImplementedError( # pragma: no cover
"implement the `_request_bars` coroutine", # pragma: no cover
)

async def _request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
raise NotImplementedError(
"implement the `_request_order_book_snapshot` coroutine", # pragma: no cover
) # pra
31 changes: 31 additions & 0 deletions tests/unit_tests/common/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
Expand Down Expand Up @@ -2150,3 +2151,33 @@ def test_request_bars_with_invalid_params_raises_value_error(self, start, stop):
# Act, Assert
with pytest.raises(ValueError):
actor.request_bars(bar_type, start, stop)

def test_request_order_book_snapshots_sends_request_to_data_engine(self) -> None:
# Arrange
handler: list[OrderBookDeltas] = []
actor = MockActor()
actor.register_base(
portfolio=self.portfolio,
msgbus=self.msgbus,
cache=self.cache,
clock=self.clock,
)
data_type = DataType(
OrderBookDeltas,
metadata={
"instrument_id": AUDUSD_SIM.id,
"limit": 5,
},
)
# Act
request_id = actor.request_data(
data_type,
ClientId("BLOOMBERG-01"),
callback=handler.append,
)

# Assert
assert self.data_engine.request_count == 1
assert actor.has_pending_requests()
assert actor.is_pending_request(request_id)
assert request_id in actor.pending_requests()
32 changes: 32 additions & 0 deletions tests/unit_tests/data/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,38 @@ def test_request_instruments_for_venue_when_catalog_registered(self):
assert len(handler) == 1
assert len(handler[0].data) == 1

def test_request_order_book_snapshot_reaches_client(self):
# Arrange
self.data_engine.register_client(self.binance_client)

deltas = OrderBookDeltas(
instrument_id=ETHUSDT_BINANCE.id,
deltas=[TestDataStubs.order_book_delta(instrument_id=ETHUSDT_BINANCE.id)],
)

self.data_engine.process(deltas)

handler = []
request = DataRequest(
client_id=None,
venue=BINANCE,
data_type=DataType(
OrderBookDeltas,
metadata={
"instrument_id": ETHUSDT_BINANCE.id,
"limit": 10,
},
),
callback=handler.append,
request_id=UUID4(),
ts_init=self.clock.timestamp_ns(),
)
# Act
self.msgbus.request(endpoint="DataEngine.request", request=request)
# Assert
assert self.data_engine.request_count == 1
assert len(handler) == 0

# TODO: Implement with new Rust datafusion backend"
# def test_request_quote_ticks_when_catalog_registered_using_rust(self) -> None:
# # Arrange
Expand Down