bedrock.endpoints.endpoint_with_kafka_sync

  1from base64 import b64decode  # pragma: unit
  2from json import loads  # pragma: unit
  3from typing import Any  # pragma: unit
  4from bedrock.endpoints.endpoint import Endpoint  # pragma: unit
  5from bedrock.log import log_config  # pragma: unit
  6from bedrock.endpoints.dto.bedrock_response import BedrockErrorResponse  # pragma: unit
  7
  8log = log_config("Endpoint")  # pragma: unit
  9
 10
 11def get_records(event: dict) -> list[dict]:  # pragma: unit
 12    """
 13    Obtains the records from a Kafka event and sorts them by timestamp (most recent on top).
 14    :param event: MSK event (See [AWS page](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#msk-sample-event))
 15    :return:
 16    """
 17    flattened_records = [record for key in event['records'] for record in event['records'][key]]
 18    flattened_records.sort(key=lambda x: x['timestamp'])
 19    return flattened_records
 20
 21
 22def decode_messages(records: list[dict]) -> list[dict]:  # pragma: unit
 23    """
 24    Takes in a list of MSK records extracts its `value` field, decodes it from base64 and converts it to a dictionary.
 25
 26    Note: this method injects `kafkaRecordTimestamp` and `kafkaTopic` fields into the message that gets sent.
 27
 28    :param records: A list of MSK records
 29    :return:
 30    """
 31    return [{
 32        **loads(b64decode(r['value']).decode('utf-8')),
 33        'kafkaRecordTimestamp': r['timestamp'],
 34        'kafkaTopic': r['topic']
 35    } for r in records]
 36
 37
 38class EndpointWithKafkaSync(Endpoint):  # pragma: unit
 39    """
 40    You can make your endpoint inherit from this instead of `Endpoint` to automatically handle Kafka events.
 41
 42    It will automatically handle `CREATE`/`UPDATE`/`DELETE` kafka events for an entity that a service does not control
 43    but still wants to know about (i.e. keep their own copy).
 44
 45    For the default behaviour to work:
 46    * The class must have a model attributed to its `related_model`
 47    * The `related_model` of the class must have a `from_kafka_event` method
 48    * The kafka event must have a `changeType` attribute with a value of `"CREATED"`, `"UPDATED"`, or `"DELETED"`.
 49      If it does not, then it will be considered `changeType="UPDATED"`.
 50
 51
 52    # Example usage:
 53
 54    Take the scenario where Service A owns `Planet` entities and Service B owns `Country` entities. Service B needs to
 55    keep a record of which planets exist. Service A can send CREATE/UPDATE/DELETE events to Kafka for `Planet` entities
 56    and Service B can listen to those events and update its local copy of `Planet` entities accordingly.
 57    Service B only cares about the planet name (and maybe the UUID?), so it can ignore all other fields.
 58
 59    This means Service B's `Planet` entity will look something like this:
 60    ```python
 61    class Planet(Base, ModelHelper):
 62        __tablename__ = "servicea_planet"
 63
 64        name = Column(String(255), nullable=False)
 65
 66        @classmethod
 67        def from_kafka_event(self, event):
 68            new_planet = Planet()
 69            new_planet.uuid = event["planetUuid"]
 70            new_planet.name = event["planetName"]
 71            return new_planet
 72    ```
 73
 74    And the `Planets` endpoint will look something like this:
 75
 76    ```python
 77    @kafka_listener("planet-changed")
 78    class Planets(EndpointWithKafkaSync):
 79        def __init__(self):
 80            super().__init__("/planets/", related_model=Planet)
 81
 82        # ...
 83    ```
 84
 85    # How does this work?
 86    ```mermaid
 87    flowchart LR
 88        qm[Some Other Service]
 89        ba[Bedrock-based Service]
 90        qm --kafka event--> ba
 91    ```
 92    Then inside the Bedrock-based Service (by default):
 93    ```mermaid
 94    sequenceDiagram
 95        Kafka-)Endpoint: Kafka event (e.g. with changeType=UPDATED)
 96        Endpoint->>Endpoint: Decode Kafka event
 97        Endpoint->>Endpoint: Route event data (e.g. to msk_update)
 98        Endpoint->>Model: Create Model from Kafka event (Model.from_kafka_event)
 99        Model-->>Endpoint: Return Model
100        Endpoint->>Endpoint: Call related generic method (e.g. put_single_generic)
101    ```
102    """
103
104    def handle_msk_event(self, event) -> tuple[int, list[Any]]:  # pragma: unit
105        """
106        Override this method to handle Kafka events in your own custom way.
107
108        See the source for how the event gets decoded by default.
109
110        :param event: Kafka event
111        :return: A tuple of (status_code, all_event_results)
112        """
113        return self._handle_msk_event_with_context(event)
114
115    def _handle_msk_event_with_context(self, event, context=None) -> tuple[int, list[Any]]:  # pragma: unit
116        """
117        This method is called by `handle_msk_event` to process the Kafka event.
118        The reason for this separation is to prevent a breaking change in the interface of `handle_msk_event` in case it has been overridden.
119        :param event:
120        :param context:
121        :return:
122        """
123        flattened_records = get_records(event)
124        # Get latest record value
125        return_values = []
126        for msk_message in decode_messages(flattened_records):
127            if "changeType" not in msk_message or msk_message["changeType"] == "UPDATED":
128                return_values.append(self.msk_update(msk_message, self.related_model))
129            elif msk_message["changeType"] == "CREATED":
130                return_values.append(self.msk_create(msk_message, self.related_model))
131            elif msk_message["changeType"] == "DELETED":
132                return_values.append(self.msk_delete(msk_message, self.related_model))
133            else:
134                log.error(f"Unknown changeType {msk_message['changeType']}")
135                return_values.append((400, BedrockErrorResponse({
136                    "error": f"Unknown changeType {msk_message['changeType']}",
137                    "originalException": None
138                }, event)))
139        status_codes = [status for status, _ in return_values]
140        results = [values.as_json_for_kafka() for _, values in return_values]
141        return max(status_codes), results
142
143    def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
144        """
145        Override this method to handle Kafka creation events in your own custom way.
146        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `CREATED`.
147
148        :param message_body:
149        :param model_cls:
150        :return: A tuple of (status_code, result)
151        """
152
153        model = model_cls.from_kafka_event(message_body)
154
155        # in case creates are being sent for entities we already have
156        existing_model = model_cls.get(model.uuid)
157        if existing_model is not None:
158            log.debug("Entity already exists, updating instead of creating.")
159            return self.msk_update(message_body, model_cls)
160
161        return self.post_global_generic({"body": model.as_json()}, model_cls)
162
163    def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
164        """
165        Override this method to handle Kafka update events in your own custom way.
166        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `UPDATED` or no `changeType` at all.
167
168        :param message_body:
169        :param model_cls:
170        :return: A tuple of (status_code, result)
171        """
172        model = model_cls.from_kafka_event(message_body)
173
174        # just in case updates are being sent for entities we don't have
175        existing_model = model_cls.get(model.uuid)
176        if existing_model is None:
177            log.debug("Entity does not exist, creating instead of updating.")
178            return self.msk_create(message_body, model_cls)
179
180        return self.put_single_generic({"body": model.as_json()}, model_cls)
181
182    def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
183        """
184        Override this method to handle Kafka deletion events in your own custom way.
185        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `DELETED`.
186
187        :param message_body:
188        :param model_cls:
189        :return: A tuple of (status_code, result)
190        """
191        model = model_cls.from_kafka_event(message_body)
192        return self.delete_single_generic({"body": model.as_json()}, model_cls, model.uuid)
log = <MyLogger BEDROCK-Endpoint (INFO)>
def get_records(event: dict) -> list[dict]:
12def get_records(event: dict) -> list[dict]:  # pragma: unit
13    """
14    Obtains the records from a Kafka event and sorts them by timestamp (most recent on top).
15    :param event: MSK event (See [AWS page](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#msk-sample-event))
16    :return:
17    """
18    flattened_records = [record for key in event['records'] for record in event['records'][key]]
19    flattened_records.sort(key=lambda x: x['timestamp'])
20    return flattened_records

