bedrock.workers.invoker

 1import asyncio  # pragma: no cover
 2import json  # pragma: no cover
 3from json import JSONDecodeError  # pragma: no cover
 4
 5from bedrock.log import log_config  # pragma: no cover
 6from bedrock.config import get_config_params  # pragma: no cover
 7from bedrock.config._modules import _get_workers  # pragma: no cover
 8from bedrock.external.secrets_handler import find_secret  # pragma: no cover
 9from bedrock.db.connection import create_url, get_connection_details  # pragma: no cover
10from bedrock.external.aws import make_client   # pragma: no cover
11
12log = log_config("Invoker")  # pragma: no cover
13
14
15def _get_worker_module(module_name):  # pragma: no cover
16    return _get_workers("./workers", "workers.")[module_name]
17
18
19def _local_invoke(name: str, payload: dict, require_response: bool):  # pragma: no cover
20    if not require_response:
21        _local_invoke_helper(name, payload)
22    return asyncio.run(_local_invoke_helper(name, payload))
23
24
25async def _local_invoke_helper(name: str, payload: dict):  # pragma: no cover
26    worker_module = _get_worker_module(name)
27    return worker_module.handle(payload, None)
28
29
30def invoke(name: str, payload: dict, require_response: bool = False, config_override={}):  # pragma: no cover
31    """
32    Invoke a lambda function given an arn and payload.
33    :param name: Name of the worker (the name of the worker's directory under `app/workers/`)
34    :param payload: The payload to send to the lambda function
35    :param require_response: Whether to wait for a response from the lambda function. If set to `False`, the function will fire and forget
36    :param config_override: Override config values before injecting into payload
37    :return: The response from the lambda function if `require_response` is `True` (`None` otherwise)
38    """
39    invocation_type = "RequestResponse" if require_response else "Event"
40    try:
41        log.debug(f"Invoking {name} with {invocation_type}")
42        config = get_config_params()
43        log.debug("Injecting config into payload...")
44        payload["config"] = {**config}
45        if "database" in payload["config"]:
46            secret = find_secret("database")
47            payload["config"]["database"] = {
48                **payload["config"]["database"],
49                "url": create_url(*get_connection_details(secret))
50            }
51        payload["config"] = {**payload["config"], **config_override}
52
53        if config['environment'] == 'local':
54            result = _local_invoke(name, payload, require_response)
55        else:
56            arn = config["workers"][name]
57            payload_as_string = json.dumps(payload)
58            lambda_client = make_client("lambda")
59            lambda_result = lambda_client.invoke(FunctionName=arn,
60                                                 InvocationType=invocation_type,
61                                                 Payload=payload_as_string)
62            if lambda_result['StatusCode'] < 200 or lambda_result['StatusCode'] > 299:
63                raise Exception(f"Failed to invoke function {name} with status code {lambda_result['StatusCode']}")
64            lambda_payload = lambda_result['Payload']
65            try:
66                result = json.loads(lambda_payload.read().decode("utf-8"))
67            except JSONDecodeError:
68                result = lambda_payload.read().decode("utf-8")
69        if require_response:
70            return result
71    except Exception:
72        log.error(f"Failed to invoke function {name}", exc_info=True)
73    return None
log = <MyLogger BEDROCK-Invoker (INFO)>
def invoke( name: str, payload: dict, require_response: bool = False, config_override={}):
31def invoke(name: str, payload: dict, require_response: bool = False, config_override={}):  # pragma: no cover
32    """
33    Invoke a lambda function given an arn and payload.
34    :param name: Name of the worker (the name of the worker's directory under `app/workers/`)
35    :param payload: The payload to send to the lambda function
36    :param require_response: Whether to wait for a response from the lambda function. If set to `False`, the function will fire and forget
37    :param config_override: Override config values before injecting into payload
38    :return: The response from the lambda function if `require_response` is `True` (`None` otherwise)
39    """
40    invocation_type = "RequestResponse" if require_response else "Event"
41    try:
42        log.debug(f"Invoking {name} with {invocation_type}")
43        config = get_config_params()
44        log.debug("Injecting config into payload...")
45        payload["config"] = {**config}
46        if "database" in payload["config"]:
47            secret = find_secret("database")
48            payload["config"]["database"] = {
49                **payload["config"]["database"],
50                "url": create_url(*get_connection_details(secret))
51            }
52        payload["config"] = {**payload["config"], **config_override}
53
54        if config['environment'] == 'local':
55            result = _local_invoke(name, payload, require_response)
56        else:
57            arn = config["workers"][name]
58            payload_as_string = json.dumps(payload)
59            lambda_client = make_client("lambda")
60            lambda_result = lambda_client.invoke(FunctionName=arn,
61                                                 InvocationType=invocation_type,
62                                                 Payload=payload_as_string)
63            if lambda_result['StatusCode'] < 200 or lambda_result['StatusCode'] > 299:
64                raise Exception(f"Failed to invoke function {name} with status code {lambda_result['StatusCode']}")
65            lambda_payload = lambda_result['Payload']
66            try:
67                result = json.loads(lambda_payload.read().decode("utf-8"))
68            except JSONDecodeError:
69                result = lambda_payload.read().decode("utf-8")
70        if require_response:
71            return result
72    except Exception:
73        log.error(f"Failed to invoke function {name}", exc_info=True)
74    return None

Invoke a lambda function given an arn and payload.

Parameters
  • name: Name of the worker (the name of the worker's directory under app/workers/)
  • payload: The payload to send to the lambda function
  • require_response: Whether to wait for a response from the lambda function. If set to False, the function will fire and forget
  • config_override: Override config values before injecting into payload
Returns

The response from the lambda function if require_response is True (None otherwise)