bedrock.db.query_helper

  1import json
  2import re
  3from datetime import datetime
  4from dateutil.tz import tz
  5from typing import Type
  6from sqlalchemy import Integer, DateTime, ColumnOperators, func
  7from sqlalchemy.dialects.postgresql import JSONB
  8from sqlalchemy.orm import PropComparator
  9from uuid import UUID as PyUUID
 10from sqlalchemy.dialects.postgresql import UUID as PGUUID
 11from bedrock._helpers.classes import find_all_attributes
 12from bedrock._helpers.string import camelCase_to_snake_case
 13from bedrock.exceptions.bad_request_exception import BadRequestException
 14from sqlalchemy.sql.functions import _FunctionGenerator
 15from sqlalchemy.sql import or_, and_
 16from bedrock.log import log_config
 17
 18log = log_config("connection")
 19
 20
 21def unsupported_operation(operation_type):
 22    raise BadRequestException(f"Operation '{operation_type}' not supported for JSON fields")
 23
 24
 25# Dictionary mapping operations supported for filtering functionality to their SQLAlchemy equivalents.
 26OPERATIONS = {
 27    'eq': ColumnOperators.__eq__, 'ne': ColumnOperators.__ne__,
 28    'ge': ColumnOperators.__ge__, 'le': ColumnOperators.__le__,
 29    'gt': ColumnOperators.__gt__, 'lt': ColumnOperators.__lt__,
 30    'in': ColumnOperators.in_, 'ni': ColumnOperators.not_in,
 31    'like': ColumnOperators.ilike, 'nlike': ColumnOperators.not_ilike,
 32    'is': ColumnOperators.is_, 'isnt': ColumnOperators.isnot,
 33    'any': PropComparator.any
 34}
 35OPERATIONS_RELATED_NONE_MATCHER = {
 36    ColumnOperators.__eq__: ColumnOperators.is_,
 37    ColumnOperators.in_: ColumnOperators.is_,
 38    ColumnOperators.ilike: ColumnOperators.is_,
 39    ColumnOperators.is_: ColumnOperators.is_,
 40
 41    ColumnOperators.__ne__: ColumnOperators.isnot,
 42    ColumnOperators.not_in: ColumnOperators.isnot,
 43    ColumnOperators.not_ilike: ColumnOperators.isnot,
 44    ColumnOperators.isnot: ColumnOperators.isnot
 45}
 46
 47OPERATIONS_RELATED_NONE_MATCHER_BOOLEAN_OPERATION = {
 48    ColumnOperators.__eq__: or_,
 49    ColumnOperators.in_: or_,
 50    ColumnOperators.ilike: or_,
 51    ColumnOperators.is_: or_,
 52
 53    ColumnOperators.__ne__: and_,
 54    ColumnOperators.not_in: and_,
 55    ColumnOperators.not_ilike: and_,
 56    ColumnOperators.isnot: and_
 57}
 58
 59JSON_OPERATIONS = {'eq': lambda json_path, value: (f"{json_path} ? (@ == $VALUE)", {"VALUE": value}),
 60                   'le': lambda json_path, value: (f"{json_path} ? (@ <= $VALUE)", {"VALUE": value}),
 61                   'ge': lambda json_path, value: (f"{json_path} ? (@ >= $VALUE)", {"VALUE": value}),
 62                   'gt': lambda json_path, value: (f"{json_path} ? (@ < $VALUE)", {"VALUE": value}),
 63                   'lt': lambda json_path, value: (f"{json_path} ? (@ > $VALUE)", {"VALUE": value}),
 64                   'ne': lambda json_path, value: (f"{json_path} ? (@ != $VALUE)", {"VALUE": value}),
 65                   'in': lambda json_path, value: (f"{json_path} ? (@ == $VALUE)", {"VALUE": value}),
 66                   'ni': lambda json_path, value: (f"{json_path} ? (@ != $VALUE)", {"VALUE": value}),
 67                   'like': lambda json_path, value: (f'{json_path} ? (@ like_regex "{value}")', None),
 68                   'any': lambda json_path, value: unsupported_operation("any"),
 69                   'empty': lambda json_path, value: unsupported_operation("empty"),
 70                   'is': lambda json_path, value: (f"{json_path} ? (@ == $VALUE)", {"VALUE": None}),
 71                   'isnt': lambda json_path, value: (f"{json_path} ? (@ != $VALUE)", {"VALUE": None})}
 72
 73
 74def build_filter_query(filter_key, filter_value, cls):  # pragma: unit
 75    """
 76    This method converts a filter query (provided as 'filter_key' and 'filter_value' parameters) for an entity ('cls')
 77    into a form that can be understood by SQLAlchemy.
 78
 79    Notes:
 80    * The 'filter_key' should specify the column and operation to filter on using the format '<column_name>[operation]'.
 81    * The value to filter on can be a list of values, or a single value. If this is a list, the operation in
 82    'filter_key' should be either 'in' (IN) or 'ni' (NOT IN), other operators will be converted into an 'in' operation.
 83    * The result of this method is a tuple with the following three elements:
 84        1) the SQLAlchemy `Column` object corresponding to the string column provided
 85        2) the SQLAlchemy 'ColumnOperators' operator corresponding to the string column provided
 86        3) the value(s) converted to the type of the Column object in 1.
 87
 88    :param filter_key: The filter column and operation provided in the format '<column_name>[operation]' (e.g. 'label[eq]').
 89    :param filter_value: The values in the column to check for.
 90    :param cls: The class representing the entity being filtered.
 91    :return: A tuple representing the SQLAlchemy operation to perform e.g (AClass.column, ColumnOperators.__ne__, "Abc")
 92    """
 93    # TODO: This code is becoming super complex and hard to read. Refactor it to make it more readable.
 94    column_name, column_operator, json_path, match = _split_column_and_operator(filter_key)
 95    column = getattr(cls, column_name)
 96    is_deep_json_null_comparison = hasattr(column, 'type') and isinstance(column.type,
 97                                                                          JSONB) and match and json_path is not None
 98    if column_operator in ['is', 'isnt'] and not is_deep_json_null_comparison:  # Null values
 99        if filter_value is not None and f"{filter_value}".lower() not in ["none", "null"]:
100            raise BadRequestException(
101                "Invalid value provided for 'is' or 'isnt' operation (only 'none' or 'null' allowed)")
102        operation = OPERATIONS.get(column_operator)
103        return column, operation, None
104    elif hasattr(column, 'type') and isinstance(column.type, JSONB):
105        _json_path = '.'.join(f'"{segment}"' for segment in json_path.split('.')) if json_path else None
106        json_operation, json_value = JSON_OPERATIONS[column_operator](f'$.{_json_path}', filter_value)
107        return column, func.jsonb_path_exists, json_operation, json.dumps(json_value) if json_value else '{}'
108    elif column_operator == 'empty':  # Empty relationships
109        return column, OPERATIONS.get('any'), filter_value == 'false'
110    elif type(filter_value) == list:  # Backwards compatibility
111        if column_operator not in ['in', 'ni']:
112            column_operator = 'in'
113        updated_filter_value = [convert_value_to_column_type(v, column.type) for v in filter_value if v is not None]
114        extra_or_operation = None
115        if None in filter_value:
116            extra_or_operation = OPERATIONS.get("is") if column_operator == 'in' else OPERATIONS.get("isnt")
117        return column, OPERATIONS.get(column_operator), updated_filter_value, extra_or_operation
118    else:  # Everything else
119        operation = OPERATIONS.get(column_operator)
120        # replace '*' from like query with '%' to be compatible with SQL
121        updated_filter_value = None
122        if filter_value is not None:
123            _filter_value = filter_value.replace('*', '%') if operation == ColumnOperators.ilike else filter_value
124            updated_filter_value = convert_value_to_column_type(_filter_value, column.type)
125        return column, operation, updated_filter_value
126
127
128def convert_value_to_column_type(value, column_type):  # pragma: unit
129    """
130    Casts the given 'value' to the SQLAlchemy 'column_type' specified.
131    """
132    if value is None:
133        return None
134
135    # UUIDs
136    if isinstance(column_type, PGUUID):
137        if isinstance(value, PyUUID):
138            return value
139        try:
140            return PyUUID(str(value))
141        except (ValueError, TypeError):
142            return value
143
144    # Integers
145    if isinstance(column_type, Integer):
146        return int(f"{value}")
147
148    # DateTime
149    if isinstance(column_type, DateTime):
150        try:
151            _value = value.replace('Z', '+00:00') if isinstance(value, str) and 'Z' in value else value
152            return datetime.fromisoformat(_value)
153        except (ValueError, TypeError, AttributeError):
154            try:
155                return datetime.fromtimestamp(int(value) / 1000.0, tz=tz.gettz('UTC'))
156            except (TypeError, ValueError):
157                return value
158
159    return value
160
161
162def create_filters_list_from_dict(query_dict, cls):  # pragma: unit
163    """
164    Converts a dictionary of string query filters into a list of tuples to be used as part of a SQLAlchemy query.
165    For example:
166    A query dictionary `{'col1[eq]': '10', 'col_snake_case[eq]': '2023'}` ->
167    [(AClass.col1, ColumnOperators.__eq__, '10'),(AClass.col_snake_case, ColumnOperators.__eq__,2023)]
168    """
169    return [build_filter_query(k, query_dict[k], cls) for k in query_dict.keys()]
170
171
172def add_default_equality_operator(query_key):  # pragma: unit
173    """
174    Add 'eq' as the default operating string for criteria if not present.
175    """
176    return query_key if '[' in query_key else f"{query_key}[eq]"
177
178
179def get_filters_relevant_to_class(cls: Type, filters: dict) -> dict:  # pragma: unit
180    """
181    Filters a dictionary of query filters and removes queries on attributes not present in the class,
182    keeping all filters related to attributes with a JSON object type.
183    """
184    try:
185        # Attempt to retrieve column definitions
186        column_definitions = cls.get_column_definitions()
187    except AttributeError as e:
188        log.warning(f"Unexpected error: {e}")
189        # If the method doesn't exist, return an empty dictionary
190        return {}
191
192    if not column_definitions:
193        return {}
194
195    attributes = find_all_attributes(cls)
196
197    relevant_filters = {}
198    for key in filters.keys():
199        # split key of the form 'col[eq]' to get the column name
200        col = key.split('[')[0]
201        _col = camelCase_to_snake_case(col)
202
203        # Check if the column name or its snake_case equivalent is a JSON attribute
204        _column_name = _col.split(".")[0]
205        if _column_name in column_definitions and isinstance(column_definitions[_column_name].type, JSONB):
206            relevant_filters[key] = filters[key]
207        # If the column is not JSON, check for exact matches
208        elif col in column_definitions or col in attributes:
209            relevant_filters[key] = filters[key]
210        elif _col in column_definitions or _col in attributes:
211            new_key = key.replace(col, _col)
212            relevant_filters[new_key] = filters[key]
213
214    return relevant_filters
215
216
217def criteria_to_sql_alchemy_operation(item):  # pragma: integration
218    if hasattr(item[0], 'type') and \
219            isinstance(item[0].type, JSONB) and \
220            type(item[1]) == _FunctionGenerator and \
221            'jsonb_path_exists' in item[1]._FunctionGenerator__names:
222        return item[1](item[0], item[2], item[3])
223    elif item[0].comparator \
224            and hasattr(item[0].comparator, 'entity') \
225            and item[0].comparator.entity and item[1] == PropComparator.any:
226        if item[2]:
227            return item[1](item[0])
228        else:
229            return ~item[1](item[0])
230    elif isinstance(item[2], list) and item[3]:
231        boolean_operator = or_ if item[3] == OPERATIONS.get("is") else and_
232        return boolean_operator(item[1](item[0], item[2]), item[3](item[0], None))
233    else:
234        return item[1](item[0], item[2])
235
236
237def _split_column_and_operator(filter_key):
238    match = re.search(r'(.*)\[(.*)\]', filter_key)
239    if not match:
240        raise BadRequestException("Filter provided cannot be recognised")
241    else:
242        column_name = match.groups()[0] if "." not in match.groups()[0] else match.groups()[0].split(".")[0]
243        json_path = None if "." not in match.groups()[0] else ".".join(match.groups()[0].split(".")[1:])
244        column_operator = match.groups()[1]
245        return column_name, column_operator, json_path, match
log = <MyLogger BEDROCK-connection (INFO)>
def unsupported_operation(operation_type):
22def unsupported_operation(operation_type):
23    raise BadRequestException(f"Operation '{operation_type}' not supported for JSON fields")
OPERATIONS = {'eq': <function ColumnOperators.__eq__>, 'ne': <function ColumnOperators.__ne__>, 'ge': <function ColumnOperators.__ge__>, 'le': <function ColumnOperators.__le__>, 'gt': <function ColumnOperators.__gt__>, 'lt': <function ColumnOperators.__lt__>, 'in': <function ColumnOperators.in_>, 'ni': <function ColumnOperators.not_in>, 'like': <function ColumnOperators.ilike>, 'nlike': <function ColumnOperators.not_ilike>, 'is': <function ColumnOperators.is_>, 'isnt': <function ColumnOperators.is_not>, 'any': <function PropComparator.any>}
JSON_OPERATIONS = {'eq': <function <lambda>>, 'le': <function <lambda>>, 'ge': <function <lambda>>, 'gt': <function <lambda>>, 'lt': <function <lambda>>, 'ne': <function <lambda>>, 'in': <function <lambda>>, 'ni': <function <lambda>>, 'like': <function <lambda>>, 'any': <function <lambda>>, 'empty': <function <lambda>>, 'is': <function <lambda>>, 'isnt': <function <lambda>>}
def build_filter_query(filter_key, filter_value, cls):
 75def build_filter_query(filter_key, filter_value, cls):  # pragma: unit
 76    """
 77    This method converts a filter query (provided as 'filter_key' and 'filter_value' parameters) for an entity ('cls')
 78    into a form that can be understood by SQLAlchemy.
 79
 80    Notes:
 81    * The 'filter_key' should specify the column and operation to filter on using the format '<column_name>[operation]'.
 82    * The value to filter on can be a list of values, or a single value. If this is a list, the operation in
 83    'filter_key' should be either 'in' (IN) or 'ni' (NOT IN), other operators will be converted into an 'in' operation.
 84    * The result of this method is a tuple with the following three elements:
 85        1) the SQLAlchemy `Column` object corresponding to the string column provided
 86        2) the SQLAlchemy 'ColumnOperators' operator corresponding to the string column provided
 87        3) the value(s) converted to the type of the Column object in 1.
 88
 89    :param filter_key: The filter column and operation provided in the format '<column_name>[operation]' (e.g. 'label[eq]').
 90    :param filter_value: The values in the column to check for.
 91    :param cls: The class representing the entity being filtered.
 92    :return: A tuple representing the SQLAlchemy operation to perform e.g (AClass.column, ColumnOperators.__ne__, "Abc")
 93    """
 94    # TODO: This code is becoming super complex and hard to read. Refactor it to make it more readable.
 95    column_name, column_operator, json_path, match = _split_column_and_operator(filter_key)
 96    column = getattr(cls, column_name)
 97    is_deep_json_null_comparison = hasattr(column, 'type') and isinstance(column.type,
 98                                                                          JSONB) and match and json_path is not None
 99    if column_operator in ['is', 'isnt'] and not is_deep_json_null_comparison:  # Null values
