bedrock.external.decorators.kafka

  1import functools
  2from enum import Enum
  3
  4from bedrock.external.kafka import send
  5from bedrock.exceptions import BedrockException
  6from bedrock.endpoints.dto.bedrock_response import BedrockResponse
  7
  8
  9class ChangeTypes(Enum):
 10    """
 11    Supported change types for Kafka broadcasts.
 12    """
 13    CREATED = "CREATED"
 14    UPDATED = "UPDATED"
 15    DELETED = "DELETED"
 16
 17
 18def broadcast(topics: str or list,
 19              change_type: ChangeTypes = None,
 20              send_only_on_success: bool = True,
 21              extra_payload: dict = {}):  # pragma: integration
 22    """
 23    Endpoint methods annotated with this decorator will have their response broadcast to the specified Kafka topic(s)
 24    before returning a response to the user.
 25    `delete_global` and `delete_single` assume the payload has the entity details in `response["originalData"]`.
 26
 27    Remember to update the kafka topics in our
 28    [documentation page](https://docs.keyholding.com/doc/event-topics-WLlQBvFDxf) when you've added a new topic.
 29
 30    The topic naming convention is `<service>-<entity>-changed` or, if it's not related to an entity or a more complex/specific
 31    topic is required, `<service>-<entity>-events`. If anything falls out of these definitions, bring it up for discussion.
 32
 33    :param topics: A string or list of strings representing the Kafka topic(s) to broadcast to.
 34    :param change_type: (optional) The type of change that has occurred. This is automatically inferred for POST, PUT and DELETE endpoints.
 35    :param send_only_on_success: (optional) Broadcasts the message only if the endpoint returns a successful 2xx code.
 36    :param extra_payload: (optional) A dictionary of extra data to include in the broadcast message.
 37
 38    Example usage: for a `POST` to `/countries/`:
 39    ```python
 40    from bedrock.endpoints.endpoint import Endpoint
 41    from bedrock.external.decorators import kafka
 42
 43    class Countries(Endpoint):
 44        # ...
 45
 46        @kafka.broadcast("countries-changed")
 47        def post_global(self, event):
 48            new_country = Country(**json.loads(event["body"]))
 49            new_country.save()
 50            return new_country
 51    ```
 52
 53    Assuming that the Country model has `name` and `code` fields, when you create a new Country via POSTing to
 54    `/countries/`, the following message will be broadcast to the `countries-changed` topic:
 55    ```json
 56    {
 57      "changeType": "CREATED",
 58      "name": "United Kingdom",
 59      "code": "UK"
 60    }
 61    ```
 62    """
 63    topic_list = [] if not topics else [topics] if isinstance(topics, str) else topics
 64
 65    def decorator(func):
 66        @functools.wraps(func)
 67        def wrapper(*args, **kwargs):
 68            status_code, content = func(*args, **{**kwargs})
 69            kafka_content = content
 70            if isinstance(kafka_content, BedrockResponse):
 71                kafka_content = content.as_unenveloped_json()
 72            _do_kafka_broadcast(topic_list,
 73                                status_code,
 74                                kafka_content,
 75                                extra_payload,
 76                                send_only_on_success,
 77                                change_type,
 78                                func.__name__)
 79            return status_code, content
 80
 81        return wrapper
 82
 83    return decorator
 84
 85
 86def _do_kafka_broadcast(topics, status_code, content, extra_payload, send_only_on_success, user_change_type,
 87                        function_name):  # pragma: integration
 88    change_type = user_change_type if user_change_type else _find_change_type(function_name)
 89    message = make_broadcast_payload(content, change_type, extra_payload)
 90    if (send_only_on_success and 200 <= status_code <= 299) or (send_only_on_success is False):
 91        for topic in topics:
 92            send(message, topic)
 93
 94
 95def _find_change_type(function_name):  # pragma: unit
 96    if function_name.startswith("get_"):
 97        raise BedrockException("Change type must be specified for GET endpoints")
 98    elif function_name.startswith("post_"):
 99        return ChangeTypes.CREATED
100    elif function_name.startswith("put_"):
101        return ChangeTypes.UPDATED
102    elif function_name.startswith("delete_"):
103        return ChangeTypes.DELETED
104    else:
105        raise BedrockException("Change type must be specified for non-CRUD endpoints")
106
107
108def make_broadcast_payload(content: dict, change_type: ChangeTypes,
109                           extra_payload: dict):  # pragma: unit
110    return {
111        "changeType": change_type.value,
112        **content,
113        **extra_payload
114    }
class ChangeTypes(enum.Enum):
10class ChangeTypes(Enum):
11    """
12    Supported change types for Kafka broadcasts.
13    """
14    CREATED = "CREATED"
15    UPDATED = "UPDATED"
16    DELETED = "DELETED"

