bedrock.generators.code_generators.endpoint.endpoint_header

 1from bedrock._helpers.string import kebab_case_to_camelCase
 2from bedrock.generators.code_generators.common.imports import imports_list_to_string, make_model_import, \
 3    make_parent_model_import
 4
 5
 6def make_endpoint_header(kebab_name: str, kafka_topics: list[str], model: str = None, prefixes: list[str] = []) -> str:
 7    imports = _generate_imports(model, len(kafka_topics) > 0, prefixes)
 8    name = kebab_case_to_camelCase(kebab_name, True)
 9    return f"""{imports_list_to_string(imports)}
10
11log = log_config('{name}Endpoint')
12"""
13
14
15def _generate_imports(model: str, is_kafka: bool, prefixes: list[str]) -> list[str]:
16    imports = [
17        "json",
18        "bedrock.log.log_config",
19        "bedrock.exceptions.NotFoundException",
20        "bedrock.endpoints.decorators.protected",
21        "bedrock.endpoints.decorators.with_query_param",
22        "bedrock.endpoints.decorators.filter_on_columns",
23        "bedrock.endpoints.decorators.paginated",
24        "bedrock.endpoints.dto.bedrock_response.BedrockResponse",
25        "bedrock.config.keys.get_keys",
26    ]
27    if is_kafka:
28        imports.append("bedrock.endpoints.endpoint_with_kafka_sync.EndpointWithKafkaSync")
29        imports.append("bedrock.endpoints.decorators.kafka_listener")
30    else:
31        imports.append("bedrock.endpoints.endpoint.Endpoint")
32    imports.append(make_model_import(model))
33    imports.append(make_parent_model_import(prefixes))
34    return imports
def make_endpoint_header( kebab_name: str, kafka_topics: list[str], model: str = None, prefixes: list[str] = []) -> str:
 7def make_endpoint_header(kebab_name: str, kafka_topics: list[str], model: str = None, prefixes: list[str] = []) -> str:
 8    imports = _generate_imports(model, len(kafka_topics) > 0, prefixes)
 9    name = kebab_case_to_camelCase(kebab_name, True)
10    return f"""{imports_list_to_string(imports)}
11
12log = log_config('{name}Endpoint')
13"""