-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
152 lines (112 loc) · 3.25 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from base64 import urlsafe_b64decode
from struct import unpack
from datetime import datetime
from os import listdir, mkdir
from time import time
import logging
from typing import Tuple
ID32_FORMAT_SIZE: int = 27
ID64_FORMAT_SIZE: int = 32
_log_prefix: str = "inline_message_id"
_log_level: int = logging.DEBUG
_log_level_file: int = logging.WARNING
_log_format: str = "%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).(%(lineno)d) - %(message)s"
_log_dir: str = "logs"
def decode_response_base64(string: str) -> bytes:
return urlsafe_b64decode(string + ("=" * (len(string) % 4)))
def resolve_inline_message_id(inline_message_id: str) -> Tuple[int, int, int, int]:
dc_id: int
message_id: int
chat_id: int
access_hash: int
decoded_response: bytes = decode_response_base64(
string = inline_message_id
)
if len(inline_message_id) == ID32_FORMAT_SIZE:
dc_id, message_id, chat_id, access_hash = unpack("<iiiq", decoded_response)
else:
dc_id, chat_id, message_id, access_hash = unpack("<iqiq", decoded_response)
return (
dc_id,
message_id,
chat_id,
access_hash
)
def parse_chat_id(chat_id: int) -> Tuple[bool, int]:
is_chat: bool = chat_id < 0
chat_id_: int
if is_chat:
chat_id_ = chat_id * -1
else:
chat_id_ = chat_id
return (
is_chat,
chat_id_
)
class CustomAdapter(logging.LoggerAdapter):
def process(self, message: str, extra: dict) -> Tuple[str, dict]:
return (
"[{key}] {message}".format(
key = extra.pop(_log_prefix, ""),
message = message
),
extra
)
def get_file_handler() -> logging.FileHandler:
date: datetime = datetime.now()
file_handler: logging.FileHandler = logging.FileHandler(
filename = "{dir}/{hours}.{minute}.{second} {day}.{month}.{year}.log".format(
dir = _log_dir,
hours = date.hour,
minute = date.minute,
second = date.second,
day = date.day,
month = date.month,
year = date.year
),
encoding = "utf-8"
)
file_handler.setLevel(
level = _log_level_file
)
file_handler.setFormatter(
fmt = logging.Formatter(
fmt = _log_format
)
)
return file_handler
def get_stream_handler() -> logging.StreamHandler:
stream_handler: logging.StreamHandler = logging.StreamHandler()
stream_handler.setLevel(
level = _log_level
)
stream_handler.setFormatter(
fmt = logging.Formatter(
fmt = _log_format
)
)
return stream_handler
def get_logger(name: str) -> logging.LoggerAdapter:
logger: logging.Logger = logging.getLogger(
name = name
)
logger.setLevel(
level = _log_level
)
if _log_dir not in listdir():
mkdir(_log_dir)
logger.addHandler(
hdlr = get_file_handler()
)
logger.addHandler(
hdlr = get_stream_handler()
)
logger: logging.LoggerAdapter = CustomAdapter(
logger = logger,
extra = {
_log_prefix: None
}
)
return logger
def get_timestamp() -> int:
return int(time())