bedrock.db.connection

This module provides some helper methods as well as a global initialised SESSION object (created via create_session()).

  1"""
  2This module provides some helper methods as well as a global initialised `SESSION` object (created via
  3`create_session()`).
  4"""
  5
  6import sqlalchemy
  7from psycopg2 import OperationalError
  8from sqlalchemy import text
  9from sqlalchemy.orm import sessionmaker
 10
 11from bedrock.external.secrets_handler import find_secret
 12from bedrock.helpers.dictionary import get_dict_key_with_fallback
 13from bedrock.config import get_config_params
 14from bedrock.log import log_config
 15
 16log = log_config("connection")
 17
 18
 19def create_session(timeout=None):  # pragma: integration
 20    """
 21    Creates a session to the default database (with the given timeout) by looking up the database secret and/or config
 22    details.
 23    Makes use of `create_session_from_secret_name`
 24    :param timeout: Timeout in seconds. If not provided, the config timeout will be used.
 25    :return: A session to the database
 26    """
 27    return create_session_from_secret_name("database", timeout)
 28
 29
 30def create_session_from_secret_name(secret_name: str, timeout=None):  # pragma: integration
 31    """
 32    Creates a session to a database (with the given timeout) by looking up the provided secret.
 33    Makes use of `_create_session`
 34    :param timeout: Timeout in seconds. If not provided, the config timeout will be used.
 35    :return: A session to the database
 36    """
 37    secret = find_secret(secret_name)
 38    url = create_url(*get_connection_details(secret))
 39    return _create_session(url, timeout)
 40
 41
 42def translate_engine(engine: str) -> str:  # pragma: unit
 43    """
 44    Translates the engine name to the name used in a connection URL.
 45
 46    * `postgres` or `postgresql` -> `postgresql`
 47    * `mysql`, `mariadb` or `maria` -> `mysql`
 48
 49    :param engine:
 50    :return: Translated engine name
 51    """
 52    if engine.lower() in ["postgres", "postgresql"]:
 53        return "postgresql"
 54    elif engine.lower() in ["mysql", "mariadb", "maria"]:
 55        return "mysql"
 56    return engine.lower()
 57
 58
 59def create_url(engine: str, username: str, password: str, host: str, database: str, port: str or int = None,
 60               schema: str = None, options: str = None) -> str:  # pragma: unit
 61    """
 62    Creates a connection URL for the given database engine.
 63
 64    Makes use of `create_generic_url`. Depending on the engine, some additional logic can be applied in the future.
 65    """
 66    # any engines that work differently, add some ifs here
 67    return create_generic_url(translate_engine(engine), username, password, host, database, port, schema, options)
 68
 69
 70def create_generic_url(engine, username, password, host, database, port: str or int = None, schema: str = None,
 71                       options: str = None):  # pragma: unit
 72    """
 73    Creates a connection URL for the given database engine, essentially just formats the string into:
 74    ```python
 75    f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"
 76    ```
 77    Where `_port`, `_schema` and `_options` are optional and thus have logic to only add them if they are provided.
 78
 79    :param engine:
 80    :param username:
 81    :param password:
 82    :param host:
 83    :param database:
 84    :param port: Optional
 85    :param schema: Optional
 86    :param options: Optional. A query param style string of options to append to the URL (e.g. `opt1=val1&opt2=val2`)
 87    :return:
 88    """
 89    _options = f"?{options}" if options else ""
 90    _schema = f"/{schema}" if schema else ""
 91    _port = f":{port}" if port else ""
 92
 93    return f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"
 94
 95
 96def get_connection_details(secret: dict, db_config_suffix: str = None) -> tuple:  # pragma: unit
 97    """
 98    Gets the connection details from the secret and/or config.
 99    :param secret: The secret to get the details from.
100    :param db_config_suffix: Optional. If provided, looks for config in the `database_<suffix>` section.
101    :return: A tuple of the connection details. This must be unpackable into `create_url` (see order of args).
102    """
103    config = get_config_params()
104    config_suffix = f"_{db_config_suffix}" if db_config_suffix else ""
105    return get_dict_key_with_fallback(secret, "engine", config[f"database{config_suffix}"]["engine"]), \
106        get_dict_key_with_fallback(secret, "username", config[f"database{config_suffix}"]["username"]), \
107        get_dict_key_with_fallback(secret, "password", config[f"database{config_suffix}"]["password"]), \
108        get_dict_key_with_fallback(secret, "host", config[f"database{config_suffix}"]["host"]), \
109        get_dict_key_with_fallback(secret, "dbname", config[f"database{config_suffix}"]["database_name"]), \
110        get_dict_key_with_fallback(secret, "port", config[f"database{config_suffix}"]["port"]), \
111        get_dict_key_with_fallback(secret, "schema", config[f"database{config_suffix}"]["schema"]), \
112        get_dict_key_with_fallback(secret, "options", config[f"database{config_suffix}"]["options"])
113
114
115def _create_session(url, timeout=None):  # pragma: integration
116    """
117    Creates a session to the database. If `timeout` isn't provided, it will use the one in the config.
118    :param url: Connection string to the DB.
119    :param timeout: Optional timeout.
120    :return: A connection to the DB.
121    """
122    config = get_config_params()
123    connection_timeout = timeout if timeout else config["database"]["timeout"]
124    obfuscated_url = ''.join(url.split(":")[:2]) + ':***@' + ''.join(url.split('@')[1])
125    log.debug(f"Connecting to database with: {obfuscated_url} (timeout: {connection_timeout})")
126    engine = sqlalchemy.create_engine(url, connect_args={"connect_timeout": connection_timeout})
127
128    log.debug(f"Acquiring session")
129    session = sessionmaker(bind=engine, expire_on_commit=False)()
130    session.expire_on_commit = False
131    log.debug(f"Got session")
132
133    return session
134
135
136def refresh_session():  # pragma: integration
137    """
138    Refreshes the database session and also returns it.
139    :return: A session object
140    """
141    global _SESSION
142    try:
143        _SESSION.close()
144    except Exception:
145        pass
146    _SESSION = create_session()
147    return _SESSION
148
149
150def connection_is_active() -> bool:  # pragma: integration
151    """
152    Checks if the database connection is active.
153    :return: Whether the connection is active.
154    """
155    try:
156        s = create_session(1)
157        s.execute(text("select 1"))
158        s.close()
159    except OperationalError as err:
160        if "timeout" not in str(err) and "Connection refused" not in str(err):
161            log.warning(f"Unexpected error: {err}")
162        return False
163    except Exception as e:
164        log.warning(f"Unexpected error: {e}")
165        return False
166    return True
167
168
169def has_table_access(table_name: str) -> bool:  # pragma: integration
170    """
171    Checks if the application has access to the given table.
172    :param table_name: Table to check.
173    :return: Whether the table is accessible.
174    """
175    try:
176        refresh_session()
177        _SESSION.execute(text(f"select * from {table_name} limit 1"))
178    except Exception as e:
179        log.warning(f"Unexpected error: {e}")
180        return False
181    return True
182
183
184_SESSION = create_session()
185
186
187def get_session():  # pragma: integration
188    """
189    Gets the global session object.
190    """
191    global _SESSION
192    return _SESSION
log = <MyLogger BEDROCK-connection (INFO)>
def create_session(timeout=None):
20def create_session(timeout=None):  # pragma: integration
21    """
22    Creates a session to the default database (with the given timeout) by looking up the database secret and/or config
23    details.
24    Makes use of `create_session_from_secret_name`
25    :param timeout: Timeout in seconds. If not provided, the config timeout will be used.
26    :return: A session to the database
27    """
28    return create_session_from_secret_name("database", timeout)

