bedrock.external.kafka

  1import json
  2from datetime import datetime
  3from typing import Callable
  4
  5from confluent_kafka import Producer, Consumer
  6from confluent_kafka.cimpl import KafkaException
  7
  8from bedrock.config import get_config_params
  9from bedrock.exceptions import InvalidConfigurationException
 10from bedrock.log import log_config
 11
 12log = log_config("Kafka")
 13
 14
 15def delivery_callback(err, msg):  # pragma: no cover
 16    """
 17    Default callback for kafka message delivery.
 18    :param err:
 19    :param msg:
 20    :return:
 21    """
 22    if err:
 23        log.error(f"Failed to deliver kafka message: {msg}: {err}")
 24    else:
 25        log.debug(f"Kafka message sent: {msg}")
 26
 27
 28def send(payload: dict, topic: str, callback: Callable = delivery_callback):  # pragma: no cover
 29    """
 30    Send a message to a kafka topic.
 31    :param payload: A dictionary with the payload to be sent to the topic.
 32    :param topic: The topic to send the message to.
 33    :param callback: The callback function to be called when the message is delivered, defaults to `delivery_callback`.
 34    """
 35    config = get_config_params()
 36    msk_conf = _make_msk_config(config)
 37    _send(payload, topic, msk_conf, callback, config["environment"])
 38
 39
 40def is_kafka_reachable(msk_config, timeout=5):  # pragma: no cover
 41    """
 42    Check if the kafka broker is reachable.
 43    It does this by attempting to list the topics as a consumer.
 44    Typically, if it times out while listing the topics, it means the brokers aren't reachable.
 45    :param msk_config: The kafka configuration.
 46    :param timeout: Timeout in seconds. Defaults to 5.
 47    :return:
 48    """
 49    try:
 50        consumer = Consumer({
 51            **msk_config,
 52            "group.id": "topic-checker"
 53        })
 54        _ = consumer.list_topics(timeout=timeout).topics
 55        return True
 56    except KafkaException:
 57        return False
 58
 59
 60def _send(payload: dict, topic: str, msk_config: dict, callback: Callable, environment: str):  # pragma: integration
 61    if environment == 'local' and not is_kafka_reachable(msk_config):
 62        log.error(f"Kafka broker is not reachable")
 63        return None
 64    log.debug(f"{datetime.now()} Creating Kafka producer...")
 65    producer = Producer(msk_config)
 66    stringified_payload = json.dumps(payload)
 67    log.debug(f"{datetime.now()} Sending kafka message: {stringified_payload}")
 68
 69    def retry_callback(err, msg):  # pragma: no cover
 70        if err:
 71            producer.produce(topic, stringified_payload, callback=callback)
 72            producer.poll(1)
 73        else:
 74            callback(err, msg)
 75
 76    producer.produce(topic, stringified_payload, callback=retry_callback)
 77    producer.poll(1)
 78    log.debug(f"{datetime.now()} Sent")
 79    return producer  # Useful for testing
 80
 81
 82def _make_msk_config(config):  # pragma: unit
 83    if "msk" not in config:
 84        raise InvalidConfigurationException(["msk"])
 85
 86    required_keys = {
 87        "bootstrap.servers": "bootstrap_urls",
 88        "client.id": "client_id",
 89        "security.protocol": "connection_type",
 90        "request.timeout.ms": "connection_timeout",
 91        "socket.connection.setup.timeout.ms": "connection_timeout",
 92        "connections.max.idle.ms": "connection_timeout",
 93        "message.timeout.ms": "connection_timeout"
 94    }
 95    missing_keys = []
 96    msk_config = {}
 97    for key, config_variable in required_keys.items():
 98        try:
 99            if config["msk"][config_variable] is None or config["msk"][config_variable] == "":
100                missing_keys.append(config_variable)
101            msk_config[key] = config["msk"][config_variable]
102        except KeyError as e:
103            missing_keys.append(config_variable)
104
105    if len(missing_keys) > 0:
106        raise InvalidConfigurationException([f"msk.{k}" for k in missing_keys])
107
108    return msk_config
log = <MyLogger BEDROCK-Kafka (INFO)>
def delivery_callback(err, msg):
16def delivery_callback(err, msg):  # pragma: no cover
17    """
18    Default callback for kafka message delivery.
19    :param err:
20    :param msg:
21    :return:
22    """
23    if err:
24        log.error(f"Failed to deliver kafka message: {msg}: {err}")
25    else:
26        log.debug(f"Kafka message sent: {msg}")

Default callback for kafka message delivery.

Parameters
  • err:
  • msg:
Returns
def send( payload: dict, topic: str, callback: Callable = <function delivery_callback>):
29def send(payload: dict, topic: str, callback: Callable = delivery_callback):  # pragma: no cover
30    """
31    Send a message to a kafka topic.
32    :param payload: A dictionary with the payload to be sent to the topic.
33    :param topic: The topic to send the message to.
34    :param callback: The callback function to be called when the message is delivered, defaults to `delivery_callback`.
35    """
36    config = get_config_params()
37    msk_conf = _make_msk_config(config)
38    _send(payload, topic, msk_conf, callback, config["environment"])

Send a message to a kafka topic.

Parameters
  • payload: A dictionary with the payload to be sent to the topic.
  • topic: The topic to send the message to.
  • callback: The callback function to be called when the message is delivered, defaults to delivery_callback.
def is_kafka_reachable(msk_config, timeout=5):
41def is_kafka_reachable(msk_config, timeout=5):  # pragma: no cover
42    """
43    Check if the kafka broker is reachable.
44    It does this by attempting to list the topics as a consumer.
45    Typically, if it times out while listing the topics, it means the brokers aren't reachable.
46    :param msk_config: The kafka configuration.
47    :param timeout: Timeout in seconds. Defaults to 5.
48    :return:
49    """
50    try:
51        consumer = Consumer({
52            **msk_config,
53            "group.id": "topic-checker"
54        })
55        _ = consumer.list_topics(timeout=timeout).topics
56        return True
57    except KafkaException:
58        return False

Check if the kafka broker is reachable. It does this by attempting to list the topics as a consumer. Typically, if it times out while listing the topics, it means the brokers aren't reachable.

Parameters
  • msk_config: The kafka configuration.
  • timeout: Timeout in seconds. Defaults to 5.
Returns