bedrock.endpoints.decorators.audit_log

 1import functools  # pragma: unit
 2import json  # pragma: unit
 3from datetime import datetime, timezone  # pragma: unit
 4
 5from bedrock.endpoints.decorators.protected import get_auth_header_type, AuthTypes, get_auth_header_value  # pragma: unit
 6from bedrock.exceptions import UnauthorisedException  # pragma: unit
 7from bedrock.log import log_config  # pragma: unit
 8
 9log = log_config("AuditLog")  # pragma: unit
10
11
12def extract_username_from_token(token):  # pragma: unit
13    """Extract a stable identifier from JWT token object."""
14    return (
15            getattr(token, "full_name", None)
16            or getattr(token, "email", None)
17            or getattr(token, "user_id", None)
18            or "anonymous"
19    )
20
21
22def audit_log(log_request_body: bool = True, log_response_body: bool = True):  # pragma: unit
23    """
24    Decorator to log audit events for AWS Lambda endpoints using Bedrock.
25
26    :param log_request_body: Whether to log request body (only for write methods)
27    :param log_response_body: Whether to log response body
28    """
29
30    def decorator(func):
31        @functools.wraps(func)
32        def wrapper(*args, **kwargs):
33            self = args[0]
34            event = args[1]
35
36            auth_type = get_auth_header_type(event)
37            if not auth_type:
38                raise UnauthorisedException("Missing authentication header")
39
40            if auth_type == AuthTypes.API_KEY:
41                api_key = get_auth_header_value(event, auth_type)
42                user = f"API Key: {api_key[0:4]}...{api_key[-4:]}"
43            else:  # i.e. it's AuthTypes.BEARER
44                token = event.get("tkc_token")
45                user = extract_username_from_token(token)
46
47            ip = (
48                event.get("requestContext", {})
49                .get("identity", {})
50                .get("sourceIp", "UNKNOWN")
51            )
52            method = event.get("httpMethod", "UNKNOWN")
53            path = event.get("path", "UNKNOWN")
54            timestamp = datetime.now(timezone.utc).isoformat()
55
56            request_body = None
57            if log_request_body and method in {"POST", "PUT", "DELETE"}:
58                request_body = json.loads(event.get("body", "{}"))
59
60            try:
61                response = func(*args, **kwargs)
62                status_code = response[0]
63                response_body = response[1] if log_response_body else None
64            except Exception as e:
65                status_code = 500
66                response_body = str(e) if log_response_body else None
67            finally:
68                audit_log_entry = {
69                    "audit": True,
70                    "tag": "audit-log",
71                    "timestamp": timestamp,
72                    "user": user,
73                    "ip": ip,
74                    "method": method,
75                    "path": path,
76                    "status": status_code,
77                }
78
79                if request_body is not None:
80                    audit_log_entry["requestBody"] = request_body
81                if response_body is not None:
82                    audit_log_entry["responseBody"] = response_body
83
84                log.audit(audit_log_entry)
85
86            return response
87
88        return wrapper
89
90    return decorator
log = <MyLogger BEDROCK-AuditLog (INFO)>
def extract_username_from_token(token):
13def extract_username_from_token(token):  # pragma: unit
14    """Extract a stable identifier from JWT token object."""
15    return (
16            getattr(token, "full_name", None)
17            or getattr(token, "email", None)
18            or getattr(token, "user_id", None)
19            or "anonymous"
20    )

Extract a stable identifier from JWT token object.

def audit_log(log_request_body: bool = True, log_response_body: bool = True):
23def audit_log(log_request_body: bool = True, log_response_body: bool = True):  # pragma: unit
24    """
25    Decorator to log audit events for AWS Lambda endpoints using Bedrock.
26
27    :param log_request_body: Whether to log request body (only for write methods)
28    :param log_response_body: Whether to log response body
29    """
30
31    def decorator(func):
32        @functools.wraps(func)
33        def wrapper(*args, **kwargs):
34            self = args[0]
35            event = args[1]
36
37            auth_type = get_auth_header_type(event)
38            if not auth_type:
39                raise UnauthorisedException("Missing authentication header")
40
41            if auth_type == AuthTypes.API_KEY:
42                api_key = get_auth_header_value(event, auth_type)
43                user = f"API Key: {api_key[0:4]}...{api_key[-4:]}"
44            else:  # i.e. it's AuthTypes.BEARER
45                token = event.get("tkc_token")
46                user = extract_username_from_token(token)
47
48            ip = (
49                event.get("requestContext", {})
50                .get("identity", {})
51                .get("sourceIp", "UNKNOWN")
52            )
53            method = event.get("httpMethod", "UNKNOWN")
54            path = event.get("path", "UNKNOWN")
55            timestamp = datetime.now(timezone.utc).isoformat()
56
57            request_body = None
58            if log_request_body and method in {"POST", "PUT", "DELETE"}:
59                request_body = json.loads(event.get("body", "{}"))
60
61            try:
62                response = func(*args, **kwargs)
63                status_code = response[0]
64                response_body = response[1] if log_response_body else None
65            except Exception as e:
66                status_code = 500
67                response_body = str(e) if log_response_body else None
68            finally:
69                audit_log_entry = {
70                    "audit": True,
71                    "tag": "audit-log",
72                    "timestamp": timestamp,
73                    "user": user,
74                    "ip": ip,
75                    "method": method,
76                    "path": path,
77                    "status": status_code,
78                }
79
80                if request_body is not None:
81                    audit_log_entry["requestBody"] = request_body
82                if response_body is not None:
83                    audit_log_entry["responseBody"] = response_body
84
85                log.audit(audit_log_entry)
86
87            return response
88
89        return wrapper
90
91    return decorator

Decorator to log audit events for AWS Lambda endpoints using Bedrock.

Parameters
  • log_request_body: Whether to log request body (only for write methods)
  • log_response_body: Whether to log response body