bedrock.external.websocket

 1import json  # pragma: unit
 2
 3from bedrock.cache import get_cache  # pragma: unit
 4from bedrock.cache.websockets import get_connection_data, get_connection_topic_filters  # pragma: unit
 5from bedrock.cache.websockets.websocket_connection_data import WebsocketConnectionData  # pragma: unit
 6from bedrock.endpoints.dto.bedrock_response import BedrockEnvelopedResponse  # pragma: unit
 7from bedrock.endpoints.dto.bedrock_response_filter import is_filter_criteria_met  # pragma: unit
 8from bedrock.exceptions.not_implemented_exception import NotImplementedException  # pragma: unit
 9from bedrock.log import log_config  # pragma: unit
10from bedrock.websockets.websocket_gateway_factory import get_websocket_api_gateway_client  # pragma: unit
11
12log = log_config("websockets")  # pragma: unit
13
14
15def broadcast_websocket_message(response: BedrockEnvelopedResponse, topics: list[str], model=None):  # pragma: unit
16    """
17    Fan-out `response` to every connection subscribed to any of `topics`.
18    """
19    for topic in topics:
20        try:
21            for connection_id in get_connections_for_topic(topic):
22                filters = get_connection_topic_filters(connection_id, topic)
23                filtered_connection_responses = _get_filtered_connection_responses(get_connection_data(connection_id),
24                                                                                   response, model, filters)
25                for response in filtered_connection_responses:
26                    try:
27                        send_message_to_connection(connection_id, response)
28                    except Exception as e:  # pragma: no cover
29                        log.error(f"Broadcast failed for {connection_id}: {e}")
30        except Exception as e:  # pragma: no cover
31            log.error(f"Failed to retrieve connections for topic {topic}: {e}")
32
33
34def get_connections_for_topic(topic: str) -> list[str]:  # pragma: unit
35    return list(get_cache().smembers(topic))
36
37
38def send_message_to_connection(connection_id: str, response: BedrockEnvelopedResponse):  # pragma: unit
39    """
40    Sends a message to a specific connection.
41    """
42    try:
43        payload = response.as_json() if hasattr(response, "as_json") else response
44        api_gw_mmgmt_api = get_websocket_api_gateway_client()
45        api_gw_mmgmt_api.post_to_connection(
46            Data=json.dumps(payload),
47            ConnectionId=connection_id
48        )
49    except Exception as e:  # pragma: no cover
50        log.error(f"Failed to send message to connection {connection_id}: {e}")
51        raise e
52
53
54def close_connection(connection_id: str):  # pragma: unit
55    """
56    Closes a specific WebSocket connection.
57    """
58    api_gw_mmgmt_api = get_websocket_api_gateway_client()
59    try:
60        api_gw_mmgmt_api.delete_connection(ConnectionId=connection_id)
61        log.info(f"Connection {connection_id} closed successfully.")
62    except Exception as e:  # pragma: no cover
63        log.error(f"Failed to close connection {connection_id}: {e}")
64        raise e
65
66
67def _get_filtered_connection_responses(connection_data: WebsocketConnectionData, response: BedrockEnvelopedResponse,
68                                       model, filters = None) -> list[BedrockEnvelopedResponse]:  # pragma: unit
69    if connection_data is None:
70        return []
71
72    default_filter = connection_data.broadcast_filters
73    filters = filters or {}
74    criteria_filter = {**default_filter, **filters}
75    entity = response.data
76
77    if isinstance(entity, list):
78        raise NotImplementedException()
79
80    if is_filter_criteria_met(entity, criteria_filter, model_class=model):
81        return [response]
82    else:
83        return []
log = <MyLogger BEDROCK-websockets (INFO)>
def broadcast_websocket_message( response: bedrock.endpoints.dto.bedrock_response.BedrockEnvelopedResponse, topics: list[str], model=None):
16def broadcast_websocket_message(response: BedrockEnvelopedResponse, topics: list[str], model=None):  # pragma: unit
17    """
18    Fan-out `response` to every connection subscribed to any of `topics`.
19    """
20    for topic in topics:
21        try:
22            for connection_id in get_connections_for_topic(topic):
23                filters = get_connection_topic_filters(connection_id, topic)
24                filtered_connection_responses = _get_filtered_connection_responses(get_connection_data(connection_id),
25                                                                                   response, model, filters)
26                for response in filtered_connection_responses:
27                    try:
28                        send_message_to_connection(connection_id, response)
29                    except Exception as e:  # pragma: no cover
30                        log.error(f"Broadcast failed for {connection_id}: {e}")
31        except Exception as e:  # pragma: no cover
32            log.error(f"Failed to retrieve connections for topic {topic}: {e}")

Fan-out response to every connection subscribed to any of topics.

def get_connections_for_topic(topic: str) -> list[str]:
35def get_connections_for_topic(topic: str) -> list[str]:  # pragma: unit
36    return list(get_cache().smembers(topic))
def send_message_to_connection( connection_id: str, response: bedrock.endpoints.dto.bedrock_response.BedrockEnvelopedResponse):
39def send_message_to_connection(connection_id: str, response: BedrockEnvelopedResponse):  # pragma: unit
40    """
41    Sends a message to a specific connection.
42    """
43    try:
44        payload = response.as_json() if hasattr(response, "as_json") else response
45        api_gw_mmgmt_api = get_websocket_api_gateway_client()
46        api_gw_mmgmt_api.post_to_connection(
47            Data=json.dumps(payload),
48            ConnectionId=connection_id
49        )
50    except Exception as e:  # pragma: no cover
51        log.error(f"Failed to send message to connection {connection_id}: {e}")
52        raise e

Sends a message to a specific connection.

def close_connection(connection_id: str):
55def close_connection(connection_id: str):  # pragma: unit
56    """
57    Closes a specific WebSocket connection.
58    """
59    api_gw_mmgmt_api = get_websocket_api_gateway_client()
60    try:
61        api_gw_mmgmt_api.delete_connection(ConnectionId=connection_id)
62        log.info(f"Connection {connection_id} closed successfully.")
63    except Exception as e:  # pragma: no cover
64        log.error(f"Failed to close connection {connection_id}: {e}")
65        raise e

Closes a specific WebSocket connection.