bedrock.workers.invoker
1import asyncio # pragma: no cover 2import json # pragma: no cover 3from json import JSONDecodeError # pragma: no cover 4 5from bedrock.log import log_config # pragma: no cover 6from bedrock.config import get_config_params # pragma: no cover 7from bedrock.config._modules import _get_workers # pragma: no cover 8from bedrock.external.secrets_handler import find_secret # pragma: no cover 9from bedrock.db.connection import create_url, get_connection_details # pragma: no cover 10from bedrock.external.aws import make_client # pragma: no cover 11 12log = log_config("Invoker") # pragma: no cover 13 14 15def _get_worker_module(module_name): # pragma: no cover 16 return _get_workers("./workers", "workers.")[module_name] 17 18 19def _local_invoke(name: str, payload: dict, require_response: bool): # pragma: no cover 20 if not require_response: 21 _local_invoke_helper(name, payload) 22 return asyncio.run(_local_invoke_helper(name, payload)) 23 24 25async def _local_invoke_helper(name: str, payload: dict): # pragma: no cover 26 worker_module = _get_worker_module(name) 27 return worker_module.handle(payload, None) 28 29 30def invoke(name: str, payload: dict, require_response: bool = False, config_override={}): # pragma: no cover 31 """ 32 Invoke a lambda function given an arn and payload. 33 :param name: Name of the worker (the name of the worker's directory under `app/workers/`) 34 :param payload: The payload to send to the lambda function 35 :param require_response: Whether to wait for a response from the lambda function. If set to `False`, the function will fire and forget 36 :param config_override: Override config values before injecting into payload 37 :return: The response from the lambda function if `require_response` is `True` (`None` otherwise) 38 """ 39 invocation_type = "RequestResponse" if require_response else "Event" 40 try: 41 log.debug(f"Invoking {name} with {invocation_type}") 42 config = get_config_params() 43 log.debug("Injecting config into payload...") 44 payload["config"] = {**config} 45 if "database" in payload["config"]: 46 secret = find_secret("database") 47 payload["config"]["database"] = { 48 **payload["config"]["database"], 49 "url": create_url(*get_connection_details(secret)) 50 } 51 payload["config"] = {**payload["config"], **config_override} 52 53 if config['environment'] == 'local': 54 result = _local_invoke(name, payload, require_response) 55 else: 56 arn = config["workers"][name] 57 payload_as_string = json.dumps(payload) 58 lambda_client = make_client("lambda") 59 lambda_result = lambda_client.invoke(FunctionName=arn, 60 InvocationType=invocation_type, 61 Payload=payload_as_string) 62 if lambda_result['StatusCode'] < 200 or lambda_result['StatusCode'] > 299: 63 raise Exception(f"Failed to invoke function {name} with status code {lambda_result['StatusCode']}") 64 lambda_payload = lambda_result['Payload'] 65 try: 66 result = json.loads(lambda_payload.read().decode("utf-8")) 67 except JSONDecodeError: 68 result = lambda_payload.read().decode("utf-8") 69 if require_response: 70 return result 71 except Exception: 72 log.error(f"Failed to invoke function {name}", exc_info=True) 73 return None
log =
<MyLogger BEDROCK-Invoker (INFO)>
def
invoke( name: str, payload: dict, require_response: bool = False, config_override={}):
31def invoke(name: str, payload: dict, require_response: bool = False, config_override={}): # pragma: no cover 32 """ 33 Invoke a lambda function given an arn and payload. 34 :param name: Name of the worker (the name of the worker's directory under `app/workers/`) 35 :param payload: The payload to send to the lambda function 36 :param require_response: Whether to wait for a response from the lambda function. If set to `False`, the function will fire and forget 37 :param config_override: Override config values before injecting into payload 38 :return: The response from the lambda function if `require_response` is `True` (`None` otherwise) 39 """ 40 invocation_type = "RequestResponse" if require_response else "Event" 41 try: 42 log.debug(f"Invoking {name} with {invocation_type}") 43 config = get_config_params() 44 log.debug("Injecting config into payload...") 45 payload["config"] = {**config} 46 if "database" in payload["config"]: 47 secret = find_secret("database") 48 payload["config"]["database"] = { 49 **payload["config"]["database"], 50 "url": create_url(*get_connection_details(secret)) 51 } 52 payload["config"] = {**payload["config"], **config_override} 53 54 if config['environment'] == 'local': 55 result = _local_invoke(name, payload, require_response) 56 else: 57 arn = config["workers"][name] 58 payload_as_string = json.dumps(payload) 59 lambda_client = make_client("lambda") 60 lambda_result = lambda_client.invoke(FunctionName=arn, 61 InvocationType=invocation_type, 62 Payload=payload_as_string) 63 if lambda_result['StatusCode'] < 200 or lambda_result['StatusCode'] > 299: 64 raise Exception(f"Failed to invoke function {name} with status code {lambda_result['StatusCode']}") 65 lambda_payload = lambda_result['Payload'] 66 try: 67 result = json.loads(lambda_payload.read().decode("utf-8")) 68 except JSONDecodeError: 69 result = lambda_payload.read().decode("utf-8") 70 if require_response: 71 return result 72 except Exception: 73 log.error(f"Failed to invoke function {name}", exc_info=True) 74 return None
Invoke a lambda function given an arn and payload.
Parameters
- name: Name of the worker (the name of the worker's directory under
app/workers/) - payload: The payload to send to the lambda function
- require_response: Whether to wait for a response from the lambda function. If set to
False, the function will fire and forget - config_override: Override config values before injecting into payload
Returns
The response from the lambda function if
require_responseisTrue(Noneotherwise)