Obtains the records from a Kafka event and sorts them by timestamp (most recent on top).

Parameters
Returns
def decode_messages(records: list[dict]) -> list[dict]:
23def decode_messages(records: list[dict]) -> list[dict]:  # pragma: unit
24    """
25    Takes in a list of MSK records extracts its `value` field, decodes it from base64 and converts it to a dictionary.
26
27    Note: this method injects `kafkaRecordTimestamp` and `kafkaTopic` fields into the message that gets sent.
28
29    :param records: A list of MSK records
30    :return:
31    """
32    return [{
33        **loads(b64decode(r['value']).decode('utf-8')),
34        'kafkaRecordTimestamp': r['timestamp'],
35        'kafkaTopic': r['topic']
36    } for r in records]

Takes in a list of MSK records extracts its value field, decodes it from base64 and converts it to a dictionary.

Note: this method injects kafkaRecordTimestamp and kafkaTopic fields into the message that gets sent.

Parameters
  • records: A list of MSK records
Returns
class EndpointWithKafkaSync(bedrock.endpoints.endpoint.Endpoint):
 39class EndpointWithKafkaSync(Endpoint):  # pragma: unit
 40    """
 41    You can make your endpoint inherit from this instead of `Endpoint` to automatically handle Kafka events.
 42
 43    It will automatically handle `CREATE`/`UPDATE`/`DELETE` kafka events for an entity that a service does not control
 44    but still wants to know about (i.e. keep their own copy).
 45
 46    For the default behaviour to work:
 47    * The class must have a model attributed to its `related_model`
 48    * The `related_model` of the class must have a `from_kafka_event` method
 49    * The kafka event must have a `changeType` attribute with a value of `"CREATED"`, `"UPDATED"`, or `"DELETED"`.
 50      If it does not, then it will be considered `changeType="UPDATED"`.
 51
 52
 53    # Example usage:
 54
 55    Take the scenario where Service A owns `Planet` entities and Service B owns `Country` entities. Service B needs to
 56    keep a record of which planets exist. Service A can send CREATE/UPDATE/DELETE events to Kafka for `Planet` entities
 57    and Service B can listen to those events and update its local copy of `Planet` entities accordingly.
 58    Service B only cares about the planet name (and maybe the UUID?), so it can ignore all other fields.
 59
 60    This means Service B's `Planet` entity will look something like this:
 61    ```python
 62    class Planet(Base, ModelHelper):
 63        __tablename__ = "servicea_planet"
 64
 65        name = Column(String(255), nullable=False)
 66
 67        @classmethod
 68        def from_kafka_event(self, event):
 69            new_planet = Planet()
 70            new_planet.uuid = event["planetUuid"]
 71            new_planet.name = event["planetName"]
 72            return new_planet
 73    ```
 74
 75    And the `Planets` endpoint will look something like this:
 76
 77    ```python
 78    @kafka_listener("planet-changed")
 79    class Planets(EndpointWithKafkaSync):
 80        def __init__(self):
 81            super().__init__("/planets/", related_model=Planet)
 82
 83        # ...
 84    ```
 85
 86    # How does this work?
 87    ```mermaid
 88    flowchart LR
 89        qm[Some Other Service]
 90        ba[Bedrock-based Service]
 91        qm --kafka event--> ba
 92    ```
 93    Then inside the Bedrock-based Service (by default):
 94    ```mermaid
 95    sequenceDiagram
 96        Kafka-)Endpoint: Kafka event (e.g. with changeType=UPDATED)
 97        Endpoint->>Endpoint: Decode Kafka event
 98        Endpoint->>Endpoint: Route event data (e.g. to msk_update)
 99        Endpoint->>Model: Create Model from Kafka event (Model.from_kafka_event)
100        Model-->>Endpoint: Return Model
101        Endpoint->>Endpoint: Call related generic method (e.g. put_single_generic)
102    ```
103    """
104
105    def handle_msk_event(self, event) -> tuple[int, list[Any]]:  # pragma: unit
106        """
107        Override this method to handle Kafka events in your own custom way.
108
109        See the source for how the event gets decoded by default.
110
111        :param event: Kafka event
112        :return: A tuple of (status_code, all_event_results)
113        """
114        return self._handle_msk_event_with_context(event)
115
116    def _handle_msk_event_with_context(self, event, context=None) -> tuple[int, list[Any]]:  # pragma: unit
117        """
118        This method is called by `handle_msk_event` to process the Kafka event.
119        The reason for this separation is to prevent a breaking change in the interface of `handle_msk_event` in case it has been overridden.
120        :param event:
121        :param context:
122        :return:
123        """
124        flattened_records = get_records(event)
125        # Get latest record value
126        return_values = []
127        for msk_message in decode_messages(flattened_records):
128            if "changeType" not in msk_message or msk_message["changeType"] == "UPDATED":
129                return_values.append(self.msk_update(msk_message, self.related_model))
130            elif msk_message["changeType"] == "CREATED":
131                return_values.append(self.msk_create(msk_message, self.related_model))
132            elif msk_message["changeType"] == "DELETED":
133                return_values.append(self.msk_delete(msk_message, self.related_model))
134            else:
135                log.error(f"Unknown changeType {msk_message['changeType']}")
136                return_values.append((400, BedrockErrorResponse({
137                    "error": f"Unknown changeType {msk_message['changeType']}",
138                    "originalException": None
139                }, event)))
140        status_codes = [status for status, _ in return_values]
141        results = [values.as_json_for_kafka() for _, values in return_values]
142        return max(status_codes), results
143
144    def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
145        """
146        Override this method to handle Kafka creation events in your own custom way.
147        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `CREATED`.
148
149        :param message_body:
150        :param model_cls:
151        :return: A tuple of (status_code, result)
152        """
153
154        model = model_cls.from_kafka_event(message_body)
155
156        # in case creates are being sent for entities we already have
157        existing_model = model_cls.get(model.uuid)
158        if existing_model is not None:
159            log.debug("Entity already exists, updating instead of creating.")
160            return self.msk_update(message_body, model_cls)
161
162        return self.post_global_generic({"body": model.as_json()}, model_cls)
163
164    def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
165        """
166        Override this method to handle Kafka update events in your own custom way.
167        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `UPDATED` or no `changeType` at all.
168
169        :param message_body:
170        :param model_cls:
171        :return: A tuple of (status_code, result)
172        """
173        model = model_cls.from_kafka_event(message_body)
174
175        # just in case updates are being sent for entities we don't have
176        existing_model = model_cls.get(model.uuid)
177        if existing_model is None:
178            log.debug("Entity does not exist, creating instead of updating.")
179            return self.msk_create(message_body, model_cls)
180
181        return self.put_single_generic({"body": model.as_json()}, model_cls)
182
183    def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
184        """
185        Override this method to handle Kafka deletion events in your own custom way.
186        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `DELETED`.
187
188        :param message_body:
189        :param model_cls:
190        :return: A tuple of (status_code, result)
191        """
192        model = model_cls.from_kafka_event(message_body)
193        return self.delete_single_generic({"body": model.as_json()}, model_cls, model.uuid)

