bedrock.endpoints.dto.bedrock_response
1from bedrock.config.headers import ValidHeaders # pragma: unit 2from bedrock.log import log_config # pragma: unit 3from bedrock.config import get_config_params 4 5log = log_config("bedrock_response") # pragma: unit 6 7 8class PaginationMetadata: # pragma: unit 9 def __init__(self, sort_column: str, sort_order: str, limit: int, offset: int, total: int = None): 10 self.sort_column = sort_column 11 self.sort_order = sort_order 12 self.limit = limit 13 self.offset = offset 14 self.total = total 15 16 def as_json(self): 17 return { 18 "sortColumn": self.sort_column, 19 "sortOrder": self.sort_order, 20 "limit": self.limit, 21 "offset": self.offset, 22 "total": self.total 23 } 24 25 26class BedrockResponse: # pragma: unit 27 def __init__(self, 28 data: dict | list | None, 29 event: dict, 30 errors: list[dict] = None, 31 warnings: list[dict] = None, 32 original_data: dict = None): 33 self.data = data 34 self.original_data = original_data 35 self.event = event 36 self.errors = errors or [] 37 self.warnings = warnings or [] 38 39 def as_json(self): 40 return self.data 41 42 def as_json_for_kafka(self): 43 return self.data 44 45 def as_unenveloped_json(self): 46 return self.data 47 48 49class BedrockUnenvelopedResponse(BedrockResponse): # pragma: unit 50 """ 51 Use this response object when you want to return raw data without ever enveloping it in an envelope. 52 """ 53 54 def __init__(self, data: dict | list, event: dict): 55 super().__init__(data, event) 56 57 58class BedrockErrorResponse(BedrockResponse): # pragma: unit 59 def __init__(self, errors: dict | list, event: dict): 60 super().__init__(None, event) 61 if isinstance(errors, dict): 62 message = errors.get("message") or errors.get("error") 63 exception = errors.get("exception") or errors.get("originalException") 64 self.errors.append({ 65 "code": errors.get("code", "-1"), 66 "message": message, 67 "exception": exception, 68 "error": message, # Deprecated: use "message" instead 69 "originalException": exception # Deprecated: use "exception" instead 70 }) 71 elif isinstance(errors, list): 72 self.errors = _normalise_errors(errors) 73 else: 74 log.warning("Provided 'errors' was not a dict or list") 75 log.debug(errors) 76 self.errors.append({"error": "An unknown error occurred"}) 77 78 def as_json(self): 79 return { 80 "_errors": self.errors, 81 "errors": self.errors # Deprecated: use "_errors" instead 82 } 83 84 def as_json_for_kafka(self): 85 return self.as_unenveloped_json() 86 87 def as_unenveloped_json(self): 88 return self.errors[0] if self.errors else {} 89 90 91class BedrockEnvelopedResponse(BedrockResponse): # pragma: unit 92 def __init__(self, 93 data: dict | list, 94 event: dict, 95 pagination: PaginationMetadata = None, 96 errors: list[dict] = None, 97 warnings: list[dict] = None, 98 original_data: dict = None): 99 super().__init__(data, event, 100 errors=errors, 101 warnings=warnings, 102 original_data=original_data) 103 self.pagination = pagination 104 105 @classmethod 106 def from_bedrock_response(cls, response: BedrockResponse, pagination): 107 return cls(response.data, response.event, 108 pagination=pagination, 109 errors=response.errors, 110 warnings=response.warnings, 111 original_data=response.original_data) 112 113 def as_bedrock_response(self): 114 return BedrockResponse(self.data, self.event, self.errors, self.warnings, self.original_data) 115 116 def as_json(self): 117 return { 118 "data": self.data, 119 "originalData": self.original_data, 120 "_metadata": { 121 "pagination": self.pagination.as_json() if self.pagination and isinstance(self.data, list) else None 122 }, 123 "_warnings": self.warnings, 124 "_errors": self.errors 125 } 126 127 128class BedrockDeletionResponse(BedrockEnvelopedResponse): # pragma: unit 129 def __init__(self, data, event, extra_objects={}): 130 data_object = data 131 if isinstance(data, dict) and "originalData" in data: 132 data_object = data["originalData"] 133 super().__init__(data_object, event) 134 self.extra_objects = {k: v for k, v in extra_objects.items() if k not in ['originalData', 'originalObject', 'data']} 135 136 def as_json(self): 137 return { 138 "deleted": True, 139 "data": None, 140 "originalData": self.data, 141 "originalObject": self.data, # Deprecated: use "originalData" instead 142 **self.extra_objects 143 } 144 145 def as_unenveloped_json(self): 146 return { 147 "deleted": True, 148 "originalData": self.data, 149 "originalObject": self.data, # Deprecated: use "originalData" instead 150 **self.extra_objects 151 } 152 153 154def resolve_response(response: BedrockResponse | dict | list, event: dict, default_args: dict, 155 arguments: dict): # pragma: unit 156 if isinstance(response, BedrockUnenvelopedResponse): 157 return response 158 if isinstance(response, BedrockDeletionResponse): 159 return response 160 if isinstance(response, dict) and "deleted" in response: 161 log.debug("Response was a deletion, wrapping it in a BedrockDeletionResponse") 162 return BedrockDeletionResponse(response, event) 163 if isinstance(response, dict) and "error" in response: 164 log.debug("Response was an error, wrapping it in a BedrockErrorResponse") 165 return BedrockErrorResponse(response, event) 166 _response = response 167 if isinstance(response, dict) or isinstance(response, list): 168 _response = BedrockResponse(response, event) 169 log.debug("Response was not a BedrockResponse, wrapping it in one") 170 if has_envelope_header(event) and not isinstance(_response, BedrockEnvelopedResponse): 171 pagination_data = PaginationMetadata( 172 arguments.get("sort_column", default_args.get("sort_column")), 173 arguments.get("sort_order", default_args.get("sort_order")), 174 arguments.get("limit", default_args.get("limit")), 175 arguments.get("offset", default_args.get("offset")) 176 ) 177 _response = BedrockEnvelopedResponse.from_bedrock_response(_response, pagination_data) 178 if not has_envelope_header(event) and isinstance(_response, BedrockEnvelopedResponse): 179 _response = _response.as_bedrock_response() 180 return _response 181 182 183def has_envelope_header(event): # pragma: unit 184 config = get_config_params() 185 default_envelope_value = config.get("envelope_by_default", True) 186 return _has_envelope_header(event, default_envelope_value) 187 188 189def _has_envelope_header(event, default_value: bool): # pragma: unit 190 try: 191 headers = event["headers"] 192 except KeyError: 193 return default_value 194 try: 195 if headers[ValidHeaders.ENVELOPE_RESPONSE.value]: 196 if f"{headers[ValidHeaders.ENVELOPE_RESPONSE.value]}".lower() == "false": 197 return False 198 if f"{headers[ValidHeaders.ENVELOPE_RESPONSE.value]}".lower() == "true": 199 return True 200 return default_value 201 except KeyError: 202 return default_value 203 return default_value 204 205 206def _normalise_errors(errors: list[dict]) -> list[dict]: # pragma: unit 207 return [_normalise_error(error) for error in errors] 208 209 210def _normalise_error(error: dict) -> dict: # pragma: unit 211 try: 212 message = error.get("message") or error.get("error") or "Unknown error" 213 exception = error.get("exception") or error.get("originalException") 214 return { 215 "code": error.get("code", "-1"), 216 "message": message, 217 "exception": exception, 218 "error": message, # Deprecated: use "message" instead 219 "originalException": exception # Deprecated: use "exception" instead 220 } 221 except Exception as e: 222 error_message = f"An unknown error occurred: {error}" 223 exception_str = f"{e}" 224 return { 225 "code": "-1", 226 "message": error_message, 227 "exception": exception_str, 228 "error": error_message, # Deprecated: use "message" instead 229 "originalException": exception_str # Deprecated: use "exception" instead 230 }
log =
<MyLogger BEDROCK-bedrock_response (INFO)>
class
PaginationMetadata:
9class PaginationMetadata: # pragma: unit 10 def __init__(self, sort_column: str, sort_order: str, limit: int, offset: int, total: int = None): 11 self.sort_column = sort_column 12 self.sort_order = sort_order 13 self.limit = limit 14 self.offset = offset 15 self.total = total 16 17 def as_json(self): 18 return { 19 "sortColumn": self.sort_column, 20 "sortOrder": self.sort_order, 21 "limit": self.limit, 22 "offset": self.offset, 23 "total": self.total 24 }
class
BedrockResponse:
27class BedrockResponse: # pragma: unit 28 def __init__(self, 29 data: dict | list | None, 30 event: dict, 31 errors: list[dict] = None, 32 warnings: list[dict] = None, 33 original_data: dict = None): 34 self.data = data 35 self.original_data = original_data 36 self.event = event 37 self.errors = errors or [] 38 self.warnings = warnings or [] 39 40 def as_json(self): 41 return self.data 42 43 def as_json_for_kafka(self): 44 return self.data 45 46 def as_unenveloped_json(self): 47 return self.data
BedrockResponse( data: dict | list | None, event: dict, errors: list[dict] = None, warnings: list[dict] = None, original_data: dict = None)
28 def __init__(self, 29 data: dict | list | None, 30 event: dict, 31 errors: list[dict] = None, 32 warnings: list[dict] = None, 33 original_data: dict = None): 34 self.data = data 35 self.original_data = original_data 36 self.event = event 37 self.errors = errors or [] 38 self.warnings = warnings or []
50class BedrockUnenvelopedResponse(BedrockResponse): # pragma: unit 51 """ 52 Use this response object when you want to return raw data without ever enveloping it in an envelope. 53 """ 54 55 def __init__(self, data: dict | list, event: dict): 56 super().__init__(data, event)
Use this response object when you want to return raw data without ever enveloping it in an envelope.
Inherited Members
59class BedrockErrorResponse(BedrockResponse): # pragma: unit 60 def __init__(self, errors: dict | list, event: dict): 61 super().__init__(None, event) 62 if isinstance(errors, dict): 63 message = errors.get("message") or errors.get("error") 64 exception = errors.get("exception") or errors.get("originalException") 65 self.errors.append({ 66 "code": errors.get("code", "-1"), 67 "message": message, 68 "exception": exception, 69 "error": message, # Deprecated: use "message" instead 70 "originalException": exception # Deprecated: use "exception" instead 71 }) 72 elif isinstance(errors, list): 73 self.errors = _normalise_errors(errors) 74 else: 75 log.warning("Provided 'errors' was not a dict or list") 76 log.debug(errors) 77 self.errors.append({"error": "An unknown error occurred"}) 78 79 def as_json(self): 80 return { 81 "_errors": self.errors, 82 "errors": self.errors # Deprecated: use "_errors" instead 83 } 84 85 def as_json_for_kafka(self): 86 return self.as_unenveloped_json() 87 88 def as_unenveloped_json(self): 89 return self.errors[0] if self.errors else {}
BedrockErrorResponse(errors: dict | list, event: dict)
60 def __init__(self, errors: dict | list, event: dict): 61 super().__init__(None, event) 62 if isinstance(errors, dict): 63 message = errors.get("message") or errors.get("error") 64 exception = errors.get("exception") or errors.get("originalException") 65 self.errors.append({ 66 "code": errors.get("code", "-1"), 67 "message": message, 68 "exception": exception, 69 "error": message, # Deprecated: use "message" instead 70 "originalException": exception # Deprecated: use "exception" instead 71 }) 72 elif isinstance(errors, list): 73 self.errors = _normalise_errors(errors) 74 else: 75 log.warning("Provided 'errors' was not a dict or list") 76 log.debug(errors) 77 self.errors.append({"error": "An unknown error occurred"})
Inherited Members
92class BedrockEnvelopedResponse(BedrockResponse): # pragma: unit 93 def __init__(self, 94 data: dict | list, 95 event: dict, 96 pagination: PaginationMetadata = None, 97 errors: list[dict] = None, 98 warnings: list[dict] = None, 99 original_data: dict = None): 100 super().__init__(data, event, 101 errors=errors, 102 warnings=warnings, 103 original_data=original_data) 104 self.pagination = pagination 105 106 @classmethod 107 def from_bedrock_response(cls, response: BedrockResponse, pagination): 108 return cls(response.data, response.event, 109 pagination=pagination, 110 errors=response.errors, 111 warnings=response.warnings, 112 original_data=response.original_data) 113 114 def as_bedrock_response(self): 115 return BedrockResponse(self.data, self.event, self.errors, self.warnings, self.original_data) 116 117 def as_json(self): 118 return { 119 "data": self.data, 120 "originalData": self.original_data, 121 "_metadata": { 122 "pagination": self.pagination.as_json() if self.pagination and isinstance(self.data, list) else None 123 }, 124 "_warnings": self.warnings, 125 "_errors": self.errors 126 }
BedrockEnvelopedResponse( data: dict | list, event: dict, pagination: PaginationMetadata = None, errors: list[dict] = None, warnings: list[dict] = None, original_data: dict = None)
93 def __init__(self, 94 data: dict | list, 95 event: dict, 96 pagination: PaginationMetadata = None, 97 errors: list[dict] = None, 98 warnings: list[dict] = None, 99 original_data: dict = None): 100 super().__init__(data, event, 101 errors=errors, 102 warnings=warnings, 103 original_data=original_data) 104 self.pagination = pagination
Inherited Members
129class BedrockDeletionResponse(BedrockEnvelopedResponse): # pragma: unit 130 def __init__(self, data, event, extra_objects={}): 131 data_object = data 132 if isinstance(data, dict) and "originalData" in data: 133 data_object = data["originalData"] 134 super().__init__(data_object, event) 135 self.extra_objects = {k: v for k, v in extra_objects.items() if k not in ['originalData', 'originalObject', 'data']} 136 137 def as_json(self): 138 return { 139 "deleted": True, 140 "data": None, 141 "originalData": self.data, 142 "originalObject": self.data, # Deprecated: use "originalData" instead 143 **self.extra_objects 144 } 145 146 def as_unenveloped_json(self): 147 return { 148 "deleted": True, 149 "originalData": self.data, 150 "originalObject": self.data, # Deprecated: use "originalData" instead 151 **self.extra_objects 152 }
BedrockDeletionResponse(data, event, extra_objects={})
130 def __init__(self, data, event, extra_objects={}): 131 data_object = data 132 if isinstance(data, dict) and "originalData" in data: 133 data_object = data["originalData"] 134 super().__init__(data_object, event) 135 self.extra_objects = {k: v for k, v in extra_objects.items() if k not in ['originalData', 'originalObject', 'data']}
def
resolve_response( response: BedrockResponse | dict | list, event: dict, default_args: dict, arguments: dict):
155def resolve_response(response: BedrockResponse | dict | list, event: dict, default_args: dict, 156 arguments: dict): # pragma: unit 157 if isinstance(response, BedrockUnenvelopedResponse): 158 return response 159 if isinstance(response, BedrockDeletionResponse): 160 return response 161 if isinstance(response, dict) and "deleted" in response: 162 log.debug("Response was a deletion, wrapping it in a BedrockDeletionResponse") 163 return BedrockDeletionResponse(response, event) 164 if isinstance(response, dict) and "error" in response: 165 log.debug("Response was an error, wrapping it in a BedrockErrorResponse") 166 return BedrockErrorResponse(response, event) 167 _response = response 168 if isinstance(response, dict) or isinstance(response, list): 169 _response = BedrockResponse(response, event) 170 log.debug("Response was not a BedrockResponse, wrapping it in one") 171 if has_envelope_header(event) and not isinstance(_response, BedrockEnvelopedResponse): 172 pagination_data = PaginationMetadata( 173 arguments.get("sort_column", default_args.get("sort_column")), 174 arguments.get("sort_order", default_args.get("sort_order")), 175 arguments.get("limit", default_args.get("limit")), 176 arguments.get("offset", default_args.get("offset")) 177 ) 178 _response = BedrockEnvelopedResponse.from_bedrock_response(_response, pagination_data) 179 if not has_envelope_header(event) and isinstance(_response, BedrockEnvelopedResponse): 180 _response = _response.as_bedrock_response() 181 return _response
def
has_envelope_header(event):