bedrock.endpoints.dto.bedrock_response

  1from bedrock.config.headers import ValidHeaders  # pragma: unit
  2from bedrock.log import log_config  # pragma: unit
  3from bedrock.config import get_config_params
  4
  5log = log_config("bedrock_response")  # pragma: unit
  6
  7
  8class PaginationMetadata:  # pragma: unit
  9    def __init__(self, sort_column: str, sort_order: str, limit: int, offset: int, total: int = None):
 10        self.sort_column = sort_column
 11        self.sort_order = sort_order
 12        self.limit = limit
 13        self.offset = offset
 14        self.total = total
 15
 16    def as_json(self):
 17        return {
 18            "sortColumn": self.sort_column,
 19            "sortOrder": self.sort_order,
 20            "limit": self.limit,
 21            "offset": self.offset,
 22            "total": self.total
 23        }
 24
 25
 26class BedrockResponse:  # pragma: unit
 27    def __init__(self,
 28                 data: dict | list | None,
 29                 event: dict,
 30                 errors: list[dict] = None,
 31                 warnings: list[dict] = None,
 32                 original_data: dict = None):
 33        self.data = data
 34        self.original_data = original_data
 35        self.event = event
 36        self.errors = errors or []
 37        self.warnings = warnings or []
 38
 39    def as_json(self):
 40        return self.data
 41
 42    def as_json_for_kafka(self):
 43        return self.data
 44
 45    def as_unenveloped_json(self):
 46        return self.data
 47
 48
 49class BedrockUnenvelopedResponse(BedrockResponse):  # pragma: unit
 50    """
 51    Use this response object when you want to return raw data without ever enveloping it in an envelope.
 52    """
 53
 54    def __init__(self, data: dict | list, event: dict):
 55        super().__init__(data, event)
 56
 57
 58class BedrockErrorResponse(BedrockResponse):  # pragma: unit
 59    def __init__(self, errors: dict | list, event: dict):
 60        super().__init__(None, event)
 61        if isinstance(errors, dict):
 62            message = errors.get("message") or errors.get("error")
 63            exception = errors.get("exception") or errors.get("originalException")
 64            self.errors.append({
 65                "code": errors.get("code", "-1"),
 66                "message": message,
 67                "exception": exception,
 68                "error": message,  # Deprecated: use "message" instead
 69                "originalException": exception  # Deprecated: use "exception" instead
 70            })
 71        elif isinstance(errors, list):
 72            self.errors = _normalise_errors(errors)
 73        else:
 74            log.warning("Provided 'errors' was not a dict or list")
 75            log.debug(errors)
 76            self.errors.append({"error": "An unknown error occurred"})
 77
 78    def as_json(self):
 79        return {
 80            "_errors": self.errors,
 81            "errors": self.errors  # Deprecated: use "_errors" instead
 82        }
 83
 84    def as_json_for_kafka(self):
 85        return self.as_unenveloped_json()
 86
 87    def as_unenveloped_json(self):
 88        return self.errors[0] if self.errors else {}
 89
 90
 91class BedrockEnvelopedResponse(BedrockResponse):  # pragma: unit
 92    def __init__(self,
 93                 data: dict | list,
 94                 event: dict,
 95                 pagination: PaginationMetadata = None,
 96                 errors: list[dict] = None,
 97                 warnings: list[dict] = None,
 98                 original_data: dict = None):
 99        super().__init__(data, event,
100                         errors=errors,
101                         warnings=warnings,
102                         original_data=original_data)
103        self.pagination = pagination
104
105    @classmethod
106    def from_bedrock_response(cls, response: BedrockResponse, pagination):
107        return cls(response.data, response.event,
108                   pagination=pagination,
109                   errors=response.errors,
110                   warnings=response.warnings,
111                   original_data=response.original_data)
112
113    def as_bedrock_response(self):
114        return BedrockResponse(self.data, self.event, self.errors, self.warnings, self.original_data)
115
116    def as_json(self):
117        return {
118            "data": self.data,
119            "originalData": self.original_data,
120            "_metadata": {
121                "pagination": self.pagination.as_json() if self.pagination and isinstance(self.data, list) else None
122            },
123            "_warnings": self.warnings,
124            "_errors": self.errors
125        }
126
127
128class BedrockDeletionResponse(BedrockEnvelopedResponse):  # pragma: unit
129    def __init__(self, data, event, extra_objects={}):
130        data_object = data
131        if isinstance(data, dict) and "originalData" in data:
132            data_object = data["originalData"]
133        super().__init__(data_object, event)
134        self.extra_objects = {k: v for k, v in extra_objects.items() if k not in ['originalData', 'originalObject', 'data']}
135
136    def as_json(self):
137        return {
138            "deleted": True,
139            "data": None,
140            "originalData": self.data,
141            "originalObject": self.data,  # Deprecated: use "originalData" instead
142            **self.extra_objects
143        }
144
145    def as_unenveloped_json(self):
146        return {
147            "deleted": True,
148            "originalData": self.data,
149            "originalObject": self.data,  # Deprecated: use "originalData" instead
150            **self.extra_objects
151        }
152
153
154def resolve_response(response: BedrockResponse | dict | list, event: dict, default_args: dict,
155                     arguments: dict):  # pragma: unit
156    if isinstance(response, BedrockUnenvelopedResponse):
157        return response
158    if isinstance(response, BedrockDeletionResponse):
159        return response
160    if isinstance(response, dict) and "deleted" in response:
161        log.debug("Response was a deletion, wrapping it in a BedrockDeletionResponse")
162        return BedrockDeletionResponse(response, event)
163    if isinstance(response, dict) and "error" in response:
164        log.debug("Response was an error, wrapping it in a BedrockErrorResponse")
165        return BedrockErrorResponse(response, event)
166    _response = response
167    if isinstance(response, dict) or isinstance(response, list):
168        _response = BedrockResponse(response, event)
169        log.debug("Response was not a BedrockResponse, wrapping it in one")
170    if has_envelope_header(event) and not isinstance(_response, BedrockEnvelopedResponse):
171        pagination_data = PaginationMetadata(
172            arguments.get("sort_column", default_args.get("sort_column")),
173            arguments.get("sort_order", default_args.get("sort_order")),
174            arguments.get("limit", default_args.get("limit")),
175            arguments.get("offset", default_args.get("offset"))
176        )
177        _response = BedrockEnvelopedResponse.from_bedrock_response(_response, pagination_data)
178    if not has_envelope_header(event) and isinstance(_response, BedrockEnvelopedResponse):
179        _response = _response.as_bedrock_response()
180    return _response
181
182
183def has_envelope_header(event):  # pragma: unit
184    config = get_config_params()
185    default_envelope_value = config.get("envelope_by_default", True)
186    return _has_envelope_header(event, default_envelope_value)
187
188
189def _has_envelope_header(event, default_value: bool):  # pragma: unit
190    try:
191        headers = event["headers"]
192    except KeyError:
193        return default_value
194    try:
195        if headers[ValidHeaders.ENVELOPE_RESPONSE.value]:
196            if f"{headers[ValidHeaders.ENVELOPE_RESPONSE.value]}".lower() == "false":
197                return False
198            if f"{headers[ValidHeaders.ENVELOPE_RESPONSE.value]}".lower() == "true":
199                return True
200            return default_value
201    except KeyError:
202        return default_value
203    return default_value
204
205
206def _normalise_errors(errors: list[dict]) -> list[dict]:  # pragma: unit
207    return [_normalise_error(error) for error in errors]
208
209
210def _normalise_error(error: dict) -> dict:  # pragma: unit
211    try:
212        message = error.get("message") or error.get("error") or "Unknown error"
213        exception = error.get("exception") or error.get("originalException")
214        return {
215            "code": error.get("code", "-1"),
216            "message": message,
217            "exception": exception,
218            "error": message,  # Deprecated: use "message" instead
219            "originalException": exception  # Deprecated: use "exception" instead
220        }
221    except Exception as e:
222        error_message = f"An unknown error occurred: {error}"
223        exception_str = f"{e}"
224        return {
225            "code": "-1",
226            "message": error_message,
227            "exception": exception_str,
228            "error": error_message,  # Deprecated: use "message" instead
229            "originalException": exception_str  # Deprecated: use "exception" instead
230        }
log = <MyLogger BEDROCK-bedrock_response (INFO)>
class PaginationMetadata:
 9class PaginationMetadata:  # pragma: unit
