bedrock.websockets.routes

  1import json  # pragma: unit
  2
  3from jwt import ExpiredSignatureError, InvalidSignatureError  # pragma: unit
  4
  5from bedrock.cache import get_cache  # pragma: unit
  6from bedrock.cache.websockets import (create_websocket_connection_data_from_connect_event, \
  7                                      create_websocket_connection_data_from_authenticate_event, \
  8                                      subscribe_connection_to_topic, \
  9                                      unsubscribe_connection, \
 10                                      save_connection)  # pragma: unit
 11from bedrock.config import internals  # pragma: unit
 12from bedrock.config.headers import ValidHeaders  # pragma: unit
 13from bedrock.config.websockets import get_websockets_dictionary  # pragma: unit
 14from bedrock.endpoints.dto.bedrock_response import BedrockErrorResponse, BedrockEnvelopedResponse  # pragma: unit
 15from bedrock.external.websocket import send_message_to_connection, close_connection  # pragma: unit
 16from bedrock.log import log_config  # pragma: unit
 17from bedrock.websockets.connections import ping, disconnect  # pragma: unit
 18from bedrock.websockets.websocket_error_code import WebSocketErrorCode  # pragma: unit
 19
 20WEBSOCKETS_DICTIONARY = get_websockets_dictionary(internals["websockets_location"],
 21                                                  internals["websockets_prefix"])  # pragma: unit
 22
 23
 24def route_connect_handler(event, context):  # pragma: unit
 25    """
 26    This handler gets called when creating a new WebSocket connection.
 27
 28    It creates a new `WebsocketConnectionData` object from the event and saves it to the cache.
 29    Note: The connection is not authenticated at this stage, that happens in `route_authenticate_handler`.
 30    """
 31    logger = log_config("ConnectRoute")
 32    try:
 33        websocket_connection_data = create_websocket_connection_data_from_connect_event(event)
 34        save_connection(websocket_connection_data)
 35        return {"statusCode": 200, "body": "Connected"}
 36    except KeyError as e:
 37        logger.error(f"Event incorrectly formatted, missing field: {e}")
 38        return {"statusCode": 400, "body": f"Missing field: {e}"}
 39    except Exception as e:
 40        logger.error(f"Error in $connect: {type(e).__name__}: {e}.")
 41        return {"statusCode": 500, "body": "Internal Server Error"}
 42
 43
 44def route_default_handler(event, context):  # pragma: unit
 45    """
 46    This handler gets called when a message is received on the WebSocket connection that doesn't match any of the other routes.
 47    """
 48    logger = log_config("DefaultRoute")
 49    try:
 50        connection_id = event["requestContext"]["connectionId"]
 51        logger.warning(f"Default route called for connection ID: {connection_id}")
 52        send_message_to_connection(
 53            connection_id,
 54            BedrockErrorResponse({"error": "Route not found"}, event).as_json()
 55        )
 56
 57        return {
 58            "statusCode": 404,
 59            "body": "Requested route not found"
 60        }
 61    except KeyError as e:
 62        logger.error(f"Event incorrectly formatted, missing field: {e}")
 63        return {"statusCode": 400, "body": f"Missing field: {e}"}
 64    except Exception as e:
 65        logger.error(f"Error in $default: {type(e).__name__}: {e}.")
 66        return {
 67            "statusCode": 500,
 68            "body": "Internal Server Error"
 69        }
 70
 71
 72def route_disconnect_handler(event, context):  # pragma: unit
 73    """
 74    This handler gets called when terminating a WebSocket connection.
 75    It removes the connection from the cache and unsubscribes it from any topics it was subscribed to.
 76    """
 77    logger = log_config("DisconnectRoute")
 78    try:
 79        connection_id = event["requestContext"]["connectionId"]
 80        disconnect(connection_id)
 81        return {"statusCode": 200, "body": "Disconnected"}
 82    except KeyError as e:
 83        logger.error(f"Event incorrectly formatted, missing field: {e}")
 84        return {"statusCode": 400, "body": f"Missing field: {e}"}
 85    except Exception as e:
 86        logger.error(f"Error in $disconnect: {type(e).__name__}: {e}.")
 87        return {"statusCode": 500, "body": "Internal Server Error"}
 88
 89
 90def route_ping_handler(event, context):  # pragma: unit
 91    """
 92    This handler gets called when a ping message is received on the WebSocket connection.
 93    We successfully respond with a pong message if the connection is authorised and the token is still valid, otherwise we return an error.
 94    """
 95    logger = log_config("PingRoute")
 96    try:
 97        connection_id = event["requestContext"]["connectionId"]
 98        if not is_authorised(connection_id):
 99            send_websocket_auth_error(connection_id,
100                                      error_message="Connection is not authorised, cannot ping",
101                                      event=event)
102            return {"statusCode": 401, "body": "Not authorised"}
103        if ping(connection_id):
104            send_message_to_connection(
105                connection_id,
106                BedrockEnvelopedResponse({"message": "PONG"}, event)
107            )
108            return {"statusCode": 200, "body": "Ping successful"}
109        else:
110            logger.error(f"Ping failed for connection ID: {connection_id}")
111            send_message_to_connection(
112                connection_id,
113                BedrockEnvelopedResponse({"error": "Ping failed"}, event)
114            )
115            close_connection(connection_id)
116            disconnect(connection_id)
117
118            return {"statusCode": 401, "body": "Invalid Token"}
119
120    except KeyError as e:
121        logger.error(f"PingRoute event missing field: {e}")
122        return {"statusCode": 400, "body": f"Event incorrectly formatted, missing field: {e}"}
123    except Exception as e:
124        logger.error(f"Error in ping: {type(e).__name__}: {e}.")
125        return {"statusCode": 500, "body": "Internal Server Error"}
126
127
128def route_subscribe_handler(event, context):  # pragma: unit
129    """
130    This handler gets called when a message is received on the WebSocket connection to subscribe to a topic.
131    If the connection is authorised, the cache is updated with the topic details and a confirmation message is sent back to the client. Otherwise, an error message is sent back.
132    """
133    logger = log_config("SubscribeRoute")
134    try:
135        connection_id = event["requestContext"]["connectionId"]
136        if not is_authorised(connection_id):
137            send_websocket_auth_error(connection_id,
138                                      error_message="Connection is not authorised, cannot subscribe",
139                                      event=event)
140            return {"statusCode": 401, "body": "Not authorised"}
141        body = json.loads(event["body"])
142        topic = body["topic"]
143        filters = body.get("filters", None)
144        subscribe_connection_to_topic(connection_id, topic, filters)
145        send_message_to_connection(
146            connection_id,
147            BedrockEnvelopedResponse({"message": f"Subscribed to topic {topic}"}, event))
148        return {"statusCode": 200, "body": "Subscribed"}
149    except json.JSONDecodeError as e:
150        logger.error(f"Invalid JSON: {e}")
151        return {"statusCode": 400, "body": "Invalid JSON body"}
152    except KeyError as e:
153        logger.error(f"Event incorrectly formatted, missing field: {e}")
154        return {"statusCode": 400, "body": f"Missing field: {e}"}
155    except Exception as e:
156        logger.error(f"Error in subscribe: {type(e).__name__}: {e}.")
157        return {"statusCode": 500, "body": "Internal Server Error"}
158
159
160def route_send_message_handler(event, context):  # pragma: unit
161    """
162    This handler gets called when a message is received on the WebSocket connection to send a message to a topic.
163    If the connection is authorised, the message is sent to all connections subscribed to the topic. Otherwise, an error message is sent back.
164    """
165    logger = log_config("SendMessageRoute")
166    try:
167        connection_id = event["requestContext"]["connectionId"]
168        if not is_authorised(connection_id):
169            send_websocket_auth_error(connection_id,
170                                      error_message="Connection is not authorised, cannot send messages",
171                                      event=event)
172            return {"statusCode": 401, "body": "Not authorised"}
173        body = json.loads(event.get("body", "{}"))
174        topic = body.get("topic")
175        if not topic:
176            logger.error("Missing 'topic' in send_message body")
177            return {"statusCode": 400, "body": "Missing 'topic' in message body"}
178
179        logger.info(f"Sending message to topic: {topic}")
180        for websocket_handler in WEBSOCKETS_DICTIONARY.values():
181            if topic in websocket_handler.__websocket_topics__:
182                handler_instance = websocket_handler()
183                handler_instance._handle_with_context(event, context)
184
185        return {"statusCode": 200, "body": "Message handled"}
186
187    except json.JSONDecodeError as e:
188        logger.error(f"Invalid JSON in send_message: {e}")
189        return {"statusCode": 400, "body": "Invalid JSON body"}
190    except Exception as e:
191        logger.error(f"Error in subscribe: {type(e).__name__}: {e}.")
192        return {"statusCode": 500, "body": "Internal Server Error"}
193
194
195def route_unsubscribe_handler(event, context):  # pragma: unit
196    """
197    This handler gets called when a message is received on the WebSocket connection to unsubscribe from a topic.
198    If the connection is authorised, the cache is updated to remove the topic details and a confirmation message is sent back to the client. Otherwise, an error message is sent back.
199    """
200    logger = log_config("UnsubscribeRoute")
201    try:
202        connection_id = event["requestContext"]["connectionId"]
203        logger.info(f"Unsubscribing from {connection_id}")
204        if not is_authorised(connection_id):
205            send_websocket_auth_error(connection_id,
206                                      error_message="Connection is not authorised, cannot unsubscribe",
207                                      event=event)
208            return {"statusCode": 401, "body": "Not authorised"}
209        body = json.loads(event.get("body", "{}"))
210        topic = body.get("topic")
211        unsubscribe_connection(connection_id, [topic])
212        send_message_to_connection(
213            connection_id,
214            BedrockEnvelopedResponse({"message": f"Unsubscribed from topic {topic}"}, event))
215        return {"statusCode": 200, "body": "Unsubscribed"}
216    except json.JSONDecodeError as e:
217        logger.error(f"Invalid JSON: {e}")
218        return {"statusCode": 400, "body": "Invalid JSON body"}
219    except KeyError as e:
220        logger.error(f"Event incorrectly formatted, missing field: {e}")
221        return {"statusCode": 400, "body": f"Missing field: {e}"}
222    except Exception as e:
223        logger.error(f"Error in unsubscribe: {type(e).__name__}: {e}.")
224        return {"statusCode": 500, "body": "Internal Server Error"}
225
226
227def route_authenticate_handler(event, context):  # pragma: unit
228    """
229    This handler gets called when a message is received on the WebSocket connection to authenticate the connection.
230    It expects a JSON body with a "token" field. If the token is valid and the connection is successfully authenticated, a confirmation message is sent back to the client. Otherwise, an error message is sent back.
231    """
232    logger = log_config("AuthenticateRoute")
233    connection_id = None
234    try:
235        connection_id = event["requestContext"]["connectionId"]
236        body = json.loads(event["body"])
237        token = body.get("token", None)
238        if not token:
239            send_websocket_auth_error(connection_id, WebSocketErrorCode.MISSING_TOKEN, "Missing token", None, event)
240            return {"statusCode": 400, "body": "Missing token"}
241
242        event["headers"] = {f"{ValidHeaders.AUTHORIZATION.value}": f"Bearer {token}"}
243        websocket_connection_data = create_websocket_connection_data_from_authenticate_event(event)
244        save_connection(websocket_connection_data)
245        send_message_to_connection(connection_id,
246                                   BedrockEnvelopedResponse(
247                                       data={"statusCode": 200, "body": f"Connection {connection_id} Authenticated"},
248                                       event=event))
249        return {"statusCode": 200, "body": "Connected"}
250    except json.JSONDecodeError as e:
251        logger.error(f"Invalid JSON in authenticate: {e}")
252        return {"statusCode": 400, "body": "Invalid JSON body"}
253    except KeyError as e:
254        logger.error(f"Event incorrectly formatted, missing field: {e}")
255        return {"statusCode": 400, "body": f"Missing field: {e}"}
256    except ExpiredSignatureError as e:
257        logger.error(f"Token expired: {e}")
258        send_websocket_auth_error(
259            connection_id,
260            error_code=WebSocketErrorCode.TOKEN_EXPIRED,
261            error_message="Token has expired",
262            original_exception=f"{e}",
263            event=event)
264        return {"statusCode": 401, "body": "Token expired"}
265    except InvalidSignatureError as e:
266        logger.error(f"Invalid token signature: {e}")
267        send_websocket_auth_error(
268            connection_id,
269            error_code=WebSocketErrorCode.INVALID_TOKEN,
270            error_message="Invalid token signature",
271            original_exception=f"{e}",
272            event=event)
273        return {"statusCode": 401, "body": "Invalid token signature"}
274    except Exception as e:
275        logger.error(f"Error in authenticate: {type(e).__name__}: {e}.")
276        send_websocket_auth_error(
277            connection_id,
278            error_message="Authentication failed due to an internal error",
279            original_exception=f"{e}",
280            event=event)
281        return {"statusCode": 500, "body": "Internal Server Error"}
282
283
284def is_authorised(connection_id: str) -> bool:  # pragma: unit
285    if not get_cache().exists(connection_id):
286        return False
287    connection_data = json.loads(get_cache().get(connection_id))
288    return connection_data.get("isAuthorised", False) == True
289
290
291def send_websocket_auth_error(connection_id: str,
292                              error_code: WebSocketErrorCode = WebSocketErrorCode.UNAUTHORIZED,
293                              error_message: str = "Connection is not authorised",
294                              original_exception: str = None,
295                              event: dict = None):
296    send_message_to_connection(
297        connection_id,
298        BedrockErrorResponse(
299            {
300                "code": error_code,
301                "error": error_message,
302                "originalException": original_exception
303            },
304            event
305        ).as_json()
306    )
WEBSOCKETS_DICTIONARY = {}
def route_connect_handler(event, context):
25def route_connect_handler(event, context):  # pragma: unit
26    """
27    This handler gets called when creating a new WebSocket connection.
28
29    It creates a new `WebsocketConnectionData` object from the event and saves it to the cache.
30    Note: The connection is not authenticated at this stage, that happens in `route_authenticate_handler`.
31    """
32    logger = log_config("ConnectRoute")
33    try:
34        websocket_connection_data = create_websocket_connection_data_from_connect_event(event)
35        save_connection(websocket_connection_data)
36        return {"statusCode": 200, "body": "Connected"}
37    except KeyError as e:
38        logger.error(f"Event incorrectly formatted, missing field: {e}")
39        return {"statusCode": 400, "body": f"Missing field: {e}"}
40    except Exception as e:
41        logger.error(f"Error in $connect: {type(e).__name__}: {e}.")
42        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when creating a new WebSocket connection.

