Source code for redis_tools.utils

import json
import os
from enum import Enum

import dotenv
from pydantic import BaseModel

from redis_tools.config import RedisConfig

dotenv.load_dotenv(dotenv.find_dotenv())

REDIS_SERVICE_PORT = int(os.getenv('REDIS_SERVICE_PORT', 6379))
REDIS_SERVICE_DB = int(os.getenv('REDIS_SERVICE_DB', 0))
REDIS_SERVICE_PASSWORD = os.getenv('REDIS_SERVICE_PASSWORD', None)


def _serialize_for_redis(data):
    if isinstance(data, Enum):
        return data.value  # or data.value if you want to store the value
    elif isinstance(data, dict):
        return {k: _serialize_for_redis(v) for k, v in data.items()}
    elif isinstance(data, list):
        return [_serialize_for_redis(item) for item in data]
    elif isinstance(data, tuple):
        return tuple(_serialize_for_redis(item) for item in data)
    elif isinstance(data, BaseModel):
        return data.model_dump()
    elif data is None:
        return ""
    else:
        return data

[docs] def serialize_for_redis(model_dict): serialized_data = _serialize_for_redis(model_dict) return json.dumps(serialized_data)
[docs] def deserialize_from_redis(data): if isinstance(data, str): try: # Attempt to deserialize JSON return json.loads(data) except json.JSONDecodeError: # If not a valid JSON, assume it's a simple string return data elif isinstance(data, dict): return {k: deserialize_from_redis(v) for k, v in data.items()} elif isinstance(data, list): return [deserialize_from_redis(item) for item in data] elif isinstance(data, tuple): return tuple(deserialize_from_redis(item) for item in data) elif isinstance(data, Enum): # Handle Enum deserialization # Assuming Enums are stored as strings return data[data] else: return data
import aioredis
[docs] async def connect_to_aioredis(): async_redis = None # Define possible Redis hosts and ports local_redis = ('localhost', REDIS_SERVICE_PORT) docker_redis = ('redis', REDIS_SERVICE_PORT) # Add more tuples (host, port) for other scenarios redis_options = [local_redis, docker_redis] # Add more options as needed for host, port in redis_options: try: redis_config = RedisConfig( host=host, port=port, db=REDIS_SERVICE_DB, password=REDIS_SERVICE_PASSWORD, ssl=False, encoding="utf-8" ) async_redis = aioredis.from_url(redis_config.redis_url, decode_responses=True) await async_redis.ping() print(f"Connected to Redis at {host}:{port}") return async_redis # Return as soon as a successful connection is made except Exception as e: print(f"Redis connection attempt to {host}:{port} failed: {e}") print("Unable to connect to Redis using any of the configured options.") return None
[docs] async def get_async_redis(): from redis_tools import async_redis if isinstance(async_redis, aioredis.Redis): return async_redis async_redis = await connect_to_aioredis() if not async_redis: raise Exception("Redis connection failed") import redis_tools redis_tools.async_redis = async_redis return async_redis
[docs] async def stop_async_redis(): from redis_tools import async_redis if isinstance(async_redis, aioredis.Redis): await async_redis.close() print("Async Redis connection closed") else: print("No async Redis connection to close")