10    def __init__(self, sort_column: str, sort_order: str, limit: int, offset: int, total: int = None):
11        self.sort_column = sort_column
12        self.sort_order = sort_order
13        self.limit = limit
14        self.offset = offset
15        self.total = total
16
17    def as_json(self):
18        return {
19            "sortColumn": self.sort_column,
20            "sortOrder": self.sort_order,
21            "limit": self.limit,
22            "offset": self.offset,
23            "total": self.total
24        }
PaginationMetadata( sort_column: str, sort_order: str, limit: int, offset: int, total: int = None)
10    def __init__(self, sort_column: str, sort_order: str, limit: int, offset: int, total: int = None):
11        self.sort_column = sort_column
12        self.sort_order = sort_order
13        self.limit = limit
14        self.offset = offset
15        self.total = total
sort_column
sort_order
limit
offset
total
def as_json(self):
17    def as_json(self):
18        return {
19            "sortColumn": self.sort_column,
20            "sortOrder": self.sort_order,
21            "limit": self.limit,
22            "offset": self.offset,
23            "total": self.total
24        }
class BedrockResponse:
27class BedrockResponse:  # pragma: unit
28    def __init__(self,
29                 data: dict | list | None,
30                 event: dict,
31                 errors: list[dict] = None,
32                 warnings: list[dict] = None,
33                 original_data: dict = None):
34        self.data = data
35        self.original_data = original_data
36        self.event = event
37        self.errors = errors or []
38        self.warnings = warnings or []
39
40    def as_json(self):
41        return self.data
42
43    def as_json_for_kafka(self):
44        return self.data
45
46    def as_unenveloped_json(self):
47        return self.data
BedrockResponse( data: dict | list | None, event: dict, errors: list[dict] = None, warnings: list[dict] = None, original_data: dict = None)
28    def __init__(self,
29                 data: dict | list | None,
30                 event: dict,
31                 errors: list[dict] = None,
32                 warnings: list[dict] = None,
33                 original_data: dict = None):
34        self.data = data
35        self.original_data = original_data
36        self.event = event
37        self.errors = errors or []
38        self.warnings = warnings or []
data
original_data
event
errors
warnings
def as_json(self):
40    def as_json(self):
41        return self.data
def as_json_for_kafka(self):
43    def as_json_for_kafka(self):
44        return self.data
def as_unenveloped_json(self):
46    def as_unenveloped_json(self):
47        return self.data
class BedrockUnenvelopedResponse(BedrockResponse):
50class BedrockUnenvelopedResponse(BedrockResponse):  # pragma: unit
51    """
52    Use this response object when you want to return raw data without ever enveloping it in an envelope.
53    """
54
55    def __init__(self, data: dict | list, event: dict):
56        super().__init__(data, event)

