bedrock.endpoints.decorators.audit_log
1import functools # pragma: unit 2import json # pragma: unit 3from datetime import datetime, timezone # pragma: unit 4 5from bedrock.endpoints.decorators.protected import get_auth_header_type, AuthTypes, get_auth_header_value # pragma: unit 6from bedrock.exceptions import UnauthorisedException # pragma: unit 7from bedrock.log import log_config # pragma: unit 8 9log = log_config("AuditLog") # pragma: unit 10 11 12def extract_username_from_token(token): # pragma: unit 13 """Extract a stable identifier from JWT token object.""" 14 return ( 15 getattr(token, "full_name", None) 16 or getattr(token, "email", None) 17 or getattr(token, "user_id", None) 18 or "anonymous" 19 ) 20 21 22def audit_log(log_request_body: bool = True, log_response_body: bool = True): # pragma: unit 23 """ 24 Decorator to log audit events for AWS Lambda endpoints using Bedrock. 25 26 :param log_request_body: Whether to log request body (only for write methods) 27 :param log_response_body: Whether to log response body 28 """ 29 30 def decorator(func): 31 @functools.wraps(func) 32 def wrapper(*args, **kwargs): 33 self = args[0] 34 event = args[1] 35 36 auth_type = get_auth_header_type(event) 37 if not auth_type: 38 raise UnauthorisedException("Missing authentication header") 39 40 if auth_type == AuthTypes.API_KEY: 41 api_key = get_auth_header_value(event, auth_type) 42 user = f"API Key: {api_key[0:4]}...{api_key[-4:]}" 43 else: # i.e. it's AuthTypes.BEARER 44 token = event.get("tkc_token") 45 user = extract_username_from_token(token) 46 47 ip = ( 48 event.get("requestContext", {}) 49 .get("identity", {}) 50 .get("sourceIp", "UNKNOWN") 51 ) 52 method = event.get("httpMethod", "UNKNOWN") 53 path = event.get("path", "UNKNOWN") 54 timestamp = datetime.now(timezone.utc).isoformat() 55 56 request_body = None 57 if log_request_body and method in {"POST", "PUT", "DELETE"}: 58 request_body = json.loads(event.get("body", "{}")) 59 60 try: 61 response = func(*args, **kwargs) 62 status_code = response[0] 63 response_body = response[1] if log_response_body else None 64 except Exception as e: 65 status_code = 500 66 response_body = str(e) if log_response_body else None 67 finally: 68 audit_log_entry = { 69 "audit": True, 70 "tag": "audit-log", 71 "timestamp": timestamp, 72 "user": user, 73 "ip": ip, 74 "method": method, 75 "path": path, 76 "status": status_code, 77 } 78 79 if request_body is not None: 80 audit_log_entry["requestBody"] = request_body 81 if response_body is not None: 82 audit_log_entry["responseBody"] = response_body 83 84 log.audit(audit_log_entry) 85 86 return response 87 88 return wrapper 89 90 return decorator
log =
<MyLogger BEDROCK-AuditLog (INFO)>
def
extract_username_from_token(token):
13def extract_username_from_token(token): # pragma: unit 14 """Extract a stable identifier from JWT token object.""" 15 return ( 16 getattr(token, "full_name", None) 17 or getattr(token, "email", None) 18 or getattr(token, "user_id", None) 19 or "anonymous" 20 )
Extract a stable identifier from JWT token object.
def
audit_log(log_request_body: bool = True, log_response_body: bool = True):
23def audit_log(log_request_body: bool = True, log_response_body: bool = True): # pragma: unit 24 """ 25 Decorator to log audit events for AWS Lambda endpoints using Bedrock. 26 27 :param log_request_body: Whether to log request body (only for write methods) 28 :param log_response_body: Whether to log response body 29 """ 30 31 def decorator(func): 32 @functools.wraps(func) 33 def wrapper(*args, **kwargs): 34 self = args[0] 35 event = args[1] 36 37 auth_type = get_auth_header_type(event) 38 if not auth_type: 39 raise UnauthorisedException("Missing authentication header") 40 41 if auth_type == AuthTypes.API_KEY: 42 api_key = get_auth_header_value(event, auth_type) 43 user = f"API Key: {api_key[0:4]}...{api_key[-4:]}" 44 else: # i.e. it's AuthTypes.BEARER 45 token = event.get("tkc_token") 46 user = extract_username_from_token(token) 47 48 ip = ( 49 event.get("requestContext", {}) 50 .get("identity", {}) 51 .get("sourceIp", "UNKNOWN") 52 ) 53 method = event.get("httpMethod", "UNKNOWN") 54 path = event.get("path", "UNKNOWN") 55 timestamp = datetime.now(timezone.utc).isoformat() 56 57 request_body = None 58 if log_request_body and method in {"POST", "PUT", "DELETE"}: 59 request_body = json.loads(event.get("body", "{}")) 60 61 try: 62 response = func(*args, **kwargs) 63 status_code = response[0] 64 response_body = response[1] if log_response_body else None 65 except Exception as e: 66 status_code = 500 67 response_body = str(e) if log_response_body else None 68 finally: 69 audit_log_entry = { 70 "audit": True, 71 "tag": "audit-log", 72 "timestamp": timestamp, 73 "user": user, 74 "ip": ip, 75 "method": method, 76 "path": path, 77 "status": status_code, 78 } 79 80 if request_body is not None: 81 audit_log_entry["requestBody"] = request_body 82 if response_body is not None: 83 audit_log_entry["responseBody"] = response_body 84 85 log.audit(audit_log_entry) 86 87 return response 88 89 return wrapper 90 91 return decorator
Decorator to log audit events for AWS Lambda endpoints using Bedrock.
Parameters
- log_request_body: Whether to log request body (only for write methods)
- log_response_body: Whether to log response body