Source code for pyokx.ws_clients.WsPublicAsync


import asyncio
import json
import logging
from typing import List, Set

from websockets import ConnectionClosedError

from pyokx.ws_clients.WebSocketFactory import WebSocketFactory

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("WsPublicAsync")


[docs] class WsPublicAsync: def __init__(self, url: str, callback): self.url = url # self.subscriptions = set() self.loop = asyncio.get_event_loop() self.factory = WebSocketFactory(url) self.callback = callback self.max_size = 2 ** 20 self.websocket = None self.channel_params = []
[docs] async def connect(self): self.websocket = await self.factory.connect(max_size=self.max_size)
[docs] async def consume(self): try: async for message in self.websocket: logger.debug("Received message: {%s}", message) if self.callback: await self.callback(message) except ConnectionClosedError as e: logger.error(f"WebSocket connection closed: {e}") # Handle reconnection logic here await self.restart() except Exception as e: logger.error(f"WebSocket unhandled error: {e}") await self.restart()
[docs] async def subscribe(self, params: list): payload = json.dumps({ "op": "subscribe", "args": params }) await self.websocket.send(payload) self.channel_params = self.channel_params + params
[docs] async def unsubscribe(self, params: List[dict]): payload = json.dumps({ "op": "unsubscribe", "args": params }) logger.info(f"unsubscribe: {payload}") await self.websocket.send(payload) self.channel_params = list(set(self.channel_params) - set(params))
[docs] async def start(self): logger.info("Connecting to WebSocket...") await self.connect() self.loop.create_task(self.consume())
[docs] async def restart(self): logger.info("Restarting WebSocket...") channel_params = self.channel_params for i in range(3): try: if self.websocket: await self.factory.close() await self.start() await self.subscribe(channel_params) break except KeyboardInterrupt: logger.info("Keyboard Interrupt") try: if self.websocket: await self.factory.close() self.stop_sync() except Exception as e: logger.error(f"Error gracefully closing websocket: {e}") finally: break except Exception as e: logger.error(f"Error restarting websocket: {e}") await asyncio.sleep(1) continue
[docs] async def stop(self): await self.factory.close() self.loop.stop()
[docs] def stop_sync(self): self.loop.run_until_complete(self.stop())