It creates a new WebsocketConnectionData object from the event and saves it to the cache. Note: The connection is not authenticated at this stage, that happens in route_authenticate_handler.

def route_default_handler(event, context):
45def route_default_handler(event, context):  # pragma: unit
46    """
47    This handler gets called when a message is received on the WebSocket connection that doesn't match any of the other routes.
48    """
49    logger = log_config("DefaultRoute")
50    try:
51        connection_id = event["requestContext"]["connectionId"]
52        logger.warning(f"Default route called for connection ID: {connection_id}")
53        send_message_to_connection(
54            connection_id,
55            BedrockErrorResponse({"error": "Route not found"}, event).as_json()
56        )
57
58        return {
59            "statusCode": 404,
60            "body": "Requested route not found"
61        }
62    except KeyError as e:
63        logger.error(f"Event incorrectly formatted, missing field: {e}")
64        return {"statusCode": 400, "body": f"Missing field: {e}"}
65    except Exception as e:
66        logger.error(f"Error in $default: {type(e).__name__}: {e}.")
67        return {
68            "statusCode": 500,
69            "body": "Internal Server Error"
70        }

This handler gets called when a message is received on the WebSocket connection that doesn't match any of the other routes.

def route_disconnect_handler(event, context):
73def route_disconnect_handler(event, context):  # pragma: unit
74    """
75    This handler gets called when terminating a WebSocket connection.
76    It removes the connection from the cache and unsubscribes it from any topics it was subscribed to.
77    """
78    logger = log_config("DisconnectRoute")
79    try:
80        connection_id = event["requestContext"]["connectionId"]
81        disconnect(connection_id)
82        return {"statusCode": 200, "body": "Disconnected"}
83    except KeyError as e:
84        logger.error(f"Event incorrectly formatted, missing field: {e}")
85        return {"statusCode": 400, "body": f"Missing field: {e}"}
86    except Exception as e:
87        logger.error(f"Error in $disconnect: {type(e).__name__}: {e}.")
88        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when terminating a WebSocket connection. It removes the connection from the cache and unsubscribes it from any topics it was subscribed to.

