bedrock.db.query_helper
1import json 2import re 3from datetime import datetime 4from dateutil.tz import tz 5from typing import Type 6from sqlalchemy import Integer, DateTime, ColumnOperators, func 7from sqlalchemy.dialects.postgresql import JSONB 8from sqlalchemy.orm import PropComparator 9from uuid import UUID as PyUUID 10from sqlalchemy.dialects.postgresql import UUID as PGUUID 11from bedrock._helpers.classes import find_all_attributes 12from bedrock._helpers.string import camelCase_to_snake_case 13from bedrock.exceptions.bad_request_exception import BadRequestException 14from sqlalchemy.sql.functions import _FunctionGenerator 15from sqlalchemy.sql import or_, and_ 16from bedrock.log import log_config 17 18log = log_config("connection") 19 20 21def unsupported_operation(operation_type): 22 raise BadRequestException(f"Operation '{operation_type}' not supported for JSON fields") 23 24 25# Dictionary mapping operations supported for filtering functionality to their SQLAlchemy equivalents. 26OPERATIONS = { 27 'eq': ColumnOperators.__eq__, 'ne': ColumnOperators.__ne__, 28 'ge': ColumnOperators.__ge__, 'le': ColumnOperators.__le__, 29 'gt': ColumnOperators.__gt__, 'lt': ColumnOperators.__lt__, 30 'in': ColumnOperators.in_, 'ni': ColumnOperators.not_in, 31 'like': ColumnOperators.ilike, 'nlike': ColumnOperators.not_ilike, 32 'is': ColumnOperators.is_, 'isnt': ColumnOperators.isnot, 33 'any': PropComparator.any 34} 35OPERATIONS_RELATED_NONE_MATCHER = { 36 ColumnOperators.__eq__: ColumnOperators.is_, 37 ColumnOperators.in_: ColumnOperators.is_, 38 ColumnOperators.ilike: ColumnOperators.is_, 39 ColumnOperators.is_: ColumnOperators.is_, 40 41 ColumnOperators.__ne__: ColumnOperators.isnot, 42 ColumnOperators.not_in: ColumnOperators.isnot, 43 ColumnOperators.not_ilike: ColumnOperators.isnot, 44 ColumnOperators.isnot: ColumnOperators.isnot 45} 46 47OPERATIONS_RELATED_NONE_MATCHER_BOOLEAN_OPERATION = { 48 ColumnOperators.__eq__: or_, 49 ColumnOperators.in_: or_, 50 ColumnOperators.ilike: or_, 51 ColumnOperators.is_: or_, 52 53 ColumnOperators.__ne__: and_, 54 ColumnOperators.not_in: and_, 55 ColumnOperators.not_ilike: and_, 56 ColumnOperators.isnot: and_ 57} 58 59JSON_OPERATIONS = {'eq': lambda json_path, value: (f"{json_path} ? (@ == $VALUE)", {"VALUE": value}), 60 'le': lambda json_path, value: (f"{json_path} ? (@ <= $VALUE)", {"VALUE": value}), 61 'ge': lambda json_path, value: (f"{json_path} ? (@ >= $VALUE)", {"VALUE": value}), 62 'gt': lambda json_path, value: (f"{json_path} ? (@ < $VALUE)", {"VALUE": value}), 63 'lt': lambda json_path, value: (f"{json_path} ? (@ > $VALUE)", {"VALUE": value}), 64 'ne': lambda json_path, value: (f"{json_path} ? (@ != $VALUE)", {"VALUE": value}), 65 'in': lambda json_path, value: (f"{json_path} ? (@ == $VALUE)", {"VALUE": value}), 66 'ni': lambda json_path, value: (f"{json_path} ? (@ != $VALUE)", {"VALUE": value}), 67 'like': lambda json_path, value: (f'{json_path} ? (@ like_regex "{value}")', None), 68 'any': lambda json_path, value: unsupported_operation("any"), 69 'empty': lambda json_path, value: unsupported_operation("empty"), 70 'is': lambda json_path, value: (f"{json_path} ? (@ == $VALUE)", {"VALUE": None}), 71 'isnt': lambda json_path, value: (f"{json_path} ? (@ != $VALUE)", {"VALUE": None})} 72 73 74def build_filter_query(filter_key, filter_value, cls): # pragma: unit 75 """ 76 This method converts a filter query (provided as 'filter_key' and 'filter_value' parameters) for an entity ('cls') 77 into a form that can be understood by SQLAlchemy. 78 79 Notes: 80 * The 'filter_key' should specify the column and operation to filter on using the format '<column_name>[operation]'. 81 * The value to filter on can be a list of values, or a single value. If this is a list, the operation in 82 'filter_key' should be either 'in' (IN) or 'ni' (NOT IN), other operators will be converted into an 'in' operation. 83 * The result of this method is a tuple with the following three elements: 84 1) the SQLAlchemy `Column` object corresponding to the string column provided 85 2) the SQLAlchemy 'ColumnOperators' operator corresponding to the string column provided 86 3) the value(s) converted to the type of the Column object in 1. 87 88 :param filter_key: The filter column and operation provided in the format '<column_name>[operation]' (e.g. 'label[eq]'). 89 :param filter_value: The values in the column to check for. 90 :param cls: The class representing the entity being filtered. 91 :return: A tuple representing the SQLAlchemy operation to perform e.g (AClass.column, ColumnOperators.__ne__, "Abc") 92 """ 93 # TODO: This code is becoming super complex and hard to read. Refactor it to make it more readable. 94 column_name, column_operator, json_path, match = _split_column_and_operator(filter_key) 95 column = getattr(cls, column_name) 96 is_deep_json_null_comparison = hasattr(column, 'type') and isinstance(column.type, 97 JSONB) and match and json_path is not None 98 if column_operator in ['is', 'isnt'] and not is_deep_json_null_comparison: # Null values 99 if filter_value is not None and f"{filter_value}".lower() not in ["none", "null"]: 100 raise BadRequestException( 101 "Invalid value provided for 'is' or 'isnt' operation (only 'none' or 'null' allowed)") 102 operation = OPERATIONS.get(column_operator) 103 return column, operation, None 104 elif hasattr(column, 'type') and isinstance(column.type, JSONB): 105 _json_path = '.'.join(f'"{segment}"' for segment in json_path.split('.')) if json_path else None 106 json_operation, json_value = JSON_OPERATIONS[column_operator](f'$.{_json_path}', filter_value) 107 return column, func.jsonb_path_exists, json_operation, json.dumps(json_value) if json_value else '{}' 108 elif column_operator == 'empty': # Empty relationships 109 return column, OPERATIONS.get('any'), filter_value == 'false' 110 elif type(filter_value) == list: # Backwards compatibility 111 if column_operator not in ['in', 'ni']: 112 column_operator = 'in' 113 updated_filter_value = [convert_value_to_column_type(v, column.type) for v in filter_value if v is not None] 114 extra_or_operation = None 115 if None in filter_value: 116 extra_or_operation = OPERATIONS.get("is") if column_operator == 'in' else OPERATIONS.get("isnt") 117 return column, OPERATIONS.get(column_operator), updated_filter_value, extra_or_operation 118 else: # Everything else 119 operation = OPERATIONS.get(column_operator) 120 # replace '*' from like query with '%' to be compatible with SQL 121 updated_filter_value = None 122 if filter_value is not None: 123 _filter_value = filter_value.replace('*', '%') if operation == ColumnOperators.ilike else filter_value 124 updated_filter_value = convert_value_to_column_type(_filter_value, column.type) 125 return column, operation, updated_filter_value 126 127 128def convert_value_to_column_type(value, column_type): # pragma: unit 129 """ 130 Casts the given 'value' to the SQLAlchemy 'column_type' specified. 131 """ 132 if value is None: 133 return None 134 135 # UUIDs 136 if isinstance(column_type, PGUUID): 137 if isinstance(value, PyUUID): 138 return value 139 try: 140 return PyUUID(str(value)) 141 except (ValueError, TypeError): 142 return value 143 144 # Integers 145 if isinstance(column_type, Integer): 146 return int(f"{value}") 147 148 # DateTime 149 if isinstance(column_type, DateTime): 150 try: 151 _value = value.replace('Z', '+00:00') if isinstance(value, str) and 'Z' in value else value 152 return datetime.fromisoformat(_value) 153 except (ValueError, TypeError, AttributeError): 154 try: 155 return datetime.fromtimestamp(int(value) / 1000.0, tz=tz.gettz('UTC')) 156 except (TypeError, ValueError): 157 return value 158 159 return value 160 161 162def create_filters_list_from_dict(query_dict, cls): # pragma: unit 163 """ 164 Converts a dictionary of string query filters into a list of tuples to be used as part of a SQLAlchemy query. 165 For example: 166 A query dictionary `{'col1[eq]': '10', 'col_snake_case[eq]': '2023'}` -> 167 [(AClass.col1, ColumnOperators.__eq__, '10'),(AClass.col_snake_case, ColumnOperators.__eq__,2023)] 168 """ 169 return [build_filter_query(k, query_dict[k], cls) for k in query_dict.keys()] 170 171 172def add_default_equality_operator(query_key): # pragma: unit 173 """ 174 Add 'eq' as the default operating string for criteria if not present. 175 """ 176 return query_key if '[' in query_key else f"{query_key}[eq]" 177 178 179def get_filters_relevant_to_class(cls: Type, filters: dict) -> dict: # pragma: unit 180 """ 181 Filters a dictionary of query filters and removes queries on attributes not present in the class, 182 keeping all filters related to attributes with a JSON object type. 183 """ 184 try: 185 # Attempt to retrieve column definitions 186 column_definitions = cls.get_column_definitions() 187 except AttributeError as e: 188 log.warning(f"Unexpected error: {e}") 189 # If the method doesn't exist, return an empty dictionary 190 return {} 191 192 if not column_definitions: 193 return {} 194 195 attributes = find_all_attributes(cls) 196 197 relevant_filters = {} 198 for key in filters.keys(): 199 # split key of the form 'col[eq]' to get the column name 200 col = key.split('[')[0] 201 _col = camelCase_to_snake_case(col) 202 203 # Check if the column name or its snake_case equivalent is a JSON attribute 204 _column_name = _col.split(".")[0] 205 if _column_name in column_definitions and isinstance(column_definitions[_column_name].type, JSONB): 206 relevant_filters[key] = filters[key] 207 # If the column is not JSON, check for exact matches 208 elif col in column_definitions or col in attributes: 209 relevant_filters[key] = filters[key] 210 elif _col in column_definitions or _col in attributes: 211 new_key = key.replace(col, _col) 212 relevant_filters[new_key] = filters[key] 213 214 return relevant_filters 215 216 217def criteria_to_sql_alchemy_operation(item): # pragma: integration 218 if hasattr(item[0], 'type') and \ 219 isinstance(item[0].type, JSONB) and \ 220 type(item[1]) == _FunctionGenerator and \ 221 'jsonb_path_exists' in item[1]._FunctionGenerator__names: 222 return item[1](item[0], item[2], item[3]) 223 elif item[0].comparator \ 224 and hasattr(item[0].comparator, 'entity') \ 225 and item[0].comparator.entity and item[1] == PropComparator.any: 226 if item[2]: 227 return item[1](item[0]) 228 else: 229 return ~item[1](item[0]) 230 elif isinstance(item[2], list) and item[3]: 231 boolean_operator = or_ if item[3] == OPERATIONS.get("is") else and_ 232 return boolean_operator(item[1](item[0], item[2]), item[3](item[0], None)) 233 else: 234 return item[1](item[0], item[2]) 235 236 237def _split_column_and_operator(filter_key): 238 match = re.search(r'(.*)\[(.*)\]', filter_key) 239 if not match: 240 raise BadRequestException("Filter provided cannot be recognised") 241 else: 242 column_name = match.groups()[0] if "." not in match.groups()[0] else match.groups()[0].split(".")[0] 243 json_path = None if "." not in match.groups()[0] else ".".join(match.groups()[0].split(".")[1:]) 244 column_operator = match.groups()[1] 245 return column_name, column_operator, json_path, match
75def build_filter_query(filter_key, filter_value, cls): # pragma: unit 76 """ 77 This method converts a filter query (provided as 'filter_key' and 'filter_value' parameters) for an entity ('cls') 78 into a form that can be understood by SQLAlchemy. 79 80 Notes: 81 * The 'filter_key' should specify the column and operation to filter on using the format '<column_name>[operation]'. 82 * The value to filter on can be a list of values, or a single value. If this is a list, the operation in 83 'filter_key' should be either 'in' (IN) or 'ni' (NOT IN), other operators will be converted into an 'in' operation. 84 * The result of this method is a tuple with the following three elements: 85 1) the SQLAlchemy `Column` object corresponding to the string column provided 86 2) the SQLAlchemy 'ColumnOperators' operator corresponding to the string column provided 87 3) the value(s) converted to the type of the Column object in 1. 88 89 :param filter_key: The filter column and operation provided in the format '<column_name>[operation]' (e.g. 'label[eq]'). 90 :param filter_value: The values in the column to check for. 91 :param cls: The class representing the entity being filtered. 92 :return: A tuple representing the SQLAlchemy operation to perform e.g (AClass.column, ColumnOperators.__ne__, "Abc") 93 """ 94 # TODO: This code is becoming super complex and hard to read. Refactor it to make it more readable. 95 column_name, column_operator, json_path, match = _split_column_and_operator(filter_key) 96 column = getattr(cls, column_name) 97 is_deep_json_null_comparison = hasattr(column, 'type') and isinstance(column.type, 98 JSONB) and match and json_path is not None 99 if column_operator in ['is', 'isnt'] and not is_deep_json_null_comparison: # Null values 100 if filter_value is not None and f"{filter_value}".lower() not in ["none", "null"]: 101 raise BadRequestException( 102 "Invalid value provided for 'is' or 'isnt' operation (only 'none' or 'null' allowed)") 103 operation = OPERATIONS.get(column_operator) 104 return column, operation, None 105 elif hasattr(column, 'type') and isinstance(column.type, JSONB): 106 _json_path = '.'.join(f'"{segment}"' for segment in json_path.split('.')) if json_path else None 107 json_operation, json_value = JSON_OPERATIONS[column_operator](f'$.{_json_path}', filter_value) 108 return column, func.jsonb_path_exists, json_operation, json.dumps(json_value) if json_value else '{}' 109 elif column_operator == 'empty': # Empty relationships 110 return column, OPERATIONS.get('any'), filter_value == 'false' 111 elif type(filter_value) == list: # Backwards compatibility 112 if column_operator not in ['in', 'ni']: 113 column_operator = 'in' 114 updated_filter_value = [convert_value_to_column_type(v, column.type) for v in filter_value if v is not None] 115 extra_or_operation = None 116 if None in filter_value: 117 extra_or_operation = OPERATIONS.get("is") if column_operator == 'in' else OPERATIONS.get("isnt") 118 return column, OPERATIONS.get(column_operator), updated_filter_value, extra_or_operation 119 else: # Everything else 120 operation = OPERATIONS.get(column_operator) 121 # replace '*' from like query with '%' to be compatible with SQL 122 updated_filter_value = None 123 if filter_value is not None: 124 _filter_value = filter_value.replace('*', '%') if operation == ColumnOperators.ilike else filter_value 125 updated_filter_value = convert_value_to_column_type(_filter_value, column.type) 126 return column, operation, updated_filter_value
This method converts a filter query (provided as 'filter_key' and 'filter_value' parameters) for an entity ('cls') into a form that can be understood by SQLAlchemy.
Notes:
- The 'filter_key' should specify the column and operation to filter on using the format '
[operation]'. - The value to filter on can be a list of values, or a single value. If this is a list, the operation in 'filter_key' should be either 'in' (IN) or 'ni' (NOT IN), other operators will be converted into an 'in' operation.
- The result of this method is a tuple with the following three elements:
1) the SQLAlchemy
Columnobject corresponding to the string column provided 2) the SQLAlchemy 'ColumnOperators' operator corresponding to the string column provided 3) the value(s) converted to the type of the Column object in 1.
Parameters
- filter_key: The filter column and operation provided in the format '
[operation]' (e.g. 'label[eq]'). - filter_value: The values in the column to check for.
- cls: The class representing the entity being filtered.
Returns
A tuple representing the SQLAlchemy operation to perform e.g (AClass.column, ColumnOperators.__ne__, "Abc")
129def convert_value_to_column_type(value, column_type): # pragma: unit 130 """ 131 Casts the given 'value' to the SQLAlchemy 'column_type' specified. 132 """ 133 if value is None: 134 return None 135 136 # UUIDs 137 if isinstance(column_type, PGUUID): 138 if isinstance(value, PyUUID): 139 return value 140 try: 141 return PyUUID(str(value)) 142 except (ValueError, TypeError): 143 return value 144 145 # Integers 146 if isinstance(column_type, Integer): 147 return int(f"{value}") 148 149 # DateTime 150 if isinstance(column_type, DateTime): 151 try: 152 _value = value.replace('Z', '+00:00') if isinstance(value, str) and 'Z' in value else value 153 return datetime.fromisoformat(_value) 154 except (ValueError, TypeError, AttributeError): 155 try: 156 return datetime.fromtimestamp(int(value) / 1000.0, tz=tz.gettz('UTC')) 157 except (TypeError, ValueError): 158 return value 159 160 return value
Casts the given 'value' to the SQLAlchemy 'column_type' specified.
163def create_filters_list_from_dict(query_dict, cls): # pragma: unit 164 """ 165 Converts a dictionary of string query filters into a list of tuples to be used as part of a SQLAlchemy query. 166 For example: 167 A query dictionary `{'col1[eq]': '10', 'col_snake_case[eq]': '2023'}` -> 168 [(AClass.col1, ColumnOperators.__eq__, '10'),(AClass.col_snake_case, ColumnOperators.__eq__,2023)] 169 """ 170 return [build_filter_query(k, query_dict[k], cls) for k in query_dict.keys()]
Converts a dictionary of string query filters into a list of tuples to be used as part of a SQLAlchemy query.
For example:
A query dictionary {'col1[eq]': '10', 'col_snake_case[eq]': '2023'} ->
[(AClass.col1, ColumnOperators.__eq__, '10'),(AClass.col_snake_case, ColumnOperators.__eq__,2023)]
173def add_default_equality_operator(query_key): # pragma: unit 174 """ 175 Add 'eq' as the default operating string for criteria if not present. 176 """ 177 return query_key if '[' in query_key else f"{query_key}[eq]"
Add 'eq' as the default operating string for criteria if not present.
180def get_filters_relevant_to_class(cls: Type, filters: dict) -> dict: # pragma: unit 181 """ 182 Filters a dictionary of query filters and removes queries on attributes not present in the class, 183 keeping all filters related to attributes with a JSON object type. 184 """ 185 try: 186 # Attempt to retrieve column definitions 187 column_definitions = cls.get_column_definitions() 188 except AttributeError as e: 189 log.warning(f"Unexpected error: {e}") 190 # If the method doesn't exist, return an empty dictionary 191 return {} 192 193 if not column_definitions: 194 return {} 195 196 attributes = find_all_attributes(cls) 197 198 relevant_filters = {} 199 for key in filters.keys(): 200 # split key of the form 'col[eq]' to get the column name 201 col = key.split('[')[0] 202 _col = camelCase_to_snake_case(col) 203 204 # Check if the column name or its snake_case equivalent is a JSON attribute 205 _column_name = _col.split(".")[0] 206 if _column_name in column_definitions and isinstance(column_definitions[_column_name].type, JSONB): 207 relevant_filters[key] = filters[key] 208 # If the column is not JSON, check for exact matches 209 elif col in column_definitions or col in attributes: 210 relevant_filters[key] = filters[key] 211 elif _col in column_definitions or _col in attributes: 212 new_key = key.replace(col, _col) 213 relevant_filters[new_key] = filters[key] 214 215 return relevant_filters
Filters a dictionary of query filters and removes queries on attributes not present in the class, keeping all filters related to attributes with a JSON object type.
218def criteria_to_sql_alchemy_operation(item): # pragma: integration 219 if hasattr(item[0], 'type') and \ 220 isinstance(item[0].type, JSONB) and \ 221 type(item[1]) == _FunctionGenerator and \ 222 'jsonb_path_exists' in item[1]._FunctionGenerator__names: 223 return item[1](item[0], item[2], item[3]) 224 elif item[0].comparator \ 225 and hasattr(item[0].comparator, 'entity') \ 226 and item[0].comparator.entity and item[1] == PropComparator.any: 227 if item[2]: 228 return item[1](item[0]) 229 else: 230 return ~item[1](item[0]) 231 elif isinstance(item[2], list) and item[3]: 232 boolean_operator = or_ if item[3] == OPERATIONS.get("is") else and_ 233 return boolean_operator(item[1](item[0], item[2]), item[3](item[0], None)) 234 else: 235 return item[1](item[0], item[2])