bedrock.db.connection
This module provides some helper methods as well as a global initialised SESSION object (created via
create_session()).
1""" 2This module provides some helper methods as well as a global initialised `SESSION` object (created via 3`create_session()`). 4""" 5 6import sqlalchemy 7from psycopg2 import OperationalError 8from sqlalchemy import text 9from sqlalchemy.orm import sessionmaker 10 11from bedrock.external.secrets_handler import find_secret 12from bedrock.helpers.dictionary import get_dict_key_with_fallback 13from bedrock.config import get_config_params 14from bedrock.log import log_config 15 16log = log_config("connection") 17 18 19def create_session(timeout=None): # pragma: integration 20 """ 21 Creates a session to the default database (with the given timeout) by looking up the database secret and/or config 22 details. 23 Makes use of `create_session_from_secret_name` 24 :param timeout: Timeout in seconds. If not provided, the config timeout will be used. 25 :return: A session to the database 26 """ 27 return create_session_from_secret_name("database", timeout) 28 29 30def create_session_from_secret_name(secret_name: str, timeout=None): # pragma: integration 31 """ 32 Creates a session to a database (with the given timeout) by looking up the provided secret. 33 Makes use of `_create_session` 34 :param timeout: Timeout in seconds. If not provided, the config timeout will be used. 35 :return: A session to the database 36 """ 37 secret = find_secret(secret_name) 38 url = create_url(*get_connection_details(secret)) 39 return _create_session(url, timeout) 40 41 42def translate_engine(engine: str) -> str: # pragma: unit 43 """ 44 Translates the engine name to the name used in a connection URL. 45 46 * `postgres` or `postgresql` -> `postgresql` 47 * `mysql`, `mariadb` or `maria` -> `mysql` 48 49 :param engine: 50 :return: Translated engine name 51 """ 52 if engine.lower() in ["postgres", "postgresql"]: 53 return "postgresql" 54 elif engine.lower() in ["mysql", "mariadb", "maria"]: 55 return "mysql" 56 return engine.lower() 57 58 59def create_url(engine: str, username: str, password: str, host: str, database: str, port: str or int = None, 60 schema: str = None, options: str = None) -> str: # pragma: unit 61 """ 62 Creates a connection URL for the given database engine. 63 64 Makes use of `create_generic_url`. Depending on the engine, some additional logic can be applied in the future. 65 """ 66 # any engines that work differently, add some ifs here 67 return create_generic_url(translate_engine(engine), username, password, host, database, port, schema, options) 68 69 70def create_generic_url(engine, username, password, host, database, port: str or int = None, schema: str = None, 71 options: str = None): # pragma: unit 72 """ 73 Creates a connection URL for the given database engine, essentially just formats the string into: 74 ```python 75 f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}" 76 ``` 77 Where `_port`, `_schema` and `_options` are optional and thus have logic to only add them if they are provided. 78 79 :param engine: 80 :param username: 81 :param password: 82 :param host: 83 :param database: 84 :param port: Optional 85 :param schema: Optional 86 :param options: Optional. A query param style string of options to append to the URL (e.g. `opt1=val1&opt2=val2`) 87 :return: 88 """ 89 _options = f"?{options}" if options else "" 90 _schema = f"/{schema}" if schema else "" 91 _port = f":{port}" if port else "" 92 93 return f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}" 94 95 96def get_connection_details(secret: dict, db_config_suffix: str = None) -> tuple: # pragma: unit 97 """ 98 Gets the connection details from the secret and/or config. 99 :param secret: The secret to get the details from. 100 :param db_config_suffix: Optional. If provided, looks for config in the `database_<suffix>` section. 101 :return: A tuple of the connection details. This must be unpackable into `create_url` (see order of args). 102 """ 103 config = get_config_params() 104 config_suffix = f"_{db_config_suffix}" if db_config_suffix else "" 105 return get_dict_key_with_fallback(secret, "engine", config[f"database{config_suffix}"]["engine"]), \ 106 get_dict_key_with_fallback(secret, "username", config[f"database{config_suffix}"]["username"]), \ 107 get_dict_key_with_fallback(secret, "password", config[f"database{config_suffix}"]["password"]), \ 108 get_dict_key_with_fallback(secret, "host", config[f"database{config_suffix}"]["host"]), \ 109 get_dict_key_with_fallback(secret, "dbname", config[f"database{config_suffix}"]["database_name"]), \ 110 get_dict_key_with_fallback(secret, "port", config[f"database{config_suffix}"]["port"]), \ 111 get_dict_key_with_fallback(secret, "schema", config[f"database{config_suffix}"]["schema"]), \ 112 get_dict_key_with_fallback(secret, "options", config[f"database{config_suffix}"]["options"]) 113 114 115def _create_session(url, timeout=None): # pragma: integration 116 """ 117 Creates a session to the database. If `timeout` isn't provided, it will use the one in the config. 118 :param url: Connection string to the DB. 119 :param timeout: Optional timeout. 120 :return: A connection to the DB. 121 """ 122 config = get_config_params() 123 connection_timeout = timeout if timeout else config["database"]["timeout"] 124 obfuscated_url = ''.join(url.split(":")[:2]) + ':***@' + ''.join(url.split('@')[1]) 125 log.debug(f"Connecting to database with: {obfuscated_url} (timeout: {connection_timeout})") 126 engine = sqlalchemy.create_engine(url, connect_args={"connect_timeout": connection_timeout}) 127 128 log.debug(f"Acquiring session") 129 session = sessionmaker(bind=engine, expire_on_commit=False)() 130 session.expire_on_commit = False 131 log.debug(f"Got session") 132 133 return session 134 135 136def refresh_session(): # pragma: integration 137 """ 138 Refreshes the database session and also returns it. 139 :return: A session object 140 """ 141 global _SESSION 142 try: 143 _SESSION.close() 144 except Exception: 145 pass 146 _SESSION = create_session() 147 return _SESSION 148 149 150def connection_is_active() -> bool: # pragma: integration 151 """ 152 Checks if the database connection is active. 153 :return: Whether the connection is active. 154 """ 155 try: 156 s = create_session(1) 157 s.execute(text("select 1")) 158 s.close() 159 except OperationalError as err: 160 if "timeout" not in str(err) and "Connection refused" not in str(err): 161 log.warning(f"Unexpected error: {err}") 162 return False 163 except Exception as e: 164 log.warning(f"Unexpected error: {e}") 165 return False 166 return True 167 168 169def has_table_access(table_name: str) -> bool: # pragma: integration 170 """ 171 Checks if the application has access to the given table. 172 :param table_name: Table to check. 173 :return: Whether the table is accessible. 174 """ 175 try: 176 refresh_session() 177 _SESSION.execute(text(f"select * from {table_name} limit 1")) 178 except Exception as e: 179 log.warning(f"Unexpected error: {e}") 180 return False 181 return True 182 183 184_SESSION = create_session() 185 186 187def get_session(): # pragma: integration 188 """ 189 Gets the global session object. 190 """ 191 global _SESSION 192 return _SESSION
20def create_session(timeout=None): # pragma: integration 21 """ 22 Creates a session to the default database (with the given timeout) by looking up the database secret and/or config 23 details. 24 Makes use of `create_session_from_secret_name` 25 :param timeout: Timeout in seconds. If not provided, the config timeout will be used. 26 :return: A session to the database 27 """ 28 return create_session_from_secret_name("database", timeout)
Creates a session to the default database (with the given timeout) by looking up the database secret and/or config
details.
Makes use of create_session_from_secret_name
Parameters
- timeout: Timeout in seconds. If not provided, the config timeout will be used.
Returns
A session to the database
31def create_session_from_secret_name(secret_name: str, timeout=None): # pragma: integration 32 """ 33 Creates a session to a database (with the given timeout) by looking up the provided secret. 34 Makes use of `_create_session` 35 :param timeout: Timeout in seconds. If not provided, the config timeout will be used. 36 :return: A session to the database 37 """ 38 secret = find_secret(secret_name) 39 url = create_url(*get_connection_details(secret)) 40 return _create_session(url, timeout)
Creates a session to a database (with the given timeout) by looking up the provided secret.
Makes use of _create_session
Parameters
- timeout: Timeout in seconds. If not provided, the config timeout will be used.
Returns
A session to the database
43def translate_engine(engine: str) -> str: # pragma: unit 44 """ 45 Translates the engine name to the name used in a connection URL. 46 47 * `postgres` or `postgresql` -> `postgresql` 48 * `mysql`, `mariadb` or `maria` -> `mysql` 49 50 :param engine: 51 :return: Translated engine name 52 """ 53 if engine.lower() in ["postgres", "postgresql"]: 54 return "postgresql" 55 elif engine.lower() in ["mysql", "mariadb", "maria"]: 56 return "mysql" 57 return engine.lower()
Translates the engine name to the name used in a connection URL.
postgresorpostgresql->postgresqlmysql,mariadbormaria->mysql
Parameters
- engine:
Returns
Translated engine name
60def create_url(engine: str, username: str, password: str, host: str, database: str, port: str or int = None, 61 schema: str = None, options: str = None) -> str: # pragma: unit 62 """ 63 Creates a connection URL for the given database engine. 64 65 Makes use of `create_generic_url`. Depending on the engine, some additional logic can be applied in the future. 66 """ 67 # any engines that work differently, add some ifs here 68 return create_generic_url(translate_engine(engine), username, password, host, database, port, schema, options)
Creates a connection URL for the given database engine.
Makes use of create_generic_url. Depending on the engine, some additional logic can be applied in the future.
71def create_generic_url(engine, username, password, host, database, port: str or int = None, schema: str = None, 72 options: str = None): # pragma: unit 73 """ 74 Creates a connection URL for the given database engine, essentially just formats the string into: 75 ```python 76 f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}" 77 ``` 78 Where `_port`, `_schema` and `_options` are optional and thus have logic to only add them if they are provided. 79 80 :param engine: 81 :param username: 82 :param password: 83 :param host: 84 :param database: 85 :param port: Optional 86 :param schema: Optional 87 :param options: Optional. A query param style string of options to append to the URL (e.g. `opt1=val1&opt2=val2`) 88 :return: 89 """ 90 _options = f"?{options}" if options else "" 91 _schema = f"/{schema}" if schema else "" 92 _port = f":{port}" if port else "" 93 94 return f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"
Creates a connection URL for the given database engine, essentially just formats the string into:
f"{engine}://{username}:{password}@{host}{_port}/{database}{_schema}{_options}"
Where _port, _schema and _options are optional and thus have logic to only add them if they are provided.
Parameters
- engine:
- username:
- password:
- host:
- database:
- port: Optional
- schema: Optional
- options: Optional. A query param style string of options to append to the URL (e.g.
opt1=val1&opt2=val2)
Returns
97def get_connection_details(secret: dict, db_config_suffix: str = None) -> tuple: # pragma: unit 98 """ 99 Gets the connection details from the secret and/or config. 100 :param secret: The secret to get the details from. 101 :param db_config_suffix: Optional. If provided, looks for config in the `database_<suffix>` section. 102 :return: A tuple of the connection details. This must be unpackable into `create_url` (see order of args). 103 """ 104 config = get_config_params() 105 config_suffix = f"_{db_config_suffix}" if db_config_suffix else "" 106 return get_dict_key_with_fallback(secret, "engine", config[f"database{config_suffix}"]["engine"]), \ 107 get_dict_key_with_fallback(secret, "username", config[f"database{config_suffix}"]["username"]), \ 108 get_dict_key_with_fallback(secret, "password", config[f"database{config_suffix}"]["password"]), \ 109 get_dict_key_with_fallback(secret, "host", config[f"database{config_suffix}"]["host"]), \ 110 get_dict_key_with_fallback(secret, "dbname", config[f"database{config_suffix}"]["database_name"]), \ 111 get_dict_key_with_fallback(secret, "port", config[f"database{config_suffix}"]["port"]), \ 112 get_dict_key_with_fallback(secret, "schema", config[f"database{config_suffix}"]["schema"]), \ 113 get_dict_key_with_fallback(secret, "options", config[f"database{config_suffix}"]["options"])
Gets the connection details from the secret and/or config.
Parameters
- secret: The secret to get the details from.
- db_config_suffix: Optional. If provided, looks for config in the
database_<suffix>section.
Returns
A tuple of the connection details. This must be unpackable into
create_url(see order of args).
137def refresh_session(): # pragma: integration 138 """ 139 Refreshes the database session and also returns it. 140 :return: A session object 141 """ 142 global _SESSION 143 try: 144 _SESSION.close() 145 except Exception: 146 pass 147 _SESSION = create_session() 148 return _SESSION
Refreshes the database session and also returns it.
Returns
A session object
151def connection_is_active() -> bool: # pragma: integration 152 """ 153 Checks if the database connection is active. 154 :return: Whether the connection is active. 155 """ 156 try: 157 s = create_session(1) 158 s.execute(text("select 1")) 159 s.close() 160 except OperationalError as err: 161 if "timeout" not in str(err) and "Connection refused" not in str(err): 162 log.warning(f"Unexpected error: {err}") 163 return False 164 except Exception as e: 165 log.warning(f"Unexpected error: {e}") 166 return False 167 return True
Checks if the database connection is active.
Returns
Whether the connection is active.
170def has_table_access(table_name: str) -> bool: # pragma: integration 171 """ 172 Checks if the application has access to the given table. 173 :param table_name: Table to check. 174 :return: Whether the table is accessible. 175 """ 176 try: 177 refresh_session() 178 _SESSION.execute(text(f"select * from {table_name} limit 1")) 179 except Exception as e: 180 log.warning(f"Unexpected error: {e}") 181 return False 182 return True
Checks if the application has access to the given table.
Parameters
- table_name: Table to check.
Returns
Whether the table is accessible.
188def get_session(): # pragma: integration 189 """ 190 Gets the global session object. 191 """ 192 global _SESSION 193 return _SESSION
Gets the global session object.