bedrock.endpoints.local_websocket_api_gateway
1import asyncio 2import json 3 4from bedrock.log import log_config 5 6logger = log_config("local_websocket_api_gateway") # pragma: unit 7 8 9class LocalWebsocketApiGateway: 10 CONNECTIONS = {} 11 12 def set_loop(self, loop): # pragma: unit 13 self.loop = loop 14 15 def add_websocket_connection(self, connection_id, websocket): # pragma: unit 16 self.CONNECTIONS[connection_id] = websocket 17 18 def remove_websocket_connection(self, connection_id): # pragma: unit 19 self.CONNECTIONS.pop(connection_id) 20 21 def post_to_connection(self, Data=None, ConnectionId=None): # pragma: unit 22 websocket = self.CONNECTIONS.get(ConnectionId) 23 if websocket is None: 24 raise Exception(f"Connection {ConnectionId} not found") 25 26 if isinstance(Data, (dict, list)): 27 Data = json.dumps(Data) 28 29 try: 30 # Schedule send in the main event loop 31 asyncio.run_coroutine_threadsafe(websocket.send(Data), self.loop) 32 except Exception as e: # pragma: no cover 33 logger.error(f"Failed to send message to connection {ConnectionId}: {e}") 34 35 def delete_connection(self, ConnectionId): # pragma: unit 36 websocket = self.CONNECTIONS.pop(ConnectionId, None) 37 if websocket: 38 loop = asyncio.get_event_loop() 39 if loop.is_running(): 40 asyncio.create_task(websocket.close()) 41 else: 42 loop.run_until_complete(websocket.close()) 43 return {"ResponseMetadata": {"HTTPStatusCode": 200}} 44 else: 45 # mimic AWS GoneException when connection already gone 46 raise Exception(f"Connection {ConnectionId} not found (GoneException)")
logger =
<MyLogger BEDROCK-local_websocket_api_gateway (INFO)>
class
LocalWebsocketApiGateway:
10class LocalWebsocketApiGateway: 11 CONNECTIONS = {} 12 13 def set_loop(self, loop): # pragma: unit 14 self.loop = loop 15 16 def add_websocket_connection(self, connection_id, websocket): # pragma: unit 17 self.CONNECTIONS[connection_id] = websocket 18 19 def remove_websocket_connection(self, connection_id): # pragma: unit 20 self.CONNECTIONS.pop(connection_id) 21 22 def post_to_connection(self, Data=None, ConnectionId=None): # pragma: unit 23 websocket = self.CONNECTIONS.get(ConnectionId) 24 if websocket is None: 25 raise Exception(f"Connection {ConnectionId} not found") 26 27 if isinstance(Data, (dict, list)): 28 Data = json.dumps(Data) 29 30 try: 31 # Schedule send in the main event loop 32 asyncio.run_coroutine_threadsafe(websocket.send(Data), self.loop) 33 except Exception as e: # pragma: no cover 34 logger.error(f"Failed to send message to connection {ConnectionId}: {e}") 35 36 def delete_connection(self, ConnectionId): # pragma: unit 37 websocket = self.CONNECTIONS.pop(ConnectionId, None) 38 if websocket: 39 loop = asyncio.get_event_loop() 40 if loop.is_running(): 41 asyncio.create_task(websocket.close()) 42 else: 43 loop.run_until_complete(websocket.close()) 44 return {"ResponseMetadata": {"HTTPStatusCode": 200}} 45 else: 46 # mimic AWS GoneException when connection already gone 47 raise Exception(f"Connection {ConnectionId} not found (GoneException)")
def
post_to_connection(self, Data=None, ConnectionId=None):
22 def post_to_connection(self, Data=None, ConnectionId=None): # pragma: unit 23 websocket = self.CONNECTIONS.get(ConnectionId) 24 if websocket is None: 25 raise Exception(f"Connection {ConnectionId} not found") 26 27 if isinstance(Data, (dict, list)): 28 Data = json.dumps(Data) 29 30 try: 31 # Schedule send in the main event loop 32 asyncio.run_coroutine_threadsafe(websocket.send(Data), self.loop) 33 except Exception as e: # pragma: no cover 34 logger.error(f"Failed to send message to connection {ConnectionId}: {e}")
def
delete_connection(self, ConnectionId):
36 def delete_connection(self, ConnectionId): # pragma: unit 37 websocket = self.CONNECTIONS.pop(ConnectionId, None) 38 if websocket: 39 loop = asyncio.get_event_loop() 40 if loop.is_running(): 41 asyncio.create_task(websocket.close()) 42 else: 43 loop.run_until_complete(websocket.close()) 44 return {"ResponseMetadata": {"HTTPStatusCode": 200}} 45 else: 46 # mimic AWS GoneException when connection already gone 47 raise Exception(f"Connection {ConnectionId} not found (GoneException)")