def route_ping_handler(event, context):
 91def route_ping_handler(event, context):  # pragma: unit
 92    """
 93    This handler gets called when a ping message is received on the WebSocket connection.
 94    We successfully respond with a pong message if the connection is authorised and the token is still valid, otherwise we return an error.
 95    """
 96    logger = log_config("PingRoute")
 97    try:
 98        connection_id = event["requestContext"]["connectionId"]
 99        if not is_authorised(connection_id):
100            send_websocket_auth_error(connection_id,
101                                      error_message="Connection is not authorised, cannot ping",
102                                      event=event)
103            return {"statusCode": 401, "body": "Not authorised"}
104        if ping(connection_id):
105            send_message_to_connection(
106                connection_id,
107                BedrockEnvelopedResponse({"message": "PONG"}, event)
108            )
109            return {"statusCode": 200, "body": "Ping successful"}
110        else:
111            logger.error(f"Ping failed for connection ID: {connection_id}")
112            send_message_to_connection(
113                connection_id,
114                BedrockEnvelopedResponse({"error": "Ping failed"}, event)
115            )
116            close_connection(connection_id)
117            disconnect(connection_id)
118
119            return {"statusCode": 401, "body": "Invalid Token"}
120
121    except KeyError as e:
122        logger.error(f"PingRoute event missing field: {e}")
123        return {"statusCode": 400, "body": f"Event incorrectly formatted, missing field: {e}"}
124    except Exception as e:
125        logger.error(f"Error in ping: {type(e).__name__}: {e}.")
126        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when a ping message is received on the WebSocket connection. We successfully respond with a pong message if the connection is authorised and the token is still valid, otherwise we return an error.

