bedrock.endpoints.dto.bedrock_response_filter
1import operator # pragma: unit 2from typing import Any, Dict # pragma: unit 3 4from bedrock._helpers.string import snake_case_to_camelCase # pragma: unit 5 6# Operator mapping 7OPERATORS = { # pragma: unit 8 "eq": operator.eq, 9 "ne": operator.ne, 10 "gt": operator.gt, 11 "gte": operator.ge, 12 "lt": operator.lt, 13 "lte": operator.le, 14} 15 16 17def apply_operator(op: str, message_value: Any, filter_value: Any) -> bool: # pragma: unit 18 """Apply the given operator between message_value and filter_value.""" 19 if op == "in": 20 if isinstance(message_value, list): 21 return any(val in filter_value for val in message_value) 22 return message_value in filter_value 23 24 if op == "not_in": 25 if isinstance(message_value, list): 26 return all(val not in filter_value for val in message_value) 27 return message_value not in filter_value 28 29 if op not in OPERATORS: 30 raise ValueError(f"Unsupported operator: {op}") 31 32 return OPERATORS[op](message_value, filter_value) 33 34 35def traverse_path(message: Dict[str, Any], path: str, casing_function=snake_case_to_camelCase) -> Any: # pragma: unit 36 """ 37 Traverse a nested dictionary using a filter path hint. 38 Example path: 'Self->Property->Account.chase_account_number' 39 """ 40 current = message 41 for part in path.replace("->", ".").split("."): 42 part = casing_function(part) 43 if part.lower() == "self": 44 continue 45 if isinstance(current, dict) and part in current: 46 current = current[part] 47 else: 48 return None 49 return current 50 51 52def is_filter_criteria_met( 53 entity: Dict[str, Any], 54 filters: Dict[str, Any], 55 model_class: Any = None 56) -> bool: # pragma: unit 57 """ 58 Check if a websocket entity message passes all filter conditions. 59 60 :param entity: The broadcasted entity (dict). 61 :param filters: The connection's filter dict. 62 :param model_class: Optional model class with __filter_path_hints__. 63 """ 64 for key, filter_value in filters.items(): 65 if "[" in key and key.endswith("]"): 66 field, op = key[:-1].split("[", 1) 67 else: 68 field, op = key, "eq" 69 70 message_value = traverse_path(entity, field) 71 72 if message_value is None and model_class and hasattr(model_class, "__filter_path_hints__"): 73 for hint in model_class.__filter_path_hints__: 74 if hint.endswith(f".{field}"): 75 message_value = traverse_path(entity, hint) 76 if message_value is not None: 77 break 78 79 if message_value is None: 80 continue 81 82 if not apply_operator(op, message_value, filter_value): 83 return False 84 85 return True
OPERATORS =
{'eq': <built-in function eq>, 'ne': <built-in function ne>, 'gt': <built-in function gt>, 'gte': <built-in function ge>, 'lt': <built-in function lt>, 'lte': <built-in function le>}
def
apply_operator(op: str, message_value: Any, filter_value: Any) -> bool:
18def apply_operator(op: str, message_value: Any, filter_value: Any) -> bool: # pragma: unit 19 """Apply the given operator between message_value and filter_value.""" 20 if op == "in": 21 if isinstance(message_value, list): 22 return any(val in filter_value for val in message_value) 23 return message_value in filter_value 24 25 if op == "not_in": 26 if isinstance(message_value, list): 27 return all(val not in filter_value for val in message_value) 28 return message_value not in filter_value 29 30 if op not in OPERATORS: 31 raise ValueError(f"Unsupported operator: {op}") 32 33 return OPERATORS[op](message_value, filter_value)
Apply the given operator between message_value and filter_value.
def
traverse_path( message: Dict[str, Any], path: str, casing_function=<function snake_case_to_camelCase>) -> Any:
36def traverse_path(message: Dict[str, Any], path: str, casing_function=snake_case_to_camelCase) -> Any: # pragma: unit 37 """ 38 Traverse a nested dictionary using a filter path hint. 39 Example path: 'Self->Property->Account.chase_account_number' 40 """ 41 current = message 42 for part in path.replace("->", ".").split("."): 43 part = casing_function(part) 44 if part.lower() == "self": 45 continue 46 if isinstance(current, dict) and part in current: 47 current = current[part] 48 else: 49 return None 50 return current
Traverse a nested dictionary using a filter path hint. Example path: 'Self->Property->Account.chase_account_number'
def
is_filter_criteria_met( entity: Dict[str, Any], filters: Dict[str, Any], model_class: Any = None) -> bool:
53def is_filter_criteria_met( 54 entity: Dict[str, Any], 55 filters: Dict[str, Any], 56 model_class: Any = None 57) -> bool: # pragma: unit 58 """ 59 Check if a websocket entity message passes all filter conditions. 60 61 :param entity: The broadcasted entity (dict). 62 :param filters: The connection's filter dict. 63 :param model_class: Optional model class with __filter_path_hints__. 64 """ 65 for key, filter_value in filters.items(): 66 if "[" in key and key.endswith("]"): 67 field, op = key[:-1].split("[", 1) 68 else: 69 field, op = key, "eq" 70 71 message_value = traverse_path(entity, field) 72 73 if message_value is None and model_class and hasattr(model_class, "__filter_path_hints__"): 74 for hint in model_class.__filter_path_hints__: 75 if hint.endswith(f".{field}"): 76 message_value = traverse_path(entity, hint) 77 if message_value is not None: 78 break 79 80 if message_value is None: 81 continue 82 83 if not apply_operator(op, message_value, filter_value): 84 return False 85 86 return True
Check if a websocket entity message passes all filter conditions.
Parameters
- entity: The broadcasted entity (dict).
- filters: The connection's filter dict.
- model_class: Optional model class with __filter_path_hints__.