Creates a session to the default database (with the given timeout) by looking up the database secret and/or config details. Makes use of create_session_from_secret_name

Parameters
  • timeout: Timeout in seconds. If not provided, the config timeout will be used.
Returns

A session to the database

def create_session_from_secret_name(secret_name: str, timeout=None):
31def create_session_from_secret_name(secret_name: str, timeout=None):  # pragma: integration
32    """
33    Creates a session to a database (with the given timeout) by looking up the provided secret.
34    Makes use of `_create_session`
35    :param timeout: Timeout in seconds. If not provided, the config timeout will be used.
36    :return: A session to the database
37    """
38    secret = find_secret(secret_name)
39    url = create_url(*get_connection_details(secret))
40    return _create_session(url, timeout)

Creates a session to a database (with the given timeout) by looking up the provided secret. Makes use of _create_session

Parameters
  • timeout: Timeout in seconds. If not provided, the config timeout will be used.
Returns

A session to the database

def translate_engine(engine: str) -> str:
43def translate_engine(engine: str) -> str:  # pragma: unit
44    """
45    Translates the engine name to the name used in a connection URL.
46
47    * `postgres` or `postgresql` -> `postgresql`
48    * `mysql`, `mariadb` or `maria` -> `mysql`
49
50    :param engine:
51    :return: Translated engine name
52    """
53    if engine.lower() in ["postgres", "postgresql"]:
54        return "postgresql"
55    elif engine.lower() in ["mysql", "mariadb", "maria"]:
56        return "mysql"
57    return engine.lower()

Translates the engine name to the name used in a connection URL.

  • postgres or postgresql -> postgresql
  • mysql, mariadb or maria -> mysql
Parameters
  • engine:
Returns

Translated engine name

def create_url( engine: str, username: str, password: str, host: str, database: str, port: str = None, schema: str = None, options: str = None) -> str:
60def create_url(engine: str, username: str, password: str, host: str, database: str, port: str or int = None,
61               schema: str = None, options: str = None) -> str:  # pragma: unit
62    """
63    Creates a connection URL for the given database engine.
64
65    Makes use of `create_generic_url`. Depending on the engine, some additional logic can be applied in the future.
66    """
67    # any engines that work differently, add some ifs here
68    return create_generic_url(translate_engine(engine), username, password, host, database, port, schema, options)

