bedrock.db.model_helper

  1from datetime import datetime
  2from decimal import Decimal
  3from enum import Enum
  4from typing import Type
  5from uuid import UUID
  6
  7from sqlalchemy import UniqueConstraint
  8from sqlalchemy.orm import declarative_base
  9from sqlalchemy.orm.exc import DetachedInstanceError
 10
 11from bedrock._helpers.classes import find_all_attributes, is_serialisable
 12from bedrock._helpers.crypto import create_uuid
 13from bedrock._helpers.string import camelCase_to_snake_case, snake_case_to_camelCase
 14from bedrock.helpers.dictionary import merge_dictionaries, has_key_value, get_snake_or_camel_case_key
 15from bedrock.db.alchemy_helper import AlchemyHelper
 16from bedrock.db.constraints import NonIdentifyingUniqueConstraint
 17from bedrock.db.bedrock_model import BedrockModel
 18
 19# Here so it can be easily imported from the same place as ModelHelper
 20Base = declarative_base(cls=BedrockModel)
 21"""
 22Base is the class to be used by any application models. See [`BedrockModel`](./bedrock_model.html#BedrockModel).
 23"""
 24
 25
 26class ModelHelper(AlchemyHelper):
 27    """
 28    ModelHelper is a class that provides some helper methods for SQLAlchemy models (GORMish style).
 29    It must be inherited by a class that also inherits from Base. E.g.:
 30    class MyModel(Base, ModelHelper):
 31        __tablename__ = "my_table"
 32        # etc
 33    """
 34
 35    @classmethod
 36    def should_create_if_not_found(cls):  # pragma: no cover
 37        """
 38        Override this method to return True if you want to automatically create a new record if one is not found.
 39        :return: `False` by default
 40        """
 41        return False
 42
 43    @classmethod
 44    def get_tables(cls) -> dict:  # pragma: unit
 45        """
 46        Gets all tables that are registered with the current session.
 47
 48        Note: Relies on SQLAlchemy functionality
 49        :return: A dictionary with the key being the table name and the value being the class associated with the table
 50        """
 51        return dict([(a.entity.__tablename__, a.entity) for a in cls.registry.mappers])
 52
 53    @classmethod
 54    def get_entities(cls) -> dict:  # pragma: integration
 55        """
 56        Gets all model classes that are registered with the current session.
 57
 58        Note: Relies on SQLAlchemy functionality
 59        :return: A dictionary with the key being the class name and the value being the class
 60        """
 61        return dict([(a.entity.__name__, a.entity) for a in cls.registry.mappers])
 62
 63    @classmethod
 64    def has_custom_json(cls) -> bool:  # pragma: unit
 65        """
 66        Override this method to return `True` if you want to use a custom JSON representation of the model (rather than
 67        the default `as_json`).
 68        """
 69        return False
 70
 71    @classmethod
 72    def has_custom_schema(cls) -> bool:  # pragma: unit
 73        """
 74        Override this method to return `True` if you want to use a custom schema for the model. This is used when
 75        generating the OpenAPI spec. It defaults to the same value of `has_custom_json`.
 76        """
 77        return cls.has_custom_json()
 78
 79    @classmethod
 80    def get_primary_key_names(cls) -> list[str]:  # pragma: unit
 81        """
 82        Gets a list primary key column names.
 83
 84        Makes use of `get_primary_key_columns`.
 85        """
 86        return cls.get_primary_key_columns().keys()
 87
 88    @classmethod
 89    def get_primary_key_columns(cls) -> dict:  # pragma: unit
 90        """
 91        Gets a dictionary of primary key columns, with the key being the column name and the value being the column.
 92        """
 93        return cls.__table__.primary_key.columns
 94
 95    @classmethod
 96    def get_unique_constraint_names(cls) -> list[str]:  # pragma: unit
 97        """
 98        Gets a list of unique constraint column names (ignoring those in `NonIdentifyingUniqueConstraint`s).
 99
100        If no constraints are present, it will return the primary key names.
101        """
102        unique_columns = [u.name for c in cls.__table__.constraints
103                          if isinstance(c, UniqueConstraint) and not isinstance(c, NonIdentifyingUniqueConstraint)
104                          for u in c.columns]
105        return unique_columns if unique_columns else cls.get_primary_key_names()
106
107    @classmethod
108    def get_unique_identifier_names(cls) -> list[str]:  # pragma: unit
109        """
110        Gets a list of unique identifier column names, i.e. primary keys and any column in unique constraints.
111        """
112        unique_columns = [u.name for c in cls.__table__.constraints if isinstance(c, UniqueConstraint) for u in
113                          c.columns]
114        return [*cls.get_primary_key_names(), *unique_columns]
115
116    @classmethod
117    def get_foreign_keys(cls) -> dict[str, tuple[Type, str]]:  # pragma: unit
118        """
119        Returns a dictionary of foreign keys, with the key being the name of the foreign key and the value being a tuple
120        of the Class where the foreign key is pointing to and the name of the column in that class
121
122        Example: if the foreign key is "city_uuid" and it points to "uuid" in City, the dictionary will be:
123        ```python
124        {  "city_uuid": (City, "uuid")  }
125        ```
126        """
127        return dict([(k.parent.name, (cls.get_tables()[k.column.table.name], k.column.name))
128                     for k in cls.__table__.foreign_keys])
129
130    @classmethod
131    def get_column_names(cls) -> list[str]:  # pragma: unit
132        """
133        Gets a list of column names.
134        """
135        return [c.key for c in cls.__table__.columns]
136
137    @classmethod
138    def get_column_definition(cls, column_name):  # pragma: unit
139        """
140        Gets the column definition for a given column name.
141        :param column_name: The name of the database table column
142        """
143        try:
144            return next(
145                col for col in cls.metadata.tables[cls.__tablename__].columns._all_columns if col.name == column_name)
146        except:
147            return None
148
149    @classmethod
150    def get_column_definitions(cls):  # pragma: unit
151        try:
152            return {col.name: col for col in cls.metadata.tables[cls.__tablename__].columns._all_columns}
153        except:  # pragma: no cover
154            return None
155
156    @classmethod
157    def get_common_reference_columns(cls, other_cls):  # pragma: unit
158        """
159        Gets a list of column names that are foreign keys in both this class and the provided class.
160        :param other_cls: Another classe to compare this to.
161        :return: A list of common column names between both classes.
162        """
163        column_names = cls.get_foreign_keys().keys()
164        other_column_names = other_cls.get_column_names()
165        return [col for col in column_names if col in other_column_names]
166
167    @classmethod
168    def from_json(cls, content: dict, primary_key: str = "uuid"):  # pragma: unit
169        """
170        Alias for `from_dict`.
171        """
172        return cls.from_dict(content, primary_key)
173
174    @classmethod
175    def from_dict(cls, dictionary: dict, primary_key: str = "uuid"):  # pragma: unit
176        """
177        Creates a new model instance from a dictionary.
178
179        Automatically converts camelCase to snake_case in order to find the correct attribute.
180
181        Ignores:
182        * any attributes that are not in the model
183        * any attributes that are callable
184        * any attributes that are nested models
185        """
186        class_attributes = find_all_attributes(cls)
187        nested_models = [m.key for m in cls.get_nested_models()]
188        self = cls()
189        for key in dictionary:
190            attribute = camelCase_to_snake_case(key)
191            column = cls.get_column_definition(attribute)
192            if column is not None \
193                    and attribute not in nested_models \
194                    and attribute in class_attributes \
195                    and not callable(getattr(self, attribute)):
196                value = dictionary[key]
197                data_type = column.type.python_type
198                if data_type == datetime and isinstance(value, int):
199                    value = datetime.fromtimestamp(value / 1000)
200                setattr(self, attribute, value)
201
202        if primary_key not in dictionary:
203            setattr(self, primary_key, create_uuid())
204
205        return self
206
207    @classmethod
208    def get_nested_models(cls) -> list:  # pragma: unit
209        """
210        Gets a list of nested models by looking for any attributes that look like joins.
211        """
212        return [getattr(cls, attribute)
213                for attribute in find_all_attributes(cls)
214                if hasattr(getattr(cls, attribute), "expression")
215                and (
216                        hasattr(getattr(cls, attribute).expression, "right")
217                        or hasattr(getattr(cls, attribute).expression, "clauses")
218                )]
219
220    def as_json(self, excludes=[], nested_excludes=[], extra={}) -> dict:  # pragma: unit
221        """
222        Converts the object to a JSON string. Makes use of `as_dict` with `casing_function=snake_case_to_camelCase`.
223        :param excludes: Any attributes to exclude from the JSON.
224        :param nested_excludes: Any attributes to exclude from nested models.
225        :param extra: Any extra attributes to add to the JSON.
226        :return: The object in JSON format (with keys as camelCase).
227        """
228        cls = self.__class__  # pragma: unit
229        return self.as_dict(excludes=[*excludes, *cls.__excluded_attributes_from_json__],
230                            nested_excludes=nested_excludes, extra=extra,
231                            casing_function=snake_case_to_camelCase)
232
233    def as_dict(self, excludes=[], nested_excludes=[], extra={},
234                casing_function=camelCase_to_snake_case,
235                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
236        """
237        Converts the object to a dictionary by introspecting the class.
238
239        :param excludes: Any attributes to exclude from the dictionary.
240        :param nested_excludes: Any attributes to exclude from nested models.
241        :param extra: Any extra attributes to add to the dictionary.
242        :param casing_function: How to convert the attribute names to the dictionary keys.
243        :param resolve_defaults: Whether to resolve default values of the model.
244        :param visited: A set used to track already-visited objects during serialisation. Prevents infinite recursion.
245        :return: The object in JSON format (with keys as snake_case by default).
246        """
247
248        if visited is None:
249            visited = set()
250
251        identifier = getattr(self, "uuid", None)
252        if identifier in visited:
253            return {"uuid": str(getattr(self, "uuid", None))}
254
255        visited.add(identifier)
256
257        cls = self.__class__
258        class_attributes = find_all_attributes(cls)
259        json_obj = {}
260        marked_for_exclusion = []
261        for class_attribute in class_attributes:
262            cased_attribute = casing_function(class_attribute)
263            try:
264                value = getattr(self, class_attribute)
265            except DetachedInstanceError:  # pragma: no cover
266                # If we hit this, it generally means that we're trying to fetch a nested model that is detached.
267                # In these scenarios we probably want to ignore it as it's likely a loop in the relationship.
268                continue
269            if class_attribute not in excludes and cased_attribute not in excludes:
270                if resolve_defaults and value is None:
271                    column = cls.get_column_definition(class_attribute)
272                    if column.default is not None:
273                        value = column.default.arg(self) if callable(column.default.arg) else column.default.arg
274
275                if isinstance(value, list) and len(value) > 0 and isinstance(value[0], AlchemyHelper):
276                    json_obj[cased_attribute] = [
277                        item.as_dict(excludes=nested_excludes,
278                                     nested_excludes=nested_excludes,
279                                     casing_function=casing_function,
280                                     visited=visited) for item in value
281                    ]
282                elif not isinstance(value, list) and isinstance(value, AlchemyHelper):
283                    if f"{class_attribute}_uuid" in class_attributes:  # pragma: no cover - this is literally always true due to how Bedrock models work
284                        marked_for_exclusion.append(casing_function(f"{class_attribute}_uuid"))
285                    json_obj[cased_attribute] = value.as_dict(excludes=nested_excludes,
286                                                              nested_excludes=nested_excludes,
287                                                              casing_function=casing_function,
288                                                              visited=visited)
289                elif is_serialisable(value):
290                    json_obj[cased_attribute] = value
291                elif isinstance(value, datetime):
292                    json_obj[cased_attribute] = int(value.timestamp() * 1000)
293                elif isinstance(value, UUID):
294                    json_obj[cased_attribute] = str(value)
295                elif isinstance(value, Decimal):
296                    json_obj[cased_attribute] = float(value)
297                elif isinstance(value, Enum):
298                    json_obj[cased_attribute] = value.name
299        for key in marked_for_exclusion:
300            try:
301                del json_obj[key]
302            except KeyError:
303                pass
304        return {**json_obj, **extra}
305
306    def update_with_json(self, content: dict, primary_key="uuid", keep_as_json=False):  # pragma: unit
307        """
308        Updates the object with the given JSON content.
309
310        :param content: The JSON content to update the object with.
311        :param primary_key: The primary key of the object.
312        :param keep_as_json: Whether to return the updated object as JSON or as an object.
313        :return: A json or the object itself with the updated values.
314        """
315        original = self.as_json()
316        if primary_key in content:
317            del content[primary_key]
318        updated_json = merge_dictionaries(original, content)
319        if keep_as_json:
320            return updated_json
321        return self.__class__.from_json(updated_json, primary_key=primary_key)
322
323    def matches_by_unique_key(self, dictionary) -> bool:  # pragma: unit
324        """
325        Checks whether this object matches the provided dictionary by using the unique keys of the object.
326        :param dictionary: The dictionary to check against. It should include the unique keys of the object (in either camel or snake case).
327        :return: True if a match is found.
328        """
329        unique_columns = self.get_unique_constraint_names()
330        for unique_column in unique_columns:
331            value = getattr(self, unique_column)
332            if isinstance(value, UUID):
333                value = str(value)
334            if not has_key_value(dictionary, unique_column, value, snake_and_camel_case=True):
335                return False
336        return True
class Base(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):

Base is the class to be used by any application models. See BedrockModel.

Base(**kwargs: Any)
2167def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2168    """A simple constructor that allows initialization from kwargs.
2169
2170    Sets attributes on the constructed instance using the names and
2171    values in ``kwargs``.
2172
2173    Only keys that are present as
2174    attributes of the instance's class are allowed. These could be,
2175    for example, any mapped columns or relationships.
2176    """
2177    cls_ = type(self)
2178    for k in kwargs:
2179        if not hasattr(cls_, k):
2180            raise TypeError(
2181                "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2182            )
2183        setattr(self, k, kwargs[k])

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

registry = <sqlalchemy.orm.decl_api.registry object>
metadata = MetaData()
class ModelHelper(bedrock.db.alchemy_helper.AlchemyHelper):
 27class ModelHelper(AlchemyHelper):
 28    """
 29    ModelHelper is a class that provides some helper methods for SQLAlchemy models (GORMish style).
 30    It must be inherited by a class that also inherits from Base. E.g.:
 31    class MyModel(Base, ModelHelper):
 32        __tablename__ = "my_table"
 33        # etc
 34    """
 35
 36    @classmethod
 37    def should_create_if_not_found(cls):  # pragma: no cover
 38        """
 39        Override this method to return True if you want to automatically create a new record if one is not found.
 40        :return: `False` by default
 41        """
 42        return False
 43
 44    @classmethod
 45    def get_tables(cls) -> dict:  # pragma: unit
 46        """
 47        Gets all tables that are registered with the current session.
 48
 49        Note: Relies on SQLAlchemy functionality
 50        :return: A dictionary with the key being the table name and the value being the class associated with the table
 51        """
 52        return dict([(a.entity.__tablename__, a.entity) for a in cls.registry.mappers])
 53
 54    @classmethod
 55    def get_entities(cls) -> dict:  # pragma: integration
 56        """
 57        Gets all model classes that are registered with the current session.
 58
 59        Note: Relies on SQLAlchemy functionality
 60        :return: A dictionary with the key being the class name and the value being the class
 61        """
 62        return dict([(a.entity.__name__, a.entity) for a in cls.registry.mappers])
 63
 64    @classmethod
 65    def has_custom_json(cls) -> bool:  # pragma: unit
 66        """
 67        Override this method to return `True` if you want to use a custom JSON representation of the model (rather than
 68        the default `as_json`).
 69        """
 70        return False
 71
 72    @classmethod
 73    def has_custom_schema(cls) -> bool:  # pragma: unit
 74        """
 75        Override this method to return `True` if you want to use a custom schema for the model. This is used when
 76        generating the OpenAPI spec. It defaults to the same value of `has_custom_json`.
 77        """
 78        return cls.has_custom_json()
 79
 80    @classmethod
 81    def get_primary_key_names(cls) -> list[str]:  # pragma: unit
 82        """
 83        Gets a list primary key column names.
 84
 85        Makes use of `get_primary_key_columns`.
 86        """
 87        return cls.get_primary_key_columns().keys()
 88
 89    @classmethod
 90    def get_primary_key_columns(cls) -> dict:  # pragma: unit
 91        """
 92        Gets a dictionary of primary key columns, with the key being the column name and the value being the column.
 93        """
 94        return cls.__table__.primary_key.columns
 95
 96    @classmethod
 97    def get_unique_constraint_names(cls) -> list[str]:  # pragma: unit
 98        """
 99        Gets a list of unique constraint column names (ignoring those in `NonIdentifyingUniqueConstraint`s).
100
101        If no constraints are present, it will return the primary key names.
102        """
103        unique_columns = [u.name for c in cls.__table__.constraints
104                          if isinstance(c, UniqueConstraint) and not isinstance(c, NonIdentifyingUniqueConstraint)
105                          for u in c.columns]
106        return unique_columns if unique_columns else cls.get_primary_key_names()
107
108    @classmethod
109    def get_unique_identifier_names(cls) -> list[str]:  # pragma: unit
110        """
111        Gets a list of unique identifier column names, i.e. primary keys and any column in unique constraints.
112        """
113        unique_columns = [u.name for c in cls.__table__.constraints if isinstance(c, UniqueConstraint) for u in
114                          c.columns]
115        return [*cls.get_primary_key_names(), *unique_columns]
116
117    @classmethod
118    def get_foreign_keys(cls) -> dict[str, tuple[Type, str]]:  # pragma: unit
119        """
120        Returns a dictionary of foreign keys, with the key being the name of the foreign key and the value being a tuple
121        of the Class where the foreign key is pointing to and the name of the column in that class
122
123        Example: if the foreign key is "city_uuid" and it points to "uuid" in City, the dictionary will be:
124        ```python
125        {  "city_uuid": (City, "uuid")  }
126        ```
127        """
128        return dict([(k.parent.name, (cls.get_tables()[k.column.table.name], k.column.name))
129                     for k in cls.__table__.foreign_keys])
130
131    @classmethod
132    def get_column_names(cls) -> list[str]:  # pragma: unit
133        """
134        Gets a list of column names.
135        """
136        return [c.key for c in cls.__table__.columns]
137
138    @classmethod
139    def get_column_definition(cls, column_name):  # pragma: unit
140        """
141        Gets the column definition for a given column name.
142        :param column_name: The name of the database table column
143        """
144        try:
145            return next(
146                col for col in cls.metadata.tables[cls.__tablename__].columns._all_columns if col.name == column_name)
147        except:
148            return None
149
150    @classmethod
151    def get_column_definitions(cls):  # pragma: unit
152        try:
153            return {col.name: col for col in cls.metadata.tables[cls.__tablename__].columns._all_columns}
154        except:  # pragma: no cover
155            return None
156
157    @classmethod
158    def get_common_reference_columns(cls, other_cls):  # pragma: unit
159        """
160        Gets a list of column names that are foreign keys in both this class and the provided class.
161        :param other_cls: Another classe to compare this to.
162        :return: A list of common column names between both classes.
163        """
164        column_names = cls.get_foreign_keys().keys()
165        other_column_names = other_cls.get_column_names()
166        return [col for col in column_names if col in other_column_names]
167
168    @classmethod
169    def from_json(cls, content: dict, primary_key: str = "uuid"):  # pragma: unit
170        """
171        Alias for `from_dict`.
172        """
173        return cls.from_dict(content, primary_key)
174
175    @classmethod
176    def from_dict(cls, dictionary: dict, primary_key: str = "uuid"):  # pragma: unit
177        """
178        Creates a new model instance from a dictionary.
179
180        Automatically converts camelCase to snake_case in order to find the correct attribute.
181
182        Ignores:
183        * any attributes that are not in the model
184        * any attributes that are callable
185        * any attributes that are nested models
186        """
187        class_attributes = find_all_attributes(cls)
188        nested_models = [m.key for m in cls.get_nested_models()]
189        self = cls()
190        for key in dictionary:
191            attribute = camelCase_to_snake_case(key)
192            column = cls.get_column_definition(attribute)
193            if column is not None \
194                    and attribute not in nested_models \
195                    and attribute in class_attributes \
196                    and not callable(getattr(self, attribute)):
197                value = dictionary[key]
198                data_type = column.type.python_type
199                if data_type == datetime and isinstance(value, int):
200                    value = datetime.fromtimestamp(value / 1000)
201                setattr(self, attribute, value)
202
203        if primary_key not in dictionary:
204            setattr(self, primary_key, create_uuid())
205
206        return self
207
208    @classmethod
209    def get_nested_models(cls) -> list:  # pragma: unit
210        """
211        Gets a list of nested models by looking for any attributes that look like joins.
212        """
213        return [getattr(cls, attribute)
214                for attribute in find_all_attributes(cls)
215                if hasattr(getattr(cls, attribute), "expression")
216                and (
217                        hasattr(getattr(cls, attribute).expression, "right")
218                        or hasattr(getattr(cls, attribute).expression, "clauses")
219                )]
220
221    def as_json(self, excludes=[], nested_excludes=[], extra={}) -> dict:  # pragma: unit
222        """
223        Converts the object to a JSON string. Makes use of `as_dict` with `casing_function=snake_case_to_camelCase`.
224        :param excludes: Any attributes to exclude from the JSON.
225        :param nested_excludes: Any attributes to exclude from nested models.
226        :param extra: Any extra attributes to add to the JSON.
227        :return: The object in JSON format (with keys as camelCase).
228        """
229        cls = self.__class__  # pragma: unit
230        return self.as_dict(excludes=[*excludes, *cls.__excluded_attributes_from_json__],
231                            nested_excludes=nested_excludes, extra=extra,
232                            casing_function=snake_case_to_camelCase)
233
234    def as_dict(self, excludes=[], nested_excludes=[], extra={},
235                casing_function=camelCase_to_snake_case,
236                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
237        """
238        Converts the object to a dictionary by introspecting the class.
239
240        :param excludes: Any attributes to exclude from the dictionary.
241        :param nested_excludes: Any attributes to exclude from nested models.
242        :param extra: Any extra attributes to add to the dictionary.
243        :param casing_function: How to convert the attribute names to the dictionary keys.
244        :param resolve_defaults: Whether to resolve default values of the model.
245        :param visited: A set used to track already-visited objects during serialisation. Prevents infinite recursion.
246        :return: The object in JSON format (with keys as snake_case by default).
247        """
248
249        if visited is None:
250            visited = set()
251
252        identifier = getattr(self, "uuid", None)
253        if identifier in visited:
254            return {"uuid": str(getattr(self, "uuid", None))}
255
256        visited.add(identifier)
257
258        cls = self.__class__
259        class_attributes = find_all_attributes(cls)
260        json_obj = {}
261        marked_for_exclusion = []
262        for class_attribute in class_attributes:
263            cased_attribute = casing_function(class_attribute)
264            try:
265                value = getattr(self, class_attribute)
266            except DetachedInstanceError:  # pragma: no cover
267                # If we hit this, it generally means that we're trying to fetch a nested model that is detached.
268                # In these scenarios we probably want to ignore it as it's likely a loop in the relationship.
269                continue
270            if class_attribute not in excludes and cased_attribute not in excludes:
271                if resolve_defaults and value is None:
272                    column = cls.get_column_definition(class_attribute)
273                    if column.default is not None:
274                        value = column.default.arg(self) if callable(column.default.arg) else column.default.arg
275
276                if isinstance(value, list) and len(value) > 0 and isinstance(value[0], AlchemyHelper):
277                    json_obj[cased_attribute] = [
278                        item.as_dict(excludes=nested_excludes,
279                                     nested_excludes=nested_excludes,
280                                     casing_function=casing_function,
281                                     visited=visited) for item in value
282                    ]
283                elif not isinstance(value, list) and isinstance(value, AlchemyHelper):
284                    if f"{class_attribute}_uuid" in class_attributes:  # pragma: no cover - this is literally always true due to how Bedrock models work
285                        marked_for_exclusion.append(casing_function(f"{class_attribute}_uuid"))
286                    json_obj[cased_attribute] = value.as_dict(excludes=nested_excludes,
287                                                              nested_excludes=nested_excludes,
288                                                              casing_function=casing_function,
289                                                              visited=visited)
290                elif is_serialisable(value):
291                    json_obj[cased_attribute] = value
292                elif isinstance(value, datetime):
293                    json_obj[cased_attribute] = int(value.timestamp() * 1000)
294                elif isinstance(value, UUID):
295                    json_obj[cased_attribute] = str(value)
296                elif isinstance(value, Decimal):
297                    json_obj[cased_attribute] = float(value)
298                elif isinstance(value, Enum):
299                    json_obj[cased_attribute] = value.name
300        for key in marked_for_exclusion:
301            try:
302                del json_obj[key]
303            except KeyError:
304                pass
305        return {**json_obj, **extra}
306
307    def update_with_json(self, content: dict, primary_key="uuid", keep_as_json=False):  # pragma: unit
308        """
309        Updates the object with the given JSON content.
310
311        :param content: The JSON content to update the object with.
312        :param primary_key: The primary key of the object.
313        :param keep_as_json: Whether to return the updated object as JSON or as an object.
314        :return: A json or the object itself with the updated values.
315        """
316        original = self.as_json()
317        if primary_key in content:
318            del content[primary_key]
319        updated_json = merge_dictionaries(original, content)
320        if keep_as_json:
321            return updated_json
322        return self.__class__.from_json(updated_json, primary_key=primary_key)
323
324    def matches_by_unique_key(self, dictionary) -> bool:  # pragma: unit
325        """
326        Checks whether this object matches the provided dictionary by using the unique keys of the object.
327        :param dictionary: The dictionary to check against. It should include the unique keys of the object (in either camel or snake case).
328        :return: True if a match is found.
329        """
330        unique_columns = self.get_unique_constraint_names()
331        for unique_column in unique_columns:
332            value = getattr(self, unique_column)
333            if isinstance(value, UUID):
334                value = str(value)
335            if not has_key_value(dictionary, unique_column, value, snake_and_camel_case=True):
336                return False
337        return True

ModelHelper is a class that provides some helper methods for SQLAlchemy models (GORMish style). It must be inherited by a class that also inherits from Base. E.g.: class MyModel(Base, ModelHelper): __tablename__ = "my_table" # etc

@classmethod
def should_create_if_not_found(cls):
36    @classmethod
37    def should_create_if_not_found(cls):  # pragma: no cover
38        """
39        Override this method to return True if you want to automatically create a new record if one is not found.
40        :return: `False` by default
41        """
42        return False

Override this method to return True if you want to automatically create a new record if one is not found.

Returns

False by default

@classmethod
def get_tables(cls) -> dict:
44    @classmethod
45    def get_tables(cls) -> dict:  # pragma: unit
46        """
47        Gets all tables that are registered with the current session.
48
49        Note: Relies on SQLAlchemy functionality
50        :return: A dictionary with the key being the table name and the value being the class associated with the table
51        """
52        return dict([(a.entity.__tablename__, a.entity) for a in cls.registry.mappers])

Gets all tables that are registered with the current session.

Note: Relies on SQLAlchemy functionality

Returns

A dictionary with the key being the table name and the value being the class associated with the table

@classmethod
def get_entities(cls) -> dict:
54    @classmethod
55    def get_entities(cls) -> dict:  # pragma: integration
56        """
57        Gets all model classes that are registered with the current session.
58
59        Note: Relies on SQLAlchemy functionality
60        :return: A dictionary with the key being the class name and the value being the class
61        """
62        return dict([(a.entity.__name__, a.entity) for a in cls.registry.mappers])

Gets all model classes that are registered with the current session.

Note: Relies on SQLAlchemy functionality

Returns

A dictionary with the key being the class name and the value being the class

@classmethod
def has_custom_json(cls) -> bool:
64    @classmethod
65    def has_custom_json(cls) -> bool:  # pragma: unit
66        """
67        Override this method to return `True` if you want to use a custom JSON representation of the model (rather than
68        the default `as_json`).
69        """
70        return False

Override this method to return True if you want to use a custom JSON representation of the model (rather than the default as_json).

@classmethod
def has_custom_schema(cls) -> bool:
72    @classmethod
73    def has_custom_schema(cls) -> bool:  # pragma: unit
74        """
75        Override this method to return `True` if you want to use a custom schema for the model. This is used when
76        generating the OpenAPI spec. It defaults to the same value of `has_custom_json`.
77        """
78        return cls.has_custom_json()

Override this method to return True if you want to use a custom schema for the model. This is used when generating the OpenAPI spec. It defaults to the same value of has_custom_json.

@classmethod
def get_primary_key_names(cls) -> list[str]:
80    @classmethod
81    def get_primary_key_names(cls) -> list[str]:  # pragma: unit
82        """
83        Gets a list primary key column names.
84
85        Makes use of `get_primary_key_columns`.
86        """
87        return cls.get_primary_key_columns().keys()

Gets a list primary key column names.

Makes use of get_primary_key_columns.

@classmethod
def get_primary_key_columns(cls) -> dict:
89    @classmethod
90    def get_primary_key_columns(cls) -> dict:  # pragma: unit
91        """
92        Gets a dictionary of primary key columns, with the key being the column name and the value being the column.
93        """
94        return cls.__table__.primary_key.columns

Gets a dictionary of primary key columns, with the key being the column name and the value being the column.

@classmethod
def get_unique_constraint_names(cls) -> list[str]:
 96    @classmethod
 97    def get_unique_constraint_names(cls) -> list[str]:  # pragma: unit
 98        """
 99        Gets a list of unique constraint column names (ignoring those in `NonIdentifyingUniqueConstraint`s).
100
101        If no constraints are present, it will return the primary key names.
102        """
103        unique_columns = [u.name for c in cls.__table__.constraints
104                          if isinstance(c, UniqueConstraint) and not isinstance(c, NonIdentifyingUniqueConstraint)
105                          for u in c.columns]
106        return unique_columns if unique_columns else cls.get_primary_key_names()

Gets a list of unique constraint column names (ignoring those in NonIdentifyingUniqueConstraints).

If no constraints are present, it will return the primary key names.

@classmethod
def get_unique_identifier_names(cls) -> list[str]:
108    @classmethod
109    def get_unique_identifier_names(cls) -> list[str]:  # pragma: unit
110        """
111        Gets a list of unique identifier column names, i.e. primary keys and any column in unique constraints.
112        """
113        unique_columns = [u.name for c in cls.__table__.constraints if isinstance(c, UniqueConstraint) for u in
114                          c.columns]
115        return [*cls.get_primary_key_names(), *unique_columns]

Gets a list of unique identifier column names, i.e. primary keys and any column in unique constraints.

@classmethod
def get_foreign_keys(cls) -> dict[str, tuple[typing.Type, str]]:
117    @classmethod
118    def get_foreign_keys(cls) -> dict[str, tuple[Type, str]]:  # pragma: unit
119        """
120        Returns a dictionary of foreign keys, with the key being the name of the foreign key and the value being a tuple
121        of the Class where the foreign key is pointing to and the name of the column in that class
122
123        Example: if the foreign key is "city_uuid" and it points to "uuid" in City, the dictionary will be:
124        ```python
125        {  "city_uuid": (City, "uuid")  }
126        ```
127        """
128        return dict([(k.parent.name, (cls.get_tables()[k.column.table.name], k.column.name))
129                     for k in cls.__table__.foreign_keys])

Returns a dictionary of foreign keys, with the key being the name of the foreign key and the value being a tuple of the Class where the foreign key is pointing to and the name of the column in that class

Example: if the foreign key is "city_uuid" and it points to "uuid" in City, the dictionary will be:

{  "city_uuid": (City, "uuid")  }
@classmethod
def get_column_names(cls) -> list[str]:
131    @classmethod
132    def get_column_names(cls) -> list[str]:  # pragma: unit
133        """
134        Gets a list of column names.
135        """
136        return [c.key for c in cls.__table__.columns]

Gets a list of column names.

@classmethod
def get_column_definition(cls, column_name):
138    @classmethod
139    def get_column_definition(cls, column_name):  # pragma: unit
140        """
141        Gets the column definition for a given column name.
142        :param column_name: The name of the database table column
143        """
144        try:
145            return next(
146                col for col in cls.metadata.tables[cls.__tablename__].columns._all_columns if col.name == column_name)
147        except:
148            return None

Gets the column definition for a given column name.

Parameters
  • column_name: The name of the database table column
@classmethod
def get_column_definitions(cls):
150    @classmethod
151    def get_column_definitions(cls):  # pragma: unit
152        try:
153            return {col.name: col for col in cls.metadata.tables[cls.__tablename__].columns._all_columns}
154        except:  # pragma: no cover
155            return None
@classmethod
def get_common_reference_columns(cls, other_cls):
157    @classmethod
158    def get_common_reference_columns(cls, other_cls):  # pragma: unit
159        """
160        Gets a list of column names that are foreign keys in both this class and the provided class.
161        :param other_cls: Another classe to compare this to.
162        :return: A list of common column names between both classes.
163        """
164        column_names = cls.get_foreign_keys().keys()
165        other_column_names = other_cls.get_column_names()
166        return [col for col in column_names if col in other_column_names]

Gets a list of column names that are foreign keys in both this class and the provided class.

Parameters
  • other_cls: Another classe to compare this to.
Returns

A list of common column names between both classes.

@classmethod
def from_json(cls, content: dict, primary_key: str = 'uuid'):
168    @classmethod
169    def from_json(cls, content: dict, primary_key: str = "uuid"):  # pragma: unit
170        """
171        Alias for `from_dict`.
172        """
173        return cls.from_dict(content, primary_key)

Alias for from_dict.

@classmethod
def from_dict(cls, dictionary: dict, primary_key: str = 'uuid'):
175    @classmethod
176    def from_dict(cls, dictionary: dict, primary_key: str = "uuid"):  # pragma: unit
177        """
178        Creates a new model instance from a dictionary.
179
180        Automatically converts camelCase to snake_case in order to find the correct attribute.
181
182        Ignores:
183        * any attributes that are not in the model
184        * any attributes that are callable
185        * any attributes that are nested models
186        """
187        class_attributes = find_all_attributes(cls)
188        nested_models = [m.key for m in cls.get_nested_models()]
189        self = cls()
190        for key in dictionary:
191            attribute = camelCase_to_snake_case(key)
192            column = cls.get_column_definition(attribute)
193            if column is not None \
194                    and attribute not in nested_models \
195                    and attribute in class_attributes \
196                    and not callable(getattr(self, attribute)):
197                value = dictionary[key]
198                data_type = column.type.python_type
199                if data_type == datetime and isinstance(value, int):
200                    value = datetime.fromtimestamp(value / 1000)
201                setattr(self, attribute, value)
202
203        if primary_key not in dictionary:
204            setattr(self, primary_key, create_uuid())
205
206        return self

Creates a new model instance from a dictionary.

Automatically converts camelCase to snake_case in order to find the correct attribute.

Ignores:

  • any attributes that are not in the model
  • any attributes that are callable
  • any attributes that are nested models
@classmethod
def get_nested_models(cls) -> list:
208    @classmethod
209    def get_nested_models(cls) -> list:  # pragma: unit
210        """
211        Gets a list of nested models by looking for any attributes that look like joins.
212        """
213        return [getattr(cls, attribute)
214                for attribute in find_all_attributes(cls)
215                if hasattr(getattr(cls, attribute), "expression")
216                and (
217                        hasattr(getattr(cls, attribute).expression, "right")
218                        or hasattr(getattr(cls, attribute).expression, "clauses")
219                )]

Gets a list of nested models by looking for any attributes that look like joins.

def as_json(self, excludes=[], nested_excludes=[], extra={}) -> dict:
221    def as_json(self, excludes=[], nested_excludes=[], extra={}) -> dict:  # pragma: unit
222        """
223        Converts the object to a JSON string. Makes use of `as_dict` with `casing_function=snake_case_to_camelCase`.
224        :param excludes: Any attributes to exclude from the JSON.
225        :param nested_excludes: Any attributes to exclude from nested models.
226        :param extra: Any extra attributes to add to the JSON.
227        :return: The object in JSON format (with keys as camelCase).
228        """
229        cls = self.__class__  # pragma: unit
230        return self.as_dict(excludes=[*excludes, *cls.__excluded_attributes_from_json__],
231                            nested_excludes=nested_excludes, extra=extra,
232                            casing_function=snake_case_to_camelCase)

Converts the object to a JSON string. Makes use of as_dict with casing_function=snake_case_to_camelCase.

Parameters
  • excludes: Any attributes to exclude from the JSON.
  • nested_excludes: Any attributes to exclude from nested models.
  • extra: Any extra attributes to add to the JSON.
Returns

The object in JSON format (with keys as camelCase).

def as_dict( self, excludes=[], nested_excludes=[], extra={}, casing_function=<function camelCase_to_snake_case>, resolve_defaults=False, visited=None) -> dict:
234    def as_dict(self, excludes=[], nested_excludes=[], extra={},
235                casing_function=camelCase_to_snake_case,
236                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
237        """
238        Converts the object to a dictionary by introspecting the class.
239
240        :param excludes: Any attributes to exclude from the dictionary.
241        :param nested_excludes: Any attributes to exclude from nested models.
242        :param extra: Any extra attributes to add to the dictionary.
243        :param casing_function: How to convert the attribute names to the dictionary keys.
244        :param resolve_defaults: Whether to resolve default values of the model.
245        :param visited: A set used to track already-visited objects during serialisation. Prevents infinite recursion.
246        :return: The object in JSON format (with keys as snake_case by default).
247        """
248
249        if visited is None:
250            visited = set()
251
252        identifier = getattr(self, "uuid", None)
253        if identifier in visited:
254            return {"uuid": str(getattr(self, "uuid", None))}
255
256        visited.add(identifier)
257
258        cls = self.__class__
259        class_attributes = find_all_attributes(cls)
260        json_obj = {}
261        marked_for_exclusion = []
262        for class_attribute in class_attributes:
263            cased_attribute = casing_function(class_attribute)
264            try:
265                value = getattr(self, class_attribute)
266            except DetachedInstanceError:  # pragma: no cover
267                # If we hit this, it generally means that we're trying to fetch a nested model that is detached.
268                # In these scenarios we probably want to ignore it as it's likely a loop in the relationship.
269                continue
270            if class_attribute not in excludes and cased_attribute not in excludes:
271                if resolve_defaults and value is None:
272                    column = cls.get_column_definition(class_attribute)
273                    if column.default is not None:
274                        value = column.default.arg(self) if callable(column.default.arg) else column.default.arg
275
276                if isinstance(value, list) and len(value) > 0 and isinstance(value[0], AlchemyHelper):
277                    json_obj[cased_attribute] = [
278                        item.as_dict(excludes=nested_excludes,
279                                     nested_excludes=nested_excludes,
280                                     casing_function=casing_function,
281                                     visited=visited) for item in value
282                    ]
283                elif not isinstance(value, list) and isinstance(value, AlchemyHelper):
284                    if f"{class_attribute}_uuid" in class_attributes:  # pragma: no cover - this is literally always true due to how Bedrock models work
285                        marked_for_exclusion.append(casing_function(f"{class_attribute}_uuid"))
286                    json_obj[cased_attribute] = value.as_dict(excludes=nested_excludes,
287                                                              nested_excludes=nested_excludes,
288                                                              casing_function=casing_function,
289                                                              visited=visited)
290                elif is_serialisable(value):
291                    json_obj[cased_attribute] = value
292                elif isinstance(value, datetime):
293                    json_obj[cased_attribute] = int(value.timestamp() * 1000)
294                elif isinstance(value, UUID):
295                    json_obj[cased_attribute] = str(value)
296                elif isinstance(value, Decimal):
297                    json_obj[cased_attribute] = float(value)
298                elif isinstance(value, Enum):
299                    json_obj[cased_attribute] = value.name
300        for key in marked_for_exclusion:
301            try:
302                del json_obj[key]
303            except KeyError:
304                pass
305        return {**json_obj, **extra}

Converts the object to a dictionary by introspecting the class.

Parameters
  • excludes: Any attributes to exclude from the dictionary.
  • nested_excludes: Any attributes to exclude from nested models.
  • extra: Any extra attributes to add to the dictionary.
  • casing_function: How to convert the attribute names to the dictionary keys.
  • resolve_defaults: Whether to resolve default values of the model.
  • visited: A set used to track already-visited objects during serialisation. Prevents infinite recursion.
Returns

The object in JSON format (with keys as snake_case by default).

def update_with_json(self, content: dict, primary_key='uuid', keep_as_json=False):
307    def update_with_json(self, content: dict, primary_key="uuid", keep_as_json=False):  # pragma: unit
308        """
309        Updates the object with the given JSON content.
310
311        :param content: The JSON content to update the object with.
312        :param primary_key: The primary key of the object.
313        :param keep_as_json: Whether to return the updated object as JSON or as an object.
314        :return: A json or the object itself with the updated values.
315        """
316        original = self.as_json()
317        if primary_key in content:
318            del content[primary_key]
319        updated_json = merge_dictionaries(original, content)
320        if keep_as_json:
321            return updated_json
322        return self.__class__.from_json(updated_json, primary_key=primary_key)

Updates the object with the given JSON content.

Parameters
  • content: The JSON content to update the object with.
  • primary_key: The primary key of the object.
  • keep_as_json: Whether to return the updated object as JSON or as an object.
Returns

A json or the object itself with the updated values.

def matches_by_unique_key(self, dictionary) -> bool:
324    def matches_by_unique_key(self, dictionary) -> bool:  # pragma: unit
325        """
326        Checks whether this object matches the provided dictionary by using the unique keys of the object.
327        :param dictionary: The dictionary to check against. It should include the unique keys of the object (in either camel or snake case).
328        :return: True if a match is found.
329        """
330        unique_columns = self.get_unique_constraint_names()
331        for unique_column in unique_columns:
332            value = getattr(self, unique_column)
333            if isinstance(value, UUID):
334                value = str(value)
335            if not has_key_value(dictionary, unique_column, value, snake_and_camel_case=True):
336                return False
337        return True

Checks whether this object matches the provided dictionary by using the unique keys of the object.

Parameters
  • dictionary: The dictionary to check against. It should include the unique keys of the object (in either camel or snake case).
Returns

True if a match is found.