bedrock.cache.websockets.websocket_connection_data

 1from typing import Optional
 2
 3from bedrock._helpers.classes import find_all_attributes  # pragma: unit
 4from bedrock._helpers.string import camelCase_to_snake_case, snake_case_to_camelCase  # pragma: unit
 5from bedrock.log import log_config  # pragma: unit
 6
 7logger = log_config("websocket_connection_data")  # pragma: unit
 8
 9
10class WebsocketConnectionData():  # pragma: unit
11    connection_id: Optional[str] = None
12    source_ip: Optional[str] = None
13    token: Optional[str] = None
14    token_expiry: Optional[int] = None
15    last_token_check: Optional[int] = None
16    broadcast_filters: Optional[dict] = None
17    is_authorised: bool = False
18
19    def __init__(self, connection_id: Optional[str] = None,
20                 source_ip: Optional[str] = None,
21                 token: Optional[str] = None,
22                 token_expiry: Optional[int] = None,
23                 last_token_check: Optional[int] = None,
24                 broadcast_filters: Optional[dict] = None,
25                 is_authorised: bool = False):  # pragma: unit
26        self.connection_id = connection_id
27        self.source_ip = source_ip
28        self.broadcast_filters = broadcast_filters
29        self.token = token
30        self.token_expiry = token_expiry
31        self.last_token_check = last_token_check
32        self.is_authorised = is_authorised
33
34    def as_dict(self, excludes=[], nested_excludes=[], extra={},
35                casing_function=camelCase_to_snake_case,
36                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
37        cls = self.__class__
38        class_attributes = find_all_attributes(cls)
39        dictionary = {}
40
41        for attribute in class_attributes:
42            if attribute not in excludes:
43                dictionary[casing_function(attribute)] = getattr(self, attribute)
44
45        for key, value in extra.items():
46            dictionary[casing_function(key)] = value
47
48        return dictionary
49
50    def as_json(self, excludes=[], nested_excludes=[], extra={},
51                casing_function=snake_case_to_camelCase,
52                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
53        return self.as_dict(excludes, nested_excludes, extra, casing_function, resolve_defaults, visited)
54
55    @classmethod
56    def from_dict(cls, data: dict) -> 'WebsocketConnectionData':  # pragma: unit
57        websocket_connection_data = cls()
58        class_attributes = find_all_attributes(cls)
59        for key, value in data.items():
60            snake_case_key = camelCase_to_snake_case(key)
61            if snake_case_key not in class_attributes:
62                logger.debug(f"Ignoring unknown key: {snake_case_key} in WebsocketConnectionData")
63                continue
64            setattr(websocket_connection_data, snake_case_key, value)
65        return websocket_connection_data
logger = <MyLogger BEDROCK-websocket_connection_data (INFO)>
class WebsocketConnectionData:
11class WebsocketConnectionData():  # pragma: unit
12    connection_id: Optional[str] = None
13    source_ip: Optional[str] = None
14    token: Optional[str] = None
15    token_expiry: Optional[int] = None
16    last_token_check: Optional[int] = None
17    broadcast_filters: Optional[dict] = None
18    is_authorised: bool = False
19
20    def __init__(self, connection_id: Optional[str] = None,
21                 source_ip: Optional[str] = None,
22                 token: Optional[str] = None,
23                 token_expiry: Optional[int] = None,
24                 last_token_check: Optional[int] = None,
25                 broadcast_filters: Optional[dict] = None,
26                 is_authorised: bool = False):  # pragma: unit
27        self.connection_id = connection_id
28        self.source_ip = source_ip
29        self.broadcast_filters = broadcast_filters
30        self.token = token
31        self.token_expiry = token_expiry
32        self.last_token_check = last_token_check
33        self.is_authorised = is_authorised
34
35    def as_dict(self, excludes=[], nested_excludes=[], extra={},
36                casing_function=camelCase_to_snake_case,
37                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
38        cls = self.__class__
39        class_attributes = find_all_attributes(cls)
40        dictionary = {}
41
42        for attribute in class_attributes:
43            if attribute not in excludes:
44                dictionary[casing_function(attribute)] = getattr(self, attribute)
45
46        for key, value in extra.items():
47            dictionary[casing_function(key)] = value
48
49        return dictionary
50
51    def as_json(self, excludes=[], nested_excludes=[], extra={},
52                casing_function=snake_case_to_camelCase,
53                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
54        return self.as_dict(excludes, nested_excludes, extra, casing_function, resolve_defaults, visited)
55
56    @classmethod
57    def from_dict(cls, data: dict) -> 'WebsocketConnectionData':  # pragma: unit
58        websocket_connection_data = cls()
59        class_attributes = find_all_attributes(cls)
60        for key, value in data.items():
61            snake_case_key = camelCase_to_snake_case(key)
62            if snake_case_key not in class_attributes:
63                logger.debug(f"Ignoring unknown key: {snake_case_key} in WebsocketConnectionData")
64                continue
65            setattr(websocket_connection_data, snake_case_key, value)
66        return websocket_connection_data
WebsocketConnectionData( connection_id: Optional[str] = None, source_ip: Optional[str] = None, token: Optional[str] = None, token_expiry: Optional[int] = None, last_token_check: Optional[int] = None, broadcast_filters: Optional[dict] = None, is_authorised: bool = False)
20    def __init__(self, connection_id: Optional[str] = None,
21                 source_ip: Optional[str] = None,
22                 token: Optional[str] = None,
23                 token_expiry: Optional[int] = None,
24                 last_token_check: Optional[int] = None,
25                 broadcast_filters: Optional[dict] = None,
26                 is_authorised: bool = False):  # pragma: unit
27        self.connection_id = connection_id
28        self.source_ip = source_ip
29        self.broadcast_filters = broadcast_filters
30        self.token = token
31        self.token_expiry = token_expiry
32        self.last_token_check = last_token_check
33        self.is_authorised = is_authorised
connection_id: Optional[str] = None
source_ip: Optional[str] = None
token: Optional[str] = None
token_expiry: Optional[int] = None
last_token_check: Optional[int] = None
broadcast_filters: Optional[dict] = None
is_authorised: bool = False
def as_dict( self, excludes=[], nested_excludes=[], extra={}, casing_function=<function camelCase_to_snake_case>, resolve_defaults=False, visited=None) -> dict:
35    def as_dict(self, excludes=[], nested_excludes=[], extra={},
36                casing_function=camelCase_to_snake_case,
37                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
38        cls = self.__class__
39        class_attributes = find_all_attributes(cls)
40        dictionary = {}
41
42        for attribute in class_attributes:
43            if attribute not in excludes:
44                dictionary[casing_function(attribute)] = getattr(self, attribute)
45
46        for key, value in extra.items():
47            dictionary[casing_function(key)] = value
48
49        return dictionary
def as_json( self, excludes=[], nested_excludes=[], extra={}, casing_function=<function snake_case_to_camelCase>, resolve_defaults=False, visited=None) -> dict:
51    def as_json(self, excludes=[], nested_excludes=[], extra={},
52                casing_function=snake_case_to_camelCase,
53                resolve_defaults=False, visited=None) -> dict:  # pragma: unit
54        return self.as_dict(excludes, nested_excludes, extra, casing_function, resolve_defaults, visited)
@classmethod
def from_dict( cls, data: dict) -> WebsocketConnectionData:
56    @classmethod
57    def from_dict(cls, data: dict) -> 'WebsocketConnectionData':  # pragma: unit
58        websocket_connection_data = cls()
59        class_attributes = find_all_attributes(cls)
60        for key, value in data.items():
61            snake_case_key = camelCase_to_snake_case(key)
62            if snake_case_key not in class_attributes:
63                logger.debug(f"Ignoring unknown key: {snake_case_key} in WebsocketConnectionData")
64                continue
65            setattr(websocket_connection_data, snake_case_key, value)
66        return websocket_connection_data