Creates a connection URL for the given database engine.

Makes use of create_generic_url. Depending on the engine, some additional logic can be applied in the future.

def create_generic_url( engine, username, password, host, database, port: str = None, schema: str = None, options: str = None):
71def create_generic_url(engine, username, password, host, database, port: str or int = None, schema: str = None,
72                       options: str = None):  # pragma: unit
73    """
74    Creates a connection URL for the given database engine, essentially just formats the string into:
75    ```python
76    f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"
77    ```
78    Where `_port`, `_schema` and `_options` are optional and thus have logic to only add them if they are provided.
79
80    :param engine:
81    :param username:
82    :param password:
83    :param host:
84    :param database:
85    :param port: Optional
86    :param schema: Optional
87    :param options: Optional. A query param style string of options to append to the URL (e.g. `opt1=val1&opt2=val2`)
88    :return:
89    """
90    _options = f"?{options}" if options else ""
91    _schema = f"/{schema}" if schema else ""
92    _port = f":{port}" if port else ""
93
94    return f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"

Creates a connection URL for the given database engine, essentially just formats the string into:

f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"

Where _port, _schema and _options are optional and thus have logic to only add them if they are provided.

Parameters
  • engine:
  • username:
  • password:
  • host:
  • database:
  • port: Optional
  • schema: Optional
  • options: Optional. A query param style string of options to append to the URL (e.g. opt1=val1&opt2=val2)
Returns
def get_connection_details(secret: dict, db_config_suffix: str = None) -> tuple:
 97def get_connection_details(secret: dict, db_config_suffix: str = None) -> tuple:  # pragma: unit
 98    """
 99    Gets the connection details from the secret and/or config.
100    :param secret: The secret to get the details from.
101    :param db_config_suffix: Optional. If provided, looks for config in the `database_<suffix>` section.
102    :return: A tuple of the connection details. This must be unpackable into `create_url` (see order of args).
103    """
104    config = get_config_params()
105    config_suffix = f"_{db_config_suffix}" if db_config_suffix else ""
106    return get_dict_key_with_fallback(secret, "engine", config[f"database{config_suffix}"]["engine"]), \
107        get_dict_key_with_fallback(secret, "username", config[f"database{config_suffix}"]["username"]), \
108        get_dict_key_with_fallback(secret, "password", config[f"database{config_suffix}"]["password"]), \
109        get_dict_key_with_fallback(secret, "host", config[f"database{config_suffix}"]["host"]), \
110        get_dict_key_with_fallback(secret, "dbname", config[f"database{config_suffix}"]["database_name"]), \
111        get_dict_key_with_fallback(secret, "port", config[f"database{config_suffix}"]["port"]), \
112        get_dict_key_with_fallback(secret, "schema", config[f"database{config_suffix}"]["schema"]), \
113        get_dict_key_with_fallback(secret, "options", config[f"database{config_suffix}"]["options"])

Gets the connection details from the secret and/or config.

Parameters
  • secret: The secret to get the details from.
  • db_config_suffix: Optional. If provided, looks for config in the database_<suffix> section.
Returns

A tuple of the connection details. This must be unpackable into create_url (see order of args).

def refresh_session():
137def refresh_session():  # pragma: integration
138    """
139    Refreshes the database session and also returns it.
140    :return: A session object
141    """
142    global _SESSION
143    try:
144        _SESSION.close()
145    except Exception:
146        pass
147    _SESSION = create_session()
148    return _SESSION

Refreshes the database session and also returns it.

Returns

A session object

def connection_is_active() -> bool:
151def connection_is_active() -> bool:  # pragma: integration
152    """
153    Checks if the database connection is active.
154    :return: Whether the connection is active.
155    """
156    try:
157        s = create_session(1)
158        s.execute(text("select 1"))
159        s.close()
160    except OperationalError as err:
161        if "timeout" not in str(err) and "Connection refused" not in str(err):
162            log.warning(f"Unexpected error: {err}")
163        return False
164    except Exception as e:
165        log.warning(f"Unexpected error: {e}")
166        return False
167    return True

Checks if the database connection is active.

Returns

Whether the connection is active.

def has_table_access(table_name: str) -> bool:
170def has_table_access(table_name: str) -> bool:  # pragma: integration
171    """
172    Checks if the application has access to the given table.
173    :param table_name: Table to check.
174    :return: Whether the table is accessible.
175    """
176    try:
177        refresh_session()
178        _SESSION.execute(text(f"select * from {table_name} limit 1"))
179    except Exception as e:
180        log.warning(f"Unexpected error: {e}")
181        return False
182    return True

Checks if the application has access to the given table.

Parameters
  • table_name: Table to check.
Returns

Whether the table is accessible.

def get_session():
188def get_session():  # pragma: integration
189    """
190    Gets the global session object.
191    """
192    global _SESSION
193    return _SESSION

Gets the global session object.