bedrock.generators.generate-code
1import sys 2 3from bedrock._helpers.string import camelCase_to_snake_case, kebab_case_to_snake_case 4from bedrock.generators.code_generators.common.helpers import get_model_class_name_from_endpoint, get_endpoint_name, \ 5 get_endpoint_prefixes 6from bedrock.generators.code_generators.model.helpers import resolve_model_attributes 7from bedrock.generators.code_generators.model.model_class import make_model_class 8from bedrock.generators.code_generators.model.model_header import make_model_header 9from bedrock.generators.code_generators.common.files import save_to 10from bedrock.generators.code_generators.endpoint.endpoint_class import make_endpoint_class 11from bedrock.generators.code_generators.endpoint.endpoint_header import make_endpoint_header 12 13 14def make_model(name: str, model_attributes: list[str], project_name: str = "", path_hint=None, table_name: str = None, 15 app_path="./app") -> tuple[str, dict]: 16 attributes = resolve_model_attributes(model_attributes) 17 header = make_model_header(attributes, path_hint, app_path) 18 class_, reverse_back_populate = make_model_class(name, attributes, project_name, path_hint, table_name, app_path) 19 return f"{header}\n\n{class_}\n", reverse_back_populate 20 21 22def validate(kebab_name, prefixes): 23 if "{" in kebab_name or "}" in kebab_name: 24 raise ValueError("Endpoint URLs should never finish with path parameter." 25 "For example: /thing/ not /thing/{thingUuid}") 26 url_segments = [kebab_name, *[prefix for prefix in prefixes if "{" not in prefix]] 27 for segment in url_segments: 28 if any(character.isupper() or character == "_" for character in segment): 29 raise ValueError("Endpoint URLs should be kebab-case, not camelCase or snake_case.") 30 31 32def make_endpoint(path: str, kafka_topics: list[str], model_name: str, protected: str, app_directory: str) -> str: 33 kebab_name = get_endpoint_name(path) 34 prefixes = get_endpoint_prefixes(path) 35 validate(kebab_name, prefixes) 36 37 header = make_endpoint_header(kebab_name, kafka_topics, model_name, prefixes) 38 class_ = make_endpoint_class(kebab_name, kafka_topics, model_name, protected, prefixes, app_directory) 39 return f"{header}\n\n{class_}\n" 40 41 42def do_endpoint(app_directory: str, endpoint_path: str, model_name: str): 43 try: 44 kafka_topics = sys.argv[5].split(",") if sys.argv[5] != "" else [] 45 except IndexError: 46 kafka_topics = [] 47 try: 48 protection = sys.argv[6] if sys.argv[6] != "" else "accounts" 49 except IndexError: 50 protection = "accounts" 51 52 endpoint_file_name = kebab_case_to_snake_case(get_endpoint_name(endpoint_path)) 53 endpoint_content = make_endpoint(endpoint_path, kafka_topics, model_name, protection, app_directory) 54 save_to(f"{app_directory}/endpoints", f"{endpoint_file_name}.py", endpoint_content) 55 56 57def do_model(app_directory: str, model_name: str): 58 try: 59 model_attributes = sys.argv[5].split(";") if sys.argv[5] != "" else [] 60 except IndexError: 61 model_attributes = [] 62 try: 63 model_path_hint = sys.argv[6] if sys.argv[6] != "" else None 64 except IndexError: 65 model_path_hint = None 66 try: 67 model_table_name = sys.argv[7] if sys.argv[7] != "" else None 68 except IndexError: 69 model_table_name = None 70 try: 71 project_name = sys.argv[8].replace("-", "_") if sys.argv[8] != "" else None 72 except IndexError: 73 project_name = None 74 75 model_file_name = camelCase_to_snake_case(model_name) 76 model_content, reverse_back_populate = make_model(model_name, model_attributes, project_name, model_path_hint, 77 model_table_name, app_directory) 78 save_to(f"{app_directory}/model", f"{model_file_name}.py", model_content) 79 if reverse_back_populate: 80 add_relationship_to_model(app_directory, reverse_back_populate["model"], reverse_back_populate["line"]) 81 82 83def add_relationship_to_model(app_directory: str, model_name: str, reverse_back_populate: str): 84 if not reverse_back_populate: 85 return 86 model_file_name = camelCase_to_snake_case(model_name) 87 with open(f"{app_directory}/model/{model_file_name}.py", "r") as model_file: 88 lines = model_file.readlines() 89 90 with open(f"{app_directory}/model/{model_file_name}.py", "w") as model_file: 91 new_lines = [] 92 started_lines_with_column = False 93 have_added_reverse_back_populate = False 94 for i, line in enumerate(lines): 95 new_lines.append(line) 96 is_code_line = line.strip() != "" or not line.strip().startswith("#") 97 98 if not have_added_reverse_back_populate: 99 if started_lines_with_column and "Column" not in line \ 100 or (i == len(lines) - 1 and not have_added_reverse_back_populate): 101 indent = len(line) - len(line.lstrip(' ')) 102 new_lines.append(f"{' ' * indent}{reverse_back_populate}\n") 103 have_added_reverse_back_populate = True 104 elif "Column" in line and not is_code_line: 105 started_lines_with_column = True 106 model_file.write(''.join(new_lines)) 107 108 109if __name__ == "__main__": 110 GENERATE_CODE_TYPE = sys.argv[1] 111 APP_DIRECTORY = sys.argv[2] 112 ENDPOINT_PATH = sys.argv[3] 113 try: 114 _MODEL_NAME = sys.argv[4] if sys.argv[4] != "" else None 115 except IndexError: 116 _MODEL_NAME = None 117 MODEL_NAME = get_model_class_name_from_endpoint(ENDPOINT_PATH, _MODEL_NAME) 118 119 if GENERATE_CODE_TYPE == "model": 120 do_model(APP_DIRECTORY, MODEL_NAME) 121 elif GENERATE_CODE_TYPE == "endpoint": 122 do_endpoint(APP_DIRECTORY, ENDPOINT_PATH, MODEL_NAME) 123 else: 124 raise ValueError("Invalid type")
def
make_model( name: str, model_attributes: list[str], project_name: str = '', path_hint=None, table_name: str = None, app_path='./app') -> tuple[str, dict]:
15def make_model(name: str, model_attributes: list[str], project_name: str = "", path_hint=None, table_name: str = None, 16 app_path="./app") -> tuple[str, dict]: 17 attributes = resolve_model_attributes(model_attributes) 18 header = make_model_header(attributes, path_hint, app_path) 19 class_, reverse_back_populate = make_model_class(name, attributes, project_name, path_hint, table_name, app_path) 20 return f"{header}\n\n{class_}\n", reverse_back_populate
def
validate(kebab_name, prefixes):
23def validate(kebab_name, prefixes): 24 if "{" in kebab_name or "}" in kebab_name: 25 raise ValueError("Endpoint URLs should never finish with path parameter." 26 "For example: /thing/ not /thing/{thingUuid}") 27 url_segments = [kebab_name, *[prefix for prefix in prefixes if "{" not in prefix]] 28 for segment in url_segments: 29 if any(character.isupper() or character == "_" for character in segment): 30 raise ValueError("Endpoint URLs should be kebab-case, not camelCase or snake_case.")
def
make_endpoint( path: str, kafka_topics: list[str], model_name: str, protected: str, app_directory: str) -> str:
33def make_endpoint(path: str, kafka_topics: list[str], model_name: str, protected: str, app_directory: str) -> str: 34 kebab_name = get_endpoint_name(path) 35 prefixes = get_endpoint_prefixes(path) 36 validate(kebab_name, prefixes) 37 38 header = make_endpoint_header(kebab_name, kafka_topics, model_name, prefixes) 39 class_ = make_endpoint_class(kebab_name, kafka_topics, model_name, protected, prefixes, app_directory) 40 return f"{header}\n\n{class_}\n"
def
do_endpoint(app_directory: str, endpoint_path: str, model_name: str):
43def do_endpoint(app_directory: str, endpoint_path: str, model_name: str): 44 try: 45 kafka_topics = sys.argv[5].split(",") if sys.argv[5] != "" else [] 46 except IndexError: 47 kafka_topics = [] 48 try: 49 protection = sys.argv[6] if sys.argv[6] != "" else "accounts" 50 except IndexError: 51 protection = "accounts" 52 53 endpoint_file_name = kebab_case_to_snake_case(get_endpoint_name(endpoint_path)) 54 endpoint_content = make_endpoint(endpoint_path, kafka_topics, model_name, protection, app_directory) 55 save_to(f"{app_directory}/endpoints", f"{endpoint_file_name}.py", endpoint_content)
def
do_model(app_directory: str, model_name: str):
58def do_model(app_directory: str, model_name: str): 59 try: 60 model_attributes = sys.argv[5].split(";") if sys.argv[5] != "" else [] 61 except IndexError: 62 model_attributes = [] 63 try: 64 model_path_hint = sys.argv[6] if sys.argv[6] != "" else None 65 except IndexError: 66 model_path_hint = None 67 try: 68 model_table_name = sys.argv[7] if sys.argv[7] != "" else None 69 except IndexError: 70 model_table_name = None 71 try: 72 project_name = sys.argv[8].replace("-", "_") if sys.argv[8] != "" else None 73 except IndexError: 74 project_name = None 75 76 model_file_name = camelCase_to_snake_case(model_name) 77 model_content, reverse_back_populate = make_model(model_name, model_attributes, project_name, model_path_hint, 78 model_table_name, app_directory) 79 save_to(f"{app_directory}/model", f"{model_file_name}.py", model_content) 80 if reverse_back_populate: 81 add_relationship_to_model(app_directory, reverse_back_populate["model"], reverse_back_populate["line"])
def
add_relationship_to_model(app_directory: str, model_name: str, reverse_back_populate: str):
84def add_relationship_to_model(app_directory: str, model_name: str, reverse_back_populate: str): 85 if not reverse_back_populate: 86 return 87 model_file_name = camelCase_to_snake_case(model_name) 88 with open(f"{app_directory}/model/{model_file_name}.py", "r") as model_file: 89 lines = model_file.readlines() 90 91 with open(f"{app_directory}/model/{model_file_name}.py", "w") as model_file: 92 new_lines = [] 93 started_lines_with_column = False 94 have_added_reverse_back_populate = False 95 for i, line in enumerate(lines): 96 new_lines.append(line) 97 is_code_line = line.strip() != "" or not line.strip().startswith("#") 98 99 if not have_added_reverse_back_populate: 100 if started_lines_with_column and "Column" not in line \ 101 or (i == len(lines) - 1 and not have_added_reverse_back_populate): 102 indent = len(line) - len(line.lstrip(' ')) 103 new_lines.append(f"{' ' * indent}{reverse_back_populate}\n") 104 have_added_reverse_back_populate = True 105 elif "Column" in line and not is_code_line: 106 started_lines_with_column = True 107 model_file.write(''.join(new_lines))