bedrock.generators.code_generators.endpoint.endpoint_header
1from bedrock._helpers.string import kebab_case_to_camelCase 2from bedrock.generators.code_generators.common.imports import imports_list_to_string, make_model_import, \ 3 make_parent_model_import 4 5 6def make_endpoint_header(kebab_name: str, kafka_topics: list[str], model: str = None, prefixes: list[str] = []) -> str: 7 imports = _generate_imports(model, len(kafka_topics) > 0, prefixes) 8 name = kebab_case_to_camelCase(kebab_name, True) 9 return f"""{imports_list_to_string(imports)} 10 11log = log_config('{name}Endpoint') 12""" 13 14 15def _generate_imports(model: str, is_kafka: bool, prefixes: list[str]) -> list[str]: 16 imports = [ 17 "json", 18 "bedrock.log.log_config", 19 "bedrock.exceptions.NotFoundException", 20 "bedrock.endpoints.decorators.protected", 21 "bedrock.endpoints.decorators.with_query_param", 22 "bedrock.endpoints.decorators.filter_on_columns", 23 "bedrock.endpoints.decorators.paginated", 24 "bedrock.endpoints.dto.bedrock_response.BedrockResponse", 25 "bedrock.config.keys.get_keys", 26 ] 27 if is_kafka: 28 imports.append("bedrock.endpoints.endpoint_with_kafka_sync.EndpointWithKafkaSync") 29 imports.append("bedrock.endpoints.decorators.kafka_listener") 30 else: 31 imports.append("bedrock.endpoints.endpoint.Endpoint") 32 imports.append(make_model_import(model)) 33 imports.append(make_parent_model_import(prefixes)) 34 return imports
def
make_endpoint_header( kebab_name: str, kafka_topics: list[str], model: str = None, prefixes: list[str] = []) -> str:
7def make_endpoint_header(kebab_name: str, kafka_topics: list[str], model: str = None, prefixes: list[str] = []) -> str: 8 imports = _generate_imports(model, len(kafka_topics) > 0, prefixes) 9 name = kebab_case_to_camelCase(kebab_name, True) 10 return f"""{imports_list_to_string(imports)} 11 12log = log_config('{name}Endpoint') 13"""