diff --git a/config/brokers.toml b/config/brokers.toml index 7205d82ce..675af9409 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -1,10 +1,16 @@ -[questrade] -refresh_token = '' -access_token = '' -api_server = 'https://api06.iq.questrade.com/' -expires_in = 1800 -token_type = 'Bearer' -expires_at = 1616095326.355846 +################ +# ---- CEXY ---- +################ +[binance] +accounts.usdtm = 'futes' +futes.use_testnet = true +futes.api_key = '' +futes.api_secret = '' + +accounts.spot = 'spot' +spot.use_testnet = true +spot.api_key = '' +spot.api_secret = '' [deribit] @@ -24,6 +30,18 @@ key_secret = '' key_passphrase = '' +################ +# -- BROKERZ --- +################ +[questrade] +refresh_token = '' +access_token = '' +api_server = 'https://api06.iq.questrade.com/' +expires_in = 1800 +token_type = 'Bearer' +expires_at = 1616095326.355846 + + [ib] hosts = [ '127.0.0.1', diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index 778bdd4ee..8f55217c5 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -40,6 +40,7 @@ MktPair, Symbol, unpack_fqme, + _derivs as DerivTypes, ) from ._allocate import ( mk_allocator, @@ -65,6 +66,7 @@ 'open_pps', 'open_trade_ledger', 'unpack_fqme', + 'DerivTypes', ] diff --git a/piker/accounting/_ledger.py b/piker/accounting/_ledger.py index 5107f2bbc..04ee04b7e 100644 --- a/piker/accounting/_ledger.py +++ b/piker/accounting/_ledger.py @@ -123,6 +123,11 @@ def update_from_t( self, t: Transaction, ) -> None: + ''' + Given an input `Transaction`, cast to `dict` and update + from it's transaction id. + + ''' self.data[t.tid] = t.to_dict() def iter_trans( @@ -259,6 +264,45 @@ def dyn_parse_to_dt( yield tid, data +def load_ledger( + brokername: str, + acctid: str, + +) -> tuple[dict, Path]: + ''' + Load a ledger (TOML) file from user's config directory: + $CONFIG_DIR/accounting/ledgers/trades__.toml + + Return its `dict`-content and file path. + + ''' + import time + try: + import tomllib + except ModuleNotFoundError: + import tomli as tomllib + + ldir: Path = config._config_dir / 'accounting' / 'ledgers' + if not ldir.is_dir(): + ldir.mkdir() + + fname = f'trades_{brokername}_{acctid}.toml' + fpath: Path = ldir / fname + + if not fpath.is_file(): + log.info( + f'Creating new local trades ledger: {fpath}' + ) + fpath.touch() + + with fpath.open(mode='rb') as cf: + start = time.time() + ledger_dict = tomllib.load(cf) + log.debug(f'Ledger load took {time.time() - start}s') + + return ledger_dict, fpath + + @cm def open_trade_ledger( broker: str, @@ -267,7 +311,7 @@ def open_trade_ledger( # default is to sort by detected datetime-ish field tx_sort: Callable = iter_by_dt, -) -> Generator[dict, None, None]: +) -> Generator[TransactionLedger, None, None]: ''' Indempotently create and read in a trade log file from the ``/ledgers/`` directory. @@ -277,7 +321,7 @@ def open_trade_ledger( name as defined in the user's ``brokers.toml`` config. ''' - ledger_dict, fpath = config.load_ledger(broker, account) + ledger_dict, fpath = load_ledger(broker, account) cpy = ledger_dict.copy() ledger = TransactionLedger( ledger_dict=cpy, diff --git a/piker/accounting/_mktinfo.py b/piker/accounting/_mktinfo.py index 2d2ebccdc..c1f14f9fb 100644 --- a/piker/accounting/_mktinfo.py +++ b/piker/accounting/_mktinfo.py @@ -39,6 +39,7 @@ from ..data.types import Struct +# TODO: make these literals.. _underlyings: list[str] = [ 'stock', 'bond', @@ -47,6 +48,10 @@ 'commodity', ] +_crypto_derivs: list[str] = [ + 'perpetual_future', + 'crypto_future', +] _derivs: list[str] = [ 'swap', @@ -66,6 +71,8 @@ _underlyings + _derivs + + + _crypto_derivs ] # egs. stock, futer, option, bond etc. @@ -207,6 +214,33 @@ class MktPair(Struct, frozen=True): /.... -> .. ^ -- optional tokens ------------------------------- ^ + + Notes: + ------ + + Some venues provide a different semantic (which we frankly find + confusing and non-general) such as "base" and "quote" asset. + For example this is how `binance` defines the terms: + + https://binance-docs.github.io/apidocs/websocket_api/en/#public-api-definitions + https://binance-docs.github.io/apidocs/futures/en/#public-endpoints-info + + - *base* asset refers to the asset that is the *quantity* of a symbol. + - *quote* asset refers to the asset that is the *price* of a symbol. + + In other words the "quote" asset is the asset that the market + is pricing "buys" *in*, and the *base* asset it the one that the market + allows you to "buy" an *amount of*. Put more simply the *quote* + asset is our "source" asset and the *base* asset is our "destination" + asset. + + This defintion can be further understood reading our + `.brokers.binance.api.Pair` type wherein the + `Pair.[quote/base]AssetPrecision` field determines the (transfer) + transaction precision available per asset; i.e. the satoshis + unit in bitcoin for representing the minimum size of a + transaction that can take place on the blockchain. + ''' dst: str | Asset # "destination asset" (name) used to buy *to* @@ -513,10 +547,15 @@ def quantize( # TODO: BACKWARD COMPAT, TO REMOVE? @property def type_key(self) -> str: + + # if set explicitly then use it! + if self._atype: + return self._atype + if isinstance(self.dst, Asset): return str(self.dst.atype) - return self._atype + return 'unknown' @property def price_tick_digits(self) -> int: diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 3af0eeef2..f50040cbc 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -42,6 +42,7 @@ Transaction, iter_by_dt, open_trade_ledger, + TransactionLedger, ) from ._mktinfo import ( MktPair, @@ -49,7 +50,6 @@ unpack_fqme, ) from .. import config -from ..brokers import get_brokermod from ..clearing._messages import ( BrokerdPosition, Status, @@ -327,7 +327,8 @@ def calc_ppu( entry: dict[str, Any] for (tid, entry) in self.iter_clears(): clear_size = entry['size'] - clear_price = entry['price'] + clear_price: str | float = entry['price'] + is_clear: bool = not isinstance(clear_price, str) last_accum_size = asize_h[-1] if asize_h else 0 accum_size = last_accum_size + clear_size @@ -340,9 +341,18 @@ def calc_ppu( asize_h.append(0) continue - if accum_size == 0: - ppu_h.append(0) - asize_h.append(0) + # on transfers we normally write some non-valid + # price since withdrawal to another account/wallet + # has nothing to do with inter-asset-market prices. + # TODO: this should be better handled via a `type: 'tx'` + # field as per existing issue surrounding all this: + # https://github.com/pikers/piker/issues/510 + if isinstance(clear_price, str): + # TODO: we can't necessarily have this commit to + # the overall pos size since we also need to + # include other positions contributions to this + # balance or we might end up with a -ve balance for + # the position.. continue # test if the pp somehow went "passed" a net zero size state @@ -375,7 +385,10 @@ def calc_ppu( # abs_clear_size = abs(clear_size) abs_new_size = abs(accum_size) - if abs_diff > 0: + if ( + abs_diff > 0 + and is_clear + ): cost_basis = ( # cost basis for this clear @@ -397,6 +410,12 @@ def calc_ppu( asize_h.append(accum_size) else: + # TODO: for PPU we should probably handle txs out + # (aka withdrawals) similarly by simply not having + # them contrib to the running PPU calc and only + # when the next entry clear comes in (which will + # then have a higher weighting on the PPU). + # on "exit" clears from a given direction, # only the size changes not the price-per-unit # need to be updated since the ppu remains constant @@ -734,48 +753,63 @@ def write_config(self) -> None: ) -def load_pps_from_ledger( - +def load_account( brokername: str, - acctname: str, - - # post normalization filter on ledger entries to be processed - filter_by: list[dict] | None = None, + acctid: str, -) -> tuple[ - dict[str, Transaction], - dict[str, Position], -]: +) -> tuple[dict, Path]: ''' - Open a ledger file by broker name and account and read in and - process any trade records into our normalized ``Transaction`` form - and then update the equivalent ``Pptable`` and deliver the two - bs_mktid-mapped dict-sets of the transactions and pps. + Load a accounting (with positions) file from + $CONFIG_DIR/accounting/account...toml + + Where normally $CONFIG_DIR = ~/.config/piker/ + and we implicitly create a accounting subdir which should + normally be linked to a git repo managed by the user B) ''' - with ( - open_trade_ledger(brokername, acctname) as ledger, - open_pps(brokername, acctname) as table, - ): - if not ledger: - # null case, no ledger file with content - return {} + legacy_fn: str = f'pps.{brokername}.{acctid}.toml' + fn: str = f'account.{brokername}.{acctid}.toml' - mod = get_brokermod(brokername) - src_records: dict[str, Transaction] = mod.norm_trade_records(ledger) + dirpath: Path = config._config_dir / 'accounting' + if not dirpath.is_dir(): + dirpath.mkdir() - if filter_by: - records = {} - bs_mktids = set(filter_by) - for tid, r in src_records.items(): - if r.bs_mktid in bs_mktids: - records[tid] = r - else: - records = src_records + conf, path = config.load( + path=dirpath / fn, + decode=tomlkit.parse, + touch_if_dne=True, + ) - updated = table.update_from_trans(records) + if not conf: + legacypath = dirpath / legacy_fn + log.warning( + f'Your account file is using the legacy `pps.` prefix..\n' + f'Rewriting contents to new name -> {path}\n' + 'Please delete the old file!\n' + f'|-> {legacypath}\n' + ) + if legacypath.is_file(): + legacy_config, _ = config.load( + path=legacypath, + + # TODO: move to tomlkit: + # - needs to be fixed to support bidict? + # https://github.com/sdispater/tomlkit/issues/289 + # - we need to use or fork's fix to do multiline array + # indenting. + decode=tomlkit.parse, + ) + conf.update(legacy_config) + + # XXX: override the presumably previously non-existant + # file with legacy's contents. + config.write( + conf, + path=path, + fail_empty=False, + ) - return records, updated + return conf, path @cm @@ -792,7 +826,7 @@ def open_pps( ''' conf: dict conf_path: Path - conf, conf_path = config.load_account(brokername, acctid) + conf, conf_path = load_account(brokername, acctid) if brokername in conf: log.warning( @@ -927,3 +961,56 @@ def open_pps( finally: if write_on_exit: table.write_config() + + +def load_pps_from_ledger( + + brokername: str, + acctname: str, + + # post normalization filter on ledger entries to be processed + filter_by_ids: list[str] | None = None, + +) -> tuple[ + dict[str, Transaction], + PpTable, +]: + ''' + Open a ledger file by broker name and account and read in and + process any trade records into our normalized ``Transaction`` form + and then update the equivalent ``Pptable`` and deliver the two + bs_mktid-mapped dict-sets of the transactions and pps. + + ''' + ledger: TransactionLedger + table: PpTable + with ( + open_trade_ledger(brokername, acctname) as ledger, + open_pps(brokername, acctname) as table, + ): + if not ledger: + # null case, no ledger file with content + return {} + + from ..brokers import get_brokermod + mod = get_brokermod(brokername) + src_records: dict[str, Transaction] = mod.norm_trade_records( + ledger + ) + + if not filter_by_ids: + # records = src_records + records = ledger + + else: + records = {} + bs_mktids = set(map(str, filter_by_ids)) + + # for tid, recdict in ledger.items(): + for tid, r in src_records.items(): + if r.bs_mktid in bs_mktids: + records[tid] = r.to_dict() + + # updated = table.update_from_trans(records) + + return records, table diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 0b18a3eb0..c184614ce 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -18,11 +18,7 @@ CLI front end for trades ledger and position tracking management. ''' -from typing import ( - AsyncContextManager, -) -from types import ModuleType - +from __future__ import annotations from rich.console import Console from rich.markdown import Markdown import tractor @@ -34,65 +30,41 @@ open_piker_runtime, ) from ..clearing._messages import BrokerdPosition -from ..config import load_ledger from ..calc import humanize +from ..brokers._daemon import broker_init +from ._ledger import ( + load_ledger, + # open_trade_ledger, + TransactionLedger, +) +from ._pos import ( + PpTable, + load_pps_from_ledger, + # load_account, +) ledger = typer.Typer() -def broker_init( - brokername: str, - loglevel: str | None = None, - - **start_actor_kwargs, - -) -> tuple[ - ModuleType, - dict, - AsyncContextManager, -]: - ''' - Given an input broker name, load all named arguments - which can be passed to a daemon + context spawn for - the relevant `brokerd` service endpoint. - - ''' - from ..brokers import get_brokermod - brokermod = get_brokermod(brokername) - modpath = brokermod.__name__ - - start_actor_kwargs['name'] = f'brokerd.{brokername}' - start_actor_kwargs.update( - getattr( - brokermod, - '_spawn_kwargs', - {}, - ) - ) - - # lookup actor-enabled modules declared by the backend offering the - # `brokerd` endpoint(s). - enabled = start_actor_kwargs['enable_modules'] = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - enabled.append(subpath) - - # TODO XXX: DO WE NEED THIS? - # enabled.append('piker.data.feed') - - # non-blocking setup of brokerd service nursery - from ..brokers._daemon import _setup_persistent_brokerd - - return ( - brokermod, - start_actor_kwargs, # to `ActorNursery.start_actor()` - _setup_persistent_brokerd, # deamon service task ep - ) +def unpack_fqan( + fully_qualified_account_name: str, + console: Console | None = None, +) -> tuple | bool: + try: + brokername, account = fully_qualified_account_name.split('.') + return brokername, account + except ValueError: + if console is not None: + md = Markdown( + f'=> `{fully_qualified_account_name}` <=\n\n' + 'is not a valid ' + '__fully qualified account name?__\n\n' + 'Your account name needs to be of the form ' + '`.`\n' + ) + console.print(md) + return False @ledger.command() @@ -108,19 +80,15 @@ def sync( log = get_logger(loglevel) console = Console() - try: - brokername, account = fully_qualified_account_name.split('.') - except ValueError: - md = Markdown( - f'=> `{fully_qualified_account_name}` <=\n\n' - 'is not a valid ' - '__fully qualified account name?__\n\n' - 'Your account name needs to be of the form ' - '`.`\n' - ) - console.print(md) + pair: tuple[str, str] + if not (pair := unpack_fqan( + fully_qualified_account_name, + console, + )): return + brokername, account = pair + brokermod, start_kwargs, deamon_ep = broker_init( brokername, loglevel=loglevel, @@ -155,18 +123,30 @@ async def main(): ) brokerd_stream: tractor.MsgStream - async with open_brokerd_dialog( - brokermod, - portal, - exec_mode=( - 'paper' if account == 'paper' - else 'live' + async with ( + # engage the brokerd daemon context + portal.open_context( + deamon_ep, + brokername=brokername, + loglevel=loglevel, + ), + + # manually open the brokerd trade dialog EP + # (what the EMS normally does internall) B) + open_brokerd_dialog( + brokermod, + portal, + exec_mode=( + 'paper' + if account == 'paper' + else 'live' + ), + loglevel=loglevel, + ) as ( + brokerd_stream, + pp_msg_table, + accounts, ), - loglevel=loglevel, - ) as ( - brokerd_stream, - pp_msg_table, - accounts, ): try: assert len(accounts) == 1 @@ -253,5 +233,42 @@ async def main(): trio.run(main) +@ledger.command() +def disect( + # "fully_qualified_account_name" + fqan: str, + bs_mktid: int, # for ib + pdb: bool = False, + + loglevel: str = typer.Option( + 'error', + "-l", + ), +): + pair: tuple[str, str] + if not (pair := unpack_fqan(fqan)): + raise ValueError('{fqan} malformed!?') + + brokername, account = pair + + ledger: TransactionLedger + table: PpTable + records, table = load_pps_from_ledger( + brokername, + account, + # filter_by_id = {568549458}, + filter_by_ids={bs_mktid}, + ) + breakpoint() + # tractor.pause_from_sync() + # with open_trade_ledger( + # brokername, + # account, + # ) as ledger: + # for tid, rec in ledger.items(): + # bs_mktid: str = rec['bs_mktid'] + + + if __name__ == "__main__": ledger() # this is called from ``>> ledger `` diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 986905e2c..87a0446ad 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -33,6 +33,7 @@ DataUnavailable, DataThrottle, resproc, + get_logger, ) __all__: list[str] = [ @@ -42,6 +43,7 @@ 'DataUnavailable', 'DataThrottle', 'resproc', + 'get_logger', ] __brokers__: list[str] = [ diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 368e81164..ecb785f7b 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,7 +23,11 @@ from contextlib import ( asynccontextmanager as acm, ) -from typing import TYPE_CHECKING +from types import ModuleType +from typing import ( + TYPE_CHECKING, + AsyncContextManager, +) import exceptiongroup as eg import tractor @@ -39,7 +43,7 @@ # TODO: move this def to the `.data` subpkg.. # NOTE: keeping this list as small as possible is part of our caps-sec # model and should be treated with utmost care! -_data_mods = [ +_data_mods: str = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.brokers._daemon', @@ -72,9 +76,13 @@ async def _setup_persistent_brokerd( loglevel or tractor.current_actor().loglevel, name=f'{_util.subsys}.{brokername}', ) + # set global for this actor to this new process-wide instance B) _util.log = log + # further, set the log level on any broker broker specific + # logger instance. + from piker.data import feed assert not feed._bus @@ -111,6 +119,79 @@ async def _setup_persistent_brokerd( raise +def broker_init( + brokername: str, + loglevel: str | None = None, + + **start_actor_kwargs, + +) -> tuple[ + ModuleType, + dict, + AsyncContextManager, +]: + ''' + Given an input broker name, load all named arguments + which can be passed for daemon endpoint + context spawn + as required in every `brokerd` (actor) service. + + This includes: + - load the appropriate .py pkg module, + - reads any declared `__enable_modules__: listr[str]` which will be + passed to `tractor.ActorNursery.start_actor(enabled_modules=)` + at actor start time, + - deliver a references to the daemon lifetime fixture, which + for now is always the `_setup_persistent_brokerd()` context defined + above. + + ''' + from ..brokers import get_brokermod + brokermod = get_brokermod(brokername) + modpath: str = brokermod.__name__ + + start_actor_kwargs['name'] = f'brokerd.{brokername}' + start_actor_kwargs.update( + getattr( + brokermod, + '_spawn_kwargs', + {}, + ) + ) + + # XXX TODO: make this not so hacky/monkeypatched.. + # -> we need a sane way to configure the logging level for all + # code running in brokerd. + # if utilmod := getattr(brokermod, '_util', False): + # utilmod.log.setLevel(loglevel.upper()) + + # lookup actor-enabled modules declared by the backend offering the + # `brokerd` endpoint(s). + enabled: list[str] + enabled = start_actor_kwargs['enable_modules'] = [ + __name__, # so that eps from THIS mod can be invoked + modpath, + ] + for submodname in getattr( + brokermod, + '__enable_modules__', + [], + ): + subpath: str = f'{modpath}.{submodname}' + enabled.append(subpath) + + # TODO XXX: DO WE NEED THIS? + # enabled.append('piker.data.feed') + + return ( + brokermod, + start_actor_kwargs, # to `ActorNursery.start_actor()` + + # XXX see impl above; contains all (actor global) + # setup/teardown expected in all `brokerd` actor instances. + _setup_persistent_brokerd, + ) + + async def spawn_brokerd( brokername: str, @@ -120,44 +201,44 @@ async def spawn_brokerd( ) -> bool: - from piker.service import Services from piker.service._util import log # use service mngr log - log.info(f'Spawning {brokername} broker daemon') - brokermod = get_brokermod(brokername) - dname = f'brokerd.{brokername}' + ( + brokermode, + tractor_kwargs, + daemon_fixture_ep, + ) = broker_init( + brokername, + loglevel, + **tractor_kwargs, + ) + brokermod = get_brokermod(brokername) extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - modpath = brokermod.__name__ - broker_enable = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - broker_enable.append(subpath) + from piker.service import Services + dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' portal = await Services.actor_n.start_actor( dname, - enable_modules=_data_mods + broker_enable, - loglevel=loglevel, + enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), debug_mode=Services.debug_mode, **tractor_kwargs ) - # non-blocking setup of brokerd service nursery + # NOTE: the service mngr expects an already spawned actor + its + # portal ref in order to do non-blocking setup of brokerd + # service nursery. await Services.start_service_task( dname, portal, # signature of target root-task endpoint - _setup_persistent_brokerd, + daemon_fixture_ep, brokername=brokername, loglevel=loglevel, ) @@ -174,8 +255,11 @@ async def maybe_spawn_brokerd( ) -> tractor.Portal: ''' - Helper to spawn a brokerd service *from* a client - who wishes to use the sub-actor-daemon. + Helper to spawn a brokerd service *from* a client who wishes to + use the sub-actor-daemon but is fine with re-using any existing + and contactable `brokerd`. + + Mas o menos, acts as a cached-actor-getter factory. ''' from piker.service import maybe_spawn_daemon diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 7e7a3ec7d..30e36f2e1 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -32,6 +32,8 @@ subsys: str = 'piker.brokers' # NOTE: level should be reset by any actor that is spawned +# as well as given a (more) explicit name/key such +# as `piker.brokers.binance` matching the subpkg. log = get_logger(subsys) get_console_log = partial( diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py deleted file mode 100644 index a8791ae9f..000000000 --- a/piker/brokers/binance.py +++ /dev/null @@ -1,687 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) -# Guillermo Rodriguez -# Tyler Goodlet -# (in stewardship for pikers) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -Binance backend - -""" -from contextlib import ( - asynccontextmanager as acm, - aclosing, -) -from datetime import datetime -from decimal import Decimal -import itertools -from typing import ( - Any, Union, Optional, - AsyncGenerator, Callable, -) -import time - -import trio -from trio_typing import TaskStatus -import pendulum -import asks -from fuzzywuzzy import process as fuzzy -import numpy as np -import tractor - -from .._cacheables import async_lifo_cache -from ..accounting._mktinfo import ( - Asset, - MktPair, - digits_to_dec, -) -from . import ( - resproc, - SymbolNotFound, - DataUnavailable, - open_cached_client, -) -from ._util import ( - get_logger, - get_console_log, -) -from piker.data.types import Struct -from piker.data.validate import FeedInit -from piker.data import def_iohlcv_fields -from piker.data._web_bs import ( - open_autorecon_ws, - NoBsWs, -) - - -log = get_logger(__name__) - - -_url = 'https://api.binance.com' - - -# Broker specific ohlc schema (rest) -# XXX TODO? some additional fields are defined in the docs: -# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data - -# _ohlc_dtype = [ - # ('close_time', int), - # ('quote_vol', float), - # ('num_trades', int), - # ('buy_base_vol', float), - # ('buy_quote_vol', float), - # ('ignore', float), -# ] - -# UI components allow this to be declared such that additional -# (historical) fields can be exposed. -# ohlc_dtype = np.dtype(_ohlc_dtype) - -_show_wap_in_history = False - - -# https://binance-docs.github.io/apidocs/spot/en/#exchange-information - -# TODO: make this frozen again by pre-processing the -# filters list to a dict at init time? -class Pair(Struct, frozen=True): - symbol: str - status: str - - baseAsset: str - baseAssetPrecision: int - cancelReplaceAllowed: bool - allowTrailingStop: bool - quoteAsset: str - quotePrecision: int - quoteAssetPrecision: int - - baseCommissionPrecision: int - quoteCommissionPrecision: int - - orderTypes: list[str] - - icebergAllowed: bool - ocoAllowed: bool - quoteOrderQtyMarketAllowed: bool - isSpotTradingAllowed: bool - isMarginTradingAllowed: bool - - defaultSelfTradePreventionMode: str - allowedSelfTradePreventionModes: list[str] - - filters: dict[ - str, - Union[str, int, float] - ] - permissions: list[str] - - @property - def price_tick(self) -> Decimal: - # XXX: lul, after manually inspecting the response format we - # just directly pick out the info we need - step_size: str = self.filters['PRICE_FILTER']['tickSize'].rstrip('0') - return Decimal(step_size) - - @property - def size_tick(self) -> Decimal: - step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0') - return Decimal(step_size) - - -class OHLC(Struct): - ''' - Description of the flattened OHLC quote format. - - For schema details see: - https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams - - ''' - time: int - - open: float - high: float - low: float - close: float - volume: float - - close_time: int - - quote_vol: float - num_trades: int - buy_base_vol: float - buy_quote_vol: float - ignore: int - - # null the place holder for `bar_wap` until we - # figure out what to extract for this. - bar_wap: float = 0.0 - - -class L1(Struct): - # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams - - update_id: int - sym: str - - bid: float - bsize: float - ask: float - asize: float - - -# convert datetime obj timestamp to unixtime in milliseconds -def binance_timestamp( - when: datetime -) -> int: - return int((when.timestamp() * 1000) + (when.microsecond / 1000)) - - -class Client: - - def __init__(self) -> None: - self._sesh = asks.Session(connections=4) - self._sesh.base_location = _url - self._pairs: dict[str, Pair] = {} - - async def _api( - self, - method: str, - params: dict, - ) -> dict[str, Any]: - resp = await self._sesh.get( - path=f'/api/v3/{method}', - params=params, - timeout=float('inf') - ) - return resproc(resp, log) - - async def exch_info( - - self, - sym: str | None = None, - - ) -> dict[str, Pair] | Pair: - ''' - Fresh exchange-pairs info query for symbol ``sym: str``: - https://binance-docs.github.io/apidocs/spot/en/#exchange-information - - ''' - cached_pair = self._pairs.get(sym) - if cached_pair: - return cached_pair - - # retrieve all symbols by default - params = {} - if sym is not None: - sym = sym.lower() - params = {'symbol': sym} - - resp = await self._api('exchangeInfo', params=params) - entries = resp['symbols'] - if not entries: - raise SymbolNotFound(f'{sym} not found:\n{resp}') - - # pre-process .filters field into a table - pairs = {} - for item in entries: - symbol = item['symbol'] - filters = {} - filters_ls: list = item.pop('filters') - for entry in filters_ls: - ftype = entry['filterType'] - filters[ftype] = entry - - pairs[symbol] = Pair( - filters=filters, - **item, - ) - - # pairs = { - # item['symbol']: Pair(**item) for item in entries - # } - self._pairs.update(pairs) - - if sym is not None: - return pairs[sym] - else: - return self._pairs - - symbol_info = exch_info - - async def search_symbols( - self, - pattern: str, - limit: int = None, - ) -> dict[str, Any]: - if self._pairs is not None: - data = self._pairs - else: - data = await self.exch_info() - - matches = fuzzy.extractBests( - pattern, - data, - score_cutoff=50, - ) - # repack in dict form - return {item[0]['symbol']: item[0] - for item in matches} - - async def bars( - self, - symbol: str, - start_dt: Optional[datetime] = None, - end_dt: Optional[datetime] = None, - limit: int = 1000, # <- max allowed per query - as_np: bool = True, - - ) -> dict: - - if end_dt is None: - end_dt = pendulum.now('UTC').add(minutes=1) - - if start_dt is None: - start_dt = end_dt.start_of( - 'minute').subtract(minutes=limit) - - start_time = binance_timestamp(start_dt) - end_time = binance_timestamp(end_dt) - - # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data - bars = await self._api( - 'klines', - params={ - 'symbol': symbol.upper(), - 'interval': '1m', - 'startTime': start_time, - 'endTime': end_time, - 'limit': limit - } - ) - - # TODO: pack this bars scheme into a ``pydantic`` validator type: - # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data - - # TODO: we should port this to ``pydantic`` to avoid doing - # manual validation ourselves.. - new_bars = [] - for i, bar in enumerate(bars): - - bar = OHLC(*bar) - bar.typecast() - - row = [] - for j, (name, ftype) in enumerate(def_iohlcv_fields[1:]): - - # TODO: maybe we should go nanoseconds on all - # history time stamps? - if name == 'time': - # convert to epoch seconds: float - row.append(bar.time / 1000.0) - - else: - row.append(getattr(bar, name)) - - new_bars.append((i,) + tuple(row)) - - array = np.array( - new_bars, - dtype=def_iohlcv_fields, - ) if as_np else bars - return array - - -@acm -async def get_client() -> Client: - client = Client() - log.info('Caching exchange infos..') - await client.exch_info() - yield client - - -# validation type -class AggTrade(Struct, frozen=True): - e: str # Event type - E: int # Event time - s: str # Symbol - a: int # Aggregate trade ID - p: float # Price - q: float # Quantity - f: int # First trade ID - l: int # Last trade ID - T: int # Trade time - m: bool # Is the buyer the market maker? - M: bool # Ignore - - -async def stream_messages( - ws: NoBsWs, -) -> AsyncGenerator[NoBsWs, dict]: - - # TODO: match syntax here! - msg: dict[str, Any] - async for msg in ws: - match msg: - # for l1 streams binance doesn't add an event type field so - # identify those messages by matching keys - # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams - case { - # NOTE: this is never an old value it seems, so - # they are always sending real L1 spread updates. - 'u': upid, # update id - 's': sym, - 'b': bid, - 'B': bsize, - 'a': ask, - 'A': asize, - }: - # TODO: it would be super nice to have a `L1` piker type - # which "renders" incremental tick updates from a packed - # msg-struct: - # - backend msgs after packed into the type such that we - # can reduce IPC usage but without each backend having - # to do that incremental update logic manually B) - # - would it maybe be more efficient to use this instead? - # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream - l1 = L1( - update_id=upid, - sym=sym, - bid=bid, - bsize=bsize, - ask=ask, - asize=asize, - ) - l1.typecast() - - # repack into piker's tick-quote format - yield 'l1', { - 'symbol': l1.sym, - 'ticks': [ - { - 'type': 'bid', - 'price': l1.bid, - 'size': l1.bsize, - }, - { - 'type': 'bsize', - 'price': l1.bid, - 'size': l1.bsize, - }, - { - 'type': 'ask', - 'price': l1.ask, - 'size': l1.asize, - }, - { - 'type': 'asize', - 'price': l1.ask, - 'size': l1.asize, - } - ] - } - - # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams - case { - 'e': 'aggTrade', - }: - # NOTE: this is purely for a definition, - # ``msgspec.Struct`` does not runtime-validate until you - # decode/encode, see: - # https://jcristharif.com/msgspec/structs.html#type-validation - msg = AggTrade(**msg) - yield 'trade', { - 'symbol': msg.s, - 'last': msg.p, - 'brokerd_ts': time.time(), - 'ticks': [{ - 'type': 'trade', - 'price': float(msg.p), - 'size': float(msg.q), - 'broker_ts': msg.T, - }], - } - - -def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - - spot: - https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams - - - futes: - https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams - - ''' - return { - 'method': 'SUBSCRIBE', - 'params': [ - f'{pair.lower()}@{sub_name}' - for pair in pairs - ], - 'id': uid - } - - -@acm -async def open_history_client( - mkt: MktPair, - -) -> tuple[Callable, int]: - - symbol: str = mkt.bs_fqme - - # TODO implement history getter for the new storage layer. - async with open_cached_client('binance') as client: - - async def get_ohlc( - timeframe: float, - end_dt: datetime | None = None, - start_dt: datetime | None = None, - - ) -> tuple[ - np.ndarray, - datetime, # start - datetime, # end - ]: - if timeframe != 60: - raise DataUnavailable('Only 1m bars are supported') - - array = await client.bars( - symbol, - start_dt=start_dt, - end_dt=end_dt, - ) - times = array['time'] - if ( - end_dt is None - ): - inow = round(time.time()) - if (inow - times[-1]) > 60: - await tractor.breakpoint() - - start_dt = pendulum.from_timestamp(times[0]) - end_dt = pendulum.from_timestamp(times[-1]) - - return array, start_dt, end_dt - - yield get_ohlc, {'erlangs': 3, 'rate': 3} - - -@async_lifo_cache() -async def get_mkt_info( - fqme: str, - -) -> tuple[MktPair, Pair]: - - async with open_cached_client('binance') as client: - - pair: Pair = await client.exch_info(fqme.upper()) - mkt = MktPair( - dst=Asset( - name=pair.baseAsset, - atype='crypto', - tx_tick=digits_to_dec(pair.baseAssetPrecision), - ), - src=Asset( - name=pair.quoteAsset, - atype='crypto', - tx_tick=digits_to_dec(pair.quoteAssetPrecision), - ), - price_tick=pair.price_tick, - size_tick=pair.size_tick, - bs_mktid=pair.symbol, - broker='binance', - ) - both = mkt, pair - return both - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - async with ( - send_chan as send_chan, - ): - init_msgs: list[FeedInit] = [] - for sym in symbols: - mkt, pair = await get_mkt_info(sym) - - # build out init msgs according to latest spec - init_msgs.append( - FeedInit(mkt_info=mkt) - ) - - iter_subids = itertools.count() - - @acm - async def subscribe(ws: NoBsWs): - # setup subs - - subid: int = next(iter_subids) - - # trade data (aka L1) - # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker - l1_sub = make_sub(symbols, 'bookTicker', subid) - await ws.send_msg(l1_sub) - - # aggregate (each order clear by taker **not** by maker) - # trades data: - # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams - agg_trades_sub = make_sub(symbols, 'aggTrade', subid) - await ws.send_msg(agg_trades_sub) - - # might get ack from ws server, or maybe some - # other msg still in transit.. - res = await ws.recv_msg() - subid: str | None = res.get('id') - if subid: - assert res['id'] == subid - - yield - - subs = [] - for sym in symbols: - subs.append("{sym}@aggTrade") - subs.append("{sym}@bookTicker") - - # unsub from all pairs on teardown - if ws.connected(): - await ws.send_msg({ - "method": "UNSUBSCRIBE", - "params": subs, - "id": subid, - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - async with ( - open_autorecon_ws( - # XXX: see api docs which show diff addr? - # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information - # 'wss://ws-api.binance.com:443/ws-api/v3', - 'wss://stream.binance.com/ws', - fixture=subscribe, - ) as ws, - - # avoid stream-gen closure from breaking trio.. - aclosing(stream_messages(ws)) as msg_gen, - ): - typ, quote = await anext(msg_gen) - - # pull a first quote and deliver - while typ != 'trade': - typ, quote = await anext(msg_gen) - - task_status.started((init_msgs, quote)) - - # signal to caller feed is ready for consumption - feed_is_live.set() - - # import time - # last = time.time() - - # start streaming - async for typ, msg in msg_gen: - - # period = time.time() - last - # hz = 1/period if period else float('inf') - # if hz > 60: - # log.info(f'Binance quotez : {hz}') - - topic = msg['symbol'].lower() - await send_chan.send({topic: msg}) - # last = time.time() - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, -) -> Client: - async with open_cached_client('binance') as client: - - # load all symbols locally for fast search - cache = await client.exch_info() - await ctx.started() - - async with ctx.open_stream() as stream: - - async for pattern in stream: - # results = await client.exch_info(sym=pattern.upper()) - - matches = fuzzy.extractBests( - pattern, - cache, - score_cutoff=50, - ) - # repack in dict form - await stream.send({ - item[0].symbol: item[0] - for item in matches - }) diff --git a/piker/brokers/binance/__init__.py b/piker/brokers/binance/__init__.py new file mode 100644 index 000000000..fb5844685 --- /dev/null +++ b/piker/brokers/binance/__init__.py @@ -0,0 +1,53 @@ +# piker: trading gear for hackers +# Copyright (C) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +binancial secs on the floor, in the office, behind the dumpster. + +""" +from .api import ( + get_client, +) +from .feed import ( + get_mkt_info, + open_history_client, + open_symbol_search, + stream_quotes, +) +from .broker import ( + open_trade_dialog, +) + + +__all__ = [ + 'get_client', + 'get_mkt_info', + 'open_trade_dialog', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', +] + + +# `brokerd` modules +__enable_modules__: list[str] = [ + 'api', + 'feed', + 'broker', +] diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py new file mode 100644 index 000000000..e6a4cbc9b --- /dev/null +++ b/piker/brokers/binance/api.py @@ -0,0 +1,894 @@ +# piker: trading gear for hackers +# Copyright (C) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Binance clients for http and ws APIs. + +""" +from __future__ import annotations +from collections import ChainMap +from contextlib import ( + asynccontextmanager as acm, +) +from datetime import datetime +from pprint import pformat +from typing import ( + Any, + Callable, + Type, +) +import hmac +import hashlib +from pathlib import Path + +import trio +from pendulum import ( + now, +) +import asks +from fuzzywuzzy import process as fuzzy +import numpy as np + +from piker import config +from piker.clearing._messages import ( + Order, +) +from piker.accounting import ( + Asset, + digits_to_dec, +) +from piker.data.types import Struct +from piker.data import def_iohlcv_fields +from piker.brokers import ( + resproc, + SymbolNotFound, + get_logger, +) +from .venues import ( + PAIRTYPES, + Pair, + MarketType, + + _spot_url, + _futes_url, + _testnet_futes_url, + _testnet_spot_url, +) + +log = get_logger('piker.brokers.binance') + + +def get_config() -> dict: + + conf: dict + path: Path + conf, path = config.load() + + section = conf.get('binance') + + if not section: + log.warning(f'No config section found for binance in {path}') + return {} + + return section + + +log = get_logger(__name__) + +# Broker specific ohlc schema (rest) +# XXX TODO? some additional fields are defined in the docs: +# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + +# _ohlc_dtype = [ + # ('close_time', int), + # ('quote_vol', float), + # ('num_trades', int), + # ('buy_base_vol', float), + # ('buy_quote_vol', float), + # ('ignore', float), +# ] + + +class OHLC(Struct): + ''' + Description of the flattened OHLC quote format. + + For schema details see: + https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams + + ''' + time: int # epoch in ms + + open: float + high: float + low: float + close: float + volume: float + + close_time: int + + quote_vol: float + num_trades: int + buy_base_vol: float + buy_quote_vol: float + ignore: int + + +# convert datetime obj timestamp to unixtime in milliseconds +def binance_timestamp( + when: datetime +) -> int: + return int((when.timestamp() * 1000) + (when.microsecond / 1000)) + + +class Client: + ''' + Async ReST API client using ``trio`` + ``asks`` B) + + Supports all of the spot, margin and futures endpoints depending + on method. + + ''' + def __init__( + self, + + # TODO: change this to `Client.[mkt_]venue: MarketType`? + mkt_mode: MarketType = 'spot', + + ) -> None: + # build out pair info tables for each market type + # and wrap in a chain-map view for search / query. + self._spot_pairs: dict[str, Pair] = {} # spot info table + self._ufutes_pairs: dict[str, Pair] = {} # usd-futures table + self._venue2pairs: dict[str, dict] = { + 'spot': self._spot_pairs, + 'usdtm_futes': self._ufutes_pairs, + } + + self._venue2assets: dict[ + str, + dict[str, dict] | None, + ] = { + # NOTE: only the spot table contains a dict[str, Asset] + # since others (like futes, opts) can just do lookups + # from a list of names to the spot equivalent. + 'spot': {}, + 'usdtm_futes': {}, + # 'coinm_futes': {}, + } + + # NOTE: only stick in the spot table for now until exchange info + # is loaded, since at that point we'll suffix all the futes + # market symbols for use by search. See `.exch_info()`. + self._pairs: ChainMap[str, Pair] = ChainMap() + + # spot EPs sesh + self._sesh = asks.Session(connections=4) + self._sesh.base_location: str = _spot_url + # spot testnet + self._test_sesh: asks.Session = asks.Session(connections=4) + self._test_sesh.base_location: str = _testnet_spot_url + + # margin and extended spot endpoints session. + self._sapi_sesh = asks.Session(connections=4) + self._sapi_sesh.base_location: str = _spot_url + + # futes EPs sesh + self._fapi_sesh = asks.Session(connections=4) + self._fapi_sesh.base_location: str = _futes_url + # futes testnet + self._test_fapi_sesh: asks.Session = asks.Session(connections=4) + self._test_fapi_sesh.base_location: str = _testnet_futes_url + + # global client "venue selection" mode. + # set this when you want to switch venues and not have to + # specify the venue for the next request. + self.mkt_mode: MarketType = mkt_mode + + # per 8 + self.venue_sesh: dict[ + str, # venue key + tuple[asks.Session, str] # session, eps path + ] = { + 'spot': (self._sesh, '/api/v3/'), + 'spot_testnet': (self._test_sesh, '/fapi/v1/'), + + 'margin': (self._sapi_sesh, '/sapi/v1/'), + + 'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'), + 'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'), + + # 'futes_coin': self._dapi, # TODO + } + + # lookup for going from `.mkt_mode: str` to the config + # subsection `key: str` + self.venue2configkey: dict[str, str] = { + 'spot': 'spot', + 'margin': 'spot', + 'usdtm_futes': 'futes', + # 'coinm_futes': 'futes', + } + self.confkey2venuekeys: dict[str, list[str]] = { + 'spot': ['spot'], # 'margin'], + 'futes': ['usdtm_futes'], + } + + # for creating API keys see, + # https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072 + self.conf: dict = get_config() + + for key, subconf in self.conf.items(): + if api_key := subconf.get('api_key', ''): + venue_keys: list[str] = self.confkey2venuekeys[key] + + venue_key: str + sesh: asks.Session + for venue_key in venue_keys: + sesh, _ = self.venue_sesh[venue_key] + + api_key_header: dict = { + # taken from official: + # https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47 + "Content-Type": "application/json;charset=utf-8", + + # TODO: prolly should just always query and copy + # in the real latest ver? + "User-Agent": "binance-connector/6.1.6smbz6", + "X-MBX-APIKEY": api_key, + } + sesh.headers.update(api_key_header) + + # if `.use_tesnet = true` in the config then + # also add headers for the testnet session which + # will be used for all order control + if subconf.get('use_testnet', False): + testnet_sesh, _ = self.venue_sesh[ + venue_key + '_testnet' + ] + testnet_sesh.headers.update(api_key_header) + + def _mk_sig( + self, + data: dict, + venue: str, + + ) -> str: + + # look up subconfig (spot or futes) section using + # venue specific key lookup to figure out which mkt + # we need a key for. + section_name: str = self.venue2configkey[venue] + subconf: dict | None = self.conf.get(section_name) + if subconf is None: + raise config.ConfigurationError( + f'binance configuration is missing a `{section_name}` section ' + 'to define the creds for auth-ed endpoints!?' + ) + + + # XXX: Info on security and authentification + # https://binance-docs.github.io/apidocs/#endpoint-security-type + if not (api_secret := subconf.get('api_secret')): + raise config.NoSignature( + "Can't generate a signature without setting up credentials" + ) + + query_str: str = '&'.join([ + f'{key}={value}' + for key, value in data.items() + ]) + + msg_auth = hmac.new( + api_secret.encode('utf-8'), + query_str.encode('utf-8'), + hashlib.sha256 + ) + return msg_auth.hexdigest() + + # TODO: factor all these _api methods into a single impl + # which looks up the parent path for eps depending on a + # mkt_mode: MarketType input! + async def _api( + self, + endpoint: str, # ReST endpoint key + params: dict, + + method: str = 'get', + venue: str | None = None, # if None use `.mkt_mode` state + signed: bool = False, + allow_testnet: bool = False, + + ) -> dict[str, Any]: + ''' + Make a ReST API request via + - a /api/v3/ SPOT, or + - /fapi/v3/ USD-M FUTURES, or + - /api/v3/ SPOT/MARGIN + + account/market endpoint request depending on either passed in `venue: str` + or the current setting `.mkt_mode: str` setting, default `'spot'`. + + + Docs per venue API: + + SPOT: market-data and spot-account-trade eps use this + ---- endpoing parent path: + - https://binance-docs.github.io/apidocs/spot/en/#market-data-endpoints + - https://binance-docs.github.io/apidocs/spot/en/#spot-account-trade + + MARGIN: and advancecd spot account eps: + ------ + - https://binance-docs.github.io/apidocs/spot/en/#margin-account-trade + - https://binance-docs.github.io/apidocs/spot/en/#listen-key-spot + - https://binance-docs.github.io/apidocs/spot/en/#spot-algo-endpoints + + USD-M FUTES: + ----------- + - https://binance-docs.github.io/apidocs/futures/en/#market-data-endpoints + + ''' + venue_key: str = venue or self.mkt_mode + + if signed: + params['signature'] = self._mk_sig( + params, + venue=venue_key, + ) + + sesh: asks.Session + path: str + + # Check if we're configured to route order requests to the + # venue equivalent's testnet. + section_name: str = self.venue2configkey[venue_key] + if subconf := self.conf.get(section_name): + use_testnet = ( + subconf.get('use_testnet', False) + and allow_testnet + ) + + if ( + use_testnet + and method not in { + 'klines', + 'exchangeInfo', + } + ): + # NOTE: only use testnet if user set brokers.toml config + # var to true **and** it's not one of the market data + # endpoints since we basically never want to display the + # test net feeds, we only are using it for testing order + # ctl machinery B) + venue_key += '_testnet' + + sesh, path = self.venue_sesh[venue_key] + + meth: Callable = getattr(sesh, method) + resp = await meth( + path=path + endpoint, + params=params, + timeout=float('inf'), + ) + return resproc(resp, log) + + async def _cache_pairs( + self, + venue: str, + + ) -> None: + # lookup internal mkt-specific pair table to update + pair_table: dict[str, Pair] = self._venue2pairs[venue] + asset_table: dict[str, Asset] = self._venue2assets[venue] + + # make API request(s) + resp = await self._api( + 'exchangeInfo', + params={}, # NOTE: retrieve all symbols by default + # XXX: MUST explicitly pass the routing venue since we + # don't know the routing mode but want to cache market + # infos across all venues + venue=venue, + allow_testnet=False, # XXX: never use testnet for symbol lookups + ) + mkt_pairs = resp['symbols'] + if not mkt_pairs: + raise SymbolNotFound(f'No market pairs found!?:\n{resp}') + + pairs_view_subtable: dict[str, Pair] = {} + + for item in mkt_pairs: + filters_ls: list = item.pop('filters', False) + if filters_ls: + filters = {} + for entry in filters_ls: + ftype = entry['filterType'] + filters[ftype] = entry + + item['filters'] = filters + + pair_type: Type = PAIRTYPES[venue] + pair: Pair = pair_type(**item) + pair_table[pair.symbol.upper()] = pair + + # update an additional top-level-cross-venue-table + # `._pairs: ChainMap` for search B0 + pairs_view_subtable[pair.bs_fqme] = pair + + if venue == 'spot': + if (name := pair.quoteAsset) not in asset_table: + asset_table[name] = Asset( + name=name, + atype='crypto_currency', + tx_tick=digits_to_dec(pair.quoteAssetPrecision), + ) + + if (name := pair.baseAsset) not in asset_table: + asset_table[name] = Asset( + name=name, + atype='crypto_currency', + tx_tick=digits_to_dec(pair.baseAssetPrecision), + ) + + # NOTE: make merged view of all market-type pairs but + # use market specific `Pair.bs_fqme` for keys! + # this allows searching for market pairs with different + # suffixes easily, for ex. `BTCUSDT.USDTM.PERP` will show + # up when a user uses the search endpoint with pattern + # `btc` B) + self._pairs.maps.append(pairs_view_subtable) + + if venue == 'spot': + return + + assets: list[dict] = resp.get('assets', ()) + for entry in assets: + name: str = entry['asset'] + asset_table[name] = self._venue2assets['spot'].get(name) + + async def exch_info( + self, + sym: str | None = None, + + venue: MarketType | None = None, + + ) -> dict[str, Pair] | Pair: + ''' + Fresh exchange-pairs info query for symbol ``sym: str``. + + Depending on `mkt_type` different api eps are used: + - spot: + https://binance-docs.github.io/apidocs/spot/en/#exchange-information + - usd futes: + https://binance-docs.github.io/apidocs/futures/en/#check-server-time + - coin futes: + https://binance-docs.github.io/apidocs/delivery/en/#exchange-information + + ''' + pair_table: dict[str, Pair] = self._venue2pairs[ + venue or self.mkt_mode + ] + if cached_pair := pair_table.get(sym): + return cached_pair + + venues: list[str] = ['spot', 'usdtm_futes'] + if venue: + venues: list[str] = [venue] + + # batch per-venue download of all exchange infos + async with trio.open_nursery() as rn: + for ven in venues: + rn.start_soon( + self._cache_pairs, + ven, + ) + + return pair_table[sym] if sym else self._pairs + + # TODO: unused except by `brokers.core.search_symbols()`? + async def search_symbols( + self, + pattern: str, + limit: int = None, + + ) -> dict[str, Any]: + + fq_pairs: dict = await self.exch_info() + + matches = fuzzy.extractBests( + pattern, + fq_pairs, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['symbol']: item[0] + for item in matches} + + async def bars( + self, + symbol: str, + + start_dt: datetime | None = None, + end_dt: datetime | None = None, + + as_np: bool = True, + + ) -> list[tuple] | np.ndarray: + + # NOTE: diff market-venues have diff datums limits: + # - spot max is 1k + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + # - usdm futes max is 1500 + # https://binance-docs.github.io/apidocs/futures/en/#kline-candlestick-data + limits: dict[str, int] = { + 'spot': 1000, + 'usdtm_futes': 1500, + } + limit = limits[self.mkt_mode] + + if end_dt is None: + end_dt = now('UTC').add(minutes=1) + + if start_dt is None: + start_dt = end_dt.start_of( + 'minute').subtract(minutes=limit) + + start_time = binance_timestamp(start_dt) + end_time = binance_timestamp(end_dt) + + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + bars = await self._api( + 'klines', + params={ + 'symbol': symbol.upper(), + 'interval': '1m', + 'startTime': start_time, + 'endTime': end_time, + 'limit': limit + }, + allow_testnet=False, + ) + new_bars: list[tuple] = [] + for i, bar_list in enumerate(bars): + + bar = OHLC(*bar_list) + bar.typecast() + + row = [] + for j, (name, ftype) in enumerate(def_iohlcv_fields[1:]): + + # TODO: maybe we should go nanoseconds on all + # history time stamps? + if name == 'time': + # convert to epoch seconds: float + row.append(bar.time / 1000.0) + + else: + row.append(getattr(bar, name)) + + new_bars.append((i,) + tuple(row)) + + if not as_np: + return bars + + return np.array( + new_bars, + dtype=def_iohlcv_fields, + ) + + # TODO: maybe drop? Do we need this if we can simply request it + # over the user stream wss? + # async def get_positions( + # self, + # symbol: str, + # recv_window: int = 60000 + + # ) -> tuple: + + # positions = {} + # volumes = {} + + # params = dict([ + # ('symbol', symbol), + # ('recvWindow', recv_window), + # ('timestamp', binance_timestamp(now())) + # ]) + # resp = await self._api( + # 'allOrders', + # params=params, + # signed=True + # ) + # log.info(f'done. len {len(resp)}') + + # return positions, volumes + + async def get_deposits( + self, + recv_window: int = 60000 + ) -> list: + + # TODO: can't we drop this since normal dicts are + # ordered implicitly in mordern python? + params = dict([ + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + return await self._sapi( + 'capital/deposit/hisrec', + params=params, + signed=True, + ) + + async def get_withdrawls( + self, + recv_window: int = 60000 + + ) -> list: + + params = dict([ + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + return await self._sapi( + 'capital/withdraw/history', + params=params, + signed=True, + ) + + async def get_open_orders( + self, + symbol: str | None = None, + + ) -> list[Order]: + ''' + Get all open orders for venue-account. + + WARNING: apparently not specifying the symbol is given + a much heavier API "weight" meaning you shouldn't call it + often to avoid getting throttled as per: + + 'https://binance-docs.github.io/apidocs/futures/en/#current-all-open-orders-user_data + + + ''' + params: dict[str, Any] = { + 'timestamp': binance_timestamp(now()), + } + if symbol is not None: + params['symbol'] = symbol + + resp = await self._api( + 'openOrders', + params=params, + signed=True, + method='get', + allow_testnet=True, + ) + # figure out which venue (in FQME terms) we're using + # since that normally maps 1-to-1 with the account (right?) + venue: str = self.mkt_mode.rstrip('_futes') + + orders: list[Order] = [] + for entry in resp: + oid: str = entry['clientOrderId'] + symbol: str = entry['symbol'] + + # build out a fqme-styled key that should map to a pair + # entry in `._pairs` cross-venue table. + bs_mktid, _, expiry = entry['symbol'].partition('_') + bs_mktid += f'.{venue.upper()}' + + if expiry: + bs_mktid += f'.{expiry}' + else: + bs_mktid += '.PERP' + + # should never key error if we've got it right B) + pair: Pair = self._pairs[bs_mktid] + + orders.append( + Order( + oid=oid, + symbol=pair.bs_fqme.lower(), + + action=entry['side'].lower(), + price=float(entry['price']), + size=float(entry['origQty']), + + exec_mode='live', + account=f'binance.{venue}', + ) + ) + return orders + + async def submit_limit( + self, + symbol: str, + side: str, # sell / buy + quantity: float, + price: float, + + oid: int | None = None, + tif: str = 'GTC', + recv_window: int = 60000, + + # iceberg_quantity: float | None = None, + resp_type: str = 'ACK', + + # TODO: this is probably useful for doing stops, maybe we + # can set it only on dark-stops? + # close_all: bool = False, + + modify: bool = False, + + ) -> str: + ''' + Submit or modify a live limit order to ze binance. + + For modify see: + - spot: + - futes https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade + + ''' + # lookup the binance-native symbol from search table + bs_mktid: str = self._pairs[symbol.upper()].symbol + params: dict = dict([ + ('symbol', bs_mktid), + ('side', side.upper()), + ('type', 'LIMIT'), + ('timeInForce', tif), + ('quantity', quantity), + ('price', price), + ('recvWindow', recv_window), + ('newOrderRespType', resp_type), + ('timestamp', binance_timestamp(now())) + + # ('closeAll', close_all), + ]) + + method: str = 'post' + + # NOTE: modifies only require diff key for user oid: + # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade + if modify: + assert oid + params['origClientOrderId'] = oid + method: str = 'put' + + elif oid: + params['newClientOrderId'] = oid + + log.info( + 'Submitting ReST order request:\n' + f'{pformat(params)}' + ) + resp = await self._api( + 'order', + params=params, + signed=True, + method=method, + venue=self.mkt_mode, + allow_testnet=True, + ) + + # ensure our id is tracked by them + if ( + oid + and not modify + ): + assert oid == resp['clientOrderId'] + + reqid: str = resp['orderId'] + return reqid + + async def submit_cancel( + self, + symbol: str, + oid: str, + + recv_window: int = 60000 + + ) -> None: + bs_mktid: str = self._pairs[symbol.upper()].symbol + params = dict([ + ('symbol', bs_mktid), + # ('orderId', oid), + ('origClientOrderId', oid), + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + + log.cancel( + 'Submitting ReST order cancel: {oid}\n' + f'{pformat(params)}' + ) + await self._api( + 'order', + params=params, + signed=True, + method='delete', + allow_testnet=True, + ) + + async def get_listen_key(self) -> str: + + resp = await self._api( + # 'userDataStream', # spot + 'listenKey', + params={}, + method='post', + signed=True, + allow_testnet=True, + ) + return resp['listenKey'] + + async def keep_alive_key(self, listen_key: str) -> None: + await self._api( + # 'userDataStream', + 'listenKey', + params={'listenKey': listen_key}, + method='put', + allow_testnet=True, + ) + + async def close_listen_key(self, listen_key: str) -> None: + await self._api( + # 'userDataStream', + 'listenKey', + params={'listenKey': listen_key}, + method='delete', + allow_testnet=True, + ) + + @acm + async def manage_listen_key(self): + + async def periodic_keep_alive( + self, + listen_key: str, + timeout=60 * 29 # 29 minutes + ): + while True: + await trio.sleep(timeout) + await self.keep_alive_key(listen_key) + + key = await self.get_listen_key() + + async with trio.open_nursery() as n: + n.start_soon(periodic_keep_alive, self, key) + yield key + n.cancel_scope.cancel() + + await self.close_listen_key(key) + + +@acm +async def get_client() -> Client: + + client = Client() + await client.exch_info() + log.info( + f'{client} in {client.mkt_mode} mode: caching exchange infos..\n' + 'Cached multi-market pairs:\n' + f'spot: {len(client._spot_pairs)}\n' + f'usdtm_futes: {len(client._ufutes_pairs)}\n' + f'Total: {len(client._pairs)}\n' + ) + + yield client diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py new file mode 100644 index 000000000..067a21636 --- /dev/null +++ b/piker/brokers/binance/broker.py @@ -0,0 +1,674 @@ +# piker: trading gear for hackers +# Copyright (C) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Live order control B) + +''' +from __future__ import annotations +from pprint import pformat +from typing import ( + Any, + AsyncIterator, +) +import time +from time import time_ns + +from bidict import bidict +import tractor +import trio + +from piker.accounting import ( + Asset, + # MktPair, +) +from piker.brokers._util import ( + get_logger, +) +from piker.data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) +from piker.brokers import ( + open_cached_client, + BrokerError, +) +from piker.clearing import OrderDialogs +from piker.clearing._messages import ( + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdPosition, + BrokerdFill, + BrokerdCancel, + BrokerdError, + Status, + Order, +) +from .venues import ( + Pair, + _futes_ws, + _testnet_futes_ws, +) +from .api import Client + +log = get_logger('piker.brokers.binance') + + +async def handle_order_requests( + ems_order_stream: tractor.MsgStream, + client: Client, + dids: bidict[str, str], + dialogs: OrderDialogs, + +) -> None: + ''' + Receive order requests from `emsd`, translate tramsit API calls and transmit. + + ''' + msg: dict | BrokerdOrder | BrokerdCancel + async for msg in ems_order_stream: + log.info(f'Rx order request:\n{pformat(msg)}') + match msg: + case { + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + existing: BrokerdOrder | None = dialogs.get(cancel.oid) + if not existing: + log.error( + f'NO Existing order-dialog for {cancel.oid}!?' + ) + await ems_order_stream.send(BrokerdError( + oid=cancel.oid, + + # TODO: do we need the symbol? + # https://github.com/pikers/piker/issues/514 + symbol='unknown', + + reason=( + 'Invalid `binance` order request dialog oid', + ) + )) + continue + + else: + symbol: str = existing['symbol'] + try: + await client.submit_cancel( + symbol, + cancel.oid, + ) + except BrokerError as be: + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=symbol, + reason=( + '`binance` CANCEL failed:\n' + f'{be}' + )) + ) + continue + + case { + 'account': ('binance.usdtm' | 'binance.spot') as account, + 'action': action, + } if action in {'buy', 'sell'}: + + # validate + order = BrokerdOrder(**msg) + oid: str = order.oid # emsd order id + modify: bool = False + + # NOTE: check and report edits + if existing := dialogs.get(order.oid): + log.info( + f'Existing order for {oid} updated:\n' + f'{pformat(existing.maps[-1])} -> {pformat(msg)}' + ) + modify = True + + # only add new msg AFTER the existing check + dialogs.add_msg(oid, msg) + + else: + # XXX NOTE: update before the ack! + # track latest request state such that map + # lookups start at the most recent msg and then + # scan reverse-chronologically. + dialogs.add_msg(oid, msg) + + # XXX: ACK the request **immediately** before sending + # the api side request to ensure the ems maps the oid -> + # reqid correctly! + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=oid, # our custom int mapping + account='binance', # piker account + ) + await ems_order_stream.send(resp) + + # call our client api to submit the order + # NOTE: modifies only require diff key for user oid: + # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade + try: + reqid = await client.submit_limit( + symbol=order.symbol, + side=order.action, + quantity=order.size, + price=order.price, + oid=oid, + modify=modify, + ) + + # SMH they do gen their own order id: ints.. + # assert reqid == order.oid + dids[order.oid] = reqid + + except BrokerError as be: + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + '`binance` request failed:\n' + f'{be}' + )) + ) + continue + + case _: + account = msg.get('account') + if account not in {'binance.spot', 'binance.futes'}: + log.error( + 'Order request does not have a valid binance account name?\n' + 'Only one of\n' + '- `binance.spot` or,\n' + '- `binance.usdtm`\n' + 'is currently valid!' + ) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'Invalid `binance` broker request msg:\n{msg}' + )) + ) + + +@tractor.context +async def open_trade_dialog( + ctx: tractor.Context, + +) -> AsyncIterator[dict[str, Any]]: + + # TODO: how do we set this from the EMS such that + # positions are loaded from the correct venue on the user + # stream at startup? (that is in an attempt to support both + # spot and futes markets?) + # - I guess we just want to instead start 2 separate user + # stream tasks right? unless we want another actor pool? + # XXX: see issue: + venue_name: str = 'futes' + venue_mode: str = 'usdtm_futes' + account_name: str = 'usdtm' + use_testnet: bool = False + + async with open_cached_client('binance') as client: + subconf: dict = client.conf[venue_name] + use_testnet = subconf.get('use_testnet', False) + + # XXX: if no futes.api_key or spot.api_key has been set we + # always fall back to the paper engine! + if not subconf.get('api_key'): + await ctx.started('paper') + return + + async with ( + open_cached_client('binance') as client, + ): + client.mkt_mode: str = venue_mode + + # TODO: map these wss urls depending on spot or futes + # setting passed when this task is spawned? + wss_url: str = _futes_ws if not use_testnet else _testnet_futes_ws + + wss: NoBsWs + async with ( + client.manage_listen_key() as listen_key, + open_autorecon_ws(f'{wss_url}/ws/{listen_key}') as wss, + ): + nsid: int = time_ns() + await wss.send_msg({ + # "method": "SUBSCRIBE", + "method": "REQUEST", + "params": + [ + f"{listen_key}@account", + f"{listen_key}@balance", + f"{listen_key}@position", + + # TODO: does this even work!? seems to cause + # a hang on the first msg..? lelelel. + # f"{listen_key}@order", + ], + "id": nsid + }) + + with trio.fail_after(6): + msg = await wss.recv_msg() + assert msg['id'] == nsid + + # TODO: load other market wide data / statistics: + # - OI: https://binance-docs.github.io/apidocs/futures/en/#open-interest + # - OI stats: https://binance-docs.github.io/apidocs/futures/en/#open-interest-statistics + accounts: bidict[str, str] = bidict({'binance.usdtm': None}) + balances: dict[Asset, float] = {} + positions: list[BrokerdPosition] = [] + + for resp_dict in msg['result']: + resp: dict = resp_dict['res'] + req: str = resp_dict['req'] + + # @account response should be something like: + # {'accountAlias': 'sRFzFzAuuXsR', + # 'canDeposit': True, + # 'canTrade': True, + # 'canWithdraw': True, + # 'feeTier': 0} + if 'account' in req: + # NOTE: fill in the hash-like key/alias binance + # provides for the account. + alias: str = resp['accountAlias'] + accounts['binance.usdtm'] = alias + + # @balance response: + # {'accountAlias': 'sRFzFzAuuXsR', + # 'balances': [{'asset': 'BTC', + # 'availableBalance': '0.00000000', + # 'balance': '0.00000000', + # 'crossUnPnl': '0.00000000', + # 'crossWalletBalance': '0.00000000', + # 'maxWithdrawAmount': '0.00000000', + # 'updateTime': 0}] + # ... + # } + elif 'balance' in req: + for entry in resp['balances']: + name: str = entry['asset'] + balance: float = float(entry['balance']) + last_update_t: int = entry['updateTime'] + + spot_asset: Asset = client._venue2assets['spot'][name] + + if balance > 0: + balances[spot_asset] = (balance, last_update_t) + # await tractor.breakpoint() + + # @position response: + # {'positions': [{'entryPrice': '0.0', + # 'isAutoAddMargin': False, + # 'isolatedMargin': '0', + # 'leverage': 20, + # 'liquidationPrice': '0', + # 'marginType': 'CROSSED', + # 'markPrice': '0.60289650', + # 'markPrice': '0.00000000', + # 'maxNotionalValue': '25000', + # 'notional': '0', + # 'positionAmt': '0', + # 'positionSide': 'BOTH', + # 'symbol': 'ETHUSDT_230630', + # 'unRealizedProfit': '0.00000000', + # 'updateTime': 1672741444894} + # ... + # } + elif 'position' in req: + for entry in resp['positions']: + bs_mktid: str = entry['symbol'] + entry_size: float = float(entry['positionAmt']) + + pair: Pair | None = client._venue2pairs[ + venue_mode + ].get(bs_mktid) + if ( + pair + and entry_size > 0 + ): + entry_price: float = float(entry['entryPrice']) + + ppmsg = BrokerdPosition( + broker='binance', + account=f'binance.{account_name}', + + # TODO: maybe we should be passing back + # a `MktPair` here? + symbol=pair.bs_fqme.lower() + '.binance', + + size=entry_size, + avg_price=entry_price, + ) + positions.append(ppmsg) + + if pair is None: + log.warning( + f'`{bs_mktid}` Position entry but no market pair?\n' + f'{pformat(entry)}\n' + ) + + await ctx.started(( + positions, + list(accounts) + )) + + # TODO: package more state tracking into the dialogs API? + # - hmm maybe we could include `OrderDialogs.dids: + # bidict` as part of the interface and then ask for + # a reqid field to be passed at init? + # |-> `OrderDialog(reqid_field='orderId')` kinda thing? + # - also maybe bundle in some kind of dialog to account + # table? + dialogs = OrderDialogs() + dids: dict[str, int] = bidict() + + # TODO: further init setup things to get full EMS and + # .accounting support B) + # - live order loading via user stream subscription and + # update to the order dialog table. + # - MAKE SURE we add live orders loaded during init + # into the dialogs table to ensure they can be + # cancelled, meaning we can do a symbol lookup. + # - position loading using `piker.accounting` subsys + # and comparison with binance's own position calcs. + # - load pps and accounts using accounting apis, write + # the ledger and account files + # - table: PpTable + # - ledger: TransactionLedger + + async with ( + trio.open_nursery() as tn, + ctx.open_stream() as ems_stream, + ): + # deliver all pre-exist open orders to EMS thus syncing + # state with existing live limits reported by them. + order: Order + for order in await client.get_open_orders(): + status_msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=order.oid, + reqid=order.oid, + + # embedded order info + req=order, + src='binance', + ) + dialogs.add_msg(order.oid, order.to_dict()) + await ems_stream.send(status_msg) + + tn.start_soon( + handle_order_requests, + ems_stream, + client, + dids, + dialogs, + ) + tn.start_soon( + handle_order_updates, + venue_mode, + account_name, + client, + ems_stream, + wss, + dialogs, + + ) + + await trio.sleep_forever() + + +async def handle_order_updates( + venue: str, + account_name: str, + client: Client, + ems_stream: tractor.MsgStream, + wss: NoBsWs, + dialogs: OrderDialogs, + +) -> None: + ''' + Main msg handling loop for all things order management. + + This code is broken out to make the context explicit and state + variables defined in the signature clear to the reader. + + ''' + async for msg in wss: + log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') + match msg: + + # ORDER update + # spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update + # futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update + # futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update + # {'o': { + # 'L': '0', + # 'N': 'USDT', + # 'R': False, + # 'S': 'BUY', + # 'T': 1687028772484, + # 'X': 'NEW', + # 'a': '0', + # 'ap': '0', + # 'b': '7012.06520', + # 'c': '518d4122-8d3e-49b0-9a1e-1fabe6f62e4c', + # 'cp': False, + # 'f': 'GTC', + # 'i': 3376956924, + # 'l': '0', + # 'm': False, + # 'n': '0', + # 'o': 'LIMIT', + # 'ot': 'LIMIT', + # 'p': '21136.80', + # 'pP': False, + # 'ps': 'BOTH', + # 'q': '0.047', + # 'rp': '0', + # 's': 'BTCUSDT', + # 'si': 0, + # 'sp': '0', + # 'ss': 0, + # 't': 0, + # 'wt': 'CONTRACT_PRICE', + # 'x': 'NEW', + # 'z': '0'} + # } + case { + # 'e': 'executionReport', + 'e': 'ORDER_TRADE_UPDATE', + 'T': int(epoch_ms), + 'o': { + 's': bs_mktid, + + # XXX NOTE XXX see special ids for market + # events or margin calls: + # // special client order id: + # // starts with "autoclose-": liquidation order + # // "adl_autoclose": ADL auto close order + # // "settlement_autoclose-": settlement order + # for delisting or delivery + 'c': oid, + # 'i': reqid, # binance internal int id + + # prices + 'a': submit_price, + 'ap': avg_price, + 'L': fill_price, + + # sizing + 'q': req_size, + 'l': clear_size_filled, # this event + 'z': accum_size_filled, # accum + + # commissions + 'n': cost, + 'N': cost_asset, + + # state + 'S': side, + 'X': status, + }, + } as order_msg: + log.info( + f'{status} for {side} ORDER oid: {oid}\n' + f'bs_mktid: {bs_mktid}\n\n' + + f'order size: {req_size}\n' + f'cleared size: {clear_size_filled}\n' + f'accum filled size: {accum_size_filled}\n\n' + + f'submit price: {submit_price}\n' + f'fill_price: {fill_price}\n' + f'avg clearing price: {avg_price}\n\n' + + f'cost: {cost}@{cost_asset}\n' + ) + + # status remap from binance to piker's + # status set: + # - NEW + # - PARTIALLY_FILLED + # - FILLED + # - CANCELED + # - EXPIRED + # https://binance-docs.github.io/apidocs/futures/en/#event-order-update + + req_size: float = float(req_size) + accum_size_filled: float = float(accum_size_filled) + fill_price: float = float(fill_price) + + match status: + case 'PARTIALLY_FILLED' | 'FILLED': + status = 'fill' + + fill_msg = BrokerdFill( + time_ns=time_ns(), + # reqid=reqid, + reqid=oid, + + # just use size value for now? + # action=action, + size=clear_size_filled, + price=fill_price, + + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'broker'} | order_msg, + broker_time=time.time(), + ) + await ems_stream.send(fill_msg) + + if accum_size_filled == req_size: + status = 'closed' + dialogs.pop(oid) + + case 'NEW': + status = 'open' + + case 'EXPIRED': + status = 'canceled' + dialogs.pop(oid) + + case _: + status = status.lower() + + resp = BrokerdStatus( + time_ns=time_ns(), + # reqid=reqid, + reqid=oid, + + # TODO: i feel like we don't need to make the + # ems and upstream clients aware of this? + # account='binance.usdtm', + + status=status, + + filled=accum_size_filled, + remaining=req_size - accum_size_filled, + broker_details={ + 'name': 'binance', + 'broker_time': epoch_ms / 1000. + } + ) + await ems_stream.send(resp) + + # ACCOUNT and POSITION update B) + # { + # 'E': 1687036749218, + # 'e': 'ACCOUNT_UPDATE' + # 'T': 1687036749215, + # 'a': {'B': [{'a': 'USDT', + # 'bc': '0', + # 'cw': '1267.48920735', + # 'wb': '1410.90245576'}], + # 'P': [{'cr': '-3292.10973007', + # 'ep': '26349.90000', + # 'iw': '143.41324841', + # 'ma': 'USDT', + # 'mt': 'isolated', + # 'pa': '0.038', + # 'ps': 'BOTH', + # 's': 'BTCUSDT', + # 'up': '5.17555453'}], + # 'm': 'ORDER'}, + # } + case { + 'T': int(epoch_ms), + 'e': 'ACCOUNT_UPDATE', + 'a': { + 'P': [{ + 's': bs_mktid, + 'pa': pos_amount, + 'ep': entry_price, + }], + }, + }: + # real-time relay position updates back to EMS + pair: Pair | None = client._venue2pairs[venue].get(bs_mktid) + ppmsg = BrokerdPosition( + broker='binance', + account=f'binance.{account_name}', + + # TODO: maybe we should be passing back + # a `MktPair` here? + symbol=pair.bs_fqme.lower() + '.binance', + + size=float(pos_amount), + avg_price=float(entry_price), + ) + await ems_stream.send(ppmsg) + + case _: + log.warning( + 'Unhandled event:\n' + f'{pformat(msg)}' + ) diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py new file mode 100644 index 000000000..66a0bff04 --- /dev/null +++ b/piker/brokers/binance/feed.py @@ -0,0 +1,499 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Real-time and historical data feed endpoints. + +''' +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + aclosing, +) +from datetime import datetime +from functools import partial +import itertools +from typing import ( + Any, + AsyncGenerator, + Callable, + Generator, +) +import time + +import trio +from trio_typing import TaskStatus +from pendulum import ( + from_timestamp, +) +from fuzzywuzzy import process as fuzzy +import numpy as np +import tractor + +from piker.brokers import ( + open_cached_client, +) +from piker._cacheables import ( + async_lifo_cache, +) +from piker.accounting import ( + Asset, + DerivTypes, + MktPair, + unpack_fqme, + digits_to_dec, +) +from piker.data.types import Struct +from piker.data.validate import FeedInit +from piker.data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) +from piker.brokers._util import ( + DataUnavailable, + get_logger, +) + +from .api import ( + Client, +) +from .venues import ( + Pair, + FutesPair, + get_api_eps, +) + +log = get_logger('piker.brokers.binance') + + +class L1(Struct): + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + + update_id: int + sym: str + + bid: float + bsize: float + ask: float + asize: float + + +# validation type +class AggTrade(Struct, frozen=True): + e: str # Event type + E: int # Event time + s: str # Symbol + a: int # Aggregate trade ID + p: float # Price + q: float # Quantity + f: int # First trade ID + l: int # noqa Last trade ID + T: int # Trade time + m: bool # Is the buyer the market maker? + M: bool | None = None # Ignore + + +async def stream_messages( + ws: NoBsWs, +) -> AsyncGenerator[NoBsWs, dict]: + + # TODO: match syntax here! + msg: dict[str, Any] + async for msg in ws: + match msg: + # for l1 streams binance doesn't add an event type field so + # identify those messages by matching keys + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + case { + # NOTE: this is never an old value it seems, so + # they are always sending real L1 spread updates. + 'u': upid, # update id + 's': sym, + 'b': bid, + 'B': bsize, + 'a': ask, + 'A': asize, + }: + # TODO: it would be super nice to have a `L1` piker type + # which "renders" incremental tick updates from a packed + # msg-struct: + # - backend msgs after packed into the type such that we + # can reduce IPC usage but without each backend having + # to do that incremental update logic manually B) + # - would it maybe be more efficient to use this instead? + # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream + l1 = L1( + update_id=upid, + sym=sym, + bid=bid, + bsize=bsize, + ask=ask, + asize=asize, + ) + # for speed probably better to only specifically + # cast fields we need in numerical form? + # l1.typecast() + + # repack into piker's tick-quote format + yield 'l1', { + 'symbol': l1.sym, + 'ticks': [ + { + 'type': 'bid', + 'price': float(l1.bid), + 'size': float(l1.bsize), + }, + { + 'type': 'bsize', + 'price': float(l1.bid), + 'size': float(l1.bsize), + }, + { + 'type': 'ask', + 'price': float(l1.ask), + 'size': float(l1.asize), + }, + { + 'type': 'asize', + 'price': float(l1.ask), + 'size': float(l1.asize), + } + ] + } + + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + case { + 'e': 'aggTrade', + }: + # NOTE: this is purely for a definition, + # ``msgspec.Struct`` does not runtime-validate until you + # decode/encode, see: + # https://jcristharif.com/msgspec/structs.html#type-validation + msg = AggTrade(**msg) # TODO: should we .copy() ? + piker_quote: dict = { + 'symbol': msg.s, + 'last': float(msg.p), + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': float(msg.p), + 'size': float(msg.q), + 'broker_ts': msg.T, + }], + } + yield 'trade', piker_quote + + +def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + - spot: + https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams + + - futes: + https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams + + ''' + return { + 'method': 'SUBSCRIBE', + 'params': [ + f'{pair.lower()}@{sub_name}' + for pair in pairs + ], + 'id': uid + } + + +@acm +async def open_history_client( + mkt: MktPair, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('binance') as client: + + async def get_ohlc( + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') + + # TODO: better wrapping for venue / mode? + # - eventually logic for usd vs. coin settled futes + # based on `MktPair.src` type/value? + # - maybe something like `async with + # Client.use_venue('usdtm_futes')` + if mkt.type_key in DerivTypes: + client.mkt_mode = 'usdtm_futes' + else: + client.mkt_mode = 'spot' + + # NOTE: always query using their native symbology! + mktid: str = mkt.bs_mktid + array = await client.bars( + mktid, + start_dt=start_dt, + end_dt=end_dt, + ) + times = array['time'] + if ( + end_dt is None + ): + inow = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.breakpoint() + + start_dt = from_timestamp(times[0]) + end_dt = from_timestamp(times[-1]) + + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 3, 'rate': 3} + + +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair]: + + # uppercase since kraken bs_mktid is always upper + if 'binance' not in fqme: + fqme += '.binance' + + bs_fqme, _, broker = fqme.rpartition('.') + broker, mkt_ep, venue, expiry = unpack_fqme(fqme) + + # NOTE: see the `FutesPair.bs_fqme: str` implementation + # to understand the reverse market info lookup below. + mkt_mode = venue = venue.lower() or 'spot' + _atype: str = '' + if ( + venue + and 'spot' not in venue.lower() + + # XXX: catch all in case user doesn't know which + # venue they want (usdtm vs. coinm) and we can choose + # a default (via config?) once we support coin-m APIs. + or 'perp' in bs_fqme.lower() + ): + mkt_mode: str = f'{venue.lower()}_futes' + if 'perp' in expiry: + _atype = 'perpetual_future' + + else: + _atype = 'future' + + async with open_cached_client( + 'binance', + ) as client: + + # switch mode depending on input pattern parsing + client.mkt_mode = mkt_mode + + pair_str: str = mkt_ep.upper() + pair: Pair = await client.exch_info(pair_str) + + if 'futes' in mkt_mode: + assert isinstance(pair, FutesPair) + + mkt = MktPair( + dst=Asset( + name=pair.baseAsset, + atype='crypto', + tx_tick=digits_to_dec(pair.baseAssetPrecision), + ), + src=Asset( + name=pair.quoteAsset, + atype='crypto', + tx_tick=digits_to_dec(pair.quoteAssetPrecision), + ), + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=pair.symbol, + expiry=expiry, + venue=venue, + broker='binance', + _atype=_atype, + ) + both = mkt, pair + return both + + +@acm +async def subscribe( + ws: NoBsWs, + symbols: list[str], + + # defined once at import time to keep a global state B) + iter_subids: Generator[int, None, None] = itertools.count(), + +): + # setup subs + + subid: int = next(iter_subids) + + # trade data (aka L1) + # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker + l1_sub = make_sub(symbols, 'bookTicker', subid) + await ws.send_msg(l1_sub) + + # aggregate (each order clear by taker **not** by maker) + # trades data: + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + agg_trades_sub = make_sub(symbols, 'aggTrade', subid) + await ws.send_msg(agg_trades_sub) + + # might get ack from ws server, or maybe some + # other msg still in transit.. + res = await ws.recv_msg() + subid: str | None = res.get('id') + if subid: + assert res['id'] == subid + + yield + + subs = [] + for sym in symbols: + subs.append("{sym}@aggTrade") + subs.append("{sym}@bookTicker") + + # unsub from all pairs on teardown + if ws.connected(): + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": subid, + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + + async with ( + send_chan as send_chan, + open_cached_client('binance') as client, + ): + init_msgs: list[FeedInit] = [] + for sym in symbols: + mkt, pair = await get_mkt_info(sym) + + # build out init msgs according to latest spec + init_msgs.append( + FeedInit(mkt_info=mkt) + ) + + wss_url: str = get_api_eps(client.mkt_mode)[1] # 2nd elem is wss url + + # TODO: for sanity, but remove eventually Xp + if 'future' in mkt.type_key: + assert 'fstream' in wss_url + + async with ( + open_autorecon_ws( + url=wss_url, + fixture=partial( + subscribe, + symbols=[mkt.bs_mktid], + ), + ) as ws, + + # avoid stream-gen closure from breaking trio.. + aclosing(stream_messages(ws)) as msg_gen, + ): + # log.info('WAITING ON FIRST LIVE QUOTE..') + typ, quote = await anext(msg_gen) + + # pull a first quote and deliver + while typ != 'trade': + typ, quote = await anext(msg_gen) + + task_status.started((init_msgs, quote)) + + # signal to caller feed is ready for consumption + feed_is_live.set() + + # import time + # last = time.time() + + # XXX NOTE: can't include the `.binance` suffix + # or the sampling loop will not broadcast correctly + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + + # start streaming + async for typ, quote in msg_gen: + + # period = time.time() - last + # hz = 1/period if period else float('inf') + # if hz > 60: + # log.info(f'Binance quotez : {hz}') + await send_chan.send({topic: quote}) + # last = time.time() + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> Client: + + async with open_cached_client('binance') as client: + + # load all symbols locally for fast search + fqpairs_cache = await client.exch_info() + # TODO: maybe we should deliver the cache + # so that client's can always do a local-lookup-first + # style try and then update async as (new) match results + # are delivered from here? + await ctx.started() + + async with ctx.open_stream() as stream: + + pattern: str + async for pattern in stream: + matches = fuzzy.extractBests( + pattern, + fqpairs_cache, + score_cutoff=50, + ) + + # repack in dict form + await stream.send({ + item[0].bs_fqme: item[0] + for item in matches + }) diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py new file mode 100644 index 000000000..6c9afc779 --- /dev/null +++ b/piker/brokers/binance/venues.py @@ -0,0 +1,214 @@ +# piker: trading gear for hackers +# Copyright (C) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Per market data-type definitions and schemas types. + +""" +from __future__ import annotations +from typing import ( + Literal, +) +from decimal import Decimal + +from msgspec import field + +from piker.data.types import Struct + + +# API endpoint paths by venue / sub-API +_domain: str = 'binance.com' +_spot_url = f'https://api.{_domain}' +_futes_url = f'https://fapi.{_domain}' + +# WEBsocketz +# NOTE XXX: see api docs which show diff addr? +# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information +_spot_ws: str = 'wss://stream.binance.com/ws' +# or this one? .. +# 'wss://ws-api.binance.com:443/ws-api/v3', + +# https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams +_futes_ws: str = f'wss://fstream.{_domain}/ws/' +_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws/' + +# test nets +# NOTE: spot test network only allows certain ep sets: +# https://testnet.binance.vision/ +# https://www.binance.com/en/support/faq/how-to-test-my-functions-on-binance-testnet-ab78f9a1b8824cf0a106b4229c76496d +_testnet_spot_url: str = 'https://testnet.binance.vision/api' +_testnet_spot_ws: str = 'wss://testnet.binance.vision/ws' +# or this one? .. +# 'wss://testnet.binance.vision/ws-api/v3' + +_testnet_futes_url: str = 'https://testnet.binancefuture.com' +_testnet_futes_ws: str = 'wss://stream.binancefuture.com' + + +MarketType = Literal[ + 'spot', + # 'margin', + 'usdtm_futes', + # 'coin_futes', +] + + +def get_api_eps(venue: MarketType) -> tuple[str, str]: + ''' + Return API ep root paths per venue. + + ''' + return { + 'spot': ( + _spot_url, + _spot_ws, + ), + 'usdtm_futes': ( + _futes_url, + _futes_ws, + ), + }[venue] + + +class Pair(Struct, frozen=True, kw_only=True): + symbol: str + status: str + orderTypes: list[str] + + # src + quoteAsset: str + quotePrecision: int + + # dst + baseAsset: str + baseAssetPrecision: int + + filters: dict[ + str, + str | int | float, + ] = field(default_factory=dict) + + @property + def price_tick(self) -> Decimal: + # XXX: lul, after manually inspecting the response format we + # just directly pick out the info we need + step_size: str = self.filters['PRICE_FILTER']['tickSize'].rstrip('0') + return Decimal(step_size) + + @property + def size_tick(self) -> Decimal: + step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0') + return Decimal(step_size) + + @property + def bs_fqme(self) -> str: + return self.symbol + + +class SpotPair(Pair, frozen=True): + + cancelReplaceAllowed: bool + allowTrailingStop: bool + quoteAssetPrecision: int + + baseCommissionPrecision: int + quoteCommissionPrecision: int + + icebergAllowed: bool + ocoAllowed: bool + quoteOrderQtyMarketAllowed: bool + isSpotTradingAllowed: bool + isMarginTradingAllowed: bool + + defaultSelfTradePreventionMode: str + allowedSelfTradePreventionModes: list[str] + permissions: list[str] + + @property + def bs_fqme(self) -> str: + return f'{self.symbol}.SPOT' + + + +class FutesPair(Pair): + symbol: str # 'BTCUSDT', + pair: str # 'BTCUSDT', + baseAssetPrecision: int # 8, + contractType: str # 'PERPETUAL', + deliveryDate: int # 4133404800000, + liquidationFee: float # '0.012500', + maintMarginPercent: float # '2.5000', + marginAsset: str # 'USDT', + marketTakeBound: float # '0.05', + maxMoveOrderLimit: int # 10000, + onboardDate: int # 1569398400000, + pricePrecision: int # 2, + quantityPrecision: int # 3, + quoteAsset: str # 'USDT', + quotePrecision: int # 8, + requiredMarginPercent: float # '5.0000', + settlePlan: int # 0, + timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], + triggerProtect: float # '0.0500', + underlyingSubType: list[str] # ['PoW'], + underlyingType: str # 'COIN' + + # NOTE: for compat with spot pairs and `MktPair.src: Asset` + # processing.. + @property + def quoteAssetPrecision(self) -> int: + return self.quotePrecision + + @property + def bs_fqme(self) -> str: + symbol: str = self.symbol + ctype: str = self.contractType + margin: str = self.marginAsset + + match ctype: + case 'PERPETUAL': + return f'{symbol}.{margin}M.PERP' + + case 'CURRENT_QUARTER': + pair, _, expiry = symbol.partition('_') + return f'{pair}.{margin}M.{expiry}' + + case '': + subtype: list[str] = self.underlyingSubType + if not subtype: + if self.status == 'PENDING_TRADING': + return f'{symbol}.{margin}M.PENDING' + + match subtype[0]: + case 'DEFI': + return f'{symbol}.{subtype}.PERP' + + # XXX: yeah no clue then.. + return f'{symbol}.WTF.PWNED.BBQ' + + + +PAIRTYPES: dict[MarketType, Pair] = { + 'spot': SpotPair, + 'usdtm_futes': FutesPair, + + # TODO: support coin-margined venue: + # https://binance-docs.github.io/apidocs/delivery/en/#change-log + # 'coinm_futes': CoinFutesPair, +} diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 5ebca3e71..937e936e1 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -195,7 +195,7 @@ async def bcheck_main(): @cli.command() @click.option('--keys', '-k', multiple=True, - help='Return results only for these keys') + help='Return results only for these keys') @click.argument('meth', nargs=1) @click.argument('kwargs', nargs=-1) @click.pass_obj @@ -485,3 +485,37 @@ async def main(func): return click.echo(colorize_json(quotes)) + + +@cli.command() +@click.argument('section', required=False) +@click.argument('value', required=False) +@click.option('--delete', '-d', flag_value=True, help='Delete section') +@click.pass_obj +def brokercfg(config, section, value, delete): + """If invoked with no arguments, open an editor to edit broker configs file + or get / update an individual section. + """ + from .. import config + + if section: + conf, path = config.load() + + if not delete: + if value: + config.set_value(conf, section, value) + + click.echo( + colorize_json( + config.get_value(conf, section)) + ) + else: + config.del_value(conf, section) + + config.write(config=conf) + + else: + conf, path = config.load(raw=True) + config.write( + raw=click.edit(text=conf) + ) diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 80bc228f1..07ed8af58 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -34,12 +34,14 @@ stream_quotes, ) from .broker import ( - trades_dialogue, + open_trade_dialog, + norm_trade_records, ) __all__ = [ 'get_client', - 'trades_dialogue', + 'norm_trade_records', + 'open_trade_dialog', 'open_history_client', 'open_symbol_search', 'stream_quotes', diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index f23aa99b4..05417d98b 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -29,7 +29,7 @@ import tractor -from .._util import get_logger +from piker.brokers._util import get_logger if TYPE_CHECKING: from .api import Client diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 171578aa9..fd0d024d6 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -85,8 +85,9 @@ # non-relative for backends so that non-builting backends # can be easily modelled after this style B) from piker import config -from piker.brokers._util import ( +from ._util import ( log, + # only for the ib_sync internal logging get_logger, ) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index d6c361334..21d4baa5c 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -60,6 +60,7 @@ Position, Transaction, open_trade_ledger, + TransactionLedger, iter_by_dt, open_pps, PpTable, @@ -78,10 +79,10 @@ from piker.accounting import ( MktPair, ) +from ._util import log from .api import ( _accounts2clients, con2fqme, - log, get_config, open_client_proxies, Client, @@ -90,6 +91,7 @@ from ._flex_reports import parse_flex_dt + def pack_position( pos: IbPosition @@ -339,7 +341,7 @@ async def update_and_audit_msgs( acctid: str, # no `ib.` prefix is required! pps: list[Position], cids2pps: dict[tuple[str, int], BrokerdPosition], - validate: bool = False, + validate: bool = True, ) -> list[BrokerdPosition]: @@ -352,9 +354,9 @@ async def update_and_audit_msgs( # for comparison/audit versus the piker equivalent # breakeven pp calcs. ibppmsg = cids2pps.get((acctid, bs_mktid)) - if ibppmsg: - symbol = ibppmsg.symbol + + symbol: str = ibppmsg.symbol msg = BrokerdPosition( broker='ib', @@ -375,36 +377,41 @@ async def update_and_audit_msgs( ibfmtmsg = pformat(ibppmsg.to_dict()) pikerfmtmsg = pformat(msg.to_dict()) - if validate: - ibsize = ibppmsg.size - pikersize = msg.size - diff = pikersize - ibsize - - # if ib reports a lesser pp it's not as bad since we can - # presume we're at least not more in the shit then we - # thought. - if diff and pikersize: - reverse_split_ratio = pikersize / ibsize - split_ratio = 1/reverse_split_ratio - - if split_ratio >= reverse_split_ratio: - entry = f'split_ratio = {int(split_ratio)}' - else: - entry = f'split_ratio = 1/{int(reverse_split_ratio)}' - - # raise ValueError( - log.error( - f'Pos mismatch in ib vs. the piker ledger!\n' - f'IB:\n{ibfmtmsg}\n\n' - f'PIKER:\n{pikerfmtmsg}\n\n' - 'If you are expecting a (reverse) split in this ' - 'instrument you should probably put the following' - 'in the `pps.toml` section:\n' - f'{entry}\n' - # f'reverse_split_ratio: {reverse_split_ratio}\n' - # f'split_ratio: {split_ratio}\n\n' - ) - msg.size = ibsize + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff and pikersize: + reverse_split_ratio = pikersize / ibsize + split_ratio = 1/reverse_split_ratio + + if split_ratio >= reverse_split_ratio: + entry = f'split_ratio = {int(split_ratio)}' + else: + entry = f'split_ratio = 1/{int(reverse_split_ratio)}' + + msg.size = ibsize + + logmsg: str = ( + f'Pos mismatch in ib vs. the piker ledger!\n' + f'IB:\n{ibfmtmsg}\n\n' + f'PIKER:\n{pikerfmtmsg}\n\n' + 'If you are expecting a (reverse) split in this ' + 'instrument you should probably put the following' + 'in the `pps.toml` section:\n' + f'{entry}\n' + # f'reverse_split_ratio: {reverse_split_ratio}\n' + # f'split_ratio: {split_ratio}\n\n' + ) + + if validate: + raise ValueError(logmsg) + else: + # await tractor.pause() + log.error(logmsg) if ibppmsg.avg_price != msg.avg_price: # TODO: make this a "propaganda" log level? @@ -432,12 +439,16 @@ async def update_and_audit_msgs( size=p.size, avg_price=p.ppu, ) - if validate and p.size: - # raise ValueError( - log.error( + if p.size: + logmsg: str = ( f'UNEXPECTED POSITION says IB => {msg.symbol}\n' 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n' ) + log.error(logmsg) + + # if validate: + # raise ValueError(logmsg) + msgs.append(msg) return msgs @@ -520,10 +531,8 @@ async def open_trade_event_stream( @tractor.context -async def trades_dialogue( - +async def open_trade_dialog( ctx: tractor.Context, - # loglevel: str = None, ) -> AsyncIterator[dict[str, Any]]: @@ -575,6 +584,7 @@ async def trades_dialogue( # open ledger and pptable wrapper for each # detected account. + ledger: TransactionLedger ledger = ledgers[acctid] = lstack.enter_context( open_trade_ledger( 'ib', @@ -643,13 +653,14 @@ async def trades_dialogue( # TODO: fix this `tractor` BUG! # https://github.com/goodboy/tractor/issues/354 - # await tractor.breakpoint() + # await tractor.pp() if trade_entries: # write ledger with all new api_trades # **AFTER** we've updated the `pps.toml` # from the original ledger state! (i.e. this # is currently done on exit) + for tid, entry in trade_entries.items(): ledger.setdefault(tid, {}).update(entry) @@ -670,6 +681,7 @@ async def trades_dialogue( # -> collect all ib-pp reported positions so that we can be # sure know which positions to update from the ledger if # any are missing from the ``pps.toml`` + # await tractor.pp() pos: IbPosition # named tuple subtype for pos in client.positions(): @@ -702,7 +714,7 @@ async def trades_dialogue( acctid, pps.values(), cids2pps, - validate=True, + validate=False, ) all_positions.extend(msg for msg in msgs) diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 0589981b7..8ec19bcfa 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -35,7 +35,7 @@ stream_quotes, ) from .broker import ( - trades_dialogue, + open_trade_dialog, norm_trade_records, ) @@ -43,7 +43,7 @@ __all__ = [ 'get_client', 'get_mkt_info', - 'trades_dialogue', + 'open_trade_dialog', 'open_history_client', 'open_symbol_search', 'stream_quotes', diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index de2be68c1..a82714cf7 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -153,6 +153,10 @@ def price_tick(self) -> Decimal: def size_tick(self) -> Decimal: return digits_to_dec(self.lot_decimals) + @property + def bs_fqme(self) -> str: + return f'{self.symbol}.SPOT' + class Client: @@ -638,7 +642,10 @@ def normalize_symbol( the 'AssetPairs' endpoint, see methods above. ''' - return cls._ntable[ticker].lower() + try: + return cls._ntable[ticker] + except KeyError as ke: + raise SymbolNotFound(f'kraken has no {ke.args[0]}') @acm diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index fc2eff62d..7cb596725 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,7 +18,6 @@ Order api and machinery ''' -from collections import ChainMap, defaultdict from contextlib import ( asynccontextmanager as acm, aclosing, @@ -52,6 +51,9 @@ from piker.accounting._mktinfo import ( MktPair, ) +from piker.clearing import( + OrderDialogs, +) from piker.clearing._messages import ( Order, Status, @@ -124,7 +126,7 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, token: str, - apiflows: dict[int, ChainMap[dict[str, dict]]], + apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: dict[int, str], @@ -134,10 +136,8 @@ async def handle_order_requests( and deliver acks or errors. ''' - # XXX: UGH, let's unify this.. with ``msgspec``. - msg: dict[str, Any] - order: BrokerdOrder - + # XXX: UGH, let's unify this.. with ``msgspec``!!! + msg: dict | Order async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') match msg: @@ -183,11 +183,12 @@ async def handle_order_requests( # logic from old `Client.submit_limit()` if order.oid in ids: - ep = 'editOrder' - reqid = ids[order.oid] # integer not txid + ep: str = 'editOrder' + reqid: int = ids[order.oid] # integer not txid try: - txid = reqids2txids[reqid] + txid: str = reqids2txids[reqid] except KeyError: + # XXX: not sure if this block ever gets hit now? log.error('TOO FAST EDIT') reqids2txids[reqid] = TooFastEdit(reqid) @@ -208,7 +209,7 @@ async def handle_order_requests( } else: - ep = 'addOrder' + ep: str = 'addOrder' reqid = BrokerClient.new_reqid() ids[order.oid] = reqid @@ -221,8 +222,12 @@ async def handle_order_requests( 'type': order.action, } - psym = order.symbol.upper() - pair = f'{psym[:3]}/{psym[3:]}' + # XXX strip any . token which should + # ONLY ever be '.spot' rn, until we support + # futes. + bs_fqme: str = order.symbol.replace('.spot', '') + psym: str = bs_fqme.upper() + pair: str = f'{psym[:3]}/{psym[3:]}' # XXX: ACK the request **immediately** before sending # the api side request to ensure the ems maps the oid -> @@ -260,7 +265,7 @@ async def handle_order_requests( await ws.send_msg(req) # placehold for sanity checking in relay loop - apiflows[reqid].maps.append(msg) + apiflows.add_msg(reqid, msg) case _: account = msg.get('account') @@ -372,10 +377,7 @@ def trades2pps( write_storage: bool = True, -) -> tuple[ - list[BrokerdPosition], - list[Transaction], -]: +) -> list[BrokerdPosition]: if new_trans: updated = table.update_from_trans( new_trans, @@ -420,23 +422,18 @@ def trades2pps( @tractor.context -async def trades_dialogue( +async def open_trade_dialog( ctx: tractor.Context, ) -> AsyncIterator[dict[str, Any]]: async with get_client() as client: - + # make ems flip to paper mode when no creds setup in + # `brokers.toml` B0 if not client._api_key: await ctx.started('paper') return - # TODO: make ems flip to paper mode via - # some returned signal if the user only wants to use - # the data feed or we return this? - # else: - # await ctx.started(({}, ['paper'])) - # NOTE: currently we expect the user to define a "source fiat" # (much like the web UI let's you set an "account currency") # such that all positions (nested or flat) will be translated to @@ -448,10 +445,7 @@ async def trades_dialogue( acc_name = 'kraken.' + acctid # task local msg dialog tracking - apiflows: defaultdict[ - int, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) + apiflows = OrderDialogs() # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -648,7 +642,7 @@ def has_pp( # stage a first reqid of `0` reqids2txids[0] = last_trade_dict['ordertxid'] - ppmsgs = trades2pps( + ppmsgs: list[BrokerdPosition] = trades2pps( table, acctid, ) @@ -714,7 +708,7 @@ async def handle_order_updates( ws: NoBsWs, ws_stream: AsyncIterator, ems_stream: tractor.MsgStream, - apiflows: dict[int, ChainMap[dict[str, dict]]], + apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: bidict[int, str], table: PpTable, @@ -874,8 +868,9 @@ async def handle_order_updates( # 'vol_exec': exec_vlm} # 0.0000 match update_msg: - # EMS-unknown LIVE order that needs to be - # delivered and loaded on the client-side. + # EMS-unknown pre-exising-submitted LIVE + # order that needs to be delivered and + # loaded on the client-side. case { 'userref': reqid, 'descr': { @@ -894,7 +889,7 @@ async def handle_order_updates( ids.inverse.get(reqid) is None ): # parse out existing live order - fqme = pair.replace('/', '').lower() + fqme = pair.replace('/', '').lower() + '.spot' price = float(price) size = float(vol) @@ -928,7 +923,7 @@ async def handle_order_updates( ), src='kraken', ) - apiflows[reqid].maps.append(status_msg.to_dict()) + apiflows.add_msg(reqid, status_msg.to_dict()) await ems_stream.send(status_msg) continue @@ -1064,7 +1059,7 @@ async def handle_order_updates( ), ) - apiflows[reqid].maps.append(update_msg) + apiflows.add_msg(reqid, update_msg) await ems_stream.send(resp) # fill msg. @@ -1143,9 +1138,8 @@ async def handle_order_updates( ) continue - # update the msg chain - chain = apiflows[reqid] - chain.maps.append(event) + # update the msg history + apiflows.add_msg(reqid, event) if status == 'error': # any of ``{'add', 'edit', 'cancel'}`` @@ -1155,11 +1149,16 @@ async def handle_order_updates( f'Failed to {action} order {reqid}:\n' f'{errmsg}' ) + + symbol: str = 'N/A' + if chain := apiflows.get(reqid): + symbol: str = chain.get('symbol', 'N/A') + await ems_stream.send(BrokerdError( oid=oid, # XXX: use old reqid in case it changed? reqid=reqid, - symbol=chain.get('symbol', 'N/A'), + symbol=symbol, reason=f'Failed {action}:\n{errmsg}', broker_details=event @@ -1201,8 +1200,8 @@ async def norm_trade_records( }[record['type']] # we normalize to kraken's `altname` always.. - bs_mktid = Client.normalize_symbol(record['pair']) - fqme = f'{bs_mktid}.kraken' + bs_mktid: str = Client.normalize_symbol(record['pair']) + fqme = f'{bs_mktid.lower()}.kraken' mkt: MktPair = (await get_mkt_info(fqme))[0] records[tid] = Transaction( diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index dc70672fc..d0b14f33e 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -24,7 +24,6 @@ ) from datetime import datetime from typing import ( - Any, AsyncGenerator, Callable, Optional, @@ -41,9 +40,11 @@ from piker.accounting._mktinfo import ( Asset, MktPair, + unpack_fqme, ) from piker.brokers import ( open_cached_client, + SymbolNotFound, ) from piker._cacheables import ( async_lifo_cache, @@ -63,7 +64,7 @@ ) -class OHLC(Struct): +class OHLC(Struct, frozen=True): ''' Description of the flattened OHLC quote format. @@ -74,6 +75,8 @@ class OHLC(Struct): chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair + + # unpacked from array time: float # Begin time of interval, in seconds since epoch etime: float # End time of interval, in seconds since epoch open: float # Open price of interval @@ -83,8 +86,6 @@ class OHLC(Struct): vwap: float # Volume weighted average price within interval volume: float # Accumulated volume **within interval** count: int # Number of trades within interval - # (sampled) generated tick data - ticks: list[Any] = [] async def stream_messages( @@ -148,14 +149,15 @@ async def process_data_feed_msgs( pair ]: if 'ohlc' in chan_name: + array: list = payload_array[0] ohlc = OHLC( chan_id, chan_name, pair, - *payload_array[0] + *map(float, array[:-1]), + count=array[-1], ) - ohlc.typecast() - yield 'ohlc', ohlc + yield 'ohlc', ohlc.copy() elif 'spread' in chan_name: @@ -195,24 +197,18 @@ async def process_data_feed_msgs( # yield msg -def normalize( - ohlc: OHLC, +def normalize(ohlc: OHLC) -> dict: + ''' + Norm an `OHLC` msg to piker's minimal (live-)quote schema. -) -> dict: + ''' quote = ohlc.to_dict() quote['broker_ts'] = quote['time'] quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['last'] = quote['close'] quote['bar_wap'] = ohlc.vwap - - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - # XXX: piker style is always lowercases symbols. - topic = quote['pair'].replace('/', '').lower() - - # print(quote) - return topic, quote + return quote @acm @@ -221,7 +217,7 @@ async def open_history_client( ) -> AsyncGenerator[Callable, None]: - symbol: str = mkt.bs_fqme + symbol: str = mkt.bs_mktid # TODO implement history getter for the new storage layer. async with open_cached_client('kraken') as client: @@ -284,6 +280,18 @@ async def get_mkt_info( key-strs to `MktPair`s. ''' + venue: str = 'spot' + expiry: str = '' + if '.kraken' in fqme: + broker, pair, venue, expiry = unpack_fqme(fqme) + venue: str = venue or 'spot' + + if venue != 'spot': + raise SymbolNotFound( + 'kraken only supports spot markets right now!\n' + f'{fqme}\n' + ) + async with open_cached_client('kraken') as client: # uppercase since kraken bs_mktid is always upper @@ -304,6 +312,12 @@ async def get_mkt_info( size_tick=pair.size_tick, bs_mktid=bs_mktid, + expiry=expiry, + venue=venue or 'spot', + + # TODO: futes + # _atype=_atype, + broker='kraken', ) return mkt, pair @@ -410,50 +424,58 @@ async def subscribe(ws: NoBsWs): ): # pull a first quote and deliver typ, ohlc_last = await anext(msg_gen) - topic, quote = normalize(ohlc_last) + quote = normalize(ohlc_last) task_status.started((init_msgs, quote)) feed_is_live.set() # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime + last_interval_start: float = ohlc_last.etime # start streaming - async for typ, ohlc in msg_gen: - - if typ == 'ohlc': + topic: str = mkt.bs_fqme + async for typ, quote in msg_gen: + match typ: # TODO: can get rid of all this by using - # ``trades`` subscription... - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - - # new OHLC sample interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume - - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - ohlc_last = ohlc - last = ohlc.close - - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) - - topic, quote = normalize(ohlc) - - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'].lower() + # ``trades`` subscription..? Not sure why this + # wasn't used originally? (music queues) zoltannn.. + # https://docs.kraken.com/websockets/#message-trade + case 'ohlc': + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = quote.volume + + # new OHLC sample interval + if quote.etime > last_interval_start: + last_interval_start: float = quote.etime + tick_volume: float = volume + + else: + # this is the tick volume *within the interval* + tick_volume: float = volume - ohlc_last.volume + + ohlc_last = quote + last = quote.close + + quote = normalize(quote) + ticks = quote.setdefault( + 'ticks', + [], + ) + if tick_volume: + ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) + + case 'l1': + # passthrough quote msg + pass + + case _: + log.warning(f'Unknown WSS message: {typ}, {quote}') await send_chan.send({topic: quote}) diff --git a/piker/clearing/__init__.py b/piker/clearing/__init__.py index ec796ac9d..19d6390f2 100644 --- a/piker/clearing/__init__.py +++ b/piker/clearing/__init__.py @@ -26,12 +26,14 @@ from ._ems import ( open_brokerd_dialog, ) +from ._util import OrderDialogs __all__ = [ 'open_ems', 'OrderClient', 'open_brokerd_dialog', + 'OrderDialogs', ] diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 436b4f8e1..9977f95d7 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -216,8 +216,8 @@ async def open_ems( loglevel: str = 'error', ) -> tuple[ - OrderClient, - tractor.MsgStream, + OrderClient, # client + tractor.MsgStream, # order ctl stream dict[ # brokername, acctid tuple[str, str], diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 1bb57ae79..c18631a66 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1529,30 +1529,34 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The primary ``emsd`` task trees are: + The primary ``emsd`` task tree is: - ``_setup_persistent_emsd()``: - is the ``emsd`` actor's primary root task which sets up an - actor-global ``Router`` instance and starts a relay loop task - which lives until the backend broker is shutdown or the ems is - terminated. - | - - (maybe) ``translate_and_relay_brokerd_events()``: - accept normalized trades responses from brokerd, process and - relay to ems client(s); this is a effectively a "trade event - reponse" proxy-broker. - - - ``_emsd_main()``: - attaches a brokerd real-time quote feed and trades dialogue with - brokderd trading api for every connecting client. - | - - ``clear_dark_triggers()``: - run (dark order) conditions on inputs and trigger brokerd "live" - order submissions. - | - - ``process_client_order_cmds()``: - accepts order cmds from requesting clients, registers dark orders and - alerts with clearing loop. + is the ``emsd`` actor's primary *service-fixture* task which + is opened by the `pikerd` service manager and sets up + a process-global (actor-local) ``Router`` instance and opens + a service nursery which lives until the backend broker is + shutdown or the ems is terminated; all tasks are + *dynamically* started (and persisted) within this service + nursery when the below endpoint context is opened: + | + - ``_emsd_main()``: + attaches a real-time quote feed and trades dialogue with + a `brokerd` actor which connects to the backend broker's + trading api for every connecting client. + | + - ``clear_dark_triggers()``: + run (dark order) conditions on inputs and trigger brokerd + "live" order submissions. + | + - ``process_client_order_cmds()``: + accepts order cmds from requesting clients, registers + dark orders and alerts with above (dark) clearing loop. + | + - (maybe) ``translate_and_relay_brokerd_events()``: + accept normalized trades responses from brokerd, process and + relay to ems client(s); this is a effectively a "trade event + reponse" proxy-broker. ''' global _router @@ -1560,9 +1564,9 @@ async def _emsd_main( broker, _, _, _ = unpack_fqme(fqme) - # TODO: would be nice if in tractor we can require either a ctx arg, - # or a named arg with ctx in it and a type annotation of - # tractor.Context instead of strictly requiring a ctx arg. + # TODO: would be nice if in tractor we can require either a ctx + # arg, or a named arg with ctx in it and a type annotation of + # `tractor.Context` instead of strictly requiring a ctx arg. ems_ctx = ctx # spawn one task per broker feed diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 34e7ec58e..ac5f3d3fd 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -55,6 +55,7 @@ from ..accounting import unpack_fqme from ._util import ( log, # sub-sys logger + get_console_log, ) from ._messages import ( BrokerdCancel, @@ -536,7 +537,8 @@ async def open_trade_dialog( ) -> None: - tractor.log.get_console_log(loglevel) + # enable piker.clearing console log for *this* subactor + get_console_log(loglevel) ppt: PpTable ledger: TransactionLedger diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index ec93512dc..d3c0fb8ef 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -17,12 +17,15 @@ Sub-sys module commons. """ +from collections import ChainMap from functools import partial +from typing import Any from ..log import ( get_logger, get_console_log, ) +from piker.data.types import Struct subsys: str = 'piker.clearing' log = get_logger(subsys) @@ -31,3 +34,63 @@ get_console_log, name=subsys, ) + + +# TODO: use this in other backends like kraken which currently has +# a less formalized version more or less: +# `apiflows[reqid].maps.append(status_msg.to_dict())` +class OrderDialogs(Struct): + ''' + Order control dialog (and thus transaction) tracking via + message recording. + + Allows easily recording messages associated with a given set of + order control transactions and looking up the latest field + state using the entire (reverse chronological) msg flow. + + ''' + _flows: dict[str, ChainMap] = {} + + def add_msg( + self, + oid: str, + msg: dict, + ) -> None: + + # NOTE: manually enter a new map on the first msg add to + # avoid creating one with an empty dict first entry in + # `ChainMap.maps` which is the default if none passed at + # init. + cm: ChainMap = self._flows.get(oid) + if cm: + cm.maps.insert(0, msg) + else: + cm = ChainMap(msg) + self._flows[oid] = cm + + # TODO: wrap all this in the `collections.abc.Mapping` interface? + def get( + self, + oid: str, + + ) -> ChainMap[str, Any]: + ''' + Return the dialog `ChainMap` for provided id. + + ''' + return self._flows.get(oid, None) + + def pop( + self, + oid: str, + + ) -> ChainMap[str, Any]: + ''' + Pop and thus remove the `ChainMap` containing the msg flow + for the given order id. + + ''' + if (flow := self._flows.pop(oid, None)) is None: + log.warning(f'No flow found for oid: {oid}') + + return flow diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index a812555e7..6972270d0 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -39,7 +39,7 @@ from .. import config -log = get_logger('cli') +log = get_logger('piker.cli') @click.command() @@ -71,16 +71,14 @@ def pikerd( Spawn the piker broker-daemon. ''' - from .. import service - - log = get_console_log(loglevel) + log = get_console_log(loglevel, name='cli') if pdb: log.warning(( "\n" - "!!! You have enabled daemon DEBUG mode !!!\n" - "If a daemon crashes it will likely block" - " the service until resumed from console!\n" + "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" + "When a `piker` daemon crashes it will block the " + "task-thread until resumed from console!\n" "\n" )) @@ -91,6 +89,8 @@ def pikerd( int(port) or _default_registry_port, ) + from .. import service + async def main(): service_mngr: service.Services diff --git a/piker/config.py b/piker/config.py index 0220f3e6d..80f7b1d1a 100644 --- a/piker/config.py +++ b/piker/config.py @@ -22,7 +22,6 @@ import sys import os import shutil -import time from typing import ( Callable, MutableMapping, @@ -173,6 +172,14 @@ def _posixify(name): ) +class ConfigurationError(Exception): + 'Misconfigured settings, likely in a TOML file.' + + +class NoSignature(ConfigurationError): + 'No credentials setup for broker backend!' + + def _override_config_dir( path: str ) -> None: @@ -302,98 +309,6 @@ def load( return config, path -def load_account( - brokername: str, - acctid: str, - -) -> tuple[dict, Path]: - ''' - Load a accounting (with positions) file from - $CONFIG_DIR/accounting/account...toml - - Where normally $CONFIG_DIR = ~/.config/piker/ - and we implicitly create a accounting subdir which should - normally be linked to a git repo managed by the user B) - - ''' - legacy_fn: str = f'pps.{brokername}.{acctid}.toml' - fn: str = f'account.{brokername}.{acctid}.toml' - - dirpath: Path = _config_dir / 'accounting' - if not dirpath.is_dir(): - dirpath.mkdir() - - config, path = load( - path=dirpath / fn, - decode=tomlkit.parse, - touch_if_dne=True, - ) - - if not config: - legacypath = dirpath / legacy_fn - log.warning( - f'Your account file is using the legacy `pps.` prefix..\n' - f'Rewriting contents to new name -> {path}\n' - 'Please delete the old file!\n' - f'|-> {legacypath}\n' - ) - if legacypath.is_file(): - legacy_config, _ = load( - path=legacypath, - - # TODO: move to tomlkit: - # - needs to be fixed to support bidict? - # https://github.com/sdispater/tomlkit/issues/289 - # - we need to use or fork's fix to do multiline array - # indenting. - decode=tomlkit.parse, - ) - config.update(legacy_config) - - # XXX: override the presumably previously non-existant - # file with legacy's contents. - write( - config, - path=path, - fail_empty=False, - ) - - return config, path - - -def load_ledger( - brokername: str, - acctid: str, - -) -> tuple[dict, Path]: - ''' - Load a ledger (TOML) file from user's config directory: - $CONFIG_DIR/accounting/ledgers/trades__.toml - - Return its `dict`-content and file path. - - ''' - ldir: Path = _config_dir / 'accounting' / 'ledgers' - if not ldir.is_dir(): - ldir.mkdir() - - fname = f'trades_{brokername}_{acctid}.toml' - fpath: Path = ldir / fname - - if not fpath.is_file(): - log.info( - f'Creating new local trades ledger: {fpath}' - ) - fpath.touch() - - with fpath.open(mode='rb') as cf: - start = time.time() - ledger_dict = tomllib.load(cf) - log.debug(f'Ledger load took {time.time() - start}s') - - return ledger_dict, fpath - - def write( config: dict, # toml config as dict @@ -463,3 +378,51 @@ def load_accounts( accounts['paper'] = None return accounts + + +# XXX: Recursive getting & setting + +def get_value(_dict, _section): + subs = _section.split('.') + if len(subs) > 1: + return get_value( + _dict[subs[0]], + '.'.join(subs[1:]), + ) + + else: + return _dict[_section] + + +def set_value(_dict, _section, val): + subs = _section.split('.') + if len(subs) > 1: + if subs[0] not in _dict: + _dict[subs[0]] = {} + + return set_value( + _dict[subs[0]], + '.'.join(subs[1:]), + val + ) + + else: + _dict[_section] = val + + +def del_value(_dict, _section): + subs = _section.split('.') + if len(subs) > 1: + if subs[0] not in _dict: + return + + return del_value( + _dict[subs[0]], + '.'.join(subs[1:]) + ) + + else: + if _section not in _dict: + return + + del _dict[_section] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index cd0a11833..087928ec7 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -35,11 +35,15 @@ def_ohlcv_fields, ) from .feed import ( + Feed, open_feed, ) +from .flows import Flume __all__ = [ + 'Flume', + 'Feed', 'open_feed', 'ShmArray', 'iterticks', diff --git a/piker/data/feed.py b/piker/data/feed.py index ea7f360b9..fcd193da0 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -642,9 +642,12 @@ class Feed(Struct): ''' mods: dict[str, ModuleType] = {} portals: dict[ModuleType, tractor.Portal] = {} - flumes: dict[str, Flume] = {} + flumes: dict[ + str, # FQME + Flume, + ] = {} streams: dict[ - str, + str, # broker name trio.abc.ReceiveChannel[dict[str, Any]], ] = {} diff --git a/piker/log.py b/piker/log.py index a36beec02..56776e1e8 100644 --- a/piker/log.py +++ b/piker/log.py @@ -40,7 +40,10 @@ def get_logger( Return the package log or a sub-log for `name` if provided. ''' - return tractor.log.get_logger(name=name, _root_name=_proj_name) + return tractor.log.get_logger( + name=name, + _root_name=_proj_name, + ) def get_console_log( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 0b3a18970..9debfc582 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -587,7 +587,7 @@ def on_cancel( ) -> None: - msg = self.client._sent_orders.pop(uuid, None) + msg: Order = self.client._sent_orders.pop(uuid, None) if msg is not None: self.lines.remove_line(uuid=uuid) @@ -715,7 +715,8 @@ async def open_order_mode( loglevel: str = 'info' ) -> None: - '''Activate chart-trader order mode loop: + ''' + Activate chart-trader order mode loop: - connect to emsd - load existing positions @@ -769,15 +770,20 @@ async def open_order_mode( accounts_def: bidict[str, str | None] = config.load_accounts( providers=[mkt.broker], ) + # await tractor.pause() # XXX: ``brokerd`` delivers a set of account names that it # allows use of but the user also can define the accounts they'd # like to use, in order, in their `brokers.toml` file. - accounts = {} + accounts: dict[str, str] = {} for name in brokerd_accounts: # ensure name is in ``brokers.toml`` accounts[name] = accounts_def[name] + # always add a paper entry so that paper cleared + # order dialogs can be tracked in the order mode UIs. + accounts['paper'] = 'paper' + # first account listed is the one we select at startup # (aka order based selection). pp_account = next( diff --git a/piker/ui/view_mode.py b/piker/ui/view_mode.py index d785c67a2..82dfbf623 100644 --- a/piker/ui/view_mode.py +++ b/piker/ui/view_mode.py @@ -488,7 +488,8 @@ def overlay_viewlists( upt.rng = r_up upt.y_val = new_major_ymx profiler(msg) - print(msg) + if debug_print: + print(msg) # register curves by a "full" dispersion metric for # later sort order in the overlay (technique diff --git a/tests/test_docker_services.py b/tests/test_docker_services.py index 66438e336..084e5e63e 100644 --- a/tests/test_docker_services.py +++ b/tests/test_docker_services.py @@ -12,6 +12,7 @@ from piker.service import elastic +@pytest.mark.skip def test_marketstore_startup_and_version( open_test_pikerd: AsyncContextManager, loglevel: str, @@ -38,7 +39,7 @@ async def main(): ) as ( _, # host _, # port - pikerd_portal, + _, services, ), diff --git a/tests/test_ems.py b/tests/test_ems.py index f9c010f08..c2f5d7a8e 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -164,7 +164,9 @@ def test_ems_err_on_bad_broker( async def load_bad_fqme(): try: async with ( - open_test_pikerd() as (_, _, _, _), + open_test_pikerd( + debug_mode=False, + ) as (_, _, _, _), open_ems( 'doggycoin.doggy', @@ -173,8 +175,11 @@ async def load_bad_fqme(): ) as _ ): pytest.fail('EMS is working on non-broker!?') - except ModuleNotFoundError: - pass + + # NOTE: emsd should error on the actor's enabled modules + # import phase, when looking for a backend named `doggy`. + except tractor.RemoteActorError as re: + assert re.type == ModuleNotFoundError run_and_tollerate_cancels(load_bad_fqme) @@ -241,8 +246,9 @@ async def submit_and_check( ''' broker: str = 'kraken' + venue: str = 'spot' mkt_key: str = 'xbtusdt' - fqme: str = f'{mkt_key}.{broker}' + fqme: str = f'{mkt_key}.{venue}.{broker}' startup_pps: dict[ tuple[str, str], # brokername, acctid