Skip to content

API Reference

WebSocket module for real-time notifications and communication.

WebSocketManager

Manage active WebSocket connections per user.

Maintains a registry of authenticated WebSocket connections indexed by user ID, enabling message broadcasting and targeted notifications.

Attributes:

Name Type Description
active_connections dict[int, WebSocket]

Maps user IDs to their WebSocket.

Source code in backend/app/websocket/manager.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class WebSocketManager:
    """
    Manage active WebSocket connections per user.

    Maintains a registry of authenticated WebSocket connections
    indexed by user ID, enabling message broadcasting and
    targeted notifications.

    Attributes:
        active_connections: Maps user IDs to their WebSocket.
    """

    def __init__(self) -> None:
        """Initialize the WebSocket manager with empty connections."""
        self.active_connections: dict[int, WebSocket] = {}

    async def connect(self, user_id: int, websocket: WebSocket) -> None:
        """
        Accept and register a new WebSocket connection.

        Args:
            user_id: The user's unique identifier.
            websocket: The WebSocket connection to register.
        """
        await websocket.accept()
        self.active_connections[user_id] = websocket
        core_logger.print_to_log(f"WebSocket connected for user {user_id}", "debug")

    def disconnect(self, user_id: int) -> None:
        """
        Remove a user's WebSocket connection.

        Args:
            user_id: The user's unique identifier.
        """
        if self.active_connections.pop(user_id, None):
            core_logger.print_to_log(
                f"WebSocket disconnected for user {user_id}",
                "debug",
            )

    async def send_message(self, user_id: int, message: dict) -> None:
        """
        Send a JSON message to a specific user.

        Args:
            user_id: The user's unique identifier.
            message: JSON-serializable data to send.
        """
        websocket = self.active_connections.get(user_id)
        if websocket:
            await websocket.send_json(message)

    async def broadcast(self, message: dict) -> None:
        """
        Send a JSON message to all connected users.

        Args:
            message: JSON-serializable data to broadcast.
        """
        for websocket in self.active_connections.values():
            await websocket.send_json(message)

    def get_connection(self, user_id: int) -> WebSocket | None:
        """
        Retrieve a user's WebSocket connection.

        Args:
            user_id: The user's unique identifier.

        Returns:
            The WebSocket connection or None if not found.
        """
        return self.active_connections.get(user_id)

__init__

__init__()

Initialize the WebSocket manager with empty connections.

Source code in backend/app/websocket/manager.py
20
21
22
def __init__(self) -> None:
    """Initialize the WebSocket manager with empty connections."""
    self.active_connections: dict[int, WebSocket] = {}

broadcast async

broadcast(message)

Send a JSON message to all connected users.

Parameters:

Name Type Description Default
message dict

JSON-serializable data to broadcast.

required
Source code in backend/app/websocket/manager.py
61
62
63
64
65
66
67
68
69
async def broadcast(self, message: dict) -> None:
    """
    Send a JSON message to all connected users.

    Args:
        message: JSON-serializable data to broadcast.
    """
    for websocket in self.active_connections.values():
        await websocket.send_json(message)

connect async

connect(user_id, websocket)

Accept and register a new WebSocket connection.

Parameters:

Name Type Description Default
user_id int

The user's unique identifier.

required
websocket WebSocket

The WebSocket connection to register.

required
Source code in backend/app/websocket/manager.py
24
25
26
27
28
29
30
31
32
33
34
async def connect(self, user_id: int, websocket: WebSocket) -> None:
    """
    Accept and register a new WebSocket connection.

    Args:
        user_id: The user's unique identifier.
        websocket: The WebSocket connection to register.
    """
    await websocket.accept()
    self.active_connections[user_id] = websocket
    core_logger.print_to_log(f"WebSocket connected for user {user_id}", "debug")

disconnect

disconnect(user_id)

Remove a user's WebSocket connection.

Parameters:

Name Type Description Default
user_id int

The user's unique identifier.

required
Source code in backend/app/websocket/manager.py
36
37
38
39
40
41
42
43
44
45
46
47
def disconnect(self, user_id: int) -> None:
    """
    Remove a user's WebSocket connection.

    Args:
        user_id: The user's unique identifier.
    """
    if self.active_connections.pop(user_id, None):
        core_logger.print_to_log(
            f"WebSocket disconnected for user {user_id}",
            "debug",
        )

get_connection

get_connection(user_id)

Retrieve a user's WebSocket connection.

Parameters:

Name Type Description Default
user_id int

The user's unique identifier.

required

Returns:

Type Description
WebSocket | None

The WebSocket connection or None if not found.

Source code in backend/app/websocket/manager.py
71
72
73
74
75
76
77
78
79
80
81
def get_connection(self, user_id: int) -> WebSocket | None:
    """
    Retrieve a user's WebSocket connection.

    Args:
        user_id: The user's unique identifier.

    Returns:
        The WebSocket connection or None if not found.
    """
    return self.active_connections.get(user_id)

send_message async

send_message(user_id, message)

Send a JSON message to a specific user.

Parameters:

Name Type Description Default
user_id int

The user's unique identifier.

required
message dict

JSON-serializable data to send.

required
Source code in backend/app/websocket/manager.py
49
50
51
52
53
54
55
56
57
58
59
async def send_message(self, user_id: int, message: dict) -> None:
    """
    Send a JSON message to a specific user.

    Args:
        user_id: The user's unique identifier.
        message: JSON-serializable data to send.
    """
    websocket = self.active_connections.get(user_id)
    if websocket:
        await websocket.send_json(message)

get_websocket_manager cached

get_websocket_manager()

Get the singleton WebSocket manager instance.

Returns:

Type Description
WebSocketManager

The shared WebSocketManager instance.

Source code in backend/app/websocket/manager.py
84
85
86
87
88
89
90
91
92
@lru_cache(maxsize=1)
def get_websocket_manager() -> WebSocketManager:
    """
    Get the singleton WebSocket manager instance.

    Returns:
        The shared WebSocketManager instance.
    """
    return WebSocketManager()

notify_frontend async

notify_frontend(user_id, websocket_manager, json_data)

Send a JSON message to a user's WebSocket connection.

Attempts to send data to the user's WebSocket. For MFA verification, raises an exception if no connection exists.

Parameters:

Name Type Description Default
user_id int

The target user's identifier.

required
websocket_manager WebSocketManager

The WebSocket connection manager.

required
json_data dict

JSON-serializable data to send.

required

Returns:

Type Description
bool

True if message was sent, False if no connection.

Raises:

Type Description
HTTPException

If MFA_REQUIRED but no connection exists.

Source code in backend/app/websocket/utils.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def notify_frontend(
    user_id: int,
    websocket_manager: websocket_manager.WebSocketManager,
    json_data: dict,
) -> bool:
    """
    Send a JSON message to a user's WebSocket connection.

    Attempts to send data to the user's WebSocket. For MFA
    verification, raises an exception if no connection exists.

    Args:
        user_id: The target user's identifier.
        websocket_manager: The WebSocket connection manager.
        json_data: JSON-serializable data to send.

    Returns:
        True if message was sent, False if no connection.

    Raises:
        HTTPException: If MFA_REQUIRED but no connection exists.
    """
    websocket = websocket_manager.get_connection(user_id)
    if websocket:
        await websocket.send_json(json_data)
        return True

    if json_data.get("message") == "MFA_REQUIRED":
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"No WebSocket connection for user {user_id}",
        )
    return False