bedrock.external.auth_handler
1from collections.abc import Callable 2 3from jwt import ExpiredSignatureError 4from bedrock.security import Token 5from bedrock.exceptions import UnauthorisedException 6from bedrock._helpers.crypto import decode_access_token 7 8TOKENS = {} 9""" 10Tokens are cached in-memory for the duration of their validity or until the lambda goes to sleep. 11""" 12 13 14def default_construct_token(access_token) -> Token: # pragma: integration 15 """ 16 Default function for creating tokens. To override this, see `set_token_constructor`. 17 18 :param access_token: 19 :return: The `Token` object 20 """ 21 return Token(access_token) 22 23 24TOKEN_CONSTRUCTOR = default_construct_token 25""" 26The function that is called to authenticate a JWT. By default, it is `default_auth`, but can be set to a custom function 27(see `set_token_constructor`). 28""" 29 30 31def delete_token(key): # pragma: unit 32 """ 33 Delete a token from the cache. 34 :param key: The JWT to delete 35 """ 36 try: 37 del TOKENS[key] 38 except KeyError: 39 pass 40 41 42def save_token(key, token): # pragma: unit 43 """ 44 Save a token to the cache. 45 :param key: The JWT to save 46 :param token: The `Token` object to save 47 :return: 48 """ 49 TOKENS[key] = token 50 51 52def set_token_constructor(fn: Callable[[str], Token]): 53 """ 54 Setter for the token creation function so that a user of bedrock can provide custom logic to create a token from a JWT. 55 This should get called in the lambda handler (`handler.template.py`) and the local handler (`local.handler.py`). 56 57 For example: 58 ```python 59from bedrock.external.auth_handler import set_token_constructor 60set_token_constructor(lambda access_token: TokenWithAccounts(access_token[account['number'] for account in fetch_accounts(access_token)])) 61 """ 62 global TOKEN_CONSTRUCTOR 63 TOKEN_CONSTRUCTOR = fn 64 65 66def auth(access_token) -> Token: 67 """ 68 Verify the JWT, create a `Token` based on it, then save it to the cache. 69 70 :param access_token: 71 :return: The `Token` object 72 """ 73 decode_access_token(access_token) 74 token = TOKEN_CONSTRUCTOR(access_token) 75 save_token(access_token, token) 76 return token 77 78 79def get_token(access_token) -> Token: # pragma: unit 80 """ 81 Get a token from the cache or verify authorisation if it is not present. 82 :param access_token: The JWT to get 83 :return: A `Token` object 84 """ 85 try: 86 decode_access_token(access_token) # to ensure the token is valid 87 token = TOKENS[access_token] if access_token in TOKENS else auth(access_token) 88 except ExpiredSignatureError: 89 delete_token(access_token) 90 raise UnauthorisedException("Access Token has expired") 91 except Exception: 92 delete_token(access_token) 93 raise UnauthorisedException("Invalid Access Token") 94 return token
Tokens are cached in-memory for the duration of their validity or until the lambda goes to sleep.
15def default_construct_token(access_token) -> Token: # pragma: integration 16 """ 17 Default function for creating tokens. To override this, see `set_token_constructor`. 18 19 :param access_token: 20 :return: The `Token` object 21 """ 22 return Token(access_token)
Default function for creating tokens. To override this, see set_token_constructor.
Parameters
- access_token:
Returns
The
Tokenobject
15def default_construct_token(access_token) -> Token: # pragma: integration 16 """ 17 Default function for creating tokens. To override this, see `set_token_constructor`. 18 19 :param access_token: 20 :return: The `Token` object 21 """ 22 return Token(access_token)
The function that is called to authenticate a JWT. By default, it is default_auth, but can be set to a custom function
(see set_token_constructor).
32def delete_token(key): # pragma: unit 33 """ 34 Delete a token from the cache. 35 :param key: The JWT to delete 36 """ 37 try: 38 del TOKENS[key] 39 except KeyError: 40 pass
Delete a token from the cache.
Parameters
- key: The JWT to delete
43def save_token(key, token): # pragma: unit 44 """ 45 Save a token to the cache. 46 :param key: The JWT to save 47 :param token: The `Token` object to save 48 :return: 49 """ 50 TOKENS[key] = token
Save a token to the cache.
Parameters
- key: The JWT to save
- token: The
Tokenobject to save
Returns
53def set_token_constructor(fn: Callable[[str], Token]): 54 """ 55 Setter for the token creation function so that a user of bedrock can provide custom logic to create a token from a JWT. 56 This should get called in the lambda handler (`handler.template.py`) and the local handler (`local.handler.py`). 57 58 For example: 59 ```python 60from bedrock.external.auth_handler import set_token_constructor 61set_token_constructor(lambda access_token: TokenWithAccounts(access_token[account['number'] for account in fetch_accounts(access_token)])) 62 """ 63 global TOKEN_CONSTRUCTOR 64 TOKEN_CONSTRUCTOR = fn
Setter for the token creation function so that a user of bedrock can provide custom logic to create a token from a JWT.
This should get called in the lambda handler (handler.template.py) and the local handler (local.handler.py).
For example:
```python
from bedrock.external.auth_handler import set_token_constructor set_token_constructor(lambda access_token: TokenWithAccounts(access_token[account['number'] for account in fetch_accounts(access_token)]))
67def auth(access_token) -> Token: 68 """ 69 Verify the JWT, create a `Token` based on it, then save it to the cache. 70 71 :param access_token: 72 :return: The `Token` object 73 """ 74 decode_access_token(access_token) 75 token = TOKEN_CONSTRUCTOR(access_token) 76 save_token(access_token, token) 77 return token
Verify the JWT, create a Token based on it, then save it to the cache.
Parameters
- access_token:
Returns
The
Tokenobject
80def get_token(access_token) -> Token: # pragma: unit 81 """ 82 Get a token from the cache or verify authorisation if it is not present. 83 :param access_token: The JWT to get 84 :return: A `Token` object 85 """ 86 try: 87 decode_access_token(access_token) # to ensure the token is valid 88 token = TOKENS[access_token] if access_token in TOKENS else auth(access_token) 89 except ExpiredSignatureError: 90 delete_token(access_token) 91 raise UnauthorisedException("Access Token has expired") 92 except Exception: 93 delete_token(access_token) 94 raise UnauthorisedException("Invalid Access Token") 95 return token
Get a token from the cache or verify authorisation if it is not present.
Parameters
- access_token: The JWT to get
Returns
A
Tokenobject