bedrock.external.kafka
1import json 2from datetime import datetime 3from typing import Callable 4 5from confluent_kafka import Producer, Consumer 6from confluent_kafka.cimpl import KafkaException 7 8from bedrock.config import get_config_params 9from bedrock.exceptions import InvalidConfigurationException 10from bedrock.log import log_config 11 12log = log_config("Kafka") 13 14 15def delivery_callback(err, msg): # pragma: no cover 16 """ 17 Default callback for kafka message delivery. 18 :param err: 19 :param msg: 20 :return: 21 """ 22 if err: 23 log.error(f"Failed to deliver kafka message: {msg}: {err}") 24 else: 25 log.debug(f"Kafka message sent: {msg}") 26 27 28def send(payload: dict, topic: str, callback: Callable = delivery_callback): # pragma: no cover 29 """ 30 Send a message to a kafka topic. 31 :param payload: A dictionary with the payload to be sent to the topic. 32 :param topic: The topic to send the message to. 33 :param callback: The callback function to be called when the message is delivered, defaults to `delivery_callback`. 34 """ 35 config = get_config_params() 36 msk_conf = _make_msk_config(config) 37 _send(payload, topic, msk_conf, callback, config["environment"]) 38 39 40def is_kafka_reachable(msk_config, timeout=5): # pragma: no cover 41 """ 42 Check if the kafka broker is reachable. 43 It does this by attempting to list the topics as a consumer. 44 Typically, if it times out while listing the topics, it means the brokers aren't reachable. 45 :param msk_config: The kafka configuration. 46 :param timeout: Timeout in seconds. Defaults to 5. 47 :return: 48 """ 49 try: 50 consumer = Consumer({ 51 **msk_config, 52 "group.id": "topic-checker" 53 }) 54 _ = consumer.list_topics(timeout=timeout).topics 55 return True 56 except KafkaException: 57 return False 58 59 60def _send(payload: dict, topic: str, msk_config: dict, callback: Callable, environment: str): # pragma: integration 61 if environment == 'local' and not is_kafka_reachable(msk_config): 62 log.error(f"Kafka broker is not reachable") 63 return None 64 log.debug(f"{datetime.now()} Creating Kafka producer...") 65 producer = Producer(msk_config) 66 stringified_payload = json.dumps(payload) 67 log.debug(f"{datetime.now()} Sending kafka message: {stringified_payload}") 68 69 def retry_callback(err, msg): # pragma: no cover 70 if err: 71 producer.produce(topic, stringified_payload, callback=callback) 72 producer.poll(1) 73 else: 74 callback(err, msg) 75 76 producer.produce(topic, stringified_payload, callback=retry_callback) 77 producer.poll(1) 78 log.debug(f"{datetime.now()} Sent") 79 return producer # Useful for testing 80 81 82def _make_msk_config(config): # pragma: unit 83 if "msk" not in config: 84 raise InvalidConfigurationException(["msk"]) 85 86 required_keys = { 87 "bootstrap.servers": "bootstrap_urls", 88 "client.id": "client_id", 89 "security.protocol": "connection_type", 90 "request.timeout.ms": "connection_timeout", 91 "socket.connection.setup.timeout.ms": "connection_timeout", 92 "connections.max.idle.ms": "connection_timeout", 93 "message.timeout.ms": "connection_timeout" 94 } 95 missing_keys = [] 96 msk_config = {} 97 for key, config_variable in required_keys.items(): 98 try: 99 if config["msk"][config_variable] is None or config["msk"][config_variable] == "": 100 missing_keys.append(config_variable) 101 msk_config[key] = config["msk"][config_variable] 102 except KeyError as e: 103 missing_keys.append(config_variable) 104 105 if len(missing_keys) > 0: 106 raise InvalidConfigurationException([f"msk.{k}" for k in missing_keys]) 107 108 return msk_config
log =
<MyLogger BEDROCK-Kafka (INFO)>
def
delivery_callback(err, msg):
16def delivery_callback(err, msg): # pragma: no cover 17 """ 18 Default callback for kafka message delivery. 19 :param err: 20 :param msg: 21 :return: 22 """ 23 if err: 24 log.error(f"Failed to deliver kafka message: {msg}: {err}") 25 else: 26 log.debug(f"Kafka message sent: {msg}")
Default callback for kafka message delivery.
Parameters
- err:
- msg:
Returns
def
send( payload: dict, topic: str, callback: Callable = <function delivery_callback>):
29def send(payload: dict, topic: str, callback: Callable = delivery_callback): # pragma: no cover 30 """ 31 Send a message to a kafka topic. 32 :param payload: A dictionary with the payload to be sent to the topic. 33 :param topic: The topic to send the message to. 34 :param callback: The callback function to be called when the message is delivered, defaults to `delivery_callback`. 35 """ 36 config = get_config_params() 37 msk_conf = _make_msk_config(config) 38 _send(payload, topic, msk_conf, callback, config["environment"])
Send a message to a kafka topic.
Parameters
- payload: A dictionary with the payload to be sent to the topic.
- topic: The topic to send the message to.
- callback: The callback function to be called when the message is delivered, defaults to
delivery_callback.
def
is_kafka_reachable(msk_config, timeout=5):
41def is_kafka_reachable(msk_config, timeout=5): # pragma: no cover 42 """ 43 Check if the kafka broker is reachable. 44 It does this by attempting to list the topics as a consumer. 45 Typically, if it times out while listing the topics, it means the brokers aren't reachable. 46 :param msk_config: The kafka configuration. 47 :param timeout: Timeout in seconds. Defaults to 5. 48 :return: 49 """ 50 try: 51 consumer = Consumer({ 52 **msk_config, 53 "group.id": "topic-checker" 54 }) 55 _ = consumer.list_topics(timeout=timeout).topics 56 return True 57 except KafkaException: 58 return False
Check if the kafka broker is reachable. It does this by attempting to list the topics as a consumer. Typically, if it times out while listing the topics, it means the brokers aren't reachable.
Parameters
- msk_config: The kafka configuration.
- timeout: Timeout in seconds. Defaults to 5.