Source code for h2o_dashboard.pages.okx_dashbaord_page.okx_positions_widget

from typing import List

from h2o_wave import Q, ui, on, data, run_on, AsyncSite, pack  # noqa F401

from h2o_dashboard.util import add_card
from pyokx.redis_structured_streams import get_stream_okx_position_messages
from pyokx.ws_data_structures import PositionsChannel, WSPosition


[docs] class OKX_Live_Positions_StreamWidget: def __init__(self, q: Q, card_name: str, box: str): self.q = q self.count = 1 # Enforce only the latest data entry since it contains all latest data self.positions_stream: List[PositionsChannel] = [] self.card_name = card_name self.box = box self._column_mappings = dict( posId='Position ID', instId='Instrument ID', pos='Position', upl='UPL', avgPx='Average Price', lever='Leverage', last='Last', margin='Margin', pTime='Time' ) async def _update_stream(self): self.positions_stream: List[PositionsChannel] = await get_stream_okx_position_messages( async_redis=self.q.client.async_redis, count=self.count) return self.positions_stream async def _is_initialized(self): return len(self.positions_stream) > 0
[docs] async def get_ui_live_positions_table(self, box: str): if await self._is_initialized(): latest_positions_report: PositionsChannel = self.positions_stream[-1] latest_positions_data = latest_positions_report.data else: latest_positions_data = [] items = [] for position in latest_positions_data: position: WSPosition = position columns_values = [getattr(position, col) for col in self._column_mappings.keys()] label = str(columns_values.pop(0)) values = [] for pos_col in columns_values: try: pos_col = str(round(float(pos_col), 2)) except ValueError: pass value = str(pos_col) values.append(value) items.append(ui.stat_table_item(label=label, values=values)) return ui.stat_table_card( box=box, title='Live Positions - okx:websockets@positions', columns=list(self._column_mappings.values()), items=items )
[docs] async def add_cards(self): await self._update_stream() await add_card(self.q, self.card_name + '_positions_table', await self.get_ui_live_positions_table(box=self.box)) await self.q.page.save()
[docs] async def update_cards(self): await self._update_stream() if await self._is_initialized(): latest_positions_report: PositionsChannel = self.positions_stream[-1] latest_positions_data = latest_positions_report.data else: latest_positions_data = [] positions_table_card: ui.stat_table_card = self.q.page[self.card_name + '_positions_table'] items = [] for position in latest_positions_data: position: WSPosition = position columns_values = [getattr(position, col) for col in self._column_mappings.keys()] label = str(columns_values.pop(0)) values = [] for pos_col in columns_values: try: pos_col = str(round(float(pos_col), 2)) except ValueError: pass value = str(pos_col) values.append(value) items.append(ui.stat_table_item(label=label, values=values)) positions_table_card.items = items await self.q.page.save()