You can make your endpoint inherit from this instead of Endpoint to automatically handle Kafka events.

It will automatically handle CREATE/UPDATE/DELETE kafka events for an entity that a service does not control but still wants to know about (i.e. keep their own copy).

For the default behaviour to work:

  • The class must have a model attributed to its related_model
  • The related_model of the class must have a from_kafka_event method
  • The kafka event must have a changeType attribute with a value of "CREATED", "UPDATED", or "DELETED". If it does not, then it will be considered changeType="UPDATED".

Example usage:

Take the scenario where Service A owns Planet entities and Service B owns Country entities. Service B needs to keep a record of which planets exist. Service A can send CREATE/UPDATE/DELETE events to Kafka for Planet entities and Service B can listen to those events and update its local copy of Planet entities accordingly. Service B only cares about the planet name (and maybe the UUID?), so it can ignore all other fields.

This means Service B's Planet entity will look something like this:

class Planet(Base, ModelHelper):
    __tablename__ = "servicea_planet"

    name = Column(String(255), nullable=False)

    @classmethod
    def from_kafka_event(self, event):
        new_planet = Planet()
        new_planet.uuid = event["planetUuid"]
        new_planet.name = event["planetName"]
        return new_planet

And the Planets endpoint will look something like this:

@kafka_listener("planet-changed")
class Planets(EndpointWithKafkaSync):
    def __init__(self):
        super().__init__("/planets/", related_model=Planet)

    # ...

