bedrock.endpoints.local_websocket_api_gateway

 1import asyncio
 2import json
 3
 4from bedrock.log import log_config
 5
 6logger = log_config("local_websocket_api_gateway")  # pragma: unit
 7
 8
 9class LocalWebsocketApiGateway:
10    CONNECTIONS = {}
11
12    def set_loop(self, loop):  # pragma: unit
13        self.loop = loop
14
15    def add_websocket_connection(self, connection_id, websocket):  # pragma: unit
16        self.CONNECTIONS[connection_id] = websocket
17
18    def remove_websocket_connection(self, connection_id):  # pragma: unit
19        self.CONNECTIONS.pop(connection_id)
20
21    def post_to_connection(self, Data=None, ConnectionId=None):  # pragma: unit
22        websocket = self.CONNECTIONS.get(ConnectionId)
23        if websocket is None:
24            raise Exception(f"Connection {ConnectionId} not found")
25
26        if isinstance(Data, (dict, list)):
27            Data = json.dumps(Data)
28
29        try:
30            # Schedule send in the main event loop
31            asyncio.run_coroutine_threadsafe(websocket.send(Data), self.loop)
32        except Exception as e:  # pragma: no cover
33            logger.error(f"Failed to send message to connection {ConnectionId}: {e}")
34
35    def delete_connection(self, ConnectionId):  # pragma: unit
36        websocket = self.CONNECTIONS.pop(ConnectionId, None)
37        if websocket:
38            loop = asyncio.get_event_loop()
39            if loop.is_running():
40                asyncio.create_task(websocket.close())
41            else:
42                loop.run_until_complete(websocket.close())
43            return {"ResponseMetadata": {"HTTPStatusCode": 200}}
44        else:
45            # mimic AWS GoneException when connection already gone
46            raise Exception(f"Connection {ConnectionId} not found (GoneException)")
logger = <MyLogger BEDROCK-local_websocket_api_gateway (INFO)>
class LocalWebsocketApiGateway:
10class LocalWebsocketApiGateway:
11    CONNECTIONS = {}
12
13    def set_loop(self, loop):  # pragma: unit
14        self.loop = loop
15
16    def add_websocket_connection(self, connection_id, websocket):  # pragma: unit
17        self.CONNECTIONS[connection_id] = websocket
18
19    def remove_websocket_connection(self, connection_id):  # pragma: unit
20        self.CONNECTIONS.pop(connection_id)
21
22    def post_to_connection(self, Data=None, ConnectionId=None):  # pragma: unit
23        websocket = self.CONNECTIONS.get(ConnectionId)
24        if websocket is None:
25            raise Exception(f"Connection {ConnectionId} not found")
26
27        if isinstance(Data, (dict, list)):
28            Data = json.dumps(Data)
29
30        try:
31            # Schedule send in the main event loop
32            asyncio.run_coroutine_threadsafe(websocket.send(Data), self.loop)
33        except Exception as e:  # pragma: no cover
34            logger.error(f"Failed to send message to connection {ConnectionId}: {e}")
35
36    def delete_connection(self, ConnectionId):  # pragma: unit
37        websocket = self.CONNECTIONS.pop(ConnectionId, None)
38        if websocket:
39            loop = asyncio.get_event_loop()
40            if loop.is_running():
41                asyncio.create_task(websocket.close())
42            else:
43                loop.run_until_complete(websocket.close())
44            return {"ResponseMetadata": {"HTTPStatusCode": 200}}
45        else:
46            # mimic AWS GoneException when connection already gone
47            raise Exception(f"Connection {ConnectionId} not found (GoneException)")
CONNECTIONS = {}
def set_loop(self, loop):
13    def set_loop(self, loop):  # pragma: unit
14        self.loop = loop
def add_websocket_connection(self, connection_id, websocket):
16    def add_websocket_connection(self, connection_id, websocket):  # pragma: unit
17        self.CONNECTIONS[connection_id] = websocket
def remove_websocket_connection(self, connection_id):
19    def remove_websocket_connection(self, connection_id):  # pragma: unit
20        self.CONNECTIONS.pop(connection_id)
def post_to_connection(self, Data=None, ConnectionId=None):
22    def post_to_connection(self, Data=None, ConnectionId=None):  # pragma: unit
23        websocket = self.CONNECTIONS.get(ConnectionId)
24        if websocket is None:
25            raise Exception(f"Connection {ConnectionId} not found")
26
27        if isinstance(Data, (dict, list)):
28            Data = json.dumps(Data)
29
30        try:
31            # Schedule send in the main event loop
32            asyncio.run_coroutine_threadsafe(websocket.send(Data), self.loop)
33        except Exception as e:  # pragma: no cover
34            logger.error(f"Failed to send message to connection {ConnectionId}: {e}")
def delete_connection(self, ConnectionId):
36    def delete_connection(self, ConnectionId):  # pragma: unit
37        websocket = self.CONNECTIONS.pop(ConnectionId, None)
38        if websocket:
39            loop = asyncio.get_event_loop()
40            if loop.is_running():
41                asyncio.create_task(websocket.close())
42            else:
43                loop.run_until_complete(websocket.close())
44            return {"ResponseMetadata": {"HTTPStatusCode": 200}}
45        else:
46            # mimic AWS GoneException when connection already gone
47            raise Exception(f"Connection {ConnectionId} not found (GoneException)")