Source code for h2o_dashboard.pages.overview_page.overview_stream_widget
from datetime import datetime
from typing import List
from h2o_wave import Q, ui, on, data, run_on, AsyncSite, pack # noqa F401
from h2o_dashboard.util import add_card, convert_to_col_type
from pyokx.data_structures import AccountBalanceData
from pyokx.redis_structured_streams import get_stream_okx_account_messages, get_stream_okx_position_messages
from pyokx.ws_data_structures import AccountChannel, PositionsChannel, WSPosition
[docs]
class Overview_StreamWidget: # TODO: Will connect to the account streams for all exchanges, rn it's just okx
def __init__(self, q: Q, card_name: str, box: str):
self.q = q
self.count = 1 # Enforce only the latest data entry since it contains all latest data
self.card_name = card_name
self.colors = ['$blue', '$red', '$green', '$yellow', '$orange', '$pink', '$purple', '$cyan', '$gray']
self.okx_account_stream: List[AccountChannel] = []
self.okx_positions_stream: List[PositionsChannel] = []
self.okx_position_column_mappings = dict(
instId={'label': 'Instrument ID', 'type': 'str'},
posId={'label': 'Position ID', 'type': 'int'},
pos={'label': 'Position', 'type': 'float'},
upl={'label': 'UPnL', 'type': 'float'},
avgPx={'label': 'Avg Price', 'type': 'float'},
lever={'label': 'Lev', 'type': 'float'},
last={'label': 'Last', 'type': 'float'},
margin={'label': 'Margin', 'type': 'float'},
pTime={'label': 'Time', 'type': 'timestamp'},
)
self.box = box
async def _update_stream(self):
self.okx_account_stream: List[AccountChannel] = await get_stream_okx_account_messages(
async_redis=self.q.client.async_redis, count=self.count)
self.okx_positions_stream: List[PositionsChannel] = await get_stream_okx_position_messages(
async_redis=self.q.client.async_redis, count=self.count)
async def _is_initialized(self):
return len(self.okx_account_stream) > 0 and len(self.okx_positions_stream) > 0
[docs]
async def get_all_exchanges_account_breakdown_table_card(self, box: str):
# Table Exchange:Asset|Total-Asset-Value|FreeAssetCash
latest_account_report: AccountChannel = self.okx_account_stream[-1]
latest_account_data: AccountBalanceData = latest_account_report.data[0]
# TODO Use values from all exchanges, then generate the ui item for each exchange prior to ui.stat ...
okx_total_equity = latest_account_data.totalEq
bitget_total_equity = 0 # TODO
total_equity = float(okx_total_equity) + float(bitget_total_equity)
'''OKX Exchange'''
items = []
for account_balance_detail in latest_account_data.details:
label = f'OKX:{account_balance_detail.ccy}'
total_asset_value = round(float(account_balance_detail.eqUsd), 2)
unused_value = round(float(account_balance_detail.availBal) * (
float(account_balance_detail.disEq) / float(account_balance_detail.eq)), 2)
fraction_of_total_equity = float(account_balance_detail.eqUsd) / float(total_equity)
percentage_string = str(round(fraction_of_total_equity * 100, 0)) + '%'
update_time = datetime.fromtimestamp(int(account_balance_detail.uTime) / 1000, tz=None).strftime(
'%Y-%m-%d %H:%M:%S')
#
values = [total_asset_value, unused_value, percentage_string, update_time]
items.append(ui.stat_table_item(label=label, values=[str(value) for value in values]))
return ui.stat_table_card(
box=box,
title=f'Live Account - {{exchange}}:websockets@account - ${round(total_equity, 2)}',
columns=['Exchange:Asset', 'Total-Asset-Value', 'FreeAssetCash', '% of Total Equity', 'Update Time'],
items=items
)
[docs]
async def get_all_exchanges_live_positions_table_card(self, box: str):
latest_positions_report: PositionsChannel = self.okx_positions_stream[-1]
latest_positions_data: List[WSPosition] = latest_positions_report.data
items = []
label_value = ''
for position in latest_positions_data:
values = []
for i, (col_key, col_value) in enumerate(self.okx_position_column_mappings.items()):
if i == 0:
label_value = f'OKX:{getattr(position, col_key)}'
continue
pos_col = getattr(position, col_key)
try:
expected_type = col_value['type']
pos_col = convert_to_col_type(pos_col, expected_type)
except ValueError:
print(f"Error converting {col_key} to {expected_type}")
value = str(pos_col)
values.append(value)
items.append(ui.stat_table_item(label=label_value, values=values))
return ui.stat_table_card(
box=box,
title=f'Live Positions - {{exchange}}:websockets@positions - {len(latest_positions_data)} positions',
columns=[col_value['label'] for col_value in self.okx_position_column_mappings.values()],
items=items
)
[docs]
async def get_all_exchanges_account_breakdown_pie_chart_card(self, box: str):
latest_account_report: AccountChannel = self.okx_account_stream[-1]
latest_account_data: AccountBalanceData = latest_account_report.data[0]
# Pie Chart
pies = []
colors_copy = None
COLORS = ['$blue', '$red', '$green', '$yellow', '$orange', '$pink', '$purple', '$cyan', '$gray']
for account_balance_detail in latest_account_data.details:
if not colors_copy:
colors_copy = COLORS.copy()
color = colors_copy.pop()
fraction = float(account_balance_detail.eqUsd) / float(latest_account_data.totalEq)
percentage_string = str(round(fraction * 100, 0)) + '%'
pies.append(ui.pie(
# label: str,
# value: str,
# fraction: float,
# color: str,
# aux_value: str | None = None
label=account_balance_detail.ccy,
value='',
fraction=fraction,
color=color,
aux_value=f'({percentage_string})'
))
return ui.wide_pie_stat_card(
# box: str,
# title: str,
# pies: list[Pie],
# commands: list[Command] | None = Non
box=box,
title='All Exchanges Account Breakdown',
pies=pies
)
[docs]
async def update_all_exchanges_account_breakdown_table_card(self):
latest_account_report: AccountChannel = self.okx_account_stream[-1]
latest_account_data: AccountBalanceData = latest_account_report.data[0]
# TODO Use values from all exchanges, then generate the ui item for each exchange prior to ui.stat ...
okx_total_equity = latest_account_data.totalEq
bitget_total_equity = 0
total_equity = float(okx_total_equity) + float(bitget_total_equity)
account_breakdown_card: ui.stat_table_card = self.q.page[self.card_name + '_overview_accounts_table']
items = []
for account_balance_detail in latest_account_data.details:
label = f'OKX:{account_balance_detail.ccy}'
total_asset_value = round(float(account_balance_detail.eqUsd), 2)
unused_value = round(float(account_balance_detail.availBal) * (
float(account_balance_detail.disEq) / float(account_balance_detail.eq)), 2)
fraction_of_total_equity = float(account_balance_detail.eqUsd) / float(total_equity)
percentage_string = str(round(fraction_of_total_equity * 100, 0)) + '%'
update_time = datetime.fromtimestamp(int(account_balance_detail.uTime) / 1000, tz=None).strftime(
'%Y-%m-%d %H:%M:%S')
#
values = [total_asset_value, unused_value, percentage_string, update_time]
items.append(ui.stat_table_item(label=label, values=[str(value) for value in values]))
account_breakdown_card.items = items
account_breakdown_card.title = f'Live Account - {{exchange}}:websockets@account - ${round(total_equity, 2)}'
await self.q.page.save()
[docs]
async def update_all_exchanges_live_positions_table_card(self):
latest_positions_report: PositionsChannel = self.okx_positions_stream[-1]
latest_positions_data: List[WSPosition] = latest_positions_report.data
positions_table_card: ui.stat_table_card = self.q.page[self.card_name + '_overview_positions_table']
items = []
for position in latest_positions_data:
values = []
for i, (col_key, col_value) in enumerate(self.okx_position_column_mappings.items()):
if i == 0:
label_value = f'OKX:{getattr(position, col_key)}'
continue
pos_col = getattr(position, col_key)
try:
expected_type = col_value['type']
if expected_type == 'float':
pos_col = round(float(pos_col), 2)
elif expected_type == 'int':
pos_col = int(pos_col)
elif expected_type == 'timestamp':
pos_col = datetime.fromtimestamp(int(pos_col) / 1000, tz=None).strftime(
'%Y-%m-%d %H:%M:%S')
else:
pos_col = str(pos_col)
except ValueError:
pass
value = str(pos_col)
values.append(value)
items.append(ui.stat_table_item(label=label_value, values=values))
positions_table_card.items = items
await self.q.page.save()
[docs]
async def add_cards(self):
await self._update_stream()
# await add_card(self.q, self.card_name + 'Accounts_PieChart',
# await self.get_all_exchanges_account_breakdown_pie_chart_card(box='first_context_1'))
await add_card(self.q, self.card_name + '_overview_accounts_table',
await self.get_all_exchanges_account_breakdown_table_card(box=self.box))
await add_card(self.q, self.card_name + '_overview_positions_table',
await self.get_all_exchanges_live_positions_table_card(box=self.box))
await self.q.page.save()
[docs]
async def update_cards(self):
await self._update_stream()
await self.update_all_exchanges_account_breakdown_table_card()
await self.update_all_exchanges_live_positions_table_card()
await self.q.page.save()