Use this response object when you want to return raw data without ever enveloping it in an envelope.

BedrockUnenvelopedResponse(data: dict | list, event: dict)
55    def __init__(self, data: dict | list, event: dict):
56        super().__init__(data, event)
class BedrockErrorResponse(BedrockResponse):
59class BedrockErrorResponse(BedrockResponse):  # pragma: unit
60    def __init__(self, errors: dict | list, event: dict):
61        super().__init__(None, event)
62        if isinstance(errors, dict):
63            message = errors.get("message") or errors.get("error")
64            exception = errors.get("exception") or errors.get("originalException")
65            self.errors.append({
66                "code": errors.get("code", "-1"),
67                "message": message,
68                "exception": exception,
69                "error": message,  # Deprecated: use "message" instead
70                "originalException": exception  # Deprecated: use "exception" instead
71            })
72        elif isinstance(errors, list):
73            self.errors = _normalise_errors(errors)
74        else:
75            log.warning("Provided 'errors' was not a dict or list")
76            log.debug(errors)
77            self.errors.append({"error": "An unknown error occurred"})
78
79    def as_json(self):
80        return {
81            "_errors": self.errors,
82            "errors": self.errors  # Deprecated: use "_errors" instead
83        }
84
85    def as_json_for_kafka(self):
86        return self.as_unenveloped_json()
87
88    def as_unenveloped_json(self):
89        return self.errors[0] if self.errors else {}
BedrockErrorResponse(errors: dict | list, event: dict)
60    def __init__(self, errors: dict | list, event: dict):
61        super().__init__(None, event)
62        if isinstance(errors, dict):
63            message = errors.get("message") or errors.get("error")
64            exception = errors.get("exception") or errors.get("originalException")
65            self.errors.append({
66                "code": errors.get("code", "-1"),
67                "message": message,
68                "exception": exception,
69                "error": message,  # Deprecated: use "message" instead
70                "originalException": exception  # Deprecated: use "exception" instead
71            })
72        elif isinstance(errors, list):
73            self.errors = _normalise_errors(errors)
74        else:
75            log.warning("Provided 'errors' was not a dict or list")
76            log.debug(errors)
77            self.errors.append({"error": "An unknown error occurred"})
def as_json(self):
79    def as_json(self):
80        return {
81            "_errors": self.errors,
82            "errors": self.errors  # Deprecated: use "_errors" instead
83        }
def as_json_for_kafka(self):
85    def as_json_for_kafka(self):
86        return self.as_unenveloped_json()
def as_unenveloped_json(self):
88    def as_unenveloped_json(self):
89        return self.errors[0] if self.errors else {}
class BedrockEnvelopedResponse(BedrockResponse):
 92class BedrockEnvelopedResponse(BedrockResponse):  # pragma: unit
 93    def __init__(self,
 94                 data: dict | list,
 95                 event: dict,
 96                 pagination: PaginationMetadata = None,
 97                 errors: list[dict] = None,
 98                 warnings: list[dict] = None,
 99                 original_data: dict = None):