def route_subscribe_handler(event, context):
129def route_subscribe_handler(event, context):  # pragma: unit
130    """
131    This handler gets called when a message is received on the WebSocket connection to subscribe to a topic.
132    If the connection is authorised, the cache is updated with the topic details and a confirmation message is sent back to the client. Otherwise, an error message is sent back.
133    """
134    logger = log_config("SubscribeRoute")
135    try:
136        connection_id = event["requestContext"]["connectionId"]
137        if not is_authorised(connection_id):
138            send_websocket_auth_error(connection_id,
139                                      error_message="Connection is not authorised, cannot subscribe",
140                                      event=event)
141            return {"statusCode": 401, "body": "Not authorised"}
142        body = json.loads(event["body"])
143        topic = body["topic"]
144        filters = body.get("filters", None)
145        subscribe_connection_to_topic(connection_id, topic, filters)
146        send_message_to_connection(
147            connection_id,
148            BedrockEnvelopedResponse({"message": f"Subscribed to topic {topic}"}, event))
149        return {"statusCode": 200, "body": "Subscribed"}
150    except json.JSONDecodeError as e:
151        logger.error(f"Invalid JSON: {e}")
152        return {"statusCode": 400, "body": "Invalid JSON body"}
153    except KeyError as e:
154        logger.error(f"Event incorrectly formatted, missing field: {e}")
155        return {"statusCode": 400, "body": f"Missing field: {e}"}
156    except Exception as e:
157        logger.error(f"Error in subscribe: {type(e).__name__}: {e}.")
158        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when a message is received on the WebSocket connection to subscribe to a topic. If the connection is authorised, the cache is updated with the topic details and a confirmation message is sent back to the client. Otherwise, an error message is sent back.

