Skip to content

Commit

Permalink
Add request_order_book_snapshot method (#1745)
Browse files Browse the repository at this point in the history
  • Loading branch information
graceyangfan authored Jun 28, 2024
1 parent df2d816 commit 2202657
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 0 deletions.
10 changes: 10 additions & 0 deletions nautilus_trader/adapters/_template/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,13 @@ async def _request_bars(
raise NotImplementedError(
"method `_request_bars` must be implemented in the subclass",
) # pragma: no cover

async def _request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
raise NotImplementedError(
"method `_request_order_book_snapshot(` must be implemented in the subclass",
) # pragma: no cover
33 changes: 33 additions & 0 deletions nautilus_trader/adapters/binance/common/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,39 @@ async def _request_bars( # (too complex)
partial: Bar = bars.pop()
self._handle_bars(bar_type, bars, partial, correlation_id)

async def _request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
if limit not in [5, 10, 20, 50, 100, 500, 1000]:
self._log.error(
"Cannot get order book snapshots: "
f"invalid `limit`, was {limit}. "
"Valid limits are 5, 10, 20, 50, 100, 500 or 1000",
)
return
else:
snapshot: OrderBookDeltas = await self._http_market.request_order_book_snapshot(
instrument_id=instrument_id,
limit=limit,
ts_init=self._clock.timestamp_ns(),
)

data_type = DataType(
OrderBookDeltas,
metadata={
"instrument_id": instrument_id,
"limit": limit,
},
)
self._handle_data_response(
data_type=data_type,
data=snapshot,
correlation_id=correlation_id,
)

async def _aggregate_internal_from_minute_bars(
self,
bar_type: BarType,
Expand Down
12 changes: 12 additions & 0 deletions nautilus_trader/backtest/data_client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,15 @@ cdef class BacktestMarketDataClient(MarketDataClient):
Condition.not_none(correlation_id, "correlation_id")

# Do nothing else for backtest

cpdef void request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
UUID4 correlation_id,
):
Condition.not_none(instrument_id, "instrument_id")
Condition.not_negative_int(limit, "limit")
Condition.not_none(correlation_id, "correlation_id")

# Do nothing else for backtest
8 changes: 8 additions & 0 deletions nautilus_trader/common/actor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ cdef class Actor(Component):
ClientId client_id=*,
callback=*,
)

cpdef UUID4 request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
ClientId client_id=*,
callback=*,
)
cpdef bint is_pending_request(self, UUID4 request_id)
cpdef bint has_pending_requests(self)
cpdef set pending_requests(self)
Expand Down
63 changes: 63 additions & 0 deletions nautilus_trader/common/actor.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,69 @@ cdef class Actor(Component):

return request_id


cpdef UUID4 request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
ClientId client_id=None,
callback: Callable[[UUID4], None] | None=None
):
"""
Request an order book snapshot.
Parameters
----------
instrument_id : InstrumentId
The instrument ID for the order book snapshot request.
limit : int, optional
The limit on the depth of the order book snapshot (default is None).
client_id : ClientId, optional
The specific client ID for the command.
If None, it will be inferred from the venue in the instrument ID.
callback : Callable[[UUID4], None], optional
The registered callback, to be called with the request ID when the response has completed processing.
Returns
-------
UUID4
The request_id for the request.
Raises
------
ValueError
If the instrument_id is None.
TypeError
If callback is not None and not of type Callable.
"""
# Preconditions and validations
Condition.true(self.trader_id is not None, "The actor has not been registered")
Condition.not_none(instrument_id, "instrument_id")
Condition.callable_or_none(callback, "callback")

# Generate a unique request ID
cdef UUID4 request_id = UUID4()

# Create the data request
cdef DataRequest request = DataRequest(
client_id=client_id,
venue=instrument_id.venue,
data_type=DataType(OrderBookDeltas, metadata={
"instrument_id": instrument_id,
"limit": limit,
}),
callback=self._handle_data_response,
request_id=request_id,
ts_init=self._clock.timestamp_ns(),
)

# Store pending request and send the data request
self._pending_requests[request_id] = callback
self._send_data_req(request)

return request_id


