bedrock.cache

 1from valkey import Valkey
 2
 3from bedrock.config import get_config_params
 4from bedrock.config._common import validate_config
 5from bedrock.exceptions import BedrockException
 6from bedrock.log import log_config
 7
 8CACHE: Valkey = None
 9
10logger = log_config("cache")
11
12
13def get_cache() -> Valkey:  # pragma: integration
14    """
15    Returns a cache instance.
16    Initialises and stores a global instance if not initialised.
17    """
18    global CACHE
19    if CACHE is None:
20        CACHE = init_cache()
21    return CACHE
22
23
24def init_cache() -> Valkey:  # pragma: integration
25    """
26    Initializes the global cache instance.
27    """
28    config = get_config_params()
29    _validate_cache_config(config["cache"])
30    logger.debug('Creating new Valkey connection')
31    try:
32        return Valkey(
33            host=config["cache"]["host"],
34            port=config["cache"]["port"],
35            decode_responses=True,
36            ssl=(config["cache"]["host"] not in ["localhost", "cache"]))
37    except Exception as e:  # pragma: no cover
38        logger.exception("Failed to create Valkey client")
39        raise BedrockException("Failed to create Valkey client", e)
40
41
42def get_cache_topics():  # pragma: no cover
43    config = get_config_params()
44    return _get_cache_topics(config["cache"]["topic_prefix"])
45
46
47def _get_cache_topics(topic_prefix: str):  # pragma: unit
48    """
49    Returns a list of all topics in the cache.
50    """
51    cache = get_cache()
52    topics = []
53    cursor = '0'
54    while cursor != 0:
55        cursor, new_topics = cache.scan(cursor, match=f"{topic_prefix}-*")
56        topics.extend(new_topics)
57    return topics
58
59
60def _validate_cache_config(config: dict):  # pragma: integration
61    validate_config(config, ["host", "port", "topic_prefix"], "cache.")
CACHE: valkey.client.Valkey = None
logger = <MyLogger BEDROCK-cache (INFO)>
def get_cache() -> valkey.client.Valkey:
14def get_cache() -> Valkey:  # pragma: integration
15    """
16    Returns a cache instance.
17    Initialises and stores a global instance if not initialised.
18    """
19    global CACHE
20    if CACHE is None:
21        CACHE = init_cache()
22    return CACHE

Returns a cache instance. Initialises and stores a global instance if not initialised.

def init_cache() -> valkey.client.Valkey:
25def init_cache() -> Valkey:  # pragma: integration
26    """
27    Initializes the global cache instance.
28    """
29    config = get_config_params()
30    _validate_cache_config(config["cache"])
31    logger.debug('Creating new Valkey connection')
32    try:
33        return Valkey(
34            host=config["cache"]["host"],
35            port=config["cache"]["port"],
36            decode_responses=True,
37            ssl=(config["cache"]["host"] not in ["localhost", "cache"]))
38    except Exception as e:  # pragma: no cover
39        logger.exception("Failed to create Valkey client")
40        raise BedrockException("Failed to create Valkey client", e)

Initializes the global cache instance.

def get_cache_topics():
43def get_cache_topics():  # pragma: no cover
44    config = get_config_params()
45    return _get_cache_topics(config["cache"]["topic_prefix"])