Supported change types for Kafka broadcasts.

CREATED = <ChangeTypes.CREATED: 'CREATED'>
UPDATED = <ChangeTypes.UPDATED: 'UPDATED'>
DELETED = <ChangeTypes.DELETED: 'DELETED'>
def broadcast( topics: str, change_type: ChangeTypes = None, send_only_on_success: bool = True, extra_payload: dict = {}):
19def broadcast(topics: str or list,
20              change_type: ChangeTypes = None,
21              send_only_on_success: bool = True,
22              extra_payload: dict = {}):  # pragma: integration
23    """
24    Endpoint methods annotated with this decorator will have their response broadcast to the specified Kafka topic(s)
25    before returning a response to the user.
26    `delete_global` and `delete_single` assume the payload has the entity details in `response["originalData"]`.
27
28    Remember to update the kafka topics in our
29    [documentation page](https://docs.keyholding.com/doc/event-topics-WLlQBvFDxf) when you've added a new topic.
30
31    The topic naming convention is `<service>-<entity>-changed` or, if it's not related to an entity or a more complex/specific
32    topic is required, `<service>-<entity>-events`. If anything falls out of these definitions, bring it up for discussion.
33
34    :param topics: A string or list of strings representing the Kafka topic(s) to broadcast to.
35    :param change_type: (optional) The type of change that has occurred. This is automatically inferred for POST, PUT and DELETE endpoints.
36    :param send_only_on_success: (optional) Broadcasts the message only if the endpoint returns a successful 2xx code.
37    :param extra_payload: (optional) A dictionary of extra data to include in the broadcast message.
38
39    Example usage: for a `POST` to `/countries/`:
40    ```python
41    from bedrock.endpoints.endpoint import Endpoint
42    from bedrock.external.decorators import kafka
43
44    class Countries(Endpoint):
45        # ...
46
47        @kafka.broadcast("countries-changed")
48        def post_global(self, event):
49            new_country = Country(**json.loads(event["body"]))
50            new_country.save()
51            return new_country
52    ```
53
54    Assuming that the Country model has `name` and `code` fields, when you create a new Country via POSTing to
55    `/countries/`, the following message will be broadcast to the `countries-changed` topic:
56    ```json
57    {
58      "changeType": "CREATED",
59      "name": "United Kingdom",
60      "code": "UK"
61    }
62    ```
63    """
64    topic_list = [] if not topics else [topics] if isinstance(topics, str) else topics
65
66    def decorator(func):
67        @functools.wraps(func)
68        def wrapper(*args, **kwargs):
69            status_code, content = func(*args, **{**kwargs})
70            kafka_content = content
71            if isinstance(kafka_content, BedrockResponse):
72                kafka_content = content.as_unenveloped_json()
73            _do_kafka_broadcast(topic_list,
74                                status_code,
75                                kafka_content,
76                                extra_payload,
77                                send_only_on_success,
78                                change_type,
79                                func.__name__)
80            return status_code, content
81
82        return wrapper
83
84    return decorator

Endpoint methods annotated with this decorator will have their response broadcast to the specified Kafka topic(s) before returning a response to the user. delete_global and delete_single assume the payload has the entity details in response["originalData"].

Remember to update the kafka topics in our documentation page when you've added a new topic.

The topic naming convention is <service>-<entity>-changed or, if it's not related to an entity or a more complex/specific topic is required, <service>-<entity>-events. If anything falls out of these definitions, bring it up for discussion.

Parameters
  • topics: A string or list of strings representing the Kafka topic(s) to broadcast to.
  • change_type: (optional) The type of change that has occurred. This is automatically inferred for POST, PUT and DELETE endpoints.
  • send_only_on_success: (optional) Broadcasts the message only if the endpoint returns a successful 2xx code.
  • extra_payload: (optional) A dictionary of extra data to include in the broadcast message.

Example usage: for a POST to /countries/:

from bedrock.endpoints.endpoint import Endpoint
from bedrock.external.decorators import kafka

class Countries(Endpoint):
    # ...

    @kafka.broadcast("countries-changed")
    def post_global(self, event):
        new_country = Country(**json.loads(event["body"]))
        new_country.save()
        return new_country

Assuming that the Country model has name and code fields, when you create a new Country via POSTing to /countries/, the following message will be broadcast to the countries-changed topic:

{
  "changeType": "CREATED",
  "name": "United Kingdom",
  "code": "UK"
}
def make_broadcast_payload( content: dict, change_type: ChangeTypes, extra_payload: dict):
109def make_broadcast_payload(content: dict, change_type: ChangeTypes,
110                           extra_payload: dict):  # pragma: unit
111    return {
112        "changeType": change_type.value,
113        **content,
114        **extra_payload
115    }