bedrock.endpoints.endpoint_with_kafka_sync
1from base64 import b64decode # pragma: unit 2from json import loads # pragma: unit 3from typing import Any # pragma: unit 4from bedrock.endpoints.endpoint import Endpoint # pragma: unit 5from bedrock.log import log_config # pragma: unit 6from bedrock.endpoints.dto.bedrock_response import BedrockErrorResponse # pragma: unit 7 8log = log_config("Endpoint") # pragma: unit 9 10 11def get_records(event: dict) -> list[dict]: # pragma: unit 12 """ 13 Obtains the records from a Kafka event and sorts them by timestamp (most recent on top). 14 :param event: MSK event (See [AWS page](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#msk-sample-event)) 15 :return: 16 """ 17 flattened_records = [record for key in event['records'] for record in event['records'][key]] 18 flattened_records.sort(key=lambda x: x['timestamp']) 19 return flattened_records 20 21 22def decode_messages(records: list[dict]) -> list[dict]: # pragma: unit 23 """ 24 Takes in a list of MSK records extracts its `value` field, decodes it from base64 and converts it to a dictionary. 25 26 Note: this method injects `kafkaRecordTimestamp` and `kafkaTopic` fields into the message that gets sent. 27 28 :param records: A list of MSK records 29 :return: 30 """ 31 return [{ 32 **loads(b64decode(r['value']).decode('utf-8')), 33 'kafkaRecordTimestamp': r['timestamp'], 34 'kafkaTopic': r['topic'] 35 } for r in records] 36 37 38class EndpointWithKafkaSync(Endpoint): # pragma: unit 39 """ 40 You can make your endpoint inherit from this instead of `Endpoint` to automatically handle Kafka events. 41 42 It will automatically handle `CREATE`/`UPDATE`/`DELETE` kafka events for an entity that a service does not control 43 but still wants to know about (i.e. keep their own copy). 44 45 For the default behaviour to work: 46 * The class must have a model attributed to its `related_model` 47 * The `related_model` of the class must have a `from_kafka_event` method 48 * The kafka event must have a `changeType` attribute with a value of `"CREATED"`, `"UPDATED"`, or `"DELETED"`. 49 If it does not, then it will be considered `changeType="UPDATED"`. 50 51 52 # Example usage: 53 54 Take the scenario where Service A owns `Planet` entities and Service B owns `Country` entities. Service B needs to 55 keep a record of which planets exist. Service A can send CREATE/UPDATE/DELETE events to Kafka for `Planet` entities 56 and Service B can listen to those events and update its local copy of `Planet` entities accordingly. 57 Service B only cares about the planet name (and maybe the UUID?), so it can ignore all other fields. 58 59 This means Service B's `Planet` entity will look something like this: 60 ```python 61 class Planet(Base, ModelHelper): 62 __tablename__ = "servicea_planet" 63 64 name = Column(String(255), nullable=False) 65 66 @classmethod 67 def from_kafka_event(self, event): 68 new_planet = Planet() 69 new_planet.uuid = event["planetUuid"] 70 new_planet.name = event["planetName"] 71 return new_planet 72 ``` 73 74 And the `Planets` endpoint will look something like this: 75 76 ```python 77 @kafka_listener("planet-changed") 78 class Planets(EndpointWithKafkaSync): 79 def __init__(self): 80 super().__init__("/planets/", related_model=Planet) 81 82 # ... 83 ``` 84 85 # How does this work? 86 ```mermaid 87 flowchart LR 88 qm[Some Other Service] 89 ba[Bedrock-based Service] 90 qm --kafka event--> ba 91 ``` 92 Then inside the Bedrock-based Service (by default): 93 ```mermaid 94 sequenceDiagram 95 Kafka-)Endpoint: Kafka event (e.g. with changeType=UPDATED) 96 Endpoint->>Endpoint: Decode Kafka event 97 Endpoint->>Endpoint: Route event data (e.g. to msk_update) 98 Endpoint->>Model: Create Model from Kafka event (Model.from_kafka_event) 99 Model-->>Endpoint: Return Model 100 Endpoint->>Endpoint: Call related generic method (e.g. put_single_generic) 101 ``` 102 """ 103 104 def handle_msk_event(self, event) -> tuple[int, list[Any]]: # pragma: unit 105 """ 106 Override this method to handle Kafka events in your own custom way. 107 108 See the source for how the event gets decoded by default. 109 110 :param event: Kafka event 111 :return: A tuple of (status_code, all_event_results) 112 """ 113 return self._handle_msk_event_with_context(event) 114 115 def _handle_msk_event_with_context(self, event, context=None) -> tuple[int, list[Any]]: # pragma: unit 116 """ 117 This method is called by `handle_msk_event` to process the Kafka event. 118 The reason for this separation is to prevent a breaking change in the interface of `handle_msk_event` in case it has been overridden. 119 :param event: 120 :param context: 121 :return: 122 """ 123 flattened_records = get_records(event) 124 # Get latest record value 125 return_values = [] 126 for msk_message in decode_messages(flattened_records): 127 if "changeType" not in msk_message or msk_message["changeType"] == "UPDATED": 128 return_values.append(self.msk_update(msk_message, self.related_model)) 129 elif msk_message["changeType"] == "CREATED": 130 return_values.append(self.msk_create(msk_message, self.related_model)) 131 elif msk_message["changeType"] == "DELETED": 132 return_values.append(self.msk_delete(msk_message, self.related_model)) 133 else: 134 log.error(f"Unknown changeType {msk_message['changeType']}") 135 return_values.append((400, BedrockErrorResponse({ 136 "error": f"Unknown changeType {msk_message['changeType']}", 137 "originalException": None 138 }, event))) 139 status_codes = [status for status, _ in return_values] 140 results = [values.as_json_for_kafka() for _, values in return_values] 141 return max(status_codes), results 142 143 def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 144 """ 145 Override this method to handle Kafka creation events in your own custom way. 146 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `CREATED`. 147 148 :param message_body: 149 :param model_cls: 150 :return: A tuple of (status_code, result) 151 """ 152 153 model = model_cls.from_kafka_event(message_body) 154 155 # in case creates are being sent for entities we already have 156 existing_model = model_cls.get(model.uuid) 157 if existing_model is not None: 158 log.debug("Entity already exists, updating instead of creating.") 159 return self.msk_update(message_body, model_cls) 160 161 return self.post_global_generic({"body": model.as_json()}, model_cls) 162 163 def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 164 """ 165 Override this method to handle Kafka update events in your own custom way. 166 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `UPDATED` or no `changeType` at all. 167 168 :param message_body: 169 :param model_cls: 170 :return: A tuple of (status_code, result) 171 """ 172 model = model_cls.from_kafka_event(message_body) 173 174 # just in case updates are being sent for entities we don't have 175 existing_model = model_cls.get(model.uuid) 176 if existing_model is None: 177 log.debug("Entity does not exist, creating instead of updating.") 178 return self.msk_create(message_body, model_cls) 179 180 return self.put_single_generic({"body": model.as_json()}, model_cls) 181 182 def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 183 """ 184 Override this method to handle Kafka deletion events in your own custom way. 185 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `DELETED`. 186 187 :param message_body: 188 :param model_cls: 189 :return: A tuple of (status_code, result) 190 """ 191 model = model_cls.from_kafka_event(message_body) 192 return self.delete_single_generic({"body": model.as_json()}, model_cls, model.uuid)
12def get_records(event: dict) -> list[dict]: # pragma: unit 13 """ 14 Obtains the records from a Kafka event and sorts them by timestamp (most recent on top). 15 :param event: MSK event (See [AWS page](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#msk-sample-event)) 16 :return: 17 """ 18 flattened_records = [record for key in event['records'] for record in event['records'][key]] 19 flattened_records.sort(key=lambda x: x['timestamp']) 20 return flattened_records
Obtains the records from a Kafka event and sorts them by timestamp (most recent on top).
Parameters
- **event: MSK event (See AWS page)
Returns
23def decode_messages(records: list[dict]) -> list[dict]: # pragma: unit 24 """ 25 Takes in a list of MSK records extracts its `value` field, decodes it from base64 and converts it to a dictionary. 26 27 Note: this method injects `kafkaRecordTimestamp` and `kafkaTopic` fields into the message that gets sent. 28 29 :param records: A list of MSK records 30 :return: 31 """ 32 return [{ 33 **loads(b64decode(r['value']).decode('utf-8')), 34 'kafkaRecordTimestamp': r['timestamp'], 35 'kafkaTopic': r['topic'] 36 } for r in records]
Takes in a list of MSK records extracts its value field, decodes it from base64 and converts it to a dictionary.
Note: this method injects kafkaRecordTimestamp and kafkaTopic fields into the message that gets sent.
Parameters
- records: A list of MSK records
Returns
39class EndpointWithKafkaSync(Endpoint): # pragma: unit 40 """ 41 You can make your endpoint inherit from this instead of `Endpoint` to automatically handle Kafka events. 42 43 It will automatically handle `CREATE`/`UPDATE`/`DELETE` kafka events for an entity that a service does not control 44 but still wants to know about (i.e. keep their own copy). 45 46 For the default behaviour to work: 47 * The class must have a model attributed to its `related_model` 48 * The `related_model` of the class must have a `from_kafka_event` method 49 * The kafka event must have a `changeType` attribute with a value of `"CREATED"`, `"UPDATED"`, or `"DELETED"`. 50 If it does not, then it will be considered `changeType="UPDATED"`. 51 52 53 # Example usage: 54 55 Take the scenario where Service A owns `Planet` entities and Service B owns `Country` entities. Service B needs to 56 keep a record of which planets exist. Service A can send CREATE/UPDATE/DELETE events to Kafka for `Planet` entities 57 and Service B can listen to those events and update its local copy of `Planet` entities accordingly. 58 Service B only cares about the planet name (and maybe the UUID?), so it can ignore all other fields. 59 60 This means Service B's `Planet` entity will look something like this: 61 ```python 62 class Planet(Base, ModelHelper): 63 __tablename__ = "servicea_planet" 64 65 name = Column(String(255), nullable=False) 66 67 @classmethod 68 def from_kafka_event(self, event): 69 new_planet = Planet() 70 new_planet.uuid = event["planetUuid"] 71 new_planet.name = event["planetName"] 72 return new_planet 73 ``` 74 75 And the `Planets` endpoint will look something like this: 76 77 ```python 78 @kafka_listener("planet-changed") 79 class Planets(EndpointWithKafkaSync): 80 def __init__(self): 81 super().__init__("/planets/", related_model=Planet) 82 83 # ... 84 ``` 85 86 # How does this work? 87 ```mermaid 88 flowchart LR 89 qm[Some Other Service] 90 ba[Bedrock-based Service] 91 qm --kafka event--> ba 92 ``` 93 Then inside the Bedrock-based Service (by default): 94 ```mermaid 95 sequenceDiagram 96 Kafka-)Endpoint: Kafka event (e.g. with changeType=UPDATED) 97 Endpoint->>Endpoint: Decode Kafka event 98 Endpoint->>Endpoint: Route event data (e.g. to msk_update) 99 Endpoint->>Model: Create Model from Kafka event (Model.from_kafka_event) 100 Model-->>Endpoint: Return Model 101 Endpoint->>Endpoint: Call related generic method (e.g. put_single_generic) 102 ``` 103 """ 104 105 def handle_msk_event(self, event) -> tuple[int, list[Any]]: # pragma: unit 106 """ 107 Override this method to handle Kafka events in your own custom way. 108 109 See the source for how the event gets decoded by default. 110 111 :param event: Kafka event 112 :return: A tuple of (status_code, all_event_results) 113 """ 114 return self._handle_msk_event_with_context(event) 115 116 def _handle_msk_event_with_context(self, event, context=None) -> tuple[int, list[Any]]: # pragma: unit 117 """ 118 This method is called by `handle_msk_event` to process the Kafka event. 119 The reason for this separation is to prevent a breaking change in the interface of `handle_msk_event` in case it has been overridden. 120 :param event: 121 :param context: 122 :return: 123 """ 124 flattened_records = get_records(event) 125 # Get latest record value 126 return_values = [] 127 for msk_message in decode_messages(flattened_records): 128 if "changeType" not in msk_message or msk_message["changeType"] == "UPDATED": 129 return_values.append(self.msk_update(msk_message, self.related_model)) 130 elif msk_message["changeType"] == "CREATED": 131 return_values.append(self.msk_create(msk_message, self.related_model)) 132 elif msk_message["changeType"] == "DELETED": 133 return_values.append(self.msk_delete(msk_message, self.related_model)) 134 else: 135 log.error(f"Unknown changeType {msk_message['changeType']}") 136 return_values.append((400, BedrockErrorResponse({ 137 "error": f"Unknown changeType {msk_message['changeType']}", 138 "originalException": None 139 }, event))) 140 status_codes = [status for status, _ in return_values] 141 results = [values.as_json_for_kafka() for _, values in return_values] 142 return max(status_codes), results 143 144 def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 145 """ 146 Override this method to handle Kafka creation events in your own custom way. 147 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `CREATED`. 148 149 :param message_body: 150 :param model_cls: 151 :return: A tuple of (status_code, result) 152 """ 153 154 model = model_cls.from_kafka_event(message_body) 155 156 # in case creates are being sent for entities we already have 157 existing_model = model_cls.get(model.uuid) 158 if existing_model is not None: 159 log.debug("Entity already exists, updating instead of creating.") 160 return self.msk_update(message_body, model_cls) 161 162 return self.post_global_generic({"body": model.as_json()}, model_cls) 163 164 def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 165 """ 166 Override this method to handle Kafka update events in your own custom way. 167 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `UPDATED` or no `changeType` at all. 168 169 :param message_body: 170 :param model_cls: 171 :return: A tuple of (status_code, result) 172 """ 173 model = model_cls.from_kafka_event(message_body) 174 175 # just in case updates are being sent for entities we don't have 176 existing_model = model_cls.get(model.uuid) 177 if existing_model is None: 178 log.debug("Entity does not exist, creating instead of updating.") 179 return self.msk_create(message_body, model_cls) 180 181 return self.put_single_generic({"body": model.as_json()}, model_cls) 182 183 def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 184 """ 185 Override this method to handle Kafka deletion events in your own custom way. 186 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `DELETED`. 187 188 :param message_body: 189 :param model_cls: 190 :return: A tuple of (status_code, result) 191 """ 192 model = model_cls.from_kafka_event(message_body) 193 return self.delete_single_generic({"body": model.as_json()}, model_cls, model.uuid)
You can make your endpoint inherit from this instead of Endpoint to automatically handle Kafka events.
It will automatically handle CREATE/UPDATE/DELETE kafka events for an entity that a service does not control
but still wants to know about (i.e. keep their own copy).
For the default behaviour to work:
- The class must have a model attributed to its
related_model - The
related_modelof the class must have afrom_kafka_eventmethod - The kafka event must have a
changeTypeattribute with a value of"CREATED","UPDATED", or"DELETED". If it does not, then it will be consideredchangeType="UPDATED".
Example usage:
Take the scenario where Service A owns Planet entities and Service B owns Country entities. Service B needs to
keep a record of which planets exist. Service A can send CREATE/UPDATE/DELETE events to Kafka for Planet entities
and Service B can listen to those events and update its local copy of Planet entities accordingly.
Service B only cares about the planet name (and maybe the UUID?), so it can ignore all other fields.
This means Service B's Planet entity will look something like this:
class Planet(Base, ModelHelper):
__tablename__ = "servicea_planet"
name = Column(String(255), nullable=False)
@classmethod
def from_kafka_event(self, event):
new_planet = Planet()
new_planet.uuid = event["planetUuid"]
new_planet.name = event["planetName"]
return new_planet
And the Planets endpoint will look something like this:
@kafka_listener("planet-changed")
class Planets(EndpointWithKafkaSync):
def __init__(self):
super().__init__("/planets/", related_model=Planet)
# ...
How does this work?
flowchart LR qm[Some Other Service] ba[Bedrock-based Service] qm --kafka event--> ba
Then inside the Bedrock-based Service (by default):
sequenceDiagram Kafka-)Endpoint: Kafka event (e.g. with changeType=UPDATED) Endpoint->>Endpoint: Decode Kafka event Endpoint->>Endpoint: Route event data (e.g. to msk_update) Endpoint->>Model: Create Model from Kafka event (Model.from_kafka_event) Model-->>Endpoint: Return Model Endpoint->>Endpoint: Call related generic method (e.g. put_single_generic)
105 def handle_msk_event(self, event) -> tuple[int, list[Any]]: # pragma: unit 106 """ 107 Override this method to handle Kafka events in your own custom way. 108 109 See the source for how the event gets decoded by default. 110 111 :param event: Kafka event 112 :return: A tuple of (status_code, all_event_results) 113 """ 114 return self._handle_msk_event_with_context(event)
Override this method to handle Kafka events in your own custom way.
See the source for how the event gets decoded by default.
Parameters
- event: Kafka event
Returns
A tuple of (status_code, all_event_results)
144 def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 145 """ 146 Override this method to handle Kafka creation events in your own custom way. 147 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `CREATED`. 148 149 :param message_body: 150 :param model_cls: 151 :return: A tuple of (status_code, result) 152 """ 153 154 model = model_cls.from_kafka_event(message_body) 155 156 # in case creates are being sent for entities we already have 157 existing_model = model_cls.get(model.uuid) 158 if existing_model is not None: 159 log.debug("Entity already exists, updating instead of creating.") 160 return self.msk_update(message_body, model_cls) 161 162 return self.post_global_generic({"body": model.as_json()}, model_cls)
Override this method to handle Kafka creation events in your own custom way.
By default, this gets called by handle_msk_event when the event has a changeType of CREATED.
Parameters
- message_body:
- model_cls:
Returns
A tuple of (status_code, result)
164 def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 165 """ 166 Override this method to handle Kafka update events in your own custom way. 167 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `UPDATED` or no `changeType` at all. 168 169 :param message_body: 170 :param model_cls: 171 :return: A tuple of (status_code, result) 172 """ 173 model = model_cls.from_kafka_event(message_body) 174 175 # just in case updates are being sent for entities we don't have 176 existing_model = model_cls.get(model.uuid) 177 if existing_model is None: 178 log.debug("Entity does not exist, creating instead of updating.") 179 return self.msk_create(message_body, model_cls) 180 181 return self.put_single_generic({"body": model.as_json()}, model_cls)
Override this method to handle Kafka update events in your own custom way.
By default, this gets called by handle_msk_event when the event has a changeType of UPDATED or no changeType at all.
Parameters
- message_body:
- model_cls:
Returns
A tuple of (status_code, result)
183 def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, Any]: # pragma: no cover 184 """ 185 Override this method to handle Kafka deletion events in your own custom way. 186 By default, this gets called by `handle_msk_event` when the event has a `changeType` of `DELETED`. 187 188 :param message_body: 189 :param model_cls: 190 :return: A tuple of (status_code, result) 191 """ 192 model = model_cls.from_kafka_event(message_body) 193 return self.delete_single_generic({"body": model.as_json()}, model_cls, model.uuid)
Override this method to handle Kafka deletion events in your own custom way.
By default, this gets called by handle_msk_event when the event has a changeType of DELETED.
Parameters
- message_body:
- model_cls:
Returns
A tuple of (status_code, result)
Inherited Members
- bedrock.endpoints.endpoint.Endpoint
- Endpoint
- DEFAULT_ENDPOINTS_DATA
- param_key
- param
- resource
- prefix
- parent_resources
- path_params
- global_endpoint
- single_endpoint
- global_is_array
- has_custom_body_schema
- has_custom_return_schema
- get_body_schema
- get_return_schema
- handle_wakeup
- get_global
- get_single
- get_global_generic
- get_single_generic
- post_global
- post_single
- post_global_generic
- put_global
- put_single
- put_single_generic
- delete_global
- delete_single
- delete_single_generic