bedrock.db.alchemy_helper
1from enum import Enum 2from typing import Any 3from contextlib import suppress 4 5from sqlalchemy import ColumnOperators 6from sqlalchemy.dialects.postgresql import insert 7from sqlalchemy.exc import IntegrityError 8from sqlalchemy.orm import Query, noload 9from sqlalchemy.sql import or_, visitors 10from sqlalchemy.sql.base import ExecutableOption 11from sqlalchemy.sql.functions import func 12 13from bedrock.db.query_helper import get_filters_relevant_to_class, create_filters_list_from_dict, \ 14 add_default_equality_operator, criteria_to_sql_alchemy_operation, OPERATIONS_RELATED_NONE_MATCHER, \ 15 OPERATIONS_RELATED_NONE_MATCHER_BOOLEAN_OPERATION 16from bedrock.exceptions import ConflictingObjectException 17from bedrock.exceptions import ForbiddenException 18from bedrock.exceptions import ObjectValidationException 19from bedrock.exceptions import BedrockException 20from bedrock._helpers.classes import find_all_attributes 21from bedrock._helpers.crypto import create_uuid 22from bedrock.helpers.dictionary import has_key, get_snake_or_camel_case_value 23from bedrock.log import log_config 24from bedrock._helpers.nullables import value_is_populated 25from bedrock._helpers.string import snake_case_to_camelCase 26from bedrock.db.connection import get_session 27 28log = log_config("alchemy_helper") 29 30CACHE = {} 31 32 33class AlchemyHelper(object): 34 """ 35 AlchemyHelper is a class that provides some helper methods for SQLAlchemy CRUD operations. 36 Also, it helps establish automatic relationships between various Bedrock models (including auto-population of 37 nested objects). 38 39 ---- 40 41 For auto-population to work, you must define a class attribute called `__auto_populate__` on your model. This should 42 be a dictionary that looks like this: 43 ```python 44 # Imagine this is a model for an Icecream Shop. Shops have a many-to-many relationship with Flavours 45 # (via `IcecreamShopFlavourLink`), but they all get initialised with all the ice cream flavour types. 46 __auto_populate__ = { 47 "flavours": { # This is the name of the attribute on the model (typically the relationship name) 48 "model": IcecreamShopFlavourLink, # The model that represents the relationship 49 "partition_by": IcecreamFlavour, # This will use DB records in IcecreamFlavour to generate that many records for this auto-population 50 "template": { 51 "shop_uuid": "{{uuid}}", # The shop's uuid 52 "flavour_type_uuid": "{{partition.uuid}}", # The flavour type's (partition's) uuid 53 "flavour_type": "{{partition}}", # The partition 54 "inventory": 50 # Another column that IcecreamShopFlavourLink has 55 } 56 } 57 } 58 ``` 59 """ 60 __auto_populate__ = {} 61 62 __is_global_item__ = False 63 64 __excluded_attributes_from_json__ = [] 65 66 @classmethod 67 def get(cls, value, restrictions=None, load_options=None): # pragma: integration 68 """ 69 Gets a single object by its primary key(s). 70 71 Example: 72 ```python 73 City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879") 74 ``` 75 ```python 76 City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879", restrictions={"country_code": "GB"}) 77 ``` 78 79 :param value: Primary key value(s) 80 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 81 :param load_options: The load options to apply to the query when fetching nested objects, if any. 82 """ 83 # SQLAlchemy complains when a class has no primary key, so no need to check for that 84 unique_columns = cls.get_unique_identifier_names() 85 86 if isinstance(value, dict): 87 dict_value = value 88 else: 89 dict_value = {unique_columns[0]: value} 90 result = cls._find_all_by_unique_column(dict_value, 91 unique_column=unique_columns, 92 restrictions=restrictions, 93 load_options=load_options) 94 try: 95 return result[0] 96 except: 97 return None 98 99 @classmethod 100 def get_all(cls) -> list: # pragma: integration 101 """ 102 Gets all objects of this type. 103 104 Example: 105 ```python 106 City.get_all() 107 ``` 108 """ 109 if cls.__is_global_item__ and cls.__name__ in CACHE and CACHE[cls.__name__]: 110 return cls.post_fetch(CACHE[cls.__name__]) 111 s = get_session() 112 with s: 113 CACHE[cls.__name__] = s.query(cls).all() 114 return cls.post_fetch(CACHE[cls.__name__]) 115 116 @classmethod 117 def find_all_by(cls, key: str, value, restrictions=None, 118 load_options=None, with_count: bool = False) -> list or tuple[list, int]: # pragma: integration 119 """ 120 Find all objects by a key and value. If value is an array, it finds records that match any of the values. 121 122 Makes use of `find_all_matching_criteria`. 123 124 Example: 125 ```python 126 City.find_all_by("name", "London", restrictions={"country_code": "GB"}) 127 ``` 128 ```python 129 City.find_all_by("name", ["London", "Harrogate"], restrictions={"country_code": "GB"}) 130 ``` 131 132 :param key: Key to search by 133 :param value: Value(s) for the key. If it's an array it looks for any of the values. 134 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 135 :param load_options: The load options to apply to the query when fetching nested objects, if any. 136 :param with_count: Whether to return the count of the results as well. True makes this method return a tuple. 137 """ 138 _restrictions = restrictions if restrictions else {} 139 return cls.find_all_matching_criteria({add_default_equality_operator(key): value, **_restrictions}, 140 load_options=load_options, with_count=with_count) 141 142 @classmethod 143 def find_all_by_unique_column(cls, value, unique_column: str or list = None, 144 restrictions=None, load_options=None): # pragma: integration 145 """ 146 This method is a little like `get` but it allows you to search by any unique column(s) rather than just the primary key. 147 148 The true magic here is when `unique_column` is left up for the method to work out. 149 150 If the provided `unique_column` isn't actually unique, it behaves like `find_all_by`. 151 152 Makes use of `find_all_matching_criteria` (somewhere down the line) 153 154 Example: 155 ```python 156 # Assuming Country is defined with a UniqueConstraint on `country_code` 157 Country.find_all_by_unique_column("GB") 158 ``` 159 ```python 160 City.find_all_by_unique_column("London", unique_column="name", restrictions={"country_code": "GB"}) 161 ``` 162 163 :param value: Value(s) for the key. If it's an array it looks for any of the values. 164 :param unique_column: The unique column(s) to search by. If not provided, it will use the unique constraints to find the columns. 165 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 166 :param load_options: The load options to apply to the query when fetching nested objects, if any. 167 """ 168 _unique_column = unique_column if unique_column is not None else cls.get_unique_constraint_names() 169 if isinstance(value, list): 170 _value = {} 171 for item in value: 172 for k, v in item.items(): 173 if k not in _value: 174 _value[k] = [] 175 _value[k].append(v) 176 else: 177 _value = value 178 return cls._find_all_by_unique_column(_value, _unique_column, restrictions=restrictions, 179 load_options=load_options) 180 181 @classmethod 182 def _find_all_by_unique_column(cls, value_criteria, unique_column: str or list = "uuid", 183 restrictions=None, load_options=None): # pragma: integration 184 """ 185 Gets a number of objects (if a list is provided as the criteria) by their unique column(s). 186 187 :param value_criteria: Dictionary or a criteria 188 :param unique_column: A column name or a list of column names 189 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 190 :param load_options: The load options to apply to the query when fetching nested objects, if any. 191 192 """ 193 _restrictions = restrictions if restrictions else {} 194 if type(unique_column) is list: 195 criteria = {} 196 for key in unique_column: 197 if key is not None and has_key(value_criteria, key, snake_and_camel_case=True): 198 criteria[add_default_equality_operator(snake_case_to_camelCase(key))] \ 199 = get_snake_or_camel_case_value(value_criteria, key) 200 return cls.find_all_matching_criteria({**criteria, **_restrictions}, 201 load_options=load_options) 202 if unique_column in value_criteria and unique_column is not None: 203 return cls.find_all_by(unique_column, value_criteria[unique_column].lower(), 204 restrictions=_restrictions, 205 load_options=load_options) 206 return cls.post_fetch([]) 207 208 @classmethod 209 def _resolve_hinted_field(cls, path_hint: str, entities_dict: dict) -> tuple[list, str]: # pragma: integration 210 join_path = path_hint.split(".")[0] 211 protecting_column = path_hint.split(".")[1] 212 entities = [entities_dict[e] for e in join_path.split("->") if e.lower() != "self" and e in entities_dict] 213 return entities, protecting_column 214 215 @classmethod 216 def _resolve_hinted_fields(cls) -> list[tuple[list, str]]: # pragma: integration 217 """ 218 Resolves the hinted entity path. 219 It does this by walking through the `->` separated path to get from itself to the table that holds the end field. 220 221 This is often used to restrict access to an entity based on the user's permissions. 222 223 For example, for a City to be protected by its country code, it might look like: 224 ```python 225 "self->Region->Country.country_code" 226 ``` 227 """ 228 if not hasattr(cls, "__filter_path_hints__"): 229 cls.__filter_path_hints__ = [] 230 if not cls.__filter_path_hints__: 231 return [([], "")] 232 all_model_entities = cls.get_entities() 233 return [cls._resolve_hinted_field(path_hint, all_model_entities) for path_hint in cls.__filter_path_hints__] 234 235 @classmethod 236 def _criteria_includes_hinted_field(cls, criteria: dict, path_field_pairs: list[tuple[list, str]]): # pragma: unit 237 if not cls.__filter_path_hints__ or not criteria: 238 return False 239 hinted_fields = [pair[1] for pair in path_field_pairs] 240 hinted_fields_with_model = [f"{pair[0][-1].__name__}.{pair[1]}" for pair in path_field_pairs] 241 for column in cls._criteria_columns(criteria): 242 if column in hinted_fields or column in hinted_fields_with_model: 243 return True 244 245 return False 246 247 @classmethod 248 def _get_criteria_key_with_operation_from_column(cls, column, criteria): # pragma: unit 249 if not criteria or not column: 250 return None 251 criteria_with_operations = [c for c in criteria.keys() if c.split('[')[0] == column] 252 if criteria_with_operations: 253 return criteria_with_operations[0] 254 return None 255 256 @classmethod 257 def _criteria_columns(cls, criteria): # pragma: unit 258 return [c.split('[')[0] for c in criteria.keys()] 259 260 @classmethod 261 def find_all_with_query_matching_criteria(cls, query, criteria: dict, allow_listing_all=False, order_by=None, 262 order='asc', offset=None, limit=None, 263 load_options=None, 264 with_count=False, 265 ignore_joins=False) -> list or tuple[list, int]: # pragma: integration 266 """ 267 See `AlchemyHelper.find_all_with_query_matching_criteria_and_count` for more details. 268 The only difference is that this method lets you remove the count part with `with_count=False` (the default 269 behaviour for now). 270 :param query: The SQLAlchemy query to use (see `query` method). 271 :param criteria: The criteria to match. 272 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 273 :param order_by: The column to sort results of the query on, if any. 274 :param order: The order to sort results of the query in. 275 :param offset: The offset to use for pagination of results, if any. 276 :param limit: The number of rows to return in the results, if any. 277 :param load_options: The load options to apply to the query when fetching nested objects, if any. 278 :param with_count: Whether to include the total count of results (given the criteria) 279 :return: 280 """ 281 results, count = cls.find_all_with_query_matching_criteria_and_count( 282 query, 283 criteria, 284 allow_listing_all=allow_listing_all, 285 order_by=order_by, 286 order=order, 287 offset=offset, 288 limit=limit, 289 load_options=load_options, 290 with_count=with_count, 291 ignore_joins=ignore_joins) 292 if with_count: 293 return results, count 294 return results 295 296 @classmethod 297 def find_all_with_query_matching_criteria_and_count(cls, query, 298 criteria: dict, 299 allow_listing_all: bool = False, 300 order_by: str = None, 301 order: str = 'asc', 302 offset: int = None, 303 limit: int = None, 304 load_options=None, 305 with_count: bool = False, 306 ignore_joins=False): # pragma: integration 307 """ 308 Finds all elements that match the provided SQLAlchemy `query` that match the provided `criteria`. 309 Will return a tuple with the results and the total count of the results that match the criteria, regardless of 310 the offset and limit. 311 312 Notes: 313 * If column value in `criteria` is an array, it matches if the array contains the value (not if it is equal to the array). 314 * `criteria` are key-value pairs specified in the format 'column_name[operation]': '1' and these are converted 315 to SQLAlchemy operations just like conditions in an SQL WHERE clause, to apply as part of the query. 316 * `criteria` is resolved so that it only contains columns that exist on the model. 317 * The results of the query can be sorted by using the 'order_by' and 'order' parameters. By default, ordering is 318 done in ascending order based on 'created_at' column if it exists. 319 * Row-based pagination of results can be done using the 'offset' and 'limit' parameters. 320 321 Example: 322 ```python 323 City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": "London"}) 324 ``` 325 ```python 326 City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": ["London", "Harrogate"]}) 327 ``` 328 ```python 329 # If City doesn't have a 'code' column and is annotated with @filter_path_hint("self->Region->Country.code") 330 City.find_all_with_query_matching_criteria(City.query(), {"code[eq]": "UK"}) 331 ``` 332 ```python 333 # If City is annotated with @filter_path_hint("self->Region->Country.code") 334 City.find_all_with_query_matching_criteria(City.query(), {"Country.code[eq]": "UK"}) 335 ``` 336 337 :param query: The SQLAlchemy query to use (see `query` method). 338 :param criteria: The criteria for filtering the query. 339 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 340 :param order_by: The column to sort results of the query on, if any. 341 :param order: The order to sort results of the query in. 342 :param offset: The offset to use for pagination of results, if any. 343 :param limit: The number of rows to return in the results, if any. 344 :param load_options: The load options to apply to the query, if any. 345 :param with_count: Whether to include the total count of results (given the criteria). 346 :param ignore_joins: Whether to ignore automated joins between tables that come from filter path hints. 347 """ 348 query = cls.query_with_criteria(query, criteria, allow_listing_all, ignore_joins) 349 if query is None: 350 return cls.post_fetch([]), 0 351 return cls.get_results_with_count(query, order_by, order, offset, limit, load_options, with_count) 352 353 @classmethod 354 def get_results_with_count(cls, 355 query: Query, 356 order_by: str = None, 357 order: str = 'asc', 358 offset: int = None, 359 limit: int = None, 360 load_options=None, 361 with_count: bool = False): # pragma: integration 362 try: 363 s = get_session() 364 with s: 365 results_query = cls._apply_load_options(query, load_options) 366 results_query = cls._apply_order(results_query, order_by, order) 367 368 # If offset and limit specified for pagination, use these. If not, return all. 369 if offset: 370 results_query = results_query.offset(offset) 371 if limit: 372 results_query = results_query.limit(limit) 373 374 items, count = cls.results_with_count(results_query, query if with_count else None) 375 return cls.post_fetch(items), count 376 except Exception as e: # pragma: no cover - if something happens we don't want to break. Just log it. 377 log.error(e, exc_info=True) 378 return cls.post_fetch([]), 0 379 380 @classmethod 381 def query_with_criteria(cls, query, 382 criteria: dict, 383 allow_listing_all: bool = False, 384 ignore_joins: bool = False) -> Query | None: # pragma: integration 385 """ 386 Applies the provided `criteria` to the provided SQLAlchemy `query`. 387 See `find_all_with_query_matching_criteria_and_count` for more details. 388 389 :param query: The SQLAlchemy query to apply the criteria to (see `query` method). 390 :param criteria: The criteria for filtering the query. 391 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 392 :param ignore_joins: Whether to ignore automated joins between tables that come from filter path hints. 393 :return: A SQLAlchemy query with the criteria applied. 394 """ 395 relevant_criteria = get_filters_relevant_to_class(cls, criteria) if criteria else {} 396 hinted_fields = [] if ignore_joins else cls._resolve_hinted_fields() 397 # convert list of relevant criteria into format that can be understood by SQLAlchemy 398 query_filters = create_filters_list_from_dict(relevant_criteria, cls) 399 400 has_query_parameters = query_filters or cls._criteria_includes_hinted_field(criteria, hinted_fields) 401 if not has_query_parameters and not allow_listing_all: 402 return None 403 404 try: 405 for item in query_filters: 406 query = query.filter(criteria_to_sql_alchemy_operation(item)) 407 except Exception as e: 408 log.error("Error while trying to apply criteria: " + str(e), exc_info=True) 409 raise ForbiddenException(e) 410 411 # If hinted fields are present, join the relevant entities and apply the criteria 412 try: 413 joined_entities = [] 414 for hinted_pair in hinted_fields: 415 entities, protecting_column = hinted_pair 416 if not entities or not protecting_column: 417 continue 418 criteria_with_operation = cls._get_criteria_key_with_operation_from_column(protecting_column, criteria) 419 if not criteria_with_operation: 420 # Try Entity.column 421 criteria_with_operation = cls._get_criteria_key_with_operation_from_column( 422 f"{entities[-1].__name__}.{protecting_column}", criteria) 423 if protecting_column and criteria_with_operation: 424 value = criteria[criteria_with_operation] 425 # TODO: I think the bit below can be: 426 # (type(value) is list and None in value) or (value is None) 427 # and then we can auto-deal with "None" joins too. 428 # - tco 429 match_null_values = type(value) is list and None in value 430 for e in entities: 431 if e not in joined_entities and not _query_has_entity(query, e): 432 query = query.join(e, isouter=match_null_values) 433 joined_entities.append(e) 434 protecting_column_with_operation = criteria_with_operation.split(".")[-1] if "." in criteria_with_operation else criteria_with_operation 435 item = create_filters_list_from_dict({ 436 protecting_column_with_operation: value 437 }, entities[-1])[0] 438 join_filter = cls._get_join_filters_for_path_hints(item, match_null_values) 439 query = query.filter(join_filter) 440 except Exception as e: 441 log.error("Error while trying to apply hinted entity restriction: " + str(e), exc_info=True) 442 raise ForbiddenException(e) 443 return query 444 445 @classmethod 446 def _apply_load_options(cls, query: Query, load_options: ExecutableOption) -> Query: # pragma: integration 447 if load_options: 448 return query.options(load_options) 449 return query 450 451 @classmethod 452 def _apply_order(cls, query: Query, order_by: str, order: str) -> Query: # pragma: integration 453 # If sort column and order specified, use these with uuid as secondary. If not, order by 'created_at' column if present. 454 if order_by and hasattr(cls, "uuid"): 455 if order and order.lower() == 'desc': 456 return query.order_by(getattr(cls, order_by).desc(), getattr(cls, "uuid").desc()) 457 else: 458 return query.order_by(getattr(cls, order_by).asc(), getattr(cls, "uuid").asc()) 459 elif order_by: 460 if order and order.lower() == 'desc': 461 return query.order_by(getattr(cls, order_by).desc()) 462 else: 463 return query.order_by(getattr(cls, order_by).asc()) 464 elif hasattr(cls, "created_at"): 465 return query.order_by(getattr(cls, "created_at").asc()) 466 467 @classmethod 468 def results_with_count(cls, results_query, count_query=None): # pragma: integration 469 """ 470 Returns the results of a query and the count of the results. 471 472 :param results_query: The query with sort and pagination applied. 473 :param count_query: The query without any sort or pagination applied. 474 :return: 475 """ 476 results = results_query.all() 477 count = count_query.distinct(cls.uuid).options(noload('*')).count() if count_query else None 478 return results, count 479 480 @staticmethod 481 def _get_join_filters_for_path_hints(item, match_null_values): 482 if len(item) == 3: 483 column, operation, value = item 484 if value is None and operation is not ColumnOperators.is_ and operation is not ColumnOperators.is_not: 485 return AlchemyHelper._get_join_filters_for_path_hints(( 486 column, 487 OPERATIONS_RELATED_NONE_MATCHER[operation], 488 None 489 ), match_null_values) 490 491 if type(value) is not list: 492 return operation(column, value) 493 494 if type(value) is list and operation.__name__ == '__eq__': # this is to preserve some backwards compatibility that we had in the old version of this 495 return AlchemyHelper._get_join_filters_for_path_hints(( 496 column, 497 ColumnOperators.in_, 498 value 499 ), match_null_values) 500 501 if match_null_values: 502 none_operation = OPERATIONS_RELATED_NONE_MATCHER[operation] 503 none_boolean_operation = OPERATIONS_RELATED_NONE_MATCHER_BOOLEAN_OPERATION[operation] 504 return none_boolean_operation(operation(column, tuple(value)), none_operation(column, None)) 505 return operation(column, tuple(value)) 506 else: 507 return criteria_to_sql_alchemy_operation(item) 508 509 @staticmethod 510 def _DEPRECATED_get_join_filters_for_path_hints(value, entities, protecting_column, match_null_values): 511 """ 512 This method IS DEPRECATED and is only here to ensure the old behaviour is present in the new method for testing 513 purposes. 514 I kept it so that if there's a scenario we're not testing, we can always check the code for the original intent. 515 :param value: 516 :param entities: 517 :param protecting_column: 518 :param match_null_values: 519 :return: 520 """ 521 if type(value) is not list: 522 return getattr(entities[-1], protecting_column) == value 523 524 if match_null_values: 525 return or_(getattr(entities[-1], protecting_column).in_(tuple(value)), 526 getattr(entities[-1], protecting_column).is_(None)) 527 return getattr(entities[-1], protecting_column).in_(tuple(value)) 528 529 @classmethod 530 def post_fetch(cls, items: list) -> list: # pragma: unit 531 """ 532 Override this class method to do any post-fetch processing on the items. 533 534 :param items: Items that have been fetched by a query. 535 """ 536 return items 537 538 def pre_save(self): # pragma: integration 539 """ 540 Override this class method to do any pre-saving processing on the items. 541 """ 542 return self 543 544 def post_save(self): # pragma: integration 545 """ 546 Override this class method to do any post-saving processing on the items. 547 548 Note: this hook runs after the transaction has been committed; the in-memory instance may not include 549 database-side changes (e.g. triggers, server defaults) unless explicitly refreshed or refetched. 550 """ 551 return self 552 553 def pre_delete(self): # pragma: integration 554 """ 555 Override this class method to do any pre-deletion processing on the items. 556 """ 557 return self 558 559 def post_delete(self): # pragma: integration 560 """ 561 Override this class method to do any post-deletion processing on the items. 562 """ 563 return self 564 565 @classmethod 566 def find_all_matching_criteria(cls, criteria: dict, allow_listing_all=False, order_by=None, 567 order='asc', offset=None, limit=None, load_options=None, 568 with_count: bool = False, ignore_joins=False): # pragma: integration 569 """ 570 Finds all elements that match the provided `criteria`. 571 572 It is a shortcut for `find_all_with_query_matching_criteria` where it pre-populates the query with a simple `select from cls` as a start. 573 574 Subsequent complexities are added via `criteria`. 575 576 Example: 577 ```python 578 City.find_all_matching_criteria({"name": "London"}) 579 ``` 580 ```python 581 City.find_all_matching_criteria({"name": ["London", "Harrogate"]}) 582 ``` 583 584 :param criteria: The criteria to match. 585 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 586 :param order_by: The column to sort results of the query on, if any. 587 :param order: The order to sort results of the query in. 588 :param offset: The offset to use for pagination of results, if any. 589 :param limit: The number of rows to return in the results, if any. 590 :param load_options: The load options to apply to the query when fetching nested objects, if any. 591 :param with_count: Whether to return the count of the results as well. True makes this method return a tuple. 592 """ 593 return cls.find_all_with_query_matching_criteria(cls.query().select_from(cls), 594 criteria, 595 allow_listing_all=allow_listing_all, 596 order_by=order_by, 597 order=order, 598 offset=offset, 599 limit=limit, 600 load_options=load_options, 601 with_count=with_count, 602 ignore_joins=ignore_joins) 603 604 @classmethod 605 def find_first_matching_criteria(cls, criteria, load_options=None, 606 ignore_joins=False): # pragma: integration 607 """ 608 Behaves exactly like `find_all_matching_criteria` but returns only the first item (or `None` if no item is found). 609 610 :param criteria: The criteria to match. 611 :param load_options: The load options to apply to the query when fetching nested objects, if any. 612 :return: 613 """ 614 try: 615 return cls.find_all_matching_criteria(criteria, 616 load_options=load_options, 617 with_count=False, 618 ignore_joins=ignore_joins)[0] 619 except Exception as e: 620 log.warning(f"No {cls.__name__} found") 621 return None 622 623 @classmethod 624 def query(cls, *classes) -> Any or tuple[Any, int]: # pragma: integration 625 """ 626 Returns a SQLAlchemy query for the given class. 627 628 Example: 629 ```python 630 City.query().filter_by(name="London").all() 631 ``` 632 ```python 633 City.query(Country).join(Country).filter(Country.name == "United Kingdom").all() 634 ``` 635 636 :param classes: The classes to query. 637 """ 638 return get_session().query(cls, *classes) 639 640 @classmethod 641 def count_query(cls, *columns) -> Any or tuple[Any, int]: # pragma: integration 642 """ 643 Returns a SQLAlchemy query for the given columns and adds a count for each column. 644 This is useful for getting counts of grouped queries. 645 646 Example: 647 ```python 648 City.query(City.name).group_by(City.name).count() 649 ``` 650 ```python 651 City.query(City.name, Country.name).join(City.country_uuid == Country.uuid).filter(Country.name == "United Kingdom").group_by(City.name, Country.name).count() 652 ``` 653 654 :param classes: The classes to query. 655 """ 656 return get_session().query(*columns, *[func.count(c) for c in columns]) 657 658 @classmethod 659 def save_many(cls, items: list, duplicates_strategy: str = None, preserve_uuids=False): # pragma: integration 660 """ 661 Saves many items of the same type at once. 662 663 This includes any nested objects within the list of items. 664 When you need to maintain relationships between objects, you should set `preserve_uuids` to `True`. 665 666 :param items: The items to save. 667 :param duplicates_strategy: `None`, `'on_conflict_do_update'` or `'on_conflict_do_nothing'` 668 :param preserve_uuids: By default, any uuids are left for the DB to regenerate. Set this to `True` to preserve them. 669 """ 670 try: 671 if not items: 672 return 673 674 _items = items if isinstance(items[0], dict) else [item.pre_save() for item in items] 675 s = get_session() 676 with s: 677 if duplicates_strategy is None: 678 for item in _items: 679 item.validate() 680 s.merge(item) 681 else: 682 if isinstance(_items[0], dict): 683 items_as_dicts = _items 684 else: 685 nested_keys = [n.key for n in cls.get_nested_models()] 686 uuid = [] if preserve_uuids else ["uuid"] 687 excludes = ["created_at", "updated_at", *uuid, *nested_keys] 688 items_as_dicts = [i.as_dict(excludes=excludes, resolve_defaults=True) for i in _items] 689 if not preserve_uuids: 690 for i in items_as_dicts: 691 i["uuid"] = create_uuid() 692 statement = insert(cls).values(items_as_dicts) 693 if duplicates_strategy == "on_conflict_do_nothing": 694 s.execute(statement.on_conflict_do_nothing()) 695 if duplicates_strategy == "on_conflict_do_update": 696 upsert_statement = statement.on_conflict_do_update( 697 index_elements=[c for c in cls.get_unique_constraint_names()], 698 set_=dict((c, getattr(statement.excluded, c)) for c in cls.get_column_names()) 699 ) 700 s.execute(upsert_statement) 701 s.commit() 702 for item in _items: 703 if getattr(item, "post_save", None): 704 item.post_save() 705 except IntegrityError as integrity_error: 706 cls._handle_integrity_error(integrity_error) 707 708 @classmethod 709 def delete_many(cls, items: list): # pragma: integration 710 """ 711 Deletes many items of the same type at once. 712 :param items: The items to delete. 713 """ 714 _items = [item.pre_delete() for item in items] 715 s = get_session() 716 with s: 717 for item in _items: 718 s.delete(item) 719 s.commit() 720 for item in _items: 721 item.post_delete() 722 723 @classmethod 724 def _session(cls): # pragma: integration 725 return get_session() 726 727 @classmethod 728 def _handle_integrity_error(cls, integrity_error): # pragma: integration 729 if integrity_error.orig.pgcode == '23505': 730 log.error("Error '" + integrity_error.orig.pgcode + "' while saving due to constraint violation: " 731 + str(integrity_error)) 732 raise ConflictingObjectException(cls, cls.get_unique_identifier_names()) 733 else: 734 log.error("Error '" + integrity_error.orig.pgcode + "' while saving: " + str(integrity_error)) 735 raise BedrockException(str(integrity_error.orig.pgerror)) 736 737 def save(self): # pragma: integration 738 """ 739 Validates and saves the object to the database. 740 This also takes care of handling integrity errors by raising nicer exceptions. 741 742 Note: This method will remove all objects from the session before adding itself back in. 743 This is to avoid some weird shenanigans that I don't fully understand, but so far it seems to work. 744 """ 745 self.pre_save() 746 s = get_session() 747 try: 748 self.validate() 749 with s: 750 s.expunge_all() 751 s.add(self) 752 s.merge(self) 753 s.commit() 754 except IntegrityError as integrity_error: 755 self._handle_integrity_error(integrity_error) 756 return self.post_save() 757 758 def delete(self): # pragma: integration 759 """ 760 Deletes the object from the database. 761 """ 762 self.pre_delete() 763 s = get_session() 764 with s: 765 s.delete(self) 766 s.commit() 767 self.post_delete() 768 769 def refetch(self, load_options=None): # pragma: integration 770 """ 771 Re-fetches the object from the database. 772 It tries to do it by using `find_first_matching_criteria` with the primary key values. 773 :param load_options: The load options to apply when re-fetching the object, if any. 774 """ 775 criteria = dict((add_default_equality_operator(pk), getattr(self, pk)) for pk in self.get_primary_key_names()) 776 return self.find_first_matching_criteria(criteria, load_options=load_options, 777 ignore_joins=True) 778 779 def get_auto_populated_models(self, excludes=[]) -> dict: # pragma: integration 780 """ 781 Generates nested models based on the defined `__auto_populate__` dictionary. 782 783 See the top of this module, for an example of how to set this up. 784 785 :param excludes: Which column names to exclude from the auto-population. 786 """ 787 auto_populated_models = {} 788 for column_name, info in self.__auto_populate__.items(): 789 if column_name not in excludes: 790 partitions = info["partition_by"].get_all() 791 auto_populated_models[column_name] = self._get_auto_populated_models(info["model"], 792 info["template"], 793 partitions) 794 return auto_populated_models 795 796 def _get_auto_populated_models(self, model, template, partitions: list): # pragma: unit 797 auto_populated_models = [] 798 for p in partitions: 799 model_instance = model() 800 model_instance.uuid = create_uuid() 801 for model_column, _value in template.items(): 802 value = _value 803 if isinstance(_value, str) and "{{" in _value: 804 placeholder_key = _value.replace("{{", "").replace("}}", "") 805 if placeholder_key == "partition": 806 value = p 807 elif "partition." in placeholder_key: 808 value = getattr(p, placeholder_key.replace("partition.", "")) 809 else: 810 value = getattr(self, placeholder_key) 811 setattr(model_instance, model_column, value) 812 auto_populated_models.append(model_instance) 813 return auto_populated_models 814 815 def validate(self): # pragma: unit 816 """ 817 Validates whether nullables are filled in because SQLAlchemy's validation is a bit meh 818 """ 819 class_attributes = find_all_attributes(self.__class__) 820 invalid_items = [] 821 for attribute in class_attributes: 822 column = self.__class__.get_column_definition(attribute) 823 value = getattr(self, attribute) 824 if column is not None: 825 if column.server_default is None \ 826 and column.default is None \ 827 and not column.nullable \ 828 and (value is None or not value_is_populated(value)): 829 invalid_items.append(attribute) 830 if issubclass(column.type.python_type, Enum) \ 831 and not _is_valid_enum_value(value, column.type.python_type): 832 if not (value is None and column.nullable): 833 invalid_items.append(attribute) 834 if len(invalid_items) > 0: 835 raise ObjectValidationException(self.__class__, invalid_items) 836 return self 837 838 839def _is_valid_enum_value(value, enum): # pragma: unit 840 if isinstance(value, enum): 841 return value in enum 842 return value in enum._member_names_ 843 844 845def same_as(column_name: str): # pragma: unit 846 """ 847 Shortcut method to create a default value that is the same as whatever was specified for another column. 848 849 You'd use this for situations where `ColumnA` defaults to `ColumnB`'s value if `ColumnA` is not specified. 850 """ 851 852 def default_function(context): 853 if isinstance(context, AlchemyHelper): 854 return getattr(context, column_name) 855 if hasattr(context, "current_parameters"): 856 return context.current_parameters.get(column_name) 857 return None 858 859 return default_function 860 861 862def _query_has_entity(query: Query, model) -> bool: # pragma: unit 863 """ 864 This method checks whether the provided SQLAlchemy `query` already has a join for the provided `model`. 865 See https://stackoverflow.com/a/66855484 866 867 :param query: 868 :param entity_class: 869 :return: 870 """ 871 for visitor in visitors.iterate(query.statement): 872 # Checking for `.join(Parent.child)` clauses 873 if visitor.__visit_name__ == 'binary': 874 for vis in visitors.iterate(visitor): 875 # Visitor might not have table attribute 876 with suppress(AttributeError): 877 # Verify if already present based on table name 878 if model.__table__.fullname == vis.table.fullname: 879 return True 880 # Checking for `.join(Child)` clauses 881 if visitor.__visit_name__ == 'table': 882 # Visitor might be of ColumnCollection or so, 883 # which cannot be compared to model 884 with suppress(TypeError): 885 if model == visitor.entity_namespace: 886 return True 887 # Checking for `Model.column` clauses 888 if visitor.__visit_name__ == "column": 889 with suppress(AttributeError): 890 if model.__table__.fullname == visitor.table.fullname: 891 return True 892 return False
34class AlchemyHelper(object): 35 """ 36 AlchemyHelper is a class that provides some helper methods for SQLAlchemy CRUD operations. 37 Also, it helps establish automatic relationships between various Bedrock models (including auto-population of 38 nested objects). 39 40 ---- 41 42 For auto-population to work, you must define a class attribute called `__auto_populate__` on your model. This should 43 be a dictionary that looks like this: 44 ```python 45 # Imagine this is a model for an Icecream Shop. Shops have a many-to-many relationship with Flavours 46 # (via `IcecreamShopFlavourLink`), but they all get initialised with all the ice cream flavour types. 47 __auto_populate__ = { 48 "flavours": { # This is the name of the attribute on the model (typically the relationship name) 49 "model": IcecreamShopFlavourLink, # The model that represents the relationship 50 "partition_by": IcecreamFlavour, # This will use DB records in IcecreamFlavour to generate that many records for this auto-population 51 "template": { 52 "shop_uuid": "{{uuid}}", # The shop's uuid 53 "flavour_type_uuid": "{{partition.uuid}}", # The flavour type's (partition's) uuid 54 "flavour_type": "{{partition}}", # The partition 55 "inventory": 50 # Another column that IcecreamShopFlavourLink has 56 } 57 } 58 } 59 ``` 60 """ 61 __auto_populate__ = {} 62 63 __is_global_item__ = False 64 65 __excluded_attributes_from_json__ = [] 66 67 @classmethod 68 def get(cls, value, restrictions=None, load_options=None): # pragma: integration 69 """ 70 Gets a single object by its primary key(s). 71 72 Example: 73 ```python 74 City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879") 75 ``` 76 ```python 77 City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879", restrictions={"country_code": "GB"}) 78 ``` 79 80 :param value: Primary key value(s) 81 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 82 :param load_options: The load options to apply to the query when fetching nested objects, if any. 83 """ 84 # SQLAlchemy complains when a class has no primary key, so no need to check for that 85 unique_columns = cls.get_unique_identifier_names() 86 87 if isinstance(value, dict): 88 dict_value = value 89 else: 90 dict_value = {unique_columns[0]: value} 91 result = cls._find_all_by_unique_column(dict_value, 92 unique_column=unique_columns, 93 restrictions=restrictions, 94 load_options=load_options) 95 try: 96 return result[0] 97 except: 98 return None 99 100 @classmethod 101 def get_all(cls) -> list: # pragma: integration 102 """ 103 Gets all objects of this type. 104 105 Example: 106 ```python 107 City.get_all() 108 ``` 109 """ 110 if cls.__is_global_item__ and cls.__name__ in CACHE and CACHE[cls.__name__]: 111 return cls.post_fetch(CACHE[cls.__name__]) 112 s = get_session() 113 with s: 114 CACHE[cls.__name__] = s.query(cls).all() 115 return cls.post_fetch(CACHE[cls.__name__]) 116 117 @classmethod 118 def find_all_by(cls, key: str, value, restrictions=None, 119 load_options=None, with_count: bool = False) -> list or tuple[list, int]: # pragma: integration 120 """ 121 Find all objects by a key and value. If value is an array, it finds records that match any of the values. 122 123 Makes use of `find_all_matching_criteria`. 124 125 Example: 126 ```python 127 City.find_all_by("name", "London", restrictions={"country_code": "GB"}) 128 ``` 129 ```python 130 City.find_all_by("name", ["London", "Harrogate"], restrictions={"country_code": "GB"}) 131 ``` 132 133 :param key: Key to search by 134 :param value: Value(s) for the key. If it's an array it looks for any of the values. 135 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 136 :param load_options: The load options to apply to the query when fetching nested objects, if any. 137 :param with_count: Whether to return the count of the results as well. True makes this method return a tuple. 138 """ 139 _restrictions = restrictions if restrictions else {} 140 return cls.find_all_matching_criteria({add_default_equality_operator(key): value, **_restrictions}, 141 load_options=load_options, with_count=with_count) 142 143 @classmethod 144 def find_all_by_unique_column(cls, value, unique_column: str or list = None, 145 restrictions=None, load_options=None): # pragma: integration 146 """ 147 This method is a little like `get` but it allows you to search by any unique column(s) rather than just the primary key. 148 149 The true magic here is when `unique_column` is left up for the method to work out. 150 151 If the provided `unique_column` isn't actually unique, it behaves like `find_all_by`. 152 153 Makes use of `find_all_matching_criteria` (somewhere down the line) 154 155 Example: 156 ```python 157 # Assuming Country is defined with a UniqueConstraint on `country_code` 158 Country.find_all_by_unique_column("GB") 159 ``` 160 ```python 161 City.find_all_by_unique_column("London", unique_column="name", restrictions={"country_code": "GB"}) 162 ``` 163 164 :param value: Value(s) for the key. If it's an array it looks for any of the values. 165 :param unique_column: The unique column(s) to search by. If not provided, it will use the unique constraints to find the columns. 166 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 167 :param load_options: The load options to apply to the query when fetching nested objects, if any. 168 """ 169 _unique_column = unique_column if unique_column is not None else cls.get_unique_constraint_names() 170 if isinstance(value, list): 171 _value = {} 172 for item in value: 173 for k, v in item.items(): 174 if k not in _value: 175 _value[k] = [] 176 _value[k].append(v) 177 else: 178 _value = value 179 return cls._find_all_by_unique_column(_value, _unique_column, restrictions=restrictions, 180 load_options=load_options) 181 182 @classmethod 183 def _find_all_by_unique_column(cls, value_criteria, unique_column: str or list = "uuid", 184 restrictions=None, load_options=None): # pragma: integration 185 """ 186 Gets a number of objects (if a list is provided as the criteria) by their unique column(s). 187 188 :param value_criteria: Dictionary or a criteria 189 :param unique_column: A column name or a list of column names 190 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 191 :param load_options: The load options to apply to the query when fetching nested objects, if any. 192 193 """ 194 _restrictions = restrictions if restrictions else {} 195 if type(unique_column) is list: 196 criteria = {} 197 for key in unique_column: 198 if key is not None and has_key(value_criteria, key, snake_and_camel_case=True): 199 criteria[add_default_equality_operator(snake_case_to_camelCase(key))] \ 200 = get_snake_or_camel_case_value(value_criteria, key) 201 return cls.find_all_matching_criteria({**criteria, **_restrictions}, 202 load_options=load_options) 203 if unique_column in value_criteria and unique_column is not None: 204 return cls.find_all_by(unique_column, value_criteria[unique_column].lower(), 205 restrictions=_restrictions, 206 load_options=load_options) 207 return cls.post_fetch([]) 208 209 @classmethod 210 def _resolve_hinted_field(cls, path_hint: str, entities_dict: dict) -> tuple[list, str]: # pragma: integration 211 join_path = path_hint.split(".")[0] 212 protecting_column = path_hint.split(".")[1] 213 entities = [entities_dict[e] for e in join_path.split("->") if e.lower() != "self" and e in entities_dict] 214 return entities, protecting_column 215 216 @classmethod 217 def _resolve_hinted_fields(cls) -> list[tuple[list, str]]: # pragma: integration 218 """ 219 Resolves the hinted entity path. 220 It does this by walking through the `->` separated path to get from itself to the table that holds the end field. 221 222 This is often used to restrict access to an entity based on the user's permissions. 223 224 For example, for a City to be protected by its country code, it might look like: 225 ```python 226 "self->Region->Country.country_code" 227 ``` 228 """ 229 if not hasattr(cls, "__filter_path_hints__"): 230 cls.__filter_path_hints__ = [] 231 if not cls.__filter_path_hints__: 232 return [([], "")] 233 all_model_entities = cls.get_entities() 234 return [cls._resolve_hinted_field(path_hint, all_model_entities) for path_hint in cls.__filter_path_hints__] 235 236 @classmethod 237 def _criteria_includes_hinted_field(cls, criteria: dict, path_field_pairs: list[tuple[list, str]]): # pragma: unit 238 if not cls.__filter_path_hints__ or not criteria: 239 return False 240 hinted_fields = [pair[1] for pair in path_field_pairs] 241 hinted_fields_with_model = [f"{pair[0][-1].__name__}.{pair[1]}" for pair in path_field_pairs] 242 for column in cls._criteria_columns(criteria): 243 if column in hinted_fields or column in hinted_fields_with_model: 244 return True 245 246 return False 247 248 @classmethod 249 def _get_criteria_key_with_operation_from_column(cls, column, criteria): # pragma: unit 250 if not criteria or not column: 251 return None 252 criteria_with_operations = [c for c in criteria.keys() if c.split('[')[0] == column] 253 if criteria_with_operations: 254 return criteria_with_operations[0] 255 return None 256 257 @classmethod 258 def _criteria_columns(cls, criteria): # pragma: unit 259 return [c.split('[')[0] for c in criteria.keys()] 260 261 @classmethod 262 def find_all_with_query_matching_criteria(cls, query, criteria: dict, allow_listing_all=False, order_by=None, 263 order='asc', offset=None, limit=None, 264 load_options=None, 265 with_count=False, 266 ignore_joins=False) -> list or tuple[list, int]: # pragma: integration 267 """ 268 See `AlchemyHelper.find_all_with_query_matching_criteria_and_count` for more details. 269 The only difference is that this method lets you remove the count part with `with_count=False` (the default 270 behaviour for now). 271 :param query: The SQLAlchemy query to use (see `query` method). 272 :param criteria: The criteria to match. 273 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 274 :param order_by: The column to sort results of the query on, if any. 275 :param order: The order to sort results of the query in. 276 :param offset: The offset to use for pagination of results, if any. 277 :param limit: The number of rows to return in the results, if any. 278 :param load_options: The load options to apply to the query when fetching nested objects, if any. 279 :param with_count: Whether to include the total count of results (given the criteria) 280 :return: 281 """ 282 results, count = cls.find_all_with_query_matching_criteria_and_count( 283 query, 284 criteria, 285 allow_listing_all=allow_listing_all, 286 order_by=order_by, 287 order=order, 288 offset=offset, 289 limit=limit, 290 load_options=load_options, 291 with_count=with_count, 292 ignore_joins=ignore_joins) 293 if with_count: 294 return results, count 295 return results 296 297 @classmethod 298 def find_all_with_query_matching_criteria_and_count(cls, query, 299 criteria: dict, 300 allow_listing_all: bool = False, 301 order_by: str = None, 302 order: str = 'asc', 303 offset: int = None, 304 limit: int = None, 305 load_options=None, 306 with_count: bool = False, 307 ignore_joins=False): # pragma: integration 308 """ 309 Finds all elements that match the provided SQLAlchemy `query` that match the provided `criteria`. 310 Will return a tuple with the results and the total count of the results that match the criteria, regardless of 311 the offset and limit. 312 313 Notes: 314 * If column value in `criteria` is an array, it matches if the array contains the value (not if it is equal to the array). 315 * `criteria` are key-value pairs specified in the format 'column_name[operation]': '1' and these are converted 316 to SQLAlchemy operations just like conditions in an SQL WHERE clause, to apply as part of the query. 317 * `criteria` is resolved so that it only contains columns that exist on the model. 318 * The results of the query can be sorted by using the 'order_by' and 'order' parameters. By default, ordering is 319 done in ascending order based on 'created_at' column if it exists. 320 * Row-based pagination of results can be done using the 'offset' and 'limit' parameters. 321 322 Example: 323 ```python 324 City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": "London"}) 325 ``` 326 ```python 327 City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": ["London", "Harrogate"]}) 328 ``` 329 ```python 330 # If City doesn't have a 'code' column and is annotated with @filter_path_hint("self->Region->Country.code") 331 City.find_all_with_query_matching_criteria(City.query(), {"code[eq]": "UK"}) 332 ``` 333 ```python 334 # If City is annotated with @filter_path_hint("self->Region->Country.code") 335 City.find_all_with_query_matching_criteria(City.query(), {"Country.code[eq]": "UK"}) 336 ``` 337 338 :param query: The SQLAlchemy query to use (see `query` method). 339 :param criteria: The criteria for filtering the query. 340 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 341 :param order_by: The column to sort results of the query on, if any. 342 :param order: The order to sort results of the query in. 343 :param offset: The offset to use for pagination of results, if any. 344 :param limit: The number of rows to return in the results, if any. 345 :param load_options: The load options to apply to the query, if any. 346 :param with_count: Whether to include the total count of results (given the criteria). 347 :param ignore_joins: Whether to ignore automated joins between tables that come from filter path hints. 348 """ 349 query = cls.query_with_criteria(query, criteria, allow_listing_all, ignore_joins) 350 if query is None: 351 return cls.post_fetch([]), 0 352 return cls.get_results_with_count(query, order_by, order, offset, limit, load_options, with_count) 353 354 @classmethod 355 def get_results_with_count(cls, 356 query: Query, 357 order_by: str = None, 358 order: str = 'asc', 359 offset: int = None, 360 limit: int = None, 361 load_options=None, 362 with_count: bool = False): # pragma: integration 363 try: 364 s = get_session() 365 with s: 366 results_query = cls._apply_load_options(query, load_options) 367 results_query = cls._apply_order(results_query, order_by, order) 368 369 # If offset and limit specified for pagination, use these. If not, return all. 370 if offset: 371 results_query = results_query.offset(offset) 372 if limit: 373 results_query = results_query.limit(limit) 374 375 items, count = cls.results_with_count(results_query, query if with_count else None) 376 return cls.post_fetch(items), count 377 except Exception as e: # pragma: no cover - if something happens we don't want to break. Just log it. 378 log.error(e, exc_info=True) 379 return cls.post_fetch([]), 0 380 381 @classmethod 382 def query_with_criteria(cls, query, 383 criteria: dict, 384 allow_listing_all: bool = False, 385 ignore_joins: bool = False) -> Query | None: # pragma: integration 386 """ 387 Applies the provided `criteria` to the provided SQLAlchemy `query`. 388 See `find_all_with_query_matching_criteria_and_count` for more details. 389 390 :param query: The SQLAlchemy query to apply the criteria to (see `query` method). 391 :param criteria: The criteria for filtering the query. 392 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 393 :param ignore_joins: Whether to ignore automated joins between tables that come from filter path hints. 394 :return: A SQLAlchemy query with the criteria applied. 395 """ 396 relevant_criteria = get_filters_relevant_to_class(cls, criteria) if criteria else {} 397 hinted_fields = [] if ignore_joins else cls._resolve_hinted_fields() 398 # convert list of relevant criteria into format that can be understood by SQLAlchemy 399 query_filters = create_filters_list_from_dict(relevant_criteria, cls) 400 401 has_query_parameters = query_filters or cls._criteria_includes_hinted_field(criteria, hinted_fields) 402 if not has_query_parameters and not allow_listing_all: 403 return None 404 405 try: 406 for item in query_filters: 407 query = query.filter(criteria_to_sql_alchemy_operation(item)) 408 except Exception as e: 409 log.error("Error while trying to apply criteria: " + str(e), exc_info=True) 410 raise ForbiddenException(e) 411 412 # If hinted fields are present, join the relevant entities and apply the criteria 413 try: 414 joined_entities = [] 415 for hinted_pair in hinted_fields: 416 entities, protecting_column = hinted_pair 417 if not entities or not protecting_column: 418 continue 419 criteria_with_operation = cls._get_criteria_key_with_operation_from_column(protecting_column, criteria) 420 if not criteria_with_operation: 421 # Try Entity.column 422 criteria_with_operation = cls._get_criteria_key_with_operation_from_column( 423 f"{entities[-1].__name__}.{protecting_column}", criteria) 424 if protecting_column and criteria_with_operation: 425 value = criteria[criteria_with_operation] 426 # TODO: I think the bit below can be: 427 # (type(value) is list and None in value) or (value is None) 428 # and then we can auto-deal with "None" joins too. 429 # - tco 430 match_null_values = type(value) is list and None in value 431 for e in entities: 432 if e not in joined_entities and not _query_has_entity(query, e): 433 query = query.join(e, isouter=match_null_values) 434 joined_entities.append(e) 435 protecting_column_with_operation = criteria_with_operation.split(".")[-1] if "." in criteria_with_operation else criteria_with_operation 436 item = create_filters_list_from_dict({ 437 protecting_column_with_operation: value 438 }, entities[-1])[0] 439 join_filter = cls._get_join_filters_for_path_hints(item, match_null_values) 440 query = query.filter(join_filter) 441 except Exception as e: 442 log.error("Error while trying to apply hinted entity restriction: " + str(e), exc_info=True) 443 raise ForbiddenException(e) 444 return query 445 446 @classmethod 447 def _apply_load_options(cls, query: Query, load_options: ExecutableOption) -> Query: # pragma: integration 448 if load_options: 449 return query.options(load_options) 450 return query 451 452 @classmethod 453 def _apply_order(cls, query: Query, order_by: str, order: str) -> Query: # pragma: integration 454 # If sort column and order specified, use these with uuid as secondary. If not, order by 'created_at' column if present. 455 if order_by and hasattr(cls, "uuid"): 456 if order and order.lower() == 'desc': 457 return query.order_by(getattr(cls, order_by).desc(), getattr(cls, "uuid").desc()) 458 else: 459 return query.order_by(getattr(cls, order_by).asc(), getattr(cls, "uuid").asc()) 460 elif order_by: 461 if order and order.lower() == 'desc': 462 return query.order_by(getattr(cls, order_by).desc()) 463 else: 464 return query.order_by(getattr(cls, order_by).asc()) 465 elif hasattr(cls, "created_at"): 466 return query.order_by(getattr(cls, "created_at").asc()) 467 468 @classmethod 469 def results_with_count(cls, results_query, count_query=None): # pragma: integration 470 """ 471 Returns the results of a query and the count of the results. 472 473 :param results_query: The query with sort and pagination applied. 474 :param count_query: The query without any sort or pagination applied. 475 :return: 476 """ 477 results = results_query.all() 478 count = count_query.distinct(cls.uuid).options(noload('*')).count() if count_query else None 479 return results, count 480 481 @staticmethod 482 def _get_join_filters_for_path_hints(item, match_null_values): 483 if len(item) == 3: 484 column, operation, value = item 485 if value is None and operation is not ColumnOperators.is_ and operation is not ColumnOperators.is_not: 486 return AlchemyHelper._get_join_filters_for_path_hints(( 487 column, 488 OPERATIONS_RELATED_NONE_MATCHER[operation], 489 None 490 ), match_null_values) 491 492 if type(value) is not list: 493 return operation(column, value) 494 495 if type(value) is list and operation.__name__ == '__eq__': # this is to preserve some backwards compatibility that we had in the old version of this 496 return AlchemyHelper._get_join_filters_for_path_hints(( 497 column, 498 ColumnOperators.in_, 499 value 500 ), match_null_values) 501 502 if match_null_values: 503 none_operation = OPERATIONS_RELATED_NONE_MATCHER[operation] 504 none_boolean_operation = OPERATIONS_RELATED_NONE_MATCHER_BOOLEAN_OPERATION[operation] 505 return none_boolean_operation(operation(column, tuple(value)), none_operation(column, None)) 506 return operation(column, tuple(value)) 507 else: 508 return criteria_to_sql_alchemy_operation(item) 509 510 @staticmethod 511 def _DEPRECATED_get_join_filters_for_path_hints(value, entities, protecting_column, match_null_values): 512 """ 513 This method IS DEPRECATED and is only here to ensure the old behaviour is present in the new method for testing 514 purposes. 515 I kept it so that if there's a scenario we're not testing, we can always check the code for the original intent. 516 :param value: 517 :param entities: 518 :param protecting_column: 519 :param match_null_values: 520 :return: 521 """ 522 if type(value) is not list: 523 return getattr(entities[-1], protecting_column) == value 524 525 if match_null_values: 526 return or_(getattr(entities[-1], protecting_column).in_(tuple(value)), 527 getattr(entities[-1], protecting_column).is_(None)) 528 return getattr(entities[-1], protecting_column).in_(tuple(value)) 529 530 @classmethod 531 def post_fetch(cls, items: list) -> list: # pragma: unit 532 """ 533 Override this class method to do any post-fetch processing on the items. 534 535 :param items: Items that have been fetched by a query. 536 """ 537 return items 538 539 def pre_save(self): # pragma: integration 540 """ 541 Override this class method to do any pre-saving processing on the items. 542 """ 543 return self 544 545 def post_save(self): # pragma: integration 546 """ 547 Override this class method to do any post-saving processing on the items. 548 549 Note: this hook runs after the transaction has been committed; the in-memory instance may not include 550 database-side changes (e.g. triggers, server defaults) unless explicitly refreshed or refetched. 551 """ 552 return self 553 554 def pre_delete(self): # pragma: integration 555 """ 556 Override this class method to do any pre-deletion processing on the items. 557 """ 558 return self 559 560 def post_delete(self): # pragma: integration 561 """ 562 Override this class method to do any post-deletion processing on the items. 563 """ 564 return self 565 566 @classmethod 567 def find_all_matching_criteria(cls, criteria: dict, allow_listing_all=False, order_by=None, 568 order='asc', offset=None, limit=None, load_options=None, 569 with_count: bool = False, ignore_joins=False): # pragma: integration 570 """ 571 Finds all elements that match the provided `criteria`. 572 573 It is a shortcut for `find_all_with_query_matching_criteria` where it pre-populates the query with a simple `select from cls` as a start. 574 575 Subsequent complexities are added via `criteria`. 576 577 Example: 578 ```python 579 City.find_all_matching_criteria({"name": "London"}) 580 ``` 581 ```python 582 City.find_all_matching_criteria({"name": ["London", "Harrogate"]}) 583 ``` 584 585 :param criteria: The criteria to match. 586 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 587 :param order_by: The column to sort results of the query on, if any. 588 :param order: The order to sort results of the query in. 589 :param offset: The offset to use for pagination of results, if any. 590 :param limit: The number of rows to return in the results, if any. 591 :param load_options: The load options to apply to the query when fetching nested objects, if any. 592 :param with_count: Whether to return the count of the results as well. True makes this method return a tuple. 593 """ 594 return cls.find_all_with_query_matching_criteria(cls.query().select_from(cls), 595 criteria, 596 allow_listing_all=allow_listing_all, 597 order_by=order_by, 598 order=order, 599 offset=offset, 600 limit=limit, 601 load_options=load_options, 602 with_count=with_count, 603 ignore_joins=ignore_joins) 604 605 @classmethod 606 def find_first_matching_criteria(cls, criteria, load_options=None, 607 ignore_joins=False): # pragma: integration 608 """ 609 Behaves exactly like `find_all_matching_criteria` but returns only the first item (or `None` if no item is found). 610 611 :param criteria: The criteria to match. 612 :param load_options: The load options to apply to the query when fetching nested objects, if any. 613 :return: 614 """ 615 try: 616 return cls.find_all_matching_criteria(criteria, 617 load_options=load_options, 618 with_count=False, 619 ignore_joins=ignore_joins)[0] 620 except Exception as e: 621 log.warning(f"No {cls.__name__} found") 622 return None 623 624 @classmethod 625 def query(cls, *classes) -> Any or tuple[Any, int]: # pragma: integration 626 """ 627 Returns a SQLAlchemy query for the given class. 628 629 Example: 630 ```python 631 City.query().filter_by(name="London").all() 632 ``` 633 ```python 634 City.query(Country).join(Country).filter(Country.name == "United Kingdom").all() 635 ``` 636 637 :param classes: The classes to query. 638 """ 639 return get_session().query(cls, *classes) 640 641 @classmethod 642 def count_query(cls, *columns) -> Any or tuple[Any, int]: # pragma: integration 643 """ 644 Returns a SQLAlchemy query for the given columns and adds a count for each column. 645 This is useful for getting counts of grouped queries. 646 647 Example: 648 ```python 649 City.query(City.name).group_by(City.name).count() 650 ``` 651 ```python 652 City.query(City.name, Country.name).join(City.country_uuid == Country.uuid).filter(Country.name == "United Kingdom").group_by(City.name, Country.name).count() 653 ``` 654 655 :param classes: The classes to query. 656 """ 657 return get_session().query(*columns, *[func.count(c) for c in columns]) 658 659 @classmethod 660 def save_many(cls, items: list, duplicates_strategy: str = None, preserve_uuids=False): # pragma: integration 661 """ 662 Saves many items of the same type at once. 663 664 This includes any nested objects within the list of items. 665 When you need to maintain relationships between objects, you should set `preserve_uuids` to `True`. 666 667 :param items: The items to save. 668 :param duplicates_strategy: `None`, `'on_conflict_do_update'` or `'on_conflict_do_nothing'` 669 :param preserve_uuids: By default, any uuids are left for the DB to regenerate. Set this to `True` to preserve them. 670 """ 671 try: 672 if not items: 673 return 674 675 _items = items if isinstance(items[0], dict) else [item.pre_save() for item in items] 676 s = get_session() 677 with s: 678 if duplicates_strategy is None: 679 for item in _items: 680 item.validate() 681 s.merge(item) 682 else: 683 if isinstance(_items[0], dict): 684 items_as_dicts = _items 685 else: 686 nested_keys = [n.key for n in cls.get_nested_models()] 687 uuid = [] if preserve_uuids else ["uuid"] 688 excludes = ["created_at", "updated_at", *uuid, *nested_keys] 689 items_as_dicts = [i.as_dict(excludes=excludes, resolve_defaults=True) for i in _items] 690 if not preserve_uuids: 691 for i in items_as_dicts: 692 i["uuid"] = create_uuid() 693 statement = insert(cls).values(items_as_dicts) 694 if duplicates_strategy == "on_conflict_do_nothing": 695 s.execute(statement.on_conflict_do_nothing()) 696 if duplicates_strategy == "on_conflict_do_update": 697 upsert_statement = statement.on_conflict_do_update( 698 index_elements=[c for c in cls.get_unique_constraint_names()], 699 set_=dict((c, getattr(statement.excluded, c)) for c in cls.get_column_names()) 700 ) 701 s.execute(upsert_statement) 702 s.commit() 703 for item in _items: 704 if getattr(item, "post_save", None): 705 item.post_save() 706 except IntegrityError as integrity_error: 707 cls._handle_integrity_error(integrity_error) 708 709 @classmethod 710 def delete_many(cls, items: list): # pragma: integration 711 """ 712 Deletes many items of the same type at once. 713 :param items: The items to delete. 714 """ 715 _items = [item.pre_delete() for item in items] 716 s = get_session() 717 with s: 718 for item in _items: 719 s.delete(item) 720 s.commit() 721 for item in _items: 722 item.post_delete() 723 724 @classmethod 725 def _session(cls): # pragma: integration 726 return get_session() 727 728 @classmethod 729 def _handle_integrity_error(cls, integrity_error): # pragma: integration 730 if integrity_error.orig.pgcode == '23505': 731 log.error("Error '" + integrity_error.orig.pgcode + "' while saving due to constraint violation: " 732 + str(integrity_error)) 733 raise ConflictingObjectException(cls, cls.get_unique_identifier_names()) 734 else: 735 log.error("Error '" + integrity_error.orig.pgcode + "' while saving: " + str(integrity_error)) 736 raise BedrockException(str(integrity_error.orig.pgerror)) 737 738 def save(self): # pragma: integration 739 """ 740 Validates and saves the object to the database. 741 This also takes care of handling integrity errors by raising nicer exceptions. 742 743 Note: This method will remove all objects from the session before adding itself back in. 744 This is to avoid some weird shenanigans that I don't fully understand, but so far it seems to work. 745 """ 746 self.pre_save() 747 s = get_session() 748 try: 749 self.validate() 750 with s: 751 s.expunge_all() 752 s.add(self) 753 s.merge(self) 754 s.commit() 755 except IntegrityError as integrity_error: 756 self._handle_integrity_error(integrity_error) 757 return self.post_save() 758 759 def delete(self): # pragma: integration 760 """ 761 Deletes the object from the database. 762 """ 763 self.pre_delete() 764 s = get_session() 765 with s: 766 s.delete(self) 767 s.commit() 768 self.post_delete() 769 770 def refetch(self, load_options=None): # pragma: integration 771 """ 772 Re-fetches the object from the database. 773 It tries to do it by using `find_first_matching_criteria` with the primary key values. 774 :param load_options: The load options to apply when re-fetching the object, if any. 775 """ 776 criteria = dict((add_default_equality_operator(pk), getattr(self, pk)) for pk in self.get_primary_key_names()) 777 return self.find_first_matching_criteria(criteria, load_options=load_options, 778 ignore_joins=True) 779 780 def get_auto_populated_models(self, excludes=[]) -> dict: # pragma: integration 781 """ 782 Generates nested models based on the defined `__auto_populate__` dictionary. 783 784 See the top of this module, for an example of how to set this up. 785 786 :param excludes: Which column names to exclude from the auto-population. 787 """ 788 auto_populated_models = {} 789 for column_name, info in self.__auto_populate__.items(): 790 if column_name not in excludes: 791 partitions = info["partition_by"].get_all() 792 auto_populated_models[column_name] = self._get_auto_populated_models(info["model"], 793 info["template"], 794 partitions) 795 return auto_populated_models 796 797 def _get_auto_populated_models(self, model, template, partitions: list): # pragma: unit 798 auto_populated_models = [] 799 for p in partitions: 800 model_instance = model() 801 model_instance.uuid = create_uuid() 802 for model_column, _value in template.items(): 803 value = _value 804 if isinstance(_value, str) and "{{" in _value: 805 placeholder_key = _value.replace("{{", "").replace("}}", "") 806 if placeholder_key == "partition": 807 value = p 808 elif "partition." in placeholder_key: 809 value = getattr(p, placeholder_key.replace("partition.", "")) 810 else: 811 value = getattr(self, placeholder_key) 812 setattr(model_instance, model_column, value) 813 auto_populated_models.append(model_instance) 814 return auto_populated_models 815 816 def validate(self): # pragma: unit 817 """ 818 Validates whether nullables are filled in because SQLAlchemy's validation is a bit meh 819 """ 820 class_attributes = find_all_attributes(self.__class__) 821 invalid_items = [] 822 for attribute in class_attributes: 823 column = self.__class__.get_column_definition(attribute) 824 value = getattr(self, attribute) 825 if column is not None: 826 if column.server_default is None \ 827 and column.default is None \ 828 and not column.nullable \ 829 and (value is None or not value_is_populated(value)): 830 invalid_items.append(attribute) 831 if issubclass(column.type.python_type, Enum) \ 832 and not _is_valid_enum_value(value, column.type.python_type): 833 if not (value is None and column.nullable): 834 invalid_items.append(attribute) 835 if len(invalid_items) > 0: 836 raise ObjectValidationException(self.__class__, invalid_items) 837 return self
AlchemyHelper is a class that provides some helper methods for SQLAlchemy CRUD operations. Also, it helps establish automatic relationships between various Bedrock models (including auto-population of nested objects).
For auto-population to work, you must define a class attribute called __auto_populate__ on your model. This should
be a dictionary that looks like this:
# Imagine this is a model for an Icecream Shop. Shops have a many-to-many relationship with Flavours
# (via `IcecreamShopFlavourLink`), but they all get initialised with all the ice cream flavour types.
__auto_populate__ = {
"flavours": { # This is the name of the attribute on the model (typically the relationship name)
"model": IcecreamShopFlavourLink, # The model that represents the relationship
"partition_by": IcecreamFlavour, # This will use DB records in IcecreamFlavour to generate that many records for this auto-population
"template": {
"shop_uuid": "{{uuid}}", # The shop's uuid
"flavour_type_uuid": "{{partition.uuid}}", # The flavour type's (partition's) uuid
"flavour_type": "{{partition}}", # The partition
"inventory": 50 # Another column that IcecreamShopFlavourLink has
}
}
}
67 @classmethod 68 def get(cls, value, restrictions=None, load_options=None): # pragma: integration 69 """ 70 Gets a single object by its primary key(s). 71 72 Example: 73 ```python 74 City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879") 75 ``` 76 ```python 77 City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879", restrictions={"country_code": "GB"}) 78 ``` 79 80 :param value: Primary key value(s) 81 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 82 :param load_options: The load options to apply to the query when fetching nested objects, if any. 83 """ 84 # SQLAlchemy complains when a class has no primary key, so no need to check for that 85 unique_columns = cls.get_unique_identifier_names() 86 87 if isinstance(value, dict): 88 dict_value = value 89 else: 90 dict_value = {unique_columns[0]: value} 91 result = cls._find_all_by_unique_column(dict_value, 92 unique_column=unique_columns, 93 restrictions=restrictions, 94 load_options=load_options) 95 try: 96 return result[0] 97 except: 98 return None
Gets a single object by its primary key(s).
Example:
City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879")
City.get("661979d2-fb97-4bd2-91fb-64fdc72fb879", restrictions={"country_code": "GB"})
Parameters
- value: Primary key value(s)
- restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents".
- load_options: The load options to apply to the query when fetching nested objects, if any.
100 @classmethod 101 def get_all(cls) -> list: # pragma: integration 102 """ 103 Gets all objects of this type. 104 105 Example: 106 ```python 107 City.get_all() 108 ``` 109 """ 110 if cls.__is_global_item__ and cls.__name__ in CACHE and CACHE[cls.__name__]: 111 return cls.post_fetch(CACHE[cls.__name__]) 112 s = get_session() 113 with s: 114 CACHE[cls.__name__] = s.query(cls).all() 115 return cls.post_fetch(CACHE[cls.__name__])
Gets all objects of this type.
Example:
City.get_all()
117 @classmethod 118 def find_all_by(cls, key: str, value, restrictions=None, 119 load_options=None, with_count: bool = False) -> list or tuple[list, int]: # pragma: integration 120 """ 121 Find all objects by a key and value. If value is an array, it finds records that match any of the values. 122 123 Makes use of `find_all_matching_criteria`. 124 125 Example: 126 ```python 127 City.find_all_by("name", "London", restrictions={"country_code": "GB"}) 128 ``` 129 ```python 130 City.find_all_by("name", ["London", "Harrogate"], restrictions={"country_code": "GB"}) 131 ``` 132 133 :param key: Key to search by 134 :param value: Value(s) for the key. If it's an array it looks for any of the values. 135 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 136 :param load_options: The load options to apply to the query when fetching nested objects, if any. 137 :param with_count: Whether to return the count of the results as well. True makes this method return a tuple. 138 """ 139 _restrictions = restrictions if restrictions else {} 140 return cls.find_all_matching_criteria({add_default_equality_operator(key): value, **_restrictions}, 141 load_options=load_options, with_count=with_count)
Find all objects by a key and value. If value is an array, it finds records that match any of the values.
Makes use of find_all_matching_criteria.
Example:
City.find_all_by("name", "London", restrictions={"country_code": "GB"})
City.find_all_by("name", ["London", "Harrogate"], restrictions={"country_code": "GB"})
Parameters
- key: Key to search by
- value: Value(s) for the key. If it's an array it looks for any of the values.
- restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents".
- load_options: The load options to apply to the query when fetching nested objects, if any.
- with_count: Whether to return the count of the results as well. True makes this method return a tuple.
143 @classmethod 144 def find_all_by_unique_column(cls, value, unique_column: str or list = None, 145 restrictions=None, load_options=None): # pragma: integration 146 """ 147 This method is a little like `get` but it allows you to search by any unique column(s) rather than just the primary key. 148 149 The true magic here is when `unique_column` is left up for the method to work out. 150 151 If the provided `unique_column` isn't actually unique, it behaves like `find_all_by`. 152 153 Makes use of `find_all_matching_criteria` (somewhere down the line) 154 155 Example: 156 ```python 157 # Assuming Country is defined with a UniqueConstraint on `country_code` 158 Country.find_all_by_unique_column("GB") 159 ``` 160 ```python 161 City.find_all_by_unique_column("London", unique_column="name", restrictions={"country_code": "GB"}) 162 ``` 163 164 :param value: Value(s) for the key. If it's an array it looks for any of the values. 165 :param unique_column: The unique column(s) to search by. If not provided, it will use the unique constraints to find the columns. 166 :param restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents". 167 :param load_options: The load options to apply to the query when fetching nested objects, if any. 168 """ 169 _unique_column = unique_column if unique_column is not None else cls.get_unique_constraint_names() 170 if isinstance(value, list): 171 _value = {} 172 for item in value: 173 for k, v in item.items(): 174 if k not in _value: 175 _value[k] = [] 176 _value[k].append(v) 177 else: 178 _value = value 179 return cls._find_all_by_unique_column(_value, _unique_column, restrictions=restrictions, 180 load_options=load_options)
This method is a little like get but it allows you to search by any unique column(s) rather than just the primary key.
The true magic here is when unique_column is left up for the method to work out.
If the provided unique_column isn't actually unique, it behaves like find_all_by.
Makes use of find_all_matching_criteria (somewhere down the line)
Example:
# Assuming Country is defined with a UniqueConstraint on `country_code`
Country.find_all_by_unique_column("GB")
City.find_all_by_unique_column("London", unique_column="name", restrictions={"country_code": "GB"})
Parameters
- value: Value(s) for the key. If it's an array it looks for any of the values.
- unique_column: The unique column(s) to search by. If not provided, it will use the unique constraints to find the columns.
- restrictions: Any filters to apply to the query. You'd usually use this to get an object by its primary key but also to ensure it belongs to the right "parents".
- load_options: The load options to apply to the query when fetching nested objects, if any.
261 @classmethod 262 def find_all_with_query_matching_criteria(cls, query, criteria: dict, allow_listing_all=False, order_by=None, 263 order='asc', offset=None, limit=None, 264 load_options=None, 265 with_count=False, 266 ignore_joins=False) -> list or tuple[list, int]: # pragma: integration 267 """ 268 See `AlchemyHelper.find_all_with_query_matching_criteria_and_count` for more details. 269 The only difference is that this method lets you remove the count part with `with_count=False` (the default 270 behaviour for now). 271 :param query: The SQLAlchemy query to use (see `query` method). 272 :param criteria: The criteria to match. 273 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 274 :param order_by: The column to sort results of the query on, if any. 275 :param order: The order to sort results of the query in. 276 :param offset: The offset to use for pagination of results, if any. 277 :param limit: The number of rows to return in the results, if any. 278 :param load_options: The load options to apply to the query when fetching nested objects, if any. 279 :param with_count: Whether to include the total count of results (given the criteria) 280 :return: 281 """ 282 results, count = cls.find_all_with_query_matching_criteria_and_count( 283 query, 284 criteria, 285 allow_listing_all=allow_listing_all, 286 order_by=order_by, 287 order=order, 288 offset=offset, 289 limit=limit, 290 load_options=load_options, 291 with_count=with_count, 292 ignore_joins=ignore_joins) 293 if with_count: 294 return results, count 295 return results
See AlchemyHelper.find_all_with_query_matching_criteria_and_count for more details.
The only difference is that this method lets you remove the count part with with_count=False (the default
behaviour for now).
Parameters
- query: The SQLAlchemy query to use (see
querymethod). - criteria: The criteria to match.
- allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty.
- order_by: The column to sort results of the query on, if any.
- order: The order to sort results of the query in.
- offset: The offset to use for pagination of results, if any.
- limit: The number of rows to return in the results, if any.
- load_options: The load options to apply to the query when fetching nested objects, if any.
- with_count: Whether to include the total count of results (given the criteria)
Returns
297 @classmethod 298 def find_all_with_query_matching_criteria_and_count(cls, query, 299 criteria: dict, 300 allow_listing_all: bool = False, 301 order_by: str = None, 302 order: str = 'asc', 303 offset: int = None, 304 limit: int = None, 305 load_options=None, 306 with_count: bool = False, 307 ignore_joins=False): # pragma: integration 308 """ 309 Finds all elements that match the provided SQLAlchemy `query` that match the provided `criteria`. 310 Will return a tuple with the results and the total count of the results that match the criteria, regardless of 311 the offset and limit. 312 313 Notes: 314 * If column value in `criteria` is an array, it matches if the array contains the value (not if it is equal to the array). 315 * `criteria` are key-value pairs specified in the format 'column_name[operation]': '1' and these are converted 316 to SQLAlchemy operations just like conditions in an SQL WHERE clause, to apply as part of the query. 317 * `criteria` is resolved so that it only contains columns that exist on the model. 318 * The results of the query can be sorted by using the 'order_by' and 'order' parameters. By default, ordering is 319 done in ascending order based on 'created_at' column if it exists. 320 * Row-based pagination of results can be done using the 'offset' and 'limit' parameters. 321 322 Example: 323 ```python 324 City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": "London"}) 325 ``` 326 ```python 327 City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": ["London", "Harrogate"]}) 328 ``` 329 ```python 330 # If City doesn't have a 'code' column and is annotated with @filter_path_hint("self->Region->Country.code") 331 City.find_all_with_query_matching_criteria(City.query(), {"code[eq]": "UK"}) 332 ``` 333 ```python 334 # If City is annotated with @filter_path_hint("self->Region->Country.code") 335 City.find_all_with_query_matching_criteria(City.query(), {"Country.code[eq]": "UK"}) 336 ``` 337 338 :param query: The SQLAlchemy query to use (see `query` method). 339 :param criteria: The criteria for filtering the query. 340 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 341 :param order_by: The column to sort results of the query on, if any. 342 :param order: The order to sort results of the query in. 343 :param offset: The offset to use for pagination of results, if any. 344 :param limit: The number of rows to return in the results, if any. 345 :param load_options: The load options to apply to the query, if any. 346 :param with_count: Whether to include the total count of results (given the criteria). 347 :param ignore_joins: Whether to ignore automated joins between tables that come from filter path hints. 348 """ 349 query = cls.query_with_criteria(query, criteria, allow_listing_all, ignore_joins) 350 if query is None: 351 return cls.post_fetch([]), 0 352 return cls.get_results_with_count(query, order_by, order, offset, limit, load_options, with_count)
Finds all elements that match the provided SQLAlchemy query that match the provided criteria.
Will return a tuple with the results and the total count of the results that match the criteria, regardless of
the offset and limit.
Notes:
- If column value in
criteriais an array, it matches if the array contains the value (not if it is equal to the array). criteriaare key-value pairs specified in the format 'column_name[operation]': '1' and these are converted to SQLAlchemy operations just like conditions in an SQL WHERE clause, to apply as part of the query.criteriais resolved so that it only contains columns that exist on the model.- The results of the query can be sorted by using the 'order_by' and 'order' parameters. By default, ordering is done in ascending order based on 'created_at' column if it exists.
- Row-based pagination of results can be done using the 'offset' and 'limit' parameters.
Example:
City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": "London"})
City.find_all_with_query_matching_criteria(City.query(), {"name[eq]": ["London", "Harrogate"]})
# If City doesn't have a 'code' column and is annotated with @filter_path_hint("self->Region->Country.code")
City.find_all_with_query_matching_criteria(City.query(), {"code[eq]": "UK"})
# If City is annotated with @filter_path_hint("self->Region->Country.code")
City.find_all_with_query_matching_criteria(City.query(), {"Country.code[eq]": "UK"})
Parameters
- query: The SQLAlchemy query to use (see
querymethod). - criteria: The criteria for filtering the query.
- allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty.
- order_by: The column to sort results of the query on, if any.
- order: The order to sort results of the query in.
- offset: The offset to use for pagination of results, if any.
- limit: The number of rows to return in the results, if any.
- load_options: The load options to apply to the query, if any.
- with_count: Whether to include the total count of results (given the criteria).
- ignore_joins: Whether to ignore automated joins between tables that come from filter path hints.
354 @classmethod 355 def get_results_with_count(cls, 356 query: Query, 357 order_by: str = None, 358 order: str = 'asc', 359 offset: int = None, 360 limit: int = None, 361 load_options=None, 362 with_count: bool = False): # pragma: integration 363 try: 364 s = get_session() 365 with s: 366 results_query = cls._apply_load_options(query, load_options) 367 results_query = cls._apply_order(results_query, order_by, order) 368 369 # If offset and limit specified for pagination, use these. If not, return all. 370 if offset: 371 results_query = results_query.offset(offset) 372 if limit: 373 results_query = results_query.limit(limit) 374 375 items, count = cls.results_with_count(results_query, query if with_count else None) 376 return cls.post_fetch(items), count 377 except Exception as e: # pragma: no cover - if something happens we don't want to break. Just log it. 378 log.error(e, exc_info=True) 379 return cls.post_fetch([]), 0
381 @classmethod 382 def query_with_criteria(cls, query, 383 criteria: dict, 384 allow_listing_all: bool = False, 385 ignore_joins: bool = False) -> Query | None: # pragma: integration 386 """ 387 Applies the provided `criteria` to the provided SQLAlchemy `query`. 388 See `find_all_with_query_matching_criteria_and_count` for more details. 389 390 :param query: The SQLAlchemy query to apply the criteria to (see `query` method). 391 :param criteria: The criteria for filtering the query. 392 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 393 :param ignore_joins: Whether to ignore automated joins between tables that come from filter path hints. 394 :return: A SQLAlchemy query with the criteria applied. 395 """ 396 relevant_criteria = get_filters_relevant_to_class(cls, criteria) if criteria else {} 397 hinted_fields = [] if ignore_joins else cls._resolve_hinted_fields() 398 # convert list of relevant criteria into format that can be understood by SQLAlchemy 399 query_filters = create_filters_list_from_dict(relevant_criteria, cls) 400 401 has_query_parameters = query_filters or cls._criteria_includes_hinted_field(criteria, hinted_fields) 402 if not has_query_parameters and not allow_listing_all: 403 return None 404 405 try: 406 for item in query_filters: 407 query = query.filter(criteria_to_sql_alchemy_operation(item)) 408 except Exception as e: 409 log.error("Error while trying to apply criteria: " + str(e), exc_info=True) 410 raise ForbiddenException(e) 411 412 # If hinted fields are present, join the relevant entities and apply the criteria 413 try: 414 joined_entities = [] 415 for hinted_pair in hinted_fields: 416 entities, protecting_column = hinted_pair 417 if not entities or not protecting_column: 418 continue 419 criteria_with_operation = cls._get_criteria_key_with_operation_from_column(protecting_column, criteria) 420 if not criteria_with_operation: 421 # Try Entity.column 422 criteria_with_operation = cls._get_criteria_key_with_operation_from_column( 423 f"{entities[-1].__name__}.{protecting_column}", criteria) 424 if protecting_column and criteria_with_operation: 425 value = criteria[criteria_with_operation] 426 # TODO: I think the bit below can be: 427 # (type(value) is list and None in value) or (value is None) 428 # and then we can auto-deal with "None" joins too. 429 # - tco 430 match_null_values = type(value) is list and None in value 431 for e in entities: 432 if e not in joined_entities and not _query_has_entity(query, e): 433 query = query.join(e, isouter=match_null_values) 434 joined_entities.append(e) 435 protecting_column_with_operation = criteria_with_operation.split(".")[-1] if "." in criteria_with_operation else criteria_with_operation 436 item = create_filters_list_from_dict({ 437 protecting_column_with_operation: value 438 }, entities[-1])[0] 439 join_filter = cls._get_join_filters_for_path_hints(item, match_null_values) 440 query = query.filter(join_filter) 441 except Exception as e: 442 log.error("Error while trying to apply hinted entity restriction: " + str(e), exc_info=True) 443 raise ForbiddenException(e) 444 return query
Applies the provided criteria to the provided SQLAlchemy query.
See find_all_with_query_matching_criteria_and_count for more details.
Parameters
- query: The SQLAlchemy query to apply the criteria to (see
querymethod). - criteria: The criteria for filtering the query.
- allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty.
- ignore_joins: Whether to ignore automated joins between tables that come from filter path hints.
Returns
A SQLAlchemy query with the criteria applied.
468 @classmethod 469 def results_with_count(cls, results_query, count_query=None): # pragma: integration 470 """ 471 Returns the results of a query and the count of the results. 472 473 :param results_query: The query with sort and pagination applied. 474 :param count_query: The query without any sort or pagination applied. 475 :return: 476 """ 477 results = results_query.all() 478 count = count_query.distinct(cls.uuid).options(noload('*')).count() if count_query else None 479 return results, count
Returns the results of a query and the count of the results.
Parameters
- results_query: The query with sort and pagination applied.
- count_query: The query without any sort or pagination applied.
Returns
530 @classmethod 531 def post_fetch(cls, items: list) -> list: # pragma: unit 532 """ 533 Override this class method to do any post-fetch processing on the items. 534 535 :param items: Items that have been fetched by a query. 536 """ 537 return items
Override this class method to do any post-fetch processing on the items.
Parameters
- items: Items that have been fetched by a query.
539 def pre_save(self): # pragma: integration 540 """ 541 Override this class method to do any pre-saving processing on the items. 542 """ 543 return self
Override this class method to do any pre-saving processing on the items.
545 def post_save(self): # pragma: integration 546 """ 547 Override this class method to do any post-saving processing on the items. 548 549 Note: this hook runs after the transaction has been committed; the in-memory instance may not include 550 database-side changes (e.g. triggers, server defaults) unless explicitly refreshed or refetched. 551 """ 552 return self
Override this class method to do any post-saving processing on the items.
Note: this hook runs after the transaction has been committed; the in-memory instance may not include database-side changes (e.g. triggers, server defaults) unless explicitly refreshed or refetched.
554 def pre_delete(self): # pragma: integration 555 """ 556 Override this class method to do any pre-deletion processing on the items. 557 """ 558 return self
Override this class method to do any pre-deletion processing on the items.
560 def post_delete(self): # pragma: integration 561 """ 562 Override this class method to do any post-deletion processing on the items. 563 """ 564 return self
Override this class method to do any post-deletion processing on the items.
566 @classmethod 567 def find_all_matching_criteria(cls, criteria: dict, allow_listing_all=False, order_by=None, 568 order='asc', offset=None, limit=None, load_options=None, 569 with_count: bool = False, ignore_joins=False): # pragma: integration 570 """ 571 Finds all elements that match the provided `criteria`. 572 573 It is a shortcut for `find_all_with_query_matching_criteria` where it pre-populates the query with a simple `select from cls` as a start. 574 575 Subsequent complexities are added via `criteria`. 576 577 Example: 578 ```python 579 City.find_all_matching_criteria({"name": "London"}) 580 ``` 581 ```python 582 City.find_all_matching_criteria({"name": ["London", "Harrogate"]}) 583 ``` 584 585 :param criteria: The criteria to match. 586 :param allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty. 587 :param order_by: The column to sort results of the query on, if any. 588 :param order: The order to sort results of the query in. 589 :param offset: The offset to use for pagination of results, if any. 590 :param limit: The number of rows to return in the results, if any. 591 :param load_options: The load options to apply to the query when fetching nested objects, if any. 592 :param with_count: Whether to return the count of the results as well. True makes this method return a tuple. 593 """ 594 return cls.find_all_with_query_matching_criteria(cls.query().select_from(cls), 595 criteria, 596 allow_listing_all=allow_listing_all, 597 order_by=order_by, 598 order=order, 599 offset=offset, 600 limit=limit, 601 load_options=load_options, 602 with_count=with_count, 603 ignore_joins=ignore_joins)
Finds all elements that match the provided criteria.
It is a shortcut for find_all_with_query_matching_criteria where it pre-populates the query with a simple select from cls as a start.
Subsequent complexities are added via criteria.
Example:
City.find_all_matching_criteria({"name": "London"})
City.find_all_matching_criteria({"name": ["London", "Harrogate"]})
Parameters
- criteria: The criteria to match.
- allow_listing_all: Whether to allow listing all elements if the resolved criteria is empty.
- order_by: The column to sort results of the query on, if any.
- order: The order to sort results of the query in.
- offset: The offset to use for pagination of results, if any.
- limit: The number of rows to return in the results, if any.
- load_options: The load options to apply to the query when fetching nested objects, if any.
- with_count: Whether to return the count of the results as well. True makes this method return a tuple.
605 @classmethod 606 def find_first_matching_criteria(cls, criteria, load_options=None, 607 ignore_joins=False): # pragma: integration 608 """ 609 Behaves exactly like `find_all_matching_criteria` but returns only the first item (or `None` if no item is found). 610 611 :param criteria: The criteria to match. 612 :param load_options: The load options to apply to the query when fetching nested objects, if any. 613 :return: 614 """ 615 try: 616 return cls.find_all_matching_criteria(criteria, 617 load_options=load_options, 618 with_count=False, 619 ignore_joins=ignore_joins)[0] 620 except Exception as e: 621 log.warning(f"No {cls.__name__} found") 622 return None
Behaves exactly like find_all_matching_criteria but returns only the first item (or None if no item is found).
Parameters
- criteria: The criteria to match.
- load_options: The load options to apply to the query when fetching nested objects, if any.
Returns
624 @classmethod 625 def query(cls, *classes) -> Any or tuple[Any, int]: # pragma: integration 626 """ 627 Returns a SQLAlchemy query for the given class. 628 629 Example: 630 ```python 631 City.query().filter_by(name="London").all() 632 ``` 633 ```python 634 City.query(Country).join(Country).filter(Country.name == "United Kingdom").all() 635 ``` 636 637 :param classes: The classes to query. 638 """ 639 return get_session().query(cls, *classes)
Returns a SQLAlchemy query for the given class.
Example:
City.query().filter_by(name="London").all()
City.query(Country).join(Country).filter(Country.name == "United Kingdom").all()
Parameters
- classes: The classes to query.
641 @classmethod 642 def count_query(cls, *columns) -> Any or tuple[Any, int]: # pragma: integration 643 """ 644 Returns a SQLAlchemy query for the given columns and adds a count for each column. 645 This is useful for getting counts of grouped queries. 646 647 Example: 648 ```python 649 City.query(City.name).group_by(City.name).count() 650 ``` 651 ```python 652 City.query(City.name, Country.name).join(City.country_uuid == Country.uuid).filter(Country.name == "United Kingdom").group_by(City.name, Country.name).count() 653 ``` 654 655 :param classes: The classes to query. 656 """ 657 return get_session().query(*columns, *[func.count(c) for c in columns])
Returns a SQLAlchemy query for the given columns and adds a count for each column. This is useful for getting counts of grouped queries.
Example:
City.query(City.name).group_by(City.name).count()
City.query(City.name, Country.name).join(City.country_uuid == Country.uuid).filter(Country.name == "United Kingdom").group_by(City.name, Country.name).count()
Parameters
- classes: The classes to query.
659 @classmethod 660 def save_many(cls, items: list, duplicates_strategy: str = None, preserve_uuids=False): # pragma: integration 661 """ 662 Saves many items of the same type at once. 663 664 This includes any nested objects within the list of items. 665 When you need to maintain relationships between objects, you should set `preserve_uuids` to `True`. 666 667 :param items: The items to save. 668 :param duplicates_strategy: `None`, `'on_conflict_do_update'` or `'on_conflict_do_nothing'` 669 :param preserve_uuids: By default, any uuids are left for the DB to regenerate. Set this to `True` to preserve them. 670 """ 671 try: 672 if not items: 673 return 674 675 _items = items if isinstance(items[0], dict) else [item.pre_save() for item in items] 676 s = get_session() 677 with s: 678 if duplicates_strategy is None: 679 for item in _items: 680 item.validate() 681 s.merge(item) 682 else: 683 if isinstance(_items[0], dict): 684 items_as_dicts = _items 685 else: 686 nested_keys = [n.key for n in cls.get_nested_models()] 687 uuid = [] if preserve_uuids else ["uuid"] 688 excludes = ["created_at", "updated_at", *uuid, *nested_keys] 689 items_as_dicts = [i.as_dict(excludes=excludes, resolve_defaults=True) for i in _items] 690 if not preserve_uuids: 691 for i in items_as_dicts: 692 i["uuid"] = create_uuid() 693 statement = insert(cls).values(items_as_dicts) 694 if duplicates_strategy == "on_conflict_do_nothing": 695 s.execute(statement.on_conflict_do_nothing()) 696 if duplicates_strategy == "on_conflict_do_update": 697 upsert_statement = statement.on_conflict_do_update( 698 index_elements=[c for c in cls.get_unique_constraint_names()], 699 set_=dict((c, getattr(statement.excluded, c)) for c in cls.get_column_names()) 700 ) 701 s.execute(upsert_statement) 702 s.commit() 703 for item in _items: 704 if getattr(item, "post_save", None): 705 item.post_save() 706 except IntegrityError as integrity_error: 707 cls._handle_integrity_error(integrity_error)
Saves many items of the same type at once.
This includes any nested objects within the list of items.
When you need to maintain relationships between objects, you should set preserve_uuids to True.
Parameters
- items: The items to save.
- duplicates_strategy:
None,'on_conflict_do_update'or'on_conflict_do_nothing' - preserve_uuids: By default, any uuids are left for the DB to regenerate. Set this to
Trueto preserve them.
709 @classmethod 710 def delete_many(cls, items: list): # pragma: integration 711 """ 712 Deletes many items of the same type at once. 713 :param items: The items to delete. 714 """ 715 _items = [item.pre_delete() for item in items] 716 s = get_session() 717 with s: 718 for item in _items: 719 s.delete(item) 720 s.commit() 721 for item in _items: 722 item.post_delete()
Deletes many items of the same type at once.
Parameters
- items: The items to delete.
738 def save(self): # pragma: integration 739 """ 740 Validates and saves the object to the database. 741 This also takes care of handling integrity errors by raising nicer exceptions. 742 743 Note: This method will remove all objects from the session before adding itself back in. 744 This is to avoid some weird shenanigans that I don't fully understand, but so far it seems to work. 745 """ 746 self.pre_save() 747 s = get_session() 748 try: 749 self.validate() 750 with s: 751 s.expunge_all() 752 s.add(self) 753 s.merge(self) 754 s.commit() 755 except IntegrityError as integrity_error: 756 self._handle_integrity_error(integrity_error) 757 return self.post_save()
Validates and saves the object to the database. This also takes care of handling integrity errors by raising nicer exceptions.
Note: This method will remove all objects from the session before adding itself back in. This is to avoid some weird shenanigans that I don't fully understand, but so far it seems to work.
759 def delete(self): # pragma: integration 760 """ 761 Deletes the object from the database. 762 """ 763 self.pre_delete() 764 s = get_session() 765 with s: 766 s.delete(self) 767 s.commit() 768 self.post_delete()
Deletes the object from the database.
770 def refetch(self, load_options=None): # pragma: integration 771 """ 772 Re-fetches the object from the database. 773 It tries to do it by using `find_first_matching_criteria` with the primary key values. 774 :param load_options: The load options to apply when re-fetching the object, if any. 775 """ 776 criteria = dict((add_default_equality_operator(pk), getattr(self, pk)) for pk in self.get_primary_key_names()) 777 return self.find_first_matching_criteria(criteria, load_options=load_options, 778 ignore_joins=True)
Re-fetches the object from the database.
It tries to do it by using find_first_matching_criteria with the primary key values.
Parameters
- load_options: The load options to apply when re-fetching the object, if any.
780 def get_auto_populated_models(self, excludes=[]) -> dict: # pragma: integration 781 """ 782 Generates nested models based on the defined `__auto_populate__` dictionary. 783 784 See the top of this module, for an example of how to set this up. 785 786 :param excludes: Which column names to exclude from the auto-population. 787 """ 788 auto_populated_models = {} 789 for column_name, info in self.__auto_populate__.items(): 790 if column_name not in excludes: 791 partitions = info["partition_by"].get_all() 792 auto_populated_models[column_name] = self._get_auto_populated_models(info["model"], 793 info["template"], 794 partitions) 795 return auto_populated_models
Generates nested models based on the defined __auto_populate__ dictionary.
See the top of this module, for an example of how to set this up.
Parameters
- excludes: Which column names to exclude from the auto-population.
816 def validate(self): # pragma: unit 817 """ 818 Validates whether nullables are filled in because SQLAlchemy's validation is a bit meh 819 """ 820 class_attributes = find_all_attributes(self.__class__) 821 invalid_items = [] 822 for attribute in class_attributes: 823 column = self.__class__.get_column_definition(attribute) 824 value = getattr(self, attribute) 825 if column is not None: 826 if column.server_default is None \ 827 and column.default is None \ 828 and not column.nullable \ 829 and (value is None or not value_is_populated(value)): 830 invalid_items.append(attribute) 831 if issubclass(column.type.python_type, Enum) \ 832 and not _is_valid_enum_value(value, column.type.python_type): 833 if not (value is None and column.nullable): 834 invalid_items.append(attribute) 835 if len(invalid_items) > 0: 836 raise ObjectValidationException(self.__class__, invalid_items) 837 return self
Validates whether nullables are filled in because SQLAlchemy's validation is a bit meh
846def same_as(column_name: str): # pragma: unit 847 """ 848 Shortcut method to create a default value that is the same as whatever was specified for another column. 849 850 You'd use this for situations where `ColumnA` defaults to `ColumnB`'s value if `ColumnA` is not specified. 851 """ 852 853 def default_function(context): 854 if isinstance(context, AlchemyHelper): 855 return getattr(context, column_name) 856 if hasattr(context, "current_parameters"): 857 return context.current_parameters.get(column_name) 858 return None 859 860 return default_function
Shortcut method to create a default value that is the same as whatever was specified for another column.
You'd use this for situations where ColumnA defaults to ColumnB's value if ColumnA is not specified.