bedrock.external.decorators.kafka
1import functools 2from enum import Enum 3 4from bedrock.external.kafka import send 5from bedrock.exceptions import BedrockException 6from bedrock.endpoints.dto.bedrock_response import BedrockResponse 7 8 9class ChangeTypes(Enum): 10 """ 11 Supported change types for Kafka broadcasts. 12 """ 13 CREATED = "CREATED" 14 UPDATED = "UPDATED" 15 DELETED = "DELETED" 16 17 18def broadcast(topics: str or list, 19 change_type: ChangeTypes = None, 20 send_only_on_success: bool = True, 21 extra_payload: dict = {}): # pragma: integration 22 """ 23 Endpoint methods annotated with this decorator will have their response broadcast to the specified Kafka topic(s) 24 before returning a response to the user. 25 `delete_global` and `delete_single` assume the payload has the entity details in `response["originalData"]`. 26 27 Remember to update the kafka topics in our 28 [documentation page](https://docs.keyholding.com/doc/event-topics-WLlQBvFDxf) when you've added a new topic. 29 30 The topic naming convention is `<service>-<entity>-changed` or, if it's not related to an entity or a more complex/specific 31 topic is required, `<service>-<entity>-events`. If anything falls out of these definitions, bring it up for discussion. 32 33 :param topics: A string or list of strings representing the Kafka topic(s) to broadcast to. 34 :param change_type: (optional) The type of change that has occurred. This is automatically inferred for POST, PUT and DELETE endpoints. 35 :param send_only_on_success: (optional) Broadcasts the message only if the endpoint returns a successful 2xx code. 36 :param extra_payload: (optional) A dictionary of extra data to include in the broadcast message. 37 38 Example usage: for a `POST` to `/countries/`: 39 ```python 40 from bedrock.endpoints.endpoint import Endpoint 41 from bedrock.external.decorators import kafka 42 43 class Countries(Endpoint): 44 # ... 45 46 @kafka.broadcast("countries-changed") 47 def post_global(self, event): 48 new_country = Country(**json.loads(event["body"])) 49 new_country.save() 50 return new_country 51 ``` 52 53 Assuming that the Country model has `name` and `code` fields, when you create a new Country via POSTing to 54 `/countries/`, the following message will be broadcast to the `countries-changed` topic: 55 ```json 56 { 57 "changeType": "CREATED", 58 "name": "United Kingdom", 59 "code": "UK" 60 } 61 ``` 62 """ 63 topic_list = [] if not topics else [topics] if isinstance(topics, str) else topics 64 65 def decorator(func): 66 @functools.wraps(func) 67 def wrapper(*args, **kwargs): 68 status_code, content = func(*args, **{**kwargs}) 69 kafka_content = content 70 if isinstance(kafka_content, BedrockResponse): 71 kafka_content = content.as_unenveloped_json() 72 _do_kafka_broadcast(topic_list, 73 status_code, 74 kafka_content, 75 extra_payload, 76 send_only_on_success, 77 change_type, 78 func.__name__) 79 return status_code, content 80 81 return wrapper 82 83 return decorator 84 85 86def _do_kafka_broadcast(topics, status_code, content, extra_payload, send_only_on_success, user_change_type, 87 function_name): # pragma: integration 88 change_type = user_change_type if user_change_type else _find_change_type(function_name) 89 message = make_broadcast_payload(content, change_type, extra_payload) 90 if (send_only_on_success and 200 <= status_code <= 299) or (send_only_on_success is False): 91 for topic in topics: 92 send(message, topic) 93 94 95def _find_change_type(function_name): # pragma: unit 96 if function_name.startswith("get_"): 97 raise BedrockException("Change type must be specified for GET endpoints") 98 elif function_name.startswith("post_"): 99 return ChangeTypes.CREATED 100 elif function_name.startswith("put_"): 101 return ChangeTypes.UPDATED 102 elif function_name.startswith("delete_"): 103 return ChangeTypes.DELETED 104 else: 105 raise BedrockException("Change type must be specified for non-CRUD endpoints") 106 107 108def make_broadcast_payload(content: dict, change_type: ChangeTypes, 109 extra_payload: dict): # pragma: unit 110 return { 111 "changeType": change_type.value, 112 **content, 113 **extra_payload 114 }
10class ChangeTypes(Enum): 11 """ 12 Supported change types for Kafka broadcasts. 13 """ 14 CREATED = "CREATED" 15 UPDATED = "UPDATED" 16 DELETED = "DELETED"
Supported change types for Kafka broadcasts.
19def broadcast(topics: str or list, 20 change_type: ChangeTypes = None, 21 send_only_on_success: bool = True, 22 extra_payload: dict = {}): # pragma: integration 23 """ 24 Endpoint methods annotated with this decorator will have their response broadcast to the specified Kafka topic(s) 25 before returning a response to the user. 26 `delete_global` and `delete_single` assume the payload has the entity details in `response["originalData"]`. 27 28 Remember to update the kafka topics in our 29 [documentation page](https://docs.keyholding.com/doc/event-topics-WLlQBvFDxf) when you've added a new topic. 30 31 The topic naming convention is `<service>-<entity>-changed` or, if it's not related to an entity or a more complex/specific 32 topic is required, `<service>-<entity>-events`. If anything falls out of these definitions, bring it up for discussion. 33 34 :param topics: A string or list of strings representing the Kafka topic(s) to broadcast to. 35 :param change_type: (optional) The type of change that has occurred. This is automatically inferred for POST, PUT and DELETE endpoints. 36 :param send_only_on_success: (optional) Broadcasts the message only if the endpoint returns a successful 2xx code. 37 :param extra_payload: (optional) A dictionary of extra data to include in the broadcast message. 38 39 Example usage: for a `POST` to `/countries/`: 40 ```python 41 from bedrock.endpoints.endpoint import Endpoint 42 from bedrock.external.decorators import kafka 43 44 class Countries(Endpoint): 45 # ... 46 47 @kafka.broadcast("countries-changed") 48 def post_global(self, event): 49 new_country = Country(**json.loads(event["body"])) 50 new_country.save() 51 return new_country 52 ``` 53 54 Assuming that the Country model has `name` and `code` fields, when you create a new Country via POSTing to 55 `/countries/`, the following message will be broadcast to the `countries-changed` topic: 56 ```json 57 { 58 "changeType": "CREATED", 59 "name": "United Kingdom", 60 "code": "UK" 61 } 62 ``` 63 """ 64 topic_list = [] if not topics else [topics] if isinstance(topics, str) else topics 65 66 def decorator(func): 67 @functools.wraps(func) 68 def wrapper(*args, **kwargs): 69 status_code, content = func(*args, **{**kwargs}) 70 kafka_content = content 71 if isinstance(kafka_content, BedrockResponse): 72 kafka_content = content.as_unenveloped_json() 73 _do_kafka_broadcast(topic_list, 74 status_code, 75 kafka_content, 76 extra_payload, 77 send_only_on_success, 78 change_type, 79 func.__name__) 80 return status_code, content 81 82 return wrapper 83 84 return decorator
Endpoint methods annotated with this decorator will have their response broadcast to the specified Kafka topic(s)
before returning a response to the user.
delete_global and delete_single assume the payload has the entity details in response["originalData"].
Remember to update the kafka topics in our documentation page when you've added a new topic.
The topic naming convention is <service>-<entity>-changed or, if it's not related to an entity or a more complex/specific
topic is required, <service>-<entity>-events. If anything falls out of these definitions, bring it up for discussion.
Parameters
- topics: A string or list of strings representing the Kafka topic(s) to broadcast to.
- change_type: (optional) The type of change that has occurred. This is automatically inferred for POST, PUT and DELETE endpoints.
- send_only_on_success: (optional) Broadcasts the message only if the endpoint returns a successful 2xx code.
- extra_payload: (optional) A dictionary of extra data to include in the broadcast message.
Example usage: for a POST to /countries/:
from bedrock.endpoints.endpoint import Endpoint
from bedrock.external.decorators import kafka
class Countries(Endpoint):
# ...
@kafka.broadcast("countries-changed")
def post_global(self, event):
new_country = Country(**json.loads(event["body"]))
new_country.save()
return new_country
Assuming that the Country model has name and code fields, when you create a new Country via POSTing to
/countries/, the following message will be broadcast to the countries-changed topic:
{
"changeType": "CREATED",
"name": "United Kingdom",
"code": "UK"
}