100        if filter_value is not None and f"{filter_value}".lower() not in ["none", "null"]:
101            raise BadRequestException(
102                "Invalid value provided for 'is' or 'isnt' operation (only 'none' or 'null' allowed)")
103        operation = OPERATIONS.get(column_operator)
104        return column, operation, None
105    elif hasattr(column, 'type') and isinstance(column.type, JSONB):
106        _json_path = '.'.join(f'"{segment}"' for segment in json_path.split('.')) if json_path else None
107        json_operation, json_value = JSON_OPERATIONS[column_operator](f'$.{_json_path}', filter_value)
108        return column, func.jsonb_path_exists, json_operation, json.dumps(json_value) if json_value else '{}'
109    elif column_operator == 'empty':  # Empty relationships
110        return column, OPERATIONS.get('any'), filter_value == 'false'
111    elif type(filter_value) == list:  # Backwards compatibility
112        if column_operator not in ['in', 'ni']:
113            column_operator = 'in'
114        updated_filter_value = [convert_value_to_column_type(v, column.type) for v in filter_value if v is not None]
115        extra_or_operation = None
116        if None in filter_value:
117            extra_or_operation = OPERATIONS.get("is") if column_operator == 'in' else OPERATIONS.get("isnt")
118        return column, OPERATIONS.get(column_operator), updated_filter_value, extra_or_operation
119    else:  # Everything else
120        operation = OPERATIONS.get(column_operator)
121        # replace '*' from like query with '%' to be compatible with SQL
122        updated_filter_value = None
123        if filter_value is not None:
124            _filter_value = filter_value.replace('*', '%') if operation == ColumnOperators.ilike else filter_value
125            updated_filter_value = convert_value_to_column_type(_filter_value, column.type)
126        return column, operation, updated_filter_value

This method converts a filter query (provided as 'filter_key' and 'filter_value' parameters) for an entity ('cls') into a form that can be understood by SQLAlchemy.

Notes:

  • The 'filter_key' should specify the column and operation to filter on using the format '[operation]'.
  • The value to filter on can be a list of values, or a single value. If this is a list, the operation in 'filter_key' should be either 'in' (IN) or 'ni' (NOT IN), other operators will be converted into an 'in' operation.
  • The result of this method is a tuple with the following three elements: 1) the SQLAlchemy Column object corresponding to the string column provided 2) the SQLAlchemy 'ColumnOperators' operator corresponding to the string column provided 3) the value(s) converted to the type of the Column object in 1.
Parameters
  • filter_key: The filter column and operation provided in the format '[operation]' (e.g. 'label[eq]').
  • filter_value: The values in the column to check for.
  • cls: The class representing the entity being filtered.
Returns

A tuple representing the SQLAlchemy operation to perform e.g (AClass.column, ColumnOperators.__ne__, "Abc")

def convert_value_to_column_type(value, column_type):
129def convert_value_to_column_type(value, column_type):  # pragma: unit
130    """
131    Casts the given 'value' to the SQLAlchemy 'column_type' specified.
132    """
133    if value is None:
134        return None
135
136    # UUIDs
137    if isinstance(column_type, PGUUID):
138        if isinstance(value, PyUUID):
139            return value
140        try:
141            return PyUUID(str(value))
142        except (ValueError, TypeError):
143            return value
144
145    # Integers
146    if isinstance(column_type, Integer):
147        return int(f"{value}")
148
149    # DateTime
150    if isinstance(column_type, DateTime):
151        try:
152            _value = value.replace('Z', '+00:00') if isinstance(value, str) and 'Z' in value else value
153            return datetime.fromisoformat(_value)
154        except (ValueError, TypeError, AttributeError):
155            try:
156                return datetime.fromtimestamp(int(value) / 1000.0, tz=tz.gettz('UTC'))
157            except (TypeError, ValueError):
158                return value
159
160    return value

