bedrock.websockets.connections
1from bedrock.cache.websockets import remove_connection, unsubscribe_connection, save_connection, \ 2 is_connection_token_valid # pragma: no cover 3from bedrock.cache.websockets.websocket_connection_data import WebsocketConnectionData # pragma: no cover 4from bedrock.log import log_config # pragma: no cover 5 6logger = log_config("websocket_connections") # pragma: no cover 7 8 9def connect(websocket_connection_data: WebsocketConnectionData) -> dict: # pragma: no cover 10 return save_connection(websocket_connection_data) 11 12 13def disconnect(connection_id: str): # pragma: no cover 14 remove_connection(connection_id) 15 unsubscribe_connection(connection_id) 16 17 18def ping(connection_id: str) -> bool: # pragma: no cover 19 return is_connection_token_valid(connection_id)
logger =
<MyLogger BEDROCK-websocket_connections (INFO)>
def
connect( websocket_connection_data: bedrock.cache.websockets.websocket_connection_data.WebsocketConnectionData) -> dict:
def
disconnect(connection_id: str):
def
ping(connection_id: str) -> bool: