bedrock.generators.code_generators.endpoint.helpers
1from bedrock._helpers.string import camelCase_to_snake_case, upper_case_first 2 3 4def _resolve_endpoint_method_vars(extras: list) -> list[str]: 5 if extras: 6 return ["self", "event", *extras, "filters={}"] 7 else: 8 return ["self", "event", "filters={}"] 9 10 11def _resolve_path_param_vars(prefixes: list[str], indent=4) -> list[str]: 12 lines = [] 13 for prefix in prefixes: 14 if "{" in prefix: 15 prefix_path_param = prefix.replace("{", "").replace("}", "") 16 prefix_var = camelCase_to_snake_case(prefix_path_param) 17 lines.extend(f"""try: 18 {prefix_var} = event["pathParameters"]["{prefix_path_param}"] 19 default_filter["{prefix_var}[eq]"] = {prefix_var} 20except Exception as e: 21 raise NotFoundException("{prefix_path_param} is not a path parameter")""".split("\n")) 22 return [f"{' ' * indent}{line}" for line in lines] 23 24 25def ensure_last_path_param_var_is_in_body(prefixes: list[str], indent=4) -> str: 26 prefix_vars = [prefix.replace("{", "").replace("}", "") for prefix in prefixes if "{" in prefix] 27 if prefix_vars: 28 last_prefix_var = prefix_vars[-1] 29 last_prefix_var_snake_case = camelCase_to_snake_case(last_prefix_var) 30 return f'{" " * indent}body["{last_prefix_var}"] = {last_prefix_var_snake_case}' 31 return "" 32 33 34def ensure_parent_object_exists(prefixes: list[str], protected, indent=4) -> list[str]: 35 lines = [] 36 prefix_vars = [prefix.replace("{", "").replace("}", "") for prefix in prefixes if "{" in prefix] 37 if prefix_vars: 38 last_prefix_var = prefix_vars[-1] 39 last_prefix_var_snake_case = camelCase_to_snake_case(last_prefix_var) 40 last_prefix_object = upper_case_first(last_prefix_var).replace("Uuid", "") 41 lines.append(f'parent_object = {last_prefix_object}.find_first_matching_criteria({{') 42 lines.append(f' "uuid[eq]": {last_prefix_var_snake_case},') 43 lines.append(' **default_filter') 44 lines.append('})') 45 lines.append('if parent_object is None:') 46 lines.append( 47 f' log.debug(f"{last_prefix_object} {{{last_prefix_var_snake_case}}} not found for {protected}: {{protected}}")') 48 lines.append(f' raise NotFoundException(f"{last_prefix_object} {{{last_prefix_var_snake_case}}} not found")') 49 return [f"{' ' * indent}{line}" for line in lines]
def
ensure_last_path_param_var_is_in_body(prefixes: list[str], indent=4) -> str:
26def ensure_last_path_param_var_is_in_body(prefixes: list[str], indent=4) -> str: 27 prefix_vars = [prefix.replace("{", "").replace("}", "") for prefix in prefixes if "{" in prefix] 28 if prefix_vars: 29 last_prefix_var = prefix_vars[-1] 30 last_prefix_var_snake_case = camelCase_to_snake_case(last_prefix_var) 31 return f'{" " * indent}body["{last_prefix_var}"] = {last_prefix_var_snake_case}' 32 return ""
def
ensure_parent_object_exists(prefixes: list[str], protected, indent=4) -> list[str]:
35def ensure_parent_object_exists(prefixes: list[str], protected, indent=4) -> list[str]: 36 lines = [] 37 prefix_vars = [prefix.replace("{", "").replace("}", "") for prefix in prefixes if "{" in prefix] 38 if prefix_vars: 39 last_prefix_var = prefix_vars[-1] 40 last_prefix_var_snake_case = camelCase_to_snake_case(last_prefix_var) 41 last_prefix_object = upper_case_first(last_prefix_var).replace("Uuid", "") 42 lines.append(f'parent_object = {last_prefix_object}.find_first_matching_criteria({{') 43 lines.append(f' "uuid[eq]": {last_prefix_var_snake_case},') 44 lines.append(' **default_filter') 45 lines.append('})') 46 lines.append('if parent_object is None:') 47 lines.append( 48 f' log.debug(f"{last_prefix_object} {{{last_prefix_var_snake_case}}} not found for {protected}: {{protected}}")') 49 lines.append(f' raise NotFoundException(f"{last_prefix_object} {{{last_prefix_var_snake_case}}} not found")') 50 return [f"{' ' * indent}{line}" for line in lines]