Skip to content

Commit

Permalink
reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosmiei committed Oct 29, 2024
1 parent 79b7375 commit 3c0fec0
Show file tree
Hide file tree
Showing 22 changed files with 3,706 additions and 2,245 deletions.
4,434 changes: 2,786 additions & 1,648 deletions binance/client.py

Large diffs are not rendered by default.

136 changes: 91 additions & 45 deletions binance/depthcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@


class DepthCache(object):

def __init__(self, symbol, conv_type: Callable = float):
"""Initialise the DepthCache
Expand Down Expand Up @@ -113,27 +112,42 @@ def get_asks(self):
]
"""
return DepthCache.sort_depth(self._asks, reverse=False, conv_type=self.conv_type)
return DepthCache.sort_depth(
self._asks, reverse=False, conv_type=self.conv_type
)

@staticmethod
def sort_depth(vals, reverse=False, conv_type: Callable = float):
"""Sort bids or asks by price
"""
"""Sort bids or asks by price"""
if isinstance(vals, dict):
lst = [[conv_type(price), conv_type(quantity)] for price, quantity in vals.items()]
lst = [
[conv_type(price), conv_type(quantity)]
for price, quantity in vals.items()
]
elif isinstance(vals, list):
lst = [[conv_type(price), conv_type(quantity)] for price, quantity in vals]
else:
raise ValueError(f'Unknown order book depth data type: {type(vals)}')
raise ValueError(f"Unknown order book depth data type: {type(vals)}")
lst = sorted(lst, key=itemgetter(0), reverse=reverse)
return lst


DEFAULT_REFRESH = 60 * 30 # 30 minutes


class BaseDepthCacheManager:
TIMEOUT = 60

def __init__(self, client, symbol, loop=None, refresh_interval: Optional[int] = DEFAULT_REFRESH, bm=None, limit=10, conv_type=float):
def __init__(
self,
client,
symbol,
loop=None,
refresh_interval: Optional[int] = DEFAULT_REFRESH,
bm=None,
limit=10,
conv_type=float,
):
"""Create a DepthCacheManager instance
:param client: Binance API client
Expand Down Expand Up @@ -166,10 +180,7 @@ def __init__(self, client, symbol, loop=None, refresh_interval: Optional[int] =
self._log = logging.getLogger(__name__)

async def __aenter__(self):
await asyncio.gather(
self._init_cache(),
self._start_socket()
)
await asyncio.gather(self._init_cache(), self._start_socket())
await self._socket.__aenter__()
return self

Expand Down Expand Up @@ -221,7 +232,7 @@ async def _depth_event(self, msg):
if not msg:
return None

if 'e' in msg and msg['e'] == 'error':
if "e" in msg and msg["e"] == "error":
# close the socket
await self.close()

Expand Down Expand Up @@ -252,13 +263,13 @@ async def _process_depth_message(self, msg):

def _apply_orders(self, msg):
assert self._depth_cache
for bid in msg.get('b', []) + msg.get('bids', []):
for bid in msg.get("b", []) + msg.get("bids", []):
self._depth_cache.add_bid(bid)
for ask in msg.get('a', []) + msg.get('asks', []):
for ask in msg.get("a", []) + msg.get("asks", []):
self._depth_cache.add_ask(ask)

# keeping update time
self._depth_cache.update_time = msg.get('E') or msg.get('lastUpdateId')
self._depth_cache.update_time = msg.get("E") or msg.get("lastUpdateId")

def get_depth_cache(self):
"""Get the current depth cache
Expand All @@ -284,9 +295,16 @@ def get_symbol(self):


class DepthCacheManager(BaseDepthCacheManager):

def __init__(
self, client, symbol, loop=None, refresh_interval: Optional[int] = None, bm=None, limit=500, conv_type=float, ws_interval=None
self,
client,
symbol,
loop=None,
refresh_interval: Optional[int] = None,
bm=None,
limit=500,
conv_type=float,
ws_interval=None,
):
"""Initialise the DepthCacheManager
Expand Down Expand Up @@ -324,13 +342,13 @@ async def _init_cache(self):
# process bid and asks from the order book
self._apply_orders(res)
assert self._depth_cache
for bid in res['bids']:
for bid in res["bids"]:
self._depth_cache.add_bid(bid)
for ask in res['asks']:
for ask in res["asks"]:
self._depth_cache.add_ask(ask)

# set first update id
self._last_update_id = res['lastUpdateId']
self._last_update_id = res["lastUpdateId"]

# Apply any updates from the websocket
for msg in self._depth_message_buffer:
Expand All @@ -344,7 +362,7 @@ async def _start_socket(self):
:return:
"""
if not getattr(self, '_depth_message_buffer', None):
if not getattr(self, "_depth_message_buffer", None):
self._depth_message_buffer = []