How does this work?

flowchart LR qm[Some Other Service] ba[Bedrock-based Service] qm --kafka event--> ba

Then inside the Bedrock-based Service (by default):

sequenceDiagram Kafka-)Endpoint: Kafka event (e.g. with changeType=UPDATED) Endpoint->>Endpoint: Decode Kafka event Endpoint->>Endpoint: Route event data (e.g. to msk_update) Endpoint->>Model: Create Model from Kafka event (Model.from_kafka_event) Model-->>Endpoint: Return Model Endpoint->>Endpoint: Call related generic method (e.g. put_single_generic)
def handle_msk_event(self, event) -> tuple[int, list[typing.Any]]:
105    def handle_msk_event(self, event) -> tuple[int, list[Any]]:  # pragma: unit
106        """
107        Override this method to handle Kafka events in your own custom way.
108
109        See the source for how the event gets decoded by default.
110
111        :param event: Kafka event
112        :return: A tuple of (status_code, all_event_results)
113        """
114        return self._handle_msk_event_with_context(event)

Override this method to handle Kafka events in your own custom way.

See the source for how the event gets decoded by default.

Parameters
  • event: Kafka event
Returns

A tuple of (status_code, all_event_results)

def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, typing.Any]:
144    def msk_create(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
145        """
146        Override this method to handle Kafka creation events in your own custom way.
147        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `CREATED`.
148
149        :param message_body:
150        :param model_cls:
151        :return: A tuple of (status_code, result)
152        """
153
154        model = model_cls.from_kafka_event(message_body)
155
156        # in case creates are being sent for entities we already have
157        existing_model = model_cls.get(model.uuid)
158        if existing_model is not None:
159            log.debug("Entity already exists, updating instead of creating.")
160            return self.msk_update(message_body, model_cls)
161
162        return self.post_global_generic({"body": model.as_json()}, model_cls)

Override this method to handle Kafka creation events in your own custom way. By default, this gets called by handle_msk_event when the event has a changeType of CREATED.

Parameters
  • message_body:
  • model_cls:
Returns

A tuple of (status_code, result)

def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, typing.Any]:
164    def msk_update(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
165        """
166        Override this method to handle Kafka update events in your own custom way.
167        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `UPDATED` or no `changeType` at all.
168
169        :param message_body:
170        :param model_cls:
171        :return: A tuple of (status_code, result)
172        """
173        model = model_cls.from_kafka_event(message_body)
174
175        # just in case updates are being sent for entities we don't have
176        existing_model = model_cls.get(model.uuid)
177        if existing_model is None:
178            log.debug("Entity does not exist, creating instead of updating.")
179            return self.msk_create(message_body, model_cls)
180
181        return self.put_single_generic({"body": model.as_json()}, model_cls)

Override this method to handle Kafka update events in your own custom way. By default, this gets called by handle_msk_event when the event has a changeType of UPDATED or no changeType at all.

Parameters
  • message_body:
  • model_cls:
Returns

A tuple of (status_code, result)

def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, typing.Any]:
183    def msk_delete(self, message_body: dict, model_cls: type) -> tuple[int, Any]:  # pragma: no cover
184        """
185        Override this method to handle Kafka deletion events in your own custom way.
186        By default, this gets called by `handle_msk_event` when the event has a `changeType` of `DELETED`.
187
188        :param message_body:
189        :param model_cls:
190        :return: A tuple of (status_code, result)
191        """
192        model = model_cls.from_kafka_event(message_body)
193        return self.delete_single_generic({"body": model.as_json()}, model_cls, model.uuid)

Override this method to handle Kafka deletion events in your own custom way. By default, this gets called by handle_msk_event when the event has a changeType of DELETED.

Parameters
  • message_body:
  • model_cls:
Returns

A tuple of (status_code, result)