bedrock.external.api_gateway

 1from bedrock.config import get_config_params
 2from bedrock.config._common import validate_config
 3from bedrock.exceptions import BedrockException
 4from bedrock.external.aws import make_client
 5from bedrock.log import log_config
 6
 7log = log_config("AWS_API_Gateway")
 8
 9
10def init_aws_websocket_api_gateway_client():  # pragma: integration
11    """
12    Creates an AWS websocket API client, used for posting to websocket
13    connections e.g. client.post_to_connection(connection_id, data)
14    """
15    config = get_config_params()
16    _validate_websocket_api_gateway_config(config)
17    gateway_id = config["aws"]["api_gateway"]["gateway_id"]
18    region = config["aws"]["region"]
19    stage_name = config["aws"]["api_gateway"]["stage_name"]
20    url = f"https://{gateway_id}.execute-api.{region}.amazonaws.com/{stage_name}"
21    try:
22        return make_client("apigatewaymanagementapi", endpoint_url=url)
23    except Exception as e:  # pragma: no cover
24        log.exception("Failed to create WebSocket API client")
25        raise BedrockException("Failed to create WebSocket API client", e)
26
27
28def _validate_websocket_api_gateway_config(config: dict):  # pragma: integration
29    validate_config(config["aws"].get("api_gateway", {}), ["gateway_id", "stage_name"], "aws.api_gateway.")
30    validate_config(config["aws"], ["region"], "aws.")
log = <MyLogger BEDROCK-AWS_API_Gateway (INFO)>
def init_aws_websocket_api_gateway_client():
11def init_aws_websocket_api_gateway_client():  # pragma: integration
12    """
13    Creates an AWS websocket API client, used for posting to websocket
14    connections e.g. client.post_to_connection(connection_id, data)
15    """
16    config = get_config_params()
17    _validate_websocket_api_gateway_config(config)
18    gateway_id = config["aws"]["api_gateway"]["gateway_id"]
19    region = config["aws"]["region"]
20    stage_name = config["aws"]["api_gateway"]["stage_name"]
21    url = f"https://{gateway_id}.execute-api.{region}.amazonaws.com/{stage_name}"
22    try:
23        return make_client("apigatewaymanagementapi", endpoint_url=url)
24    except Exception as e:  # pragma: no cover
25        log.exception("Failed to create WebSocket API client")
26        raise BedrockException("Failed to create WebSocket API client", e)

Creates an AWS websocket API client, used for posting to websocket connections e.g. client.post_to_connection(connection_id, data)