await super()._start_socket()
Expand All @@ -365,10 +383,10 @@ async def _process_depth_message(self, msg):
self._depth_message_buffer.append(msg)
return

if msg['u'] <= self._last_update_id:
if msg["u"] <= self._last_update_id:
# ignore any updates before the initial update id
return
elif msg['U'] != self._last_update_id + 1:
elif msg["U"] != self._last_update_id + 1:
# if not buffered check we get sequential updates
# otherwise init cache again
await self._init_cache()
Expand All @@ -379,7 +397,7 @@ async def _process_depth_message(self, msg):
# call the callback with the updated depth cache
res = self._depth_cache

self._last_update_id = msg['u']
self._last_update_id = msg["u"]

# after processing event see if we need to refresh the depth cache
if self._refresh_interval and int(time.time()) > self._refresh_time:
Expand All @@ -396,42 +414,49 @@ async def _process_depth_message(self, msg):
:return:
"""
msg = msg.get('data')
msg = msg.get("data")
return await super()._process_depth_message(msg)

def _apply_orders(self, msg):
assert self._depth_cache
self._depth_cache._bids = msg.get('b', [])
self._depth_cache._asks = msg.get('a', [])
self._depth_cache._bids = msg.get("b", [])
self._depth_cache._asks = msg.get("a", [])

# keeping update time
self._depth_cache.update_time = msg.get('E') or msg.get('lastUpdateId')
self._depth_cache.update_time = msg.get("E") or msg.get("lastUpdateId")

def _get_socket(self):
sock = self._bm.futures_depth_socket(self._symbol)
return sock


class OptionsDepthCacheManager(BaseDepthCacheManager):

def _get_socket(self):
return self._bm.options_depth_socket(self._symbol)


class ThreadedDepthCacheManager(ThreadedApiManager):

def __init__(
self, api_key: Optional[str] = None, api_secret: Optional[str] = None,
requests_params: Optional[Dict[str, str]] = None, tld: str = 'com',
testnet: bool = False
self,
api_key: Optional[str] = None,
api_secret: Optional[str] = None,
requests_params: Optional[Dict[str, str]] = None,
tld: str = "com",
testnet: bool = False,
):
super().__init__(api_key, api_secret, requests_params, tld, testnet)

def _start_depth_cache(
self, dcm_class, callback: Callable, symbol: str,
refresh_interval=None, bm=None, limit=10, conv_type=float, **kwargs
self,
dcm_class,
callback: Callable,
symbol: str,
refresh_interval=None,
bm=None,
limit=10,
conv_type=float,
**kwargs,
) -> str:

while not self._client:
time.sleep(0.01)

Expand All @@ -443,15 +468,24 @@ def _start_depth_cache(
bm=bm,
limit=limit,
conv_type=conv_type,
**kwargs
**kwargs,
)
path = symbol.lower() + '@depth' + str(limit)
path = symbol.lower() + "@depth" + str(limit)
self._socket_running[path] = True
self._loop.call_soon(asyncio.create_task, self.start_listener(dcm, path, callback))
self._loop.call_soon(
asyncio.create_task, self.start_listener(dcm, path, callback)
)
return path

def start_depth_cache(
self, callback: Callable, symbol: str, refresh_interval=None, bm=None, limit=10, conv_type=float, ws_interval=0
self,
callback: Callable,
symbol: str,
refresh_interval=None,
bm=None,
limit=10,
conv_type=float,
ws_interval=0,
) -> str:
return self._start_depth_cache(
dcm_class=DepthCacheManager,
Expand All @@ -461,11 +495,17 @@ def start_depth_cache(
bm=bm,
limit=limit,
conv_type=conv_type,
ws_interval=ws_interval
ws_interval=ws_interval,
)

def start_futures_depth_socket(
self, callback: Callable, symbol: str, refresh_interval=None, bm=None, limit=10, conv_type=float
self,
callback: Callable,
symbol: str,
refresh_interval=None,
bm=None,
limit=10,
conv_type=float,
) -> str:
return self._start_depth_cache(
dcm_class=FuturesDepthCacheManager,
Expand All @@ -474,11 +514,17 @@ def start_futures_depth_socket(
refresh_interval=refresh_interval,
bm=bm,
limit=limit,
conv_type=conv_type
conv_type=conv_type,
)

def start_options_depth_socket(
self, callback: Callable, symbol: str, refresh_interval=None, bm=None, limit=10, conv_type=float
self,
callback: Callable,
symbol: str,
refresh_interval=None,
bm=None,
limit=10,
conv_type=float,
) -> str:
return self._start_depth_cache(
dcm_class=OptionsDepthCacheManager,
Expand All @@ -487,5 +533,5 @@ def start_options_depth_socket(
refresh_interval=refresh_interval,
bm=bm,
limit=limit,
conv_type=conv_type
conv_type=conv_type,
)
Loading

0 comments on commit 3c0fec0

Please sign in to comment.