cpdef bint is_pending_request(self, UUID4 request_id):
"""
Return whether the request for the given identifier is pending processing.
Expand Down
7 changes: 7 additions & 0 deletions nautilus_trader/data/client.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ cdef class MarketDataClient(DataClient):
datetime end=*,
)

cpdef void request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
UUID4 correlation_id
)

# -- DATA HANDLERS --------------------------------------------------------------------------------

cpdef void _handle_instrument(self, Instrument instrument, UUID4 correlation_id)
Expand Down
23 changes: 23 additions & 0 deletions nautilus_trader/data/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,29 @@ cdef class MarketDataClient(DataClient):
f"You can implement by overriding the `request_bars` method for this client", # pragma: no cover # noqa
)

cpdef void request_order_book_snapshot(
self,
InstrumentId instrument_id,
int limit,
UUID4 correlation_id,
):
"""
Request order book snapshot data.
Parameters
----------
instrument_id : InstrumentId
The instrument ID for the order book snapshot request.
limit : int
The limit on the depth of the order book snapshot.
correction_id : UUID4
The correlation ID for the request.
"""
self._log.error(
f"Cannot request order book snapshot data for {instrument_id}: not implemented. "
"You can implement by overriding the `request_order_book_snapshot` method for this client."
)

# -- PYTHON WRAPPERS ------------------------------------------------------------------------------

# Convenient Python wrappers for the data handlers. Often Python methods
Expand Down
7 changes: 7 additions & 0 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,13 @@ cdef class DataEngine(Component):
request.data_type.metadata.get("start"),
request.data_type.metadata.get("end"),
)
elif request.data_type.type == OrderBookDeltas:
Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient")
client.request_order_book_snapshot(
request.data_type.metadata.get("instrument_id"),
request.data_type.metadata.get("limit", 0),
request.id
)
else:
try:
client.request(request.data_type, request.id)
Expand Down
27 changes: 27 additions & 0 deletions nautilus_trader/live/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,23 @@ def request_bars(
log_msg=f"request: bars {bar_type}",
)

def request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
limit_str = f" limit={limit}" if limit else ""
self._log.info(f"Request {instrument_id} order_book_snapshot{limit_str}", LogColor.BLUE)
self.create_task(
self._request_order_book_snapshot(
instrument_id=instrument_id,
limit=limit,
correlation_id=correlation_id,
),
log_msg=f"request: order_book_snapshot {instrument_id}",
)

############################################################################
# Coroutines to implement
############################################################################
Expand Down Expand Up @@ -949,3 +966,13 @@ async def _request_bars(
raise NotImplementedError( # pragma: no cover
"implement the `_request_bars` coroutine", # pragma: no cover
)

async def _request_order_book_snapshot(
self,
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
) -> None:
raise NotImplementedError(
"implement the `_request_order_book_snapshot` coroutine", # pragma: no cover
) # pra
31 changes: 31 additions & 0 deletions tests/unit_tests/common/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import DataType
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
Expand Down Expand Up @@ -2150,3 +2151,33 @@ def test_request_bars_with_invalid_params_raises_value_error(self, start, stop):
# Act, Assert
with pytest.raises(ValueError):
actor.request_bars(bar_type, start, stop)

def test_request_order_book_snapshots_sends_request_to_data_engine(self) -> None:
# Arrange
handler: list[OrderBookDeltas] = []
actor = MockActor()
actor.register_base(
portfolio=self.portfolio,
msgbus=self.msgbus,
cache=self.cache,
clock=self.clock,
)
data_type = DataType(
OrderBookDeltas,
metadata={
"instrument_id": AUDUSD_SIM.id,
"limit": 5,
},
)
# Act
request_id = actor.request_data(
data_type,
ClientId("BLOOMBERG-01"),
callback=handler.append,
)

# Assert
assert self.data_engine.request_count == 1
assert actor.has_pending_requests()
assert actor.is_pending_request(request_id)
assert request_id in actor.pending_requests()
32 changes: 32 additions & 0 deletions tests/unit_tests/data/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,38 @@ def test_request_instruments_for_venue_when_catalog_registered(self):
assert len(handler) == 1
assert len(handler[0].data) == 1

def test_request_order_book_snapshot_reaches_client(self):
# Arrange
self.data_engine.register_client(self.binance_client)

deltas = OrderBookDeltas(
instrument_id=ETHUSDT_BINANCE.id,
deltas=[TestDataStubs.order_book_delta(instrument_id=ETHUSDT_BINANCE.id)],
)

self.data_engine.process(deltas)

handler = []
request = DataRequest(
client_id=None,
venue=BINANCE,
data_type=DataType(
OrderBookDeltas,
metadata={
"instrument_id": ETHUSDT_BINANCE.id,
"limit": 10,
},
),
callback=handler.append,
request_id=UUID4(),
ts_init=self.clock.timestamp_ns(),
)
# Act
self.msgbus.request(endpoint="DataEngine.request", request=request)
# Assert
assert self.data_engine.request_count == 1
assert len(handler) == 0

# TODO: Implement with new Rust datafusion backend"
# def test_request_quote_ticks_when_catalog_registered_using_rust(self) -> None:
# # Arrange
Expand Down

0 comments on commit 2202657

Please sign in to comment.