Casts the given 'value' to the SQLAlchemy 'column_type' specified.

def create_filters_list_from_dict(query_dict, cls):
163def create_filters_list_from_dict(query_dict, cls):  # pragma: unit
164    """
165    Converts a dictionary of string query filters into a list of tuples to be used as part of a SQLAlchemy query.
166    For example:
167    A query dictionary `{'col1[eq]': '10', 'col_snake_case[eq]': '2023'}` ->
168    [(AClass.col1, ColumnOperators.__eq__, '10'),(AClass.col_snake_case, ColumnOperators.__eq__,2023)]
169    """
170    return [build_filter_query(k, query_dict[k], cls) for k in query_dict.keys()]

Converts a dictionary of string query filters into a list of tuples to be used as part of a SQLAlchemy query. For example: A query dictionary {'col1[eq]': '10', 'col_snake_case[eq]': '2023'} -> [(AClass.col1, ColumnOperators.__eq__, '10'),(AClass.col_snake_case, ColumnOperators.__eq__,2023)]

def add_default_equality_operator(query_key):
173def add_default_equality_operator(query_key):  # pragma: unit
174    """
175    Add 'eq' as the default operating string for criteria if not present.
176    """
177    return query_key if '[' in query_key else f"{query_key}[eq]"

Add 'eq' as the default operating string for criteria if not present.