def route_send_message_handler(event, context):
161def route_send_message_handler(event, context):  # pragma: unit
162    """
163    This handler gets called when a message is received on the WebSocket connection to send a message to a topic.
164    If the connection is authorised, the message is sent to all connections subscribed to the topic. Otherwise, an error message is sent back.
165    """
166    logger = log_config("SendMessageRoute")
167    try:
168        connection_id = event["requestContext"]["connectionId"]
169        if not is_authorised(connection_id):
170            send_websocket_auth_error(connection_id,
171                                      error_message="Connection is not authorised, cannot send messages",
172                                      event=event)
173            return {"statusCode": 401, "body": "Not authorised"}
174        body = json.loads(event.get("body", "{}"))
175        topic = body.get("topic")
176        if not topic:
177            logger.error("Missing 'topic' in send_message body")
178            return {"statusCode": 400, "body": "Missing 'topic' in message body"}
179
180        logger.info(f"Sending message to topic: {topic}")
181        for websocket_handler in WEBSOCKETS_DICTIONARY.values():
182            if topic in websocket_handler.__websocket_topics__:
183                handler_instance = websocket_handler()
184                handler_instance._handle_with_context(event, context)
185
186        return {"statusCode": 200, "body": "Message handled"}
187
188    except json.JSONDecodeError as e:
189        logger.error(f"Invalid JSON in send_message: {e}")
190        return {"statusCode": 400, "body": "Invalid JSON body"}
191    except Exception as e:
192        logger.error(f"Error in subscribe: {type(e).__name__}: {e}.")
193        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when a message is received on the WebSocket connection to send a message to a topic. If the connection is authorised, the message is sent to all connections subscribed to the topic. Otherwise, an error message is sent back.

