Source code for pyokx.redis_structured_streams

from typing import List

import aioredis

from pyokx import ENFORCED_INSTRUMENT_TYPES
from pyokx.InstrumentSearcher import InstrumentSearcher
from pyokx.data_structures import FillHistoricalMetrics, Order, Algo_Order, FillEntry
from pyokx.ws_data_structures import AccountChannel, PositionsChannel, OrdersChannel, available_channel_models
from redis_tools.utils import deserialize_from_redis


[docs] async def get_instruments_searcher_from_redis(async_redis: aioredis.Redis) -> InstrumentSearcher: instrument_stream = await async_redis.xrevrange(f'okx:rest@instruments', count=1) if not instrument_stream: print(f"no instruments in cache, creating InstrumentSearcher with instTypes {ENFORCED_INSTRUMENT_TYPES}") okx_instrument_searcher = InstrumentSearcher(instTypes=ENFORCED_INSTRUMENT_TYPES) else: message = instrument_stream[0] redis_stream_id = message[0] message_serialized = message[1].get("data") if not message_serialized: print( f"A message in the instruments stream {'okx:rest@instruments'} with id {redis_stream_id} was empty" f", creating InstrumentSearcher with instTypes {ENFORCED_INSTRUMENT_TYPES}") okx_instrument_searcher = InstrumentSearcher(instTypes=ENFORCED_INSTRUMENT_TYPES) else: deserialized_message = deserialize_from_redis(message_serialized) okx_instrument_searcher = InstrumentSearcher(_instrument_map=deserialized_message) return okx_instrument_searcher
[docs] async def get_stream_okx_all_messages(async_redis: aioredis.Redis, count: int = 10) -> List: messages_serialized = await async_redis.xrevrange('okx:websockets@all', count=count) if not messages_serialized: print(f"no messages in all cache!") return [] messages_serialized.reverse() messages = [] for message in messages_serialized: redis_stream_id = message[0] message_serialized = message[1].get("data") if not message_serialized: print( f"A message in the all stream {'okx:websockets@all'} with id {redis_stream_id} was empty, skipping") continue message_deserialized = deserialize_from_redis(message_serialized) message_channel = message_deserialized.get("arg").get("channel") data_struct = available_channel_models[message_channel] if hasattr(data_struct, "from_array"): structured_message = data_struct.from_array(**message_deserialized) else: structured_message = data_struct(**message_deserialized) messages.append(structured_message) return messages
[docs] async def get_stream_okx_account_messages(async_redis: aioredis.Redis, count: int = 10) -> List[AccountChannel]: """Uses xrevrange to get the latest COUNT account messages from redis and return all COUNT messages in a list from oldest to newest""" account_messages_serialized = await async_redis.xrevrange('okx:websockets@account', count=count) if not account_messages_serialized: print(f"account information not ready in account cache!") return [] account_messages_serialized.reverse() account_messages = [] for account_message in account_messages_serialized: redis_stream_id = account_message[0] account_message_serialized = account_message[1].get("data") if not account_message_serialized: print( f"A message in the account stream {'okx:websockets@account'} with id {redis_stream_id} was empty, skipping") continue account_message_deserialized = deserialize_from_redis(account_message_serialized) account_message: AccountChannel = AccountChannel(**account_message_deserialized) account_messages.append(account_message) return account_messages
[docs] async def get_stream_okx_position_messages(async_redis: aioredis.Redis, count: int = 10) -> List[PositionsChannel]: position_messages_serialized = await async_redis.xrevrange('okx:websockets@positions', count=count) if not position_messages_serialized: print(f"positions information not ready in position cache!") return [] position_messages_serialized.reverse() position_messages = [] for position_message in position_messages_serialized: redis_stream_id = position_message[0] position_message_serialized = position_message[1].get("data") if not position_message_serialized: print( f"A message in the positions stream {'okx:websockets@positions'} with id {redis_stream_id} was empty, skipping") continue position_message_deserialized = deserialize_from_redis(position_message_serialized) position_message: PositionsChannel = PositionsChannel(**position_message_deserialized) position_messages.append(position_message) return position_messages
[docs] async def get_stream_okx_order_messages(async_redis: aioredis.Redis, count: int = 10) -> List[OrdersChannel]: order_messages_serialized = await async_redis.xrevrange('okx:websockets@orders', count=count) if not order_messages_serialized: print(f"orders information not ready in order cache or empty data!") return [] order_messages_serialized.reverse() order_messages = [] for order_message in order_messages_serialized: redis_stream_id = order_message[0] order_message_serialized = order_message[1].get("data") if not order_message_serialized: print( f"A message in the orders stream {'okx:websockets@orders'} with id {redis_stream_id} was empty, skipping") continue order_message_deserialized = deserialize_from_redis(order_message_serialized) order_message: OrdersChannel = OrdersChannel(**order_message_deserialized) order_messages.append(order_message) return order_messages
[docs] async def get_okx_fills_history(redis_client, count: int = 10) -> List[List[FillEntry]]: fill_history_serialized = await redis_client.xrevrange('okx:rest@fill@3months', count=count) if not fill_history_serialized: print(f"fills information not ready in fills cache!") return [] fill_history_serialized.reverse() fill_history_stream: List[List[FillEntry]] = [] for fill_metrics in fill_history_serialized: redis_stream_id = fill_metrics[0] fill_metrics_serialized = fill_metrics[1].get("data") if not fill_metrics_serialized: print( f"A message in the fills stream {'okx:reports@fill_metrics'} with id {redis_stream_id} was empty, skipping") continue fill_history_deserialized = deserialize_from_redis(fill_metrics_serialized) fill_metrics: List[FillEntry] = [FillEntry(**fill) for fill in fill_history_deserialized] fill_history_stream.append(fill_metrics) return fill_history_stream
[docs] async def get_stream_okx_fill_metrics_report(async_redis: aioredis.Redis, count: int = 10) -> List[ FillHistoricalMetrics]: fill_metrics_report_serialized = await async_redis.xrevrange('okx:reports@fill_metrics', count=count) if not fill_metrics_report_serialized: print(f"fills information not ready in fills cache!") return [] fill_metrics_report_serialized.reverse() fill_metrics_report: List[FillHistoricalMetrics] = [] for fill_metrics in fill_metrics_report_serialized: redis_stream_id = fill_metrics[0] fill_metrics_serialized = fill_metrics[1].get("data") if not fill_metrics_serialized: print( f"A message in the fills stream {'okx:reports@fill_metrics'} with id {redis_stream_id} was empty, skipping") continue fill_metrics_deserialized = deserialize_from_redis(fill_metrics_serialized) fill_metrics = FillHistoricalMetrics(**fill_metrics_deserialized) fill_metrics_report.append(fill_metrics) return fill_metrics_report
[docs] async def get_stream_okx_incomplete_orders(async_redis: aioredis.Redis, count: int = 1) -> List[List[Order]]: incomplete_orders_serialized = await async_redis.xrevrange('okx:rest@orders', count=count) if not incomplete_orders_serialized: print(f"incomplete orders information not ready in incomplete orders cache!") return [] incomplete_orders_serialized.reverse() incomplete_orders = [] for incomplete_order in incomplete_orders_serialized: redis_stream_id = incomplete_order[0] incomplete_order_serialized = incomplete_order[1].get("data") if not incomplete_order_serialized: print( f"A message in the incomplete orders stream {'okx:websockets@incomplete_orders'} with id {redis_stream_id} was empty, skipping") continue incomplete_order_deserialized = deserialize_from_redis(incomplete_order_serialized) incomplete_order: List[Order] = [Order(**order) for order in incomplete_order_deserialized] incomplete_orders.append(incomplete_order) return incomplete_orders
[docs] async def get_stream_okx_incomplete_algo_orders(async_redis: aioredis.Redis, count: int = 1) -> List[List[Algo_Order]]: incomplete_algo_orders_serialized = await async_redis.xrevrange('okx:rest@algo-orders', count=count) if not incomplete_algo_orders_serialized: print(f"incomplete algo orders information not ready in incomplete algo orders cache!") return [] incomplete_algo_orders_serialized.reverse() incomplete_algo_orders = [] for incomplete_algo_order in incomplete_algo_orders_serialized: redis_stream_id = incomplete_algo_order[0] incomplete_algo_order_serialized = incomplete_algo_order[1].get("data") if not incomplete_algo_order_serialized: print( f"A message in the incomplete algo orders stream {'okx:websockets@incomplete_algo_orders'} with id {redis_stream_id} was empty, skipping") continue incomplete_algo_order_deserialized = deserialize_from_redis(incomplete_algo_order_serialized) incomplete_algo_order: List[Algo_Order] = [Algo_Order(**algo_order) for algo_order in incomplete_algo_order_deserialized] incomplete_algo_orders.append(incomplete_algo_order) return incomplete_algo_orders