bedrock.external.api_gateway
1from bedrock.config import get_config_params 2from bedrock.config._common import validate_config 3from bedrock.exceptions import BedrockException 4from bedrock.external.aws import make_client 5from bedrock.log import log_config 6 7log = log_config("AWS_API_Gateway") 8 9 10def init_aws_websocket_api_gateway_client(): # pragma: integration 11 """ 12 Creates an AWS websocket API client, used for posting to websocket 13 connections e.g. client.post_to_connection(connection_id, data) 14 """ 15 config = get_config_params() 16 _validate_websocket_api_gateway_config(config) 17 gateway_id = config["aws"]["api_gateway"]["gateway_id"] 18 region = config["aws"]["region"] 19 stage_name = config["aws"]["api_gateway"]["stage_name"] 20 url = f"https://{gateway_id}.execute-api.{region}.amazonaws.com/{stage_name}" 21 try: 22 return make_client("apigatewaymanagementapi", endpoint_url=url) 23 except Exception as e: # pragma: no cover 24 log.exception("Failed to create WebSocket API client") 25 raise BedrockException("Failed to create WebSocket API client", e) 26 27 28def _validate_websocket_api_gateway_config(config: dict): # pragma: integration 29 validate_config(config["aws"].get("api_gateway", {}), ["gateway_id", "stage_name"], "aws.api_gateway.") 30 validate_config(config["aws"], ["region"], "aws.")
log =
<MyLogger BEDROCK-AWS_API_Gateway (INFO)>
def
init_aws_websocket_api_gateway_client():
11def init_aws_websocket_api_gateway_client(): # pragma: integration 12 """ 13 Creates an AWS websocket API client, used for posting to websocket 14 connections e.g. client.post_to_connection(connection_id, data) 15 """ 16 config = get_config_params() 17 _validate_websocket_api_gateway_config(config) 18 gateway_id = config["aws"]["api_gateway"]["gateway_id"] 19 region = config["aws"]["region"] 20 stage_name = config["aws"]["api_gateway"]["stage_name"] 21 url = f"https://{gateway_id}.execute-api.{region}.amazonaws.com/{stage_name}" 22 try: 23 return make_client("apigatewaymanagementapi", endpoint_url=url) 24 except Exception as e: # pragma: no cover 25 log.exception("Failed to create WebSocket API client") 26 raise BedrockException("Failed to create WebSocket API client", e)
Creates an AWS websocket API client, used for posting to websocket connections e.g. client.post_to_connection(connection_id, data)