def route_unsubscribe_handler(event, context):
196def route_unsubscribe_handler(event, context):  # pragma: unit
197    """
198    This handler gets called when a message is received on the WebSocket connection to unsubscribe from a topic.
199    If the connection is authorised, the cache is updated to remove the topic details and a confirmation message is sent back to the client. Otherwise, an error message is sent back.
200    """
201    logger = log_config("UnsubscribeRoute")
202    try:
203        connection_id = event["requestContext"]["connectionId"]
204        logger.info(f"Unsubscribing from {connection_id}")
205        if not is_authorised(connection_id):
206            send_websocket_auth_error(connection_id,
207                                      error_message="Connection is not authorised, cannot unsubscribe",
208                                      event=event)
209            return {"statusCode": 401, "body": "Not authorised"}
210        body = json.loads(event.get("body", "{}"))
211        topic = body.get("topic")
212        unsubscribe_connection(connection_id, [topic])
213        send_message_to_connection(
214            connection_id,
215            BedrockEnvelopedResponse({"message": f"Unsubscribed from topic {topic}"}, event))
216        return {"statusCode": 200, "body": "Unsubscribed"}
217    except json.JSONDecodeError as e:
218        logger.error(f"Invalid JSON: {e}")
219        return {"statusCode": 400, "body": "Invalid JSON body"}
220    except KeyError as e:
221        logger.error(f"Event incorrectly formatted, missing field: {e}")
222        return {"statusCode": 400, "body": f"Missing field: {e}"}
223    except Exception as e:
224        logger.error(f"Error in unsubscribe: {type(e).__name__}: {e}.")
225        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when a message is received on the WebSocket connection to unsubscribe from a topic. If the connection is authorised, the cache is updated to remove the topic details and a confirmation message is sent back to the client. Otherwise, an error message is sent back.

