bedrock.external.websocket
1import json # pragma: unit 2 3from bedrock.cache import get_cache # pragma: unit 4from bedrock.cache.websockets import get_connection_data, get_connection_topic_filters # pragma: unit 5from bedrock.cache.websockets.websocket_connection_data import WebsocketConnectionData # pragma: unit 6from bedrock.endpoints.dto.bedrock_response import BedrockEnvelopedResponse # pragma: unit 7from bedrock.endpoints.dto.bedrock_response_filter import is_filter_criteria_met # pragma: unit 8from bedrock.exceptions.not_implemented_exception import NotImplementedException # pragma: unit 9from bedrock.log import log_config # pragma: unit 10from bedrock.websockets.websocket_gateway_factory import get_websocket_api_gateway_client # pragma: unit 11 12log = log_config("websockets") # pragma: unit 13 14 15def broadcast_websocket_message(response: BedrockEnvelopedResponse, topics: list[str], model=None): # pragma: unit 16 """ 17 Fan-out `response` to every connection subscribed to any of `topics`. 18 """ 19 for topic in topics: 20 try: 21 for connection_id in get_connections_for_topic(topic): 22 filters = get_connection_topic_filters(connection_id, topic) 23 filtered_connection_responses = _get_filtered_connection_responses(get_connection_data(connection_id), 24 response, model, filters) 25 for response in filtered_connection_responses: 26 try: 27 send_message_to_connection(connection_id, response) 28 except Exception as e: # pragma: no cover 29 log.error(f"Broadcast failed for {connection_id}: {e}") 30 except Exception as e: # pragma: no cover 31 log.error(f"Failed to retrieve connections for topic {topic}: {e}") 32 33 34def get_connections_for_topic(topic: str) -> list[str]: # pragma: unit 35 return list(get_cache().smembers(topic)) 36 37 38def send_message_to_connection(connection_id: str, response: BedrockEnvelopedResponse): # pragma: unit 39 """ 40 Sends a message to a specific connection. 41 """ 42 try: 43 payload = response.as_json() if hasattr(response, "as_json") else response 44 api_gw_mmgmt_api = get_websocket_api_gateway_client() 45 api_gw_mmgmt_api.post_to_connection( 46 Data=json.dumps(payload), 47 ConnectionId=connection_id 48 ) 49 except Exception as e: # pragma: no cover 50 log.error(f"Failed to send message to connection {connection_id}: {e}") 51 raise e 52 53 54def close_connection(connection_id: str): # pragma: unit 55 """ 56 Closes a specific WebSocket connection. 57 """ 58 api_gw_mmgmt_api = get_websocket_api_gateway_client() 59 try: 60 api_gw_mmgmt_api.delete_connection(ConnectionId=connection_id) 61 log.info(f"Connection {connection_id} closed successfully.") 62 except Exception as e: # pragma: no cover 63 log.error(f"Failed to close connection {connection_id}: {e}") 64 raise e 65 66 67def _get_filtered_connection_responses(connection_data: WebsocketConnectionData, response: BedrockEnvelopedResponse, 68 model, filters = None) -> list[BedrockEnvelopedResponse]: # pragma: unit 69 if connection_data is None: 70 return [] 71 72 default_filter = connection_data.broadcast_filters 73 filters = filters or {} 74 criteria_filter = {**default_filter, **filters} 75 entity = response.data 76 77 if isinstance(entity, list): 78 raise NotImplementedException() 79 80 if is_filter_criteria_met(entity, criteria_filter, model_class=model): 81 return [response] 82 else: 83 return []
log =
<MyLogger BEDROCK-websockets (INFO)>
def
broadcast_websocket_message( response: bedrock.endpoints.dto.bedrock_response.BedrockEnvelopedResponse, topics: list[str], model=None):
16def broadcast_websocket_message(response: BedrockEnvelopedResponse, topics: list[str], model=None): # pragma: unit 17 """ 18 Fan-out `response` to every connection subscribed to any of `topics`. 19 """ 20 for topic in topics: 21 try: 22 for connection_id in get_connections_for_topic(topic): 23 filters = get_connection_topic_filters(connection_id, topic) 24 filtered_connection_responses = _get_filtered_connection_responses(get_connection_data(connection_id), 25 response, model, filters) 26 for response in filtered_connection_responses: 27 try: 28 send_message_to_connection(connection_id, response) 29 except Exception as e: # pragma: no cover 30 log.error(f"Broadcast failed for {connection_id}: {e}") 31 except Exception as e: # pragma: no cover 32 log.error(f"Failed to retrieve connections for topic {topic}: {e}")
Fan-out response to every connection subscribed to any of topics.
def
get_connections_for_topic(topic: str) -> list[str]:
def
send_message_to_connection( connection_id: str, response: bedrock.endpoints.dto.bedrock_response.BedrockEnvelopedResponse):
39def send_message_to_connection(connection_id: str, response: BedrockEnvelopedResponse): # pragma: unit 40 """ 41 Sends a message to a specific connection. 42 """ 43 try: 44 payload = response.as_json() if hasattr(response, "as_json") else response 45 api_gw_mmgmt_api = get_websocket_api_gateway_client() 46 api_gw_mmgmt_api.post_to_connection( 47 Data=json.dumps(payload), 48 ConnectionId=connection_id 49 ) 50 except Exception as e: # pragma: no cover 51 log.error(f"Failed to send message to connection {connection_id}: {e}") 52 raise e
Sends a message to a specific connection.
def
close_connection(connection_id: str):
55def close_connection(connection_id: str): # pragma: unit 56 """ 57 Closes a specific WebSocket connection. 58 """ 59 api_gw_mmgmt_api = get_websocket_api_gateway_client() 60 try: 61 api_gw_mmgmt_api.delete_connection(ConnectionId=connection_id) 62 log.info(f"Connection {connection_id} closed successfully.") 63 except Exception as e: # pragma: no cover 64 log.error(f"Failed to close connection {connection_id}: {e}") 65 raise e
Closes a specific WebSocket connection.