redis_tools.consumers
Listen to Structured Websocket Streams
This module provides tools for setting up listeners for structured websocket streams, processing incoming data, and managing the lifecycle of these listeners. It leverages asyncio for asynchronous operations and integrates with FastAPI for web framework features.
Functions: - add_listener_task: Add a new listener task to the global task registry. - get_listener_task: Retrieve a listener task from the global task registry. - remove_listener_task: Remove a listener task from the global task registry. - get_all_listener_tasks: Retrieve all listener tasks from the global task registry. - on_stream_data: Decorator for registering callback functions to specific redis streams. - consumer: Asynchronous generator that consumes data from a specified stream. - start_listening: Initialize listening tasks for provided streams and manage their lifecycle.
- redis_tools.consumers.add_listener_task(task_key, listener_task, shutdown_signal_handler)[source]
Register a new listener task along with its shutdown signal handler.
Parameters: - task_key (str): A unique identifier for the listener task. - listener_task (coroutine): The async task responsible for listening to the stream. - shutdown_signal_handler (function): A callable to be invoked during the shutdown process.
This function does not return anything but updates the listener_tasks global dictionary.
- async redis_tools.consumers.consumer(stream_name, shutdown_event, last_ids=None)[source]
Asynchronously consume messages from a specified stream and process them using registered callbacks.
Parameters: - stream_name (str): The name of the stream to consume data from. - shutdown_event (asyncio.Event): An event to signal shutdown and cleanly stop the consumer. - last_ids (dict, optional): A dictionary to track the last processed message ID for each stream.
This function is a coroutine and should be awaited. It runs indefinitely until a shutdown event is set.
- redis_tools.consumers.get_all_listener_tasks()[source]
Retrieve all listener tasks from the global task registry.
Returns: - dict: A dictionary of all registered listener tasks and their associated shutdown signal handlers.
- redis_tools.consumers.get_listener_task(task_key)[source]
Retrieve a listener task by its unique identifier.
Parameters: - task_key (str): The unique identifier of the listener task.
Returns: - tuple: A tuple containing the listener task and its associated shutdown signal handler,
or None if the task_key is not found.
- redis_tools.consumers.on_stream_data(redis_stream)[source]
Decorator for registering callback functions to specific redis streams.
Parameters: - redis_stream (str): The redis stream to which the callback functions are to be registered.
Returns: - function: The decorator function that registers the provided callback to the specified stream.
- redis_tools.consumers.remove_listener_task(task_key)[source]
Remove a listener task from the global task registry.
Parameters: - task_key (str): The unique identifier of the listener task to be removed.
This function does not return anything but updates the listener_tasks global dictionary.
- async redis_tools.consumers.start_listening(streams: List[str], shutdown_event: Event | None = None)[source]
Initialize and manage listening tasks for the provided streams.
Parameters: - streams (List[str]): A list of stream names to start listening to. - shutdown_event (asyncio.Event, optional): An event to signal shutdown and cleanly stop all consumers.
Returns: - task_key (str): A unique key representing the set of started listening tasks.
Raises: - HTTPException: If listeners for the provided streams are already running.
This function is a coroutine and should be awaited. It initializes consumer tasks for each provided stream.