bedrock.external.aws
1import boto3 2from bedrock.log import log_config 3from bedrock.config import get_config_params 4 5CLIENTS = {} 6RESOURCES = {} 7 8log = log_config("AWS") 9 10 11def has_config_params(): # pragma: integration 12 config = get_config_params() 13 try: 14 return config["aws"]["endpoint"] is not None and config["aws"]["endpoint"] != "" \ 15 and config["aws"]["region"] is not None and config["aws"]["region"] != "" \ 16 and config["aws"]["access_key_id"] is not None and config["aws"]["access_key_id"] != "" \ 17 and config["aws"]["secret_access_key"] is not None and config["aws"]["secret_access_key"] != "" 18 except KeyError: 19 return False 20 21 22def _make_client(service: str, **kwargs): # pragma: integration 23 config = get_config_params() 24 if not has_config_params(): 25 return boto3.client(service, **kwargs) 26 27 # Fill in parameters from config if not explicitly provided 28 if "region_name" not in kwargs: 29 kwargs["region_name"] = config["aws"]["region"] 30 if "aws_access_key_id" not in kwargs: 31 kwargs["aws_access_key_id"] = config["aws"]["access_key_id"] 32 if "aws_secret_access_key" not in kwargs: 33 kwargs["aws_secret_access_key"] = config["aws"]["secret_access_key"] 34 35 return boto3.client(service, **kwargs) 36 37 38def make_client(service: str, **kwargs): # pragma: integration 39 """ 40 Create a client for the given AWS service. 41 :param service: The name of the aws service. See `boto3.client` documentation. 42 :return: 43 """ 44 service_name = service + str(kwargs) 45 if service_name not in CLIENTS: 46 CLIENTS[service_name] = _make_client(service, **kwargs) 47 return CLIENTS[service_name] 48 49 50def _make_resource(service: str, **kwargs): # pragma: integration 51 config = get_config_params() 52 if not has_config_params(): 53 return boto3.resource(service, **kwargs) 54 55 # Fill in parameters from config if not explicitly provided 56 if "region_name" not in kwargs: 57 kwargs["region_name"] = config["aws"]["region"] 58 if "aws_access_key_id" not in kwargs: 59 kwargs["aws_access_key_id"] = config["aws"]["access_key_id"] 60 if "aws_secret_access_key" not in kwargs: 61 kwargs["aws_secret_access_key"] = config["aws"]["secret_access_key"] 62 63 return boto3.resource(service, **kwargs) 64 65 66def make_resource(service: str, **kwargs): # pragma: integration 67 """ 68 Create a resource for the given AWS service. 69 :param service: The name of the aws service. See `boto3.client` documentation. 70 :return: 71 """ 72 service_name = service + str(kwargs) 73 if service_name not in RESOURCES: 74 RESOURCES[service_name] = _make_resource(service, **kwargs) 75 return RESOURCES[service_name]
CLIENTS =
{'secretsmanager{}': <botocore.client.SecretsManager object>}
RESOURCES =
{}
log =
<MyLogger BEDROCK-AWS (INFO)>
def
has_config_params():
12def has_config_params(): # pragma: integration 13 config = get_config_params() 14 try: 15 return config["aws"]["endpoint"] is not None and config["aws"]["endpoint"] != "" \ 16 and config["aws"]["region"] is not None and config["aws"]["region"] != "" \ 17 and config["aws"]["access_key_id"] is not None and config["aws"]["access_key_id"] != "" \ 18 and config["aws"]["secret_access_key"] is not None and config["aws"]["secret_access_key"] != "" 19 except KeyError: 20 return False
def
make_client(service: str, **kwargs):
39def make_client(service: str, **kwargs): # pragma: integration 40 """ 41 Create a client for the given AWS service. 42 :param service: The name of the aws service. See `boto3.client` documentation. 43 :return: 44 """ 45 service_name = service + str(kwargs) 46 if service_name not in CLIENTS: 47 CLIENTS[service_name] = _make_client(service, **kwargs) 48 return CLIENTS[service_name]
Create a client for the given AWS service.
Parameters
- service: The name of the aws service. See
boto3.clientdocumentation.
Returns
def
make_resource(service: str, **kwargs):
67def make_resource(service: str, **kwargs): # pragma: integration 68 """ 69 Create a resource for the given AWS service. 70 :param service: The name of the aws service. See `boto3.client` documentation. 71 :return: 72 """ 73 service_name = service + str(kwargs) 74 if service_name not in RESOURCES: 75 RESOURCES[service_name] = _make_resource(service, **kwargs) 76 return RESOURCES[service_name]
Create a resource for the given AWS service.
Parameters
- service: The name of the aws service. See
boto3.clientdocumentation.