def get_filters_relevant_to_class(cls: Type, filters: dict) -> dict:
180def get_filters_relevant_to_class(cls: Type, filters: dict) -> dict:  # pragma: unit
181    """
182    Filters a dictionary of query filters and removes queries on attributes not present in the class,
183    keeping all filters related to attributes with a JSON object type.
184    """
185    try:
186        # Attempt to retrieve column definitions
187        column_definitions = cls.get_column_definitions()
188    except AttributeError as e:
189        log.warning(f"Unexpected error: {e}")
190        # If the method doesn't exist, return an empty dictionary
191        return {}
192
193    if not column_definitions:
194        return {}
195
196    attributes = find_all_attributes(cls)
197
198    relevant_filters = {}
199    for key in filters.keys():
200        # split key of the form 'col[eq]' to get the column name
201        col = key.split('[')[0]
202        _col = camelCase_to_snake_case(col)
203
204        # Check if the column name or its snake_case equivalent is a JSON attribute
205        _column_name = _col.split(".")[0]
206        if _column_name in column_definitions and isinstance(column_definitions[_column_name].type, JSONB):
207            relevant_filters[key] = filters[key]
208        # If the column is not JSON, check for exact matches
209        elif col in column_definitions or col in attributes:
210            relevant_filters[key] = filters[key]
211        elif _col in column_definitions or _col in attributes:
212            new_key = key.replace(col, _col)
213            relevant_filters[new_key] = filters[key]
214
215    return relevant_filters

Filters a dictionary of query filters and removes queries on attributes not present in the class, keeping all filters related to attributes with a JSON object type.

def criteria_to_sql_alchemy_operation(item):
218def criteria_to_sql_alchemy_operation(item):  # pragma: integration
219    if hasattr(item[0], 'type') and \
220            isinstance(item[0].type, JSONB) and \
221            type(item[1]) == _FunctionGenerator and \
222            'jsonb_path_exists' in item[1]._FunctionGenerator__names:
223        return item[1](item[0], item[2], item[3])
224    elif item[0].comparator \
225            and hasattr(item[0].comparator, 'entity') \
226            and item[0].comparator.entity and item[1] == PropComparator.any:
227        if item[2]:
228            return item[1](item[0])
229        else:
230            return ~item[1](item[0])
231    elif isinstance(item[2], list) and item[3]:
232        boolean_operator = or_ if item[3] == OPERATIONS.get("is") else and_
233        return boolean_operator(item[1](item[0], item[2]), item[3](item[0], None))
234    else:
235        return item[1](item[0], item[2])