bedrock.endpoints.dto.bedrock_response_filter

 1import operator  # pragma: unit
 2from typing import Any, Dict  # pragma: unit
 3
 4from bedrock._helpers.string import snake_case_to_camelCase  # pragma: unit
 5
 6# Operator mapping
 7OPERATORS = {  # pragma: unit
 8    "eq": operator.eq,
 9    "ne": operator.ne,
10    "gt": operator.gt,
11    "gte": operator.ge,
12    "lt": operator.lt,
13    "lte": operator.le,
14}
15
16
17def apply_operator(op: str, message_value: Any, filter_value: Any) -> bool:  # pragma: unit
18    """Apply the given operator between message_value and filter_value."""
19    if op == "in":
20        if isinstance(message_value, list):
21            return any(val in filter_value for val in message_value)
22        return message_value in filter_value
23
24    if op == "not_in":
25        if isinstance(message_value, list):
26            return all(val not in filter_value for val in message_value)
27        return message_value not in filter_value
28
29    if op not in OPERATORS:
30        raise ValueError(f"Unsupported operator: {op}")
31
32    return OPERATORS[op](message_value, filter_value)
33
34
35def traverse_path(message: Dict[str, Any], path: str, casing_function=snake_case_to_camelCase) -> Any:  # pragma: unit
36    """
37    Traverse a nested dictionary using a filter path hint.
38    Example path: 'Self->Property->Account.chase_account_number'
39    """
40    current = message
41    for part in path.replace("->", ".").split("."):
42        part = casing_function(part)
43        if part.lower() == "self":
44            continue
45        if isinstance(current, dict) and part in current:
46            current = current[part]
47        else:
48            return None
49    return current
50
51
52def is_filter_criteria_met(
53        entity: Dict[str, Any],
54        filters: Dict[str, Any],
55        model_class: Any = None
56) -> bool:  # pragma: unit
57    """
58    Check if a websocket entity message passes all filter conditions.
59
60    :param entity: The broadcasted entity (dict).
61    :param filters: The connection's filter dict.
62    :param model_class: Optional model class with __filter_path_hints__.
63    """
64    for key, filter_value in filters.items():
65        if "[" in key and key.endswith("]"):
66            field, op = key[:-1].split("[", 1)
67        else:
68            field, op = key, "eq"
69
70        message_value = traverse_path(entity, field)
71
72        if message_value is None and model_class and hasattr(model_class, "__filter_path_hints__"):
73            for hint in model_class.__filter_path_hints__:
74                if hint.endswith(f".{field}"):
75                    message_value = traverse_path(entity, hint)
76                    if message_value is not None:
77                        break
78
79        if message_value is None:
80            continue
81
82        if not apply_operator(op, message_value, filter_value):
83            return False
84
85    return True
OPERATORS = {'eq': <built-in function eq>, 'ne': <built-in function ne>, 'gt': <built-in function gt>, 'gte': <built-in function ge>, 'lt': <built-in function lt>, 'lte': <built-in function le>}
def apply_operator(op: str, message_value: Any, filter_value: Any) -> bool:
18def apply_operator(op: str, message_value: Any, filter_value: Any) -> bool:  # pragma: unit
19    """Apply the given operator between message_value and filter_value."""
20    if op == "in":
21        if isinstance(message_value, list):
22            return any(val in filter_value for val in message_value)
23        return message_value in filter_value
24
25    if op == "not_in":
26        if isinstance(message_value, list):
27            return all(val not in filter_value for val in message_value)
28        return message_value not in filter_value
29
30    if op not in OPERATORS:
31        raise ValueError(f"Unsupported operator: {op}")
32
33    return OPERATORS[op](message_value, filter_value)

Apply the given operator between message_value and filter_value.

def traverse_path( message: Dict[str, Any], path: str, casing_function=<function snake_case_to_camelCase>) -> Any:
36def traverse_path(message: Dict[str, Any], path: str, casing_function=snake_case_to_camelCase) -> Any:  # pragma: unit
37    """
38    Traverse a nested dictionary using a filter path hint.
39    Example path: 'Self->Property->Account.chase_account_number'
40    """
41    current = message
42    for part in path.replace("->", ".").split("."):
43        part = casing_function(part)
44        if part.lower() == "self":
45            continue
46        if isinstance(current, dict) and part in current:
47            current = current[part]
48        else:
49            return None
50    return current

Traverse a nested dictionary using a filter path hint. Example path: 'Self->Property->Account.chase_account_number'

def is_filter_criteria_met( entity: Dict[str, Any], filters: Dict[str, Any], model_class: Any = None) -> bool:
53def is_filter_criteria_met(
54        entity: Dict[str, Any],
55        filters: Dict[str, Any],
56        model_class: Any = None
57) -> bool:  # pragma: unit
58    """
59    Check if a websocket entity message passes all filter conditions.
60
61    :param entity: The broadcasted entity (dict).
62    :param filters: The connection's filter dict.
63    :param model_class: Optional model class with __filter_path_hints__.
64    """
65    for key, filter_value in filters.items():
66        if "[" in key and key.endswith("]"):
67            field, op = key[:-1].split("[", 1)
68        else:
69            field, op = key, "eq"
70
71        message_value = traverse_path(entity, field)
72
73        if message_value is None and model_class and hasattr(model_class, "__filter_path_hints__"):
74            for hint in model_class.__filter_path_hints__:
75                if hint.endswith(f".{field}"):
76                    message_value = traverse_path(entity, hint)
77                    if message_value is not None:
78                        break
79
80        if message_value is None:
81            continue
82
83        if not apply_operator(op, message_value, filter_value):
84            return False
85
86    return True

Check if a websocket entity message passes all filter conditions.

Parameters
  • entity: The broadcasted entity (dict).
  • filters: The connection's filter dict.
  • model_class: Optional model class with __filter_path_hints__.