def route_authenticate_handler(event, context):
228def route_authenticate_handler(event, context):  # pragma: unit
229    """
230    This handler gets called when a message is received on the WebSocket connection to authenticate the connection.
231    It expects a JSON body with a "token" field. If the token is valid and the connection is successfully authenticated, a confirmation message is sent back to the client. Otherwise, an error message is sent back.
232    """
233    logger = log_config("AuthenticateRoute")
234    connection_id = None
235    try:
236        connection_id = event["requestContext"]["connectionId"]
237        body = json.loads(event["body"])
238        token = body.get("token", None)
239        if not token:
240            send_websocket_auth_error(connection_id, WebSocketErrorCode.MISSING_TOKEN, "Missing token", None, event)
241            return {"statusCode": 400, "body": "Missing token"}
242
243        event["headers"] = {f"{ValidHeaders.AUTHORIZATION.value}": f"Bearer {token}"}
244        websocket_connection_data = create_websocket_connection_data_from_authenticate_event(event)
245        save_connection(websocket_connection_data)
246        send_message_to_connection(connection_id,
247                                   BedrockEnvelopedResponse(
248                                       data={"statusCode": 200, "body": f"Connection {connection_id} Authenticated"},
249                                       event=event))
250        return {"statusCode": 200, "body": "Connected"}
251    except json.JSONDecodeError as e:
252        logger.error(f"Invalid JSON in authenticate: {e}")
253        return {"statusCode": 400, "body": "Invalid JSON body"}
254    except KeyError as e:
255        logger.error(f"Event incorrectly formatted, missing field: {e}")
256        return {"statusCode": 400, "body": f"Missing field: {e}"}
257    except ExpiredSignatureError as e:
258        logger.error(f"Token expired: {e}")
259        send_websocket_auth_error(
260            connection_id,
261            error_code=WebSocketErrorCode.TOKEN_EXPIRED,
262            error_message="Token has expired",
263            original_exception=f"{e}",
264            event=event)
265        return {"statusCode": 401, "body": "Token expired"}
266    except InvalidSignatureError as e:
267        logger.error(f"Invalid token signature: {e}")
268        send_websocket_auth_error(
269            connection_id,
270            error_code=WebSocketErrorCode.INVALID_TOKEN,
271            error_message="Invalid token signature",
272            original_exception=f"{e}",
273            event=event)
274        return {"statusCode": 401, "body": "Invalid token signature"}
275    except Exception as e:
276        logger.error(f"Error in authenticate: {type(e).__name__}: {e}.")
277        send_websocket_auth_error(
278            connection_id,
279            error_message="Authentication failed due to an internal error",
280            original_exception=f"{e}",
281            event=event)
282        return {"statusCode": 500, "body": "Internal Server Error"}

This handler gets called when a message is received on the WebSocket connection to authenticate the connection. It expects a JSON body with a "token" field. If the token is valid and the connection is successfully authenticated, a confirmation message is sent back to the client. Otherwise, an error message is sent back.

def is_authorised(connection_id: str) -> bool:
285def is_authorised(connection_id: str) -> bool:  # pragma: unit
286    if not get_cache().exists(connection_id):
287        return False
288    connection_data = json.loads(get_cache().get(connection_id))
289    return connection_data.get("isAuthorised", False) == True
def send_websocket_auth_error( connection_id: str, error_code: bedrock.websockets.websocket_error_code.WebSocketErrorCode = <WebSocketErrorCode.UNAUTHORIZED: 'UNAUTHORIZED'>, error_message: str = 'Connection is not authorised', original_exception: str = None, event: dict = None):
292def send_websocket_auth_error(connection_id: str,
293                              error_code: WebSocketErrorCode = WebSocketErrorCode.UNAUTHORIZED,
294                              error_message: str = "Connection is not authorised",
295                              original_exception: str = None,
296                              event: dict = None):
297    send_message_to_connection(
298        connection_id,
299        BedrockErrorResponse(
300            {
301                "code": error_code,
302                "error": error_message,
303                "originalException": original_exception
304            },
305            event
306        ).as_json()
307    )