bedrock.cache.websockets.websocket_connection_data
1from typing import Optional 2 3from bedrock._helpers.classes import find_all_attributes # pragma: unit 4from bedrock._helpers.string import camelCase_to_snake_case, snake_case_to_camelCase # pragma: unit 5from bedrock.log import log_config # pragma: unit 6 7logger = log_config("websocket_connection_data") # pragma: unit 8 9 10class WebsocketConnectionData(): # pragma: unit 11 connection_id: Optional[str] = None 12 source_ip: Optional[str] = None 13 token: Optional[str] = None 14 token_expiry: Optional[int] = None 15 last_token_check: Optional[int] = None 16 broadcast_filters: Optional[dict] = None 17 is_authorised: bool = False 18 19 def __init__(self, connection_id: Optional[str] = None, 20 source_ip: Optional[str] = None, 21 token: Optional[str] = None, 22 token_expiry: Optional[int] = None, 23 last_token_check: Optional[int] = None, 24 broadcast_filters: Optional[dict] = None, 25 is_authorised: bool = False): # pragma: unit 26 self.connection_id = connection_id 27 self.source_ip = source_ip 28 self.broadcast_filters = broadcast_filters 29 self.token = token 30 self.token_expiry = token_expiry 31 self.last_token_check = last_token_check 32 self.is_authorised = is_authorised 33 34 def as_dict(self, excludes=[], nested_excludes=[], extra={}, 35 casing_function=camelCase_to_snake_case, 36 resolve_defaults=False, visited=None) -> dict: # pragma: unit 37 cls = self.__class__ 38 class_attributes = find_all_attributes(cls) 39 dictionary = {} 40 41 for attribute in class_attributes: 42 if attribute not in excludes: 43 dictionary[casing_function(attribute)] = getattr(self, attribute) 44 45 for key, value in extra.items(): 46 dictionary[casing_function(key)] = value 47 48 return dictionary 49 50 def as_json(self, excludes=[], nested_excludes=[], extra={}, 51 casing_function=snake_case_to_camelCase, 52 resolve_defaults=False, visited=None) -> dict: # pragma: unit 53 return self.as_dict(excludes, nested_excludes, extra, casing_function, resolve_defaults, visited) 54 55 @classmethod 56 def from_dict(cls, data: dict) -> 'WebsocketConnectionData': # pragma: unit 57 websocket_connection_data = cls() 58 class_attributes = find_all_attributes(cls) 59 for key, value in data.items(): 60 snake_case_key = camelCase_to_snake_case(key) 61 if snake_case_key not in class_attributes: 62 logger.debug(f"Ignoring unknown key: {snake_case_key} in WebsocketConnectionData") 63 continue 64 setattr(websocket_connection_data, snake_case_key, value) 65 return websocket_connection_data
logger =
<MyLogger BEDROCK-websocket_connection_data (INFO)>
class
WebsocketConnectionData:
11class WebsocketConnectionData(): # pragma: unit 12 connection_id: Optional[str] = None 13 source_ip: Optional[str] = None 14 token: Optional[str] = None 15 token_expiry: Optional[int] = None 16 last_token_check: Optional[int] = None 17 broadcast_filters: Optional[dict] = None 18 is_authorised: bool = False 19 20 def __init__(self, connection_id: Optional[str] = None, 21 source_ip: Optional[str] = None, 22 token: Optional[str] = None, 23 token_expiry: Optional[int] = None, 24 last_token_check: Optional[int] = None, 25 broadcast_filters: Optional[dict] = None, 26 is_authorised: bool = False): # pragma: unit 27 self.connection_id = connection_id 28 self.source_ip = source_ip 29 self.broadcast_filters = broadcast_filters 30 self.token = token 31 self.token_expiry = token_expiry 32 self.last_token_check = last_token_check 33 self.is_authorised = is_authorised 34 35 def as_dict(self, excludes=[], nested_excludes=[], extra={}, 36 casing_function=camelCase_to_snake_case, 37 resolve_defaults=False, visited=None) -> dict: # pragma: unit 38 cls = self.__class__ 39 class_attributes = find_all_attributes(cls) 40 dictionary = {} 41 42 for attribute in class_attributes: 43 if attribute not in excludes: 44 dictionary[casing_function(attribute)] = getattr(self, attribute) 45 46 for key, value in extra.items(): 47 dictionary[casing_function(key)] = value 48 49 return dictionary 50 51 def as_json(self, excludes=[], nested_excludes=[], extra={}, 52 casing_function=snake_case_to_camelCase, 53 resolve_defaults=False, visited=None) -> dict: # pragma: unit 54 return self.as_dict(excludes, nested_excludes, extra, casing_function, resolve_defaults, visited) 55 56 @classmethod 57 def from_dict(cls, data: dict) -> 'WebsocketConnectionData': # pragma: unit 58 websocket_connection_data = cls() 59 class_attributes = find_all_attributes(cls) 60 for key, value in data.items(): 61 snake_case_key = camelCase_to_snake_case(key) 62 if snake_case_key not in class_attributes: 63 logger.debug(f"Ignoring unknown key: {snake_case_key} in WebsocketConnectionData") 64 continue 65 setattr(websocket_connection_data, snake_case_key, value) 66 return websocket_connection_data
WebsocketConnectionData( connection_id: Optional[str] = None, source_ip: Optional[str] = None, token: Optional[str] = None, token_expiry: Optional[int] = None, last_token_check: Optional[int] = None, broadcast_filters: Optional[dict] = None, is_authorised: bool = False)
20 def __init__(self, connection_id: Optional[str] = None, 21 source_ip: Optional[str] = None, 22 token: Optional[str] = None, 23 token_expiry: Optional[int] = None, 24 last_token_check: Optional[int] = None, 25 broadcast_filters: Optional[dict] = None, 26 is_authorised: bool = False): # pragma: unit 27 self.connection_id = connection_id 28 self.source_ip = source_ip 29 self.broadcast_filters = broadcast_filters 30 self.token = token 31 self.token_expiry = token_expiry 32 self.last_token_check = last_token_check 33 self.is_authorised = is_authorised
def
as_dict( self, excludes=[], nested_excludes=[], extra={}, casing_function=<function camelCase_to_snake_case>, resolve_defaults=False, visited=None) -> dict:
35 def as_dict(self, excludes=[], nested_excludes=[], extra={}, 36 casing_function=camelCase_to_snake_case, 37 resolve_defaults=False, visited=None) -> dict: # pragma: unit 38 cls = self.__class__ 39 class_attributes = find_all_attributes(cls) 40 dictionary = {} 41 42 for attribute in class_attributes: 43 if attribute not in excludes: 44 dictionary[casing_function(attribute)] = getattr(self, attribute) 45 46 for key, value in extra.items(): 47 dictionary[casing_function(key)] = value 48 49 return dictionary
def
as_json( self, excludes=[], nested_excludes=[], extra={}, casing_function=<function snake_case_to_camelCase>, resolve_defaults=False, visited=None) -> dict:
56 @classmethod 57 def from_dict(cls, data: dict) -> 'WebsocketConnectionData': # pragma: unit 58 websocket_connection_data = cls() 59 class_attributes = find_all_attributes(cls) 60 for key, value in data.items(): 61 snake_case_key = camelCase_to_snake_case(key) 62 if snake_case_key not in class_attributes: 63 logger.debug(f"Ignoring unknown key: {snake_case_key} in WebsocketConnectionData") 64 continue 65 setattr(websocket_connection_data, snake_case_key, value) 66 return websocket_connection_data