100        super().__init__(data, event,
101                         errors=errors,
102                         warnings=warnings,
103                         original_data=original_data)
104        self.pagination = pagination
105
106    @classmethod
107    def from_bedrock_response(cls, response: BedrockResponse, pagination):
108        return cls(response.data, response.event,
109                   pagination=pagination,
110                   errors=response.errors,
111                   warnings=response.warnings,
112                   original_data=response.original_data)
113
114    def as_bedrock_response(self):
115        return BedrockResponse(self.data, self.event, self.errors, self.warnings, self.original_data)
116
117    def as_json(self):
118        return {
119            "data": self.data,
120            "originalData": self.original_data,
121            "_metadata": {
122                "pagination": self.pagination.as_json() if self.pagination and isinstance(self.data, list) else None
123            },
124            "_warnings": self.warnings,
125            "_errors": self.errors
126        }
BedrockEnvelopedResponse( data: dict | list, event: dict, pagination: PaginationMetadata = None, errors: list[dict] = None, warnings: list[dict] = None, original_data: dict = None)
 93    def __init__(self,
 94                 data: dict | list,
 95                 event: dict,
 96                 pagination: PaginationMetadata = None,
 97                 errors: list[dict] = None,
 98                 warnings: list[dict] = None,
 99                 original_data: dict = None):
100        super().__init__(data, event,
101                         errors=errors,
102                         warnings=warnings,
103                         original_data=original_data)
104        self.pagination = pagination
pagination
@classmethod
def from_bedrock_response( cls, response: BedrockResponse, pagination):
106    @classmethod
107    def from_bedrock_response(cls, response: BedrockResponse, pagination):
108        return cls(response.data, response.event,
109                   pagination=pagination,
110                   errors=response.errors,
111                   warnings=response.warnings,
112                   original_data=response.original_data)
def as_bedrock_response(self):
114    def as_bedrock_response(self):
115        return BedrockResponse(self.data, self.event, self.errors, self.warnings, self.original_data)
def as_json(self):
117    def as_json(self):
118        return {
119            "data": self.data,
120            "originalData": self.original_data,
121            "_metadata": {
122                "pagination": self.pagination.as_json() if self.pagination and isinstance(self.data, list) else None
123            },
124            "_warnings": self.warnings,
125            "_errors": self.errors
126        }
class BedrockDeletionResponse(BedrockEnvelopedResponse):
129class BedrockDeletionResponse(BedrockEnvelopedResponse):  # pragma: unit
130    def __init__(self, data, event, extra_objects={}):
131        data_object = data
132        if isinstance(data, dict) and "originalData" in data:
133            data_object = data["originalData"]
134        super().__init__(data_object, event)
135        self.extra_objects = {k: v for k, v in extra_objects.items() if k not in ['originalData', 'originalObject', 'data']}
136
137    def as_json(self):
138        return {
139            "deleted": True,
140            "data": None,
141            "originalData": self.data,
142            "originalObject": self.data,  # Deprecated: use "originalData" instead
143            **self.extra_objects
144        }
145
146    def as_unenveloped_json(self):
147        return {
148            "deleted": True,
149            "originalData": self.data,
150            "originalObject": self.data,  # Deprecated: use "originalData" instead
151            **self.extra_objects
152        }
BedrockDeletionResponse(data, event, extra_objects={})
130    def __init__(self, data, event, extra_objects={}):
131        data_object = data
132        if isinstance(data, dict) and "originalData" in data:
133            data_object = data["originalData"]
134        super().__init__(data_object, event)
135        self.extra_objects = {k: v for k, v in extra_objects.items() if k not in ['originalData', 'originalObject', 'data']}
extra_objects
def as_json(self):
137    def as_json(self):
138        return {
139            "deleted": True,
140            "data": None,
141            "originalData": self.data,
142            "originalObject": self.data,  # Deprecated: use "originalData" instead
143            **self.extra_objects
144        }
def as_unenveloped_json(self):
146    def as_unenveloped_json(self):
147        return {
148            "deleted": True,
149            "originalData": self.data,
150            "originalObject": self.data,  # Deprecated: use "originalData" instead
151            **self.extra_objects
152        }
def resolve_response( response: BedrockResponse | dict | list, event: dict, default_args: dict, arguments: dict):
155def resolve_response(response: BedrockResponse | dict | list, event: dict, default_args: dict,
156                     arguments: dict):  # pragma: unit
157    if isinstance(response, BedrockUnenvelopedResponse):
158        return response
159    if isinstance(response, BedrockDeletionResponse):
160        return response
161    if isinstance(response, dict) and "deleted" in response:
162        log.debug("Response was a deletion, wrapping it in a BedrockDeletionResponse")
163        return BedrockDeletionResponse(response, event)
164    if isinstance(response, dict) and "error" in response:
165        log.debug("Response was an error, wrapping it in a BedrockErrorResponse")
166        return BedrockErrorResponse(response, event)
167    _response = response
168    if isinstance(response, dict) or isinstance(response, list):
169        _response = BedrockResponse(response, event)
170        log.debug("Response was not a BedrockResponse, wrapping it in one")
171    if has_envelope_header(event) and not isinstance(_response, BedrockEnvelopedResponse):
172        pagination_data = PaginationMetadata(
173            arguments.get("sort_column", default_args.get("sort_column")),
174            arguments.get("sort_order", default_args.get("sort_order")),
175            arguments.get("limit", default_args.get("limit")),
176            arguments.get("offset", default_args.get("offset"))
177        )
178        _response = BedrockEnvelopedResponse.from_bedrock_response(_response, pagination_data)
179    if not has_envelope_header(event) and isinstance(_response, BedrockEnvelopedResponse):
180        _response = _response.as_bedrock_response()
181    return _response
def has_envelope_header(event):
184def has_envelope_header(event):  # pragma: unit
185    config = get_config_params()
186    default_envelope_value = config.get("envelope_by_default", True)
187    return _has_envelope_header(event, default_envelope_value)