forked from Kometa-Team/Kometa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logs.py
299 lines (259 loc) · 12 KB
/
logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import io, logging, os, re, sys, traceback
from logging.handlers import RotatingFileHandler
LOG_DIR = "logs"
COLLECTION_DIR = "collections"
PLAYLIST_DIR = "playlists"
MAIN_LOG = "meta.log"
LIBRARY_LOG = "library.log"
COLLECTION_LOG = "collection.log"
PLAYLIST_LOG = "playlist.log"
PLAYLISTS_LOG = "playlists.log"
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
TRACE = 0
def fmt_filter(record):
record.levelname = f"[{record.levelname}]"
record.filename = f"[{record.filename}:{record.lineno}]"
return True
_srcfile = os.path.normcase(fmt_filter.__code__.co_filename)
class MyLogger:
def __init__(self, logger_name, default_dir, screen_width, separating_character, ignore_ghost, is_debug, is_trace, log_requests):
self.logger_name = logger_name
self.default_dir = default_dir
self.screen_width = screen_width
self.separating_character = separating_character
self.is_debug = is_debug
self.is_trace = is_trace
self.log_requests = log_requests
self.ignore_ghost = ignore_ghost
self.log_dir = os.path.join(default_dir, LOG_DIR)
self.playlists_dir = os.path.join(self.log_dir, PLAYLIST_DIR)
self.main_log = os.path.join(self.log_dir, MAIN_LOG)
self.main_handler = None
self.save_errors = False
self.saved_errors = []
self.library_handlers = {}
self.collection_handlers = {}
self.playlist_handlers = {}
self.playlists_handler = None
self.secrets = []
self.spacing = 0
self.playlists_log = os.path.join(self.playlists_dir, PLAYLISTS_LOG)
os.makedirs(self.log_dir, exist_ok=True)
self._logger = logging.getLogger(None if self.log_requests else self.logger_name)
self._logger.setLevel(logging.DEBUG)
cmd_handler = logging.StreamHandler()
cmd_handler.setLevel(logging.DEBUG if self.debug else logging.INFO)
self._logger.addHandler(cmd_handler)
def clear_errors(self):
self.saved_errors = []
def _get_handler(self, log_file, count=3):
_handler = RotatingFileHandler(log_file, delay=True, mode="w", backupCount=count, encoding="utf-8")
self._formatter(handler=_handler)
if os.path.isfile(log_file):
self._logger.removeHandler(_handler)
_handler.doRollover()
self._logger.addHandler(_handler)
return _handler
def _formatter(self, handler=None, border=True, trace=False, log_only=False, space=False):
console = f"| %(message)-{self.screen_width - 2}s |" if border else f"%(message)-{self.screen_width - 2}s"
file = f"{' '*65}" if space else f"[%(asctime)s] %(filename)-27s {'[TRACE] ' if trace else '%(levelname)-10s'} "
handlers = [handler] if handler else self._logger.handlers
for h in handlers:
if not log_only or isinstance(h, RotatingFileHandler):
h.setFormatter(logging.Formatter(f"{file if isinstance(h, RotatingFileHandler) else ''}{console}"))
def add_main_handler(self):
self.main_handler = self._get_handler(self.main_log, count=9)
self.main_handler.addFilter(fmt_filter)
self._logger.addHandler(self.main_handler)
def remove_main_handler(self):
self._logger.removeHandler(self.main_handler)
def add_library_handler(self, library_key):
os.makedirs(os.path.join(self.log_dir, library_key, COLLECTION_DIR), exist_ok=True)
self.library_handlers[library_key] = self._get_handler(os.path.join(self.log_dir, library_key, LIBRARY_LOG))
self._logger.addHandler(self.library_handlers[library_key])
def remove_library_handler(self, library_key):
if library_key in self.library_handlers:
self._logger.removeHandler(self.library_handlers[library_key])
def re_add_library_handler(self, library_key):
if library_key in self.library_handlers:
self._logger.addHandler(self.library_handlers[library_key])
def add_playlists_handler(self):
os.makedirs(self.playlists_dir, exist_ok=True)
self.playlists_handler = self._get_handler(self.playlists_log, count=10)
self._logger.addHandler(self.playlists_handler)
def remove_playlists_handler(self):
self._logger.removeHandler(self.playlists_handler)
def add_collection_handler(self, library_key, collection_key):
collection_dir = os.path.join(self.log_dir, str(library_key), COLLECTION_DIR, str(collection_key))
os.makedirs(collection_dir, exist_ok=True)
if library_key not in self.collection_handlers:
self.collection_handlers[library_key] = {}
self.collection_handlers[library_key][collection_key] = self._get_handler(os.path.join(collection_dir, COLLECTION_LOG))
self._logger.addHandler(self.collection_handlers[library_key][collection_key])
def remove_collection_handler(self, library_key, collection_key):
if library_key in self.collection_handlers and collection_key in self.collection_handlers[library_key]:
self._logger.removeHandler(self.collection_handlers[library_key][collection_key])
def add_playlist_handler(self, playlist_key):
playlist_dir = os.path.join(self.playlists_dir, playlist_key)
os.makedirs(playlist_dir, exist_ok=True)
self.playlist_handlers[playlist_key] = self._get_handler(os.path.join(playlist_dir, PLAYLIST_LOG))
self._logger.addHandler(self.playlist_handlers[playlist_key])
def remove_playlist_handler(self, playlist_key):
if playlist_key in self.playlist_handlers:
self._logger.removeHandler(self.playlist_handlers[playlist_key])
def _centered(self, text, sep=" ", side_space=True, left=False):
if len(text) > self.screen_width - 2:
return text
space = self.screen_width - len(text) - 2
text = f"{' ' if side_space else sep}{text}{' ' if side_space else sep}"
if space % 2 == 1:
text += sep
space -= 1
side = int(space / 2) - 1
final_text = f"{text}{sep * side}{sep * side}" if left else f"{sep * side}{text}{sep * side}"
return final_text
def separator(self, text=None, space=True, border=True, debug=False, trace=False, side_space=True, left=False):
if trace and not self.is_trace:
return None
sep = " " if space else self.separating_character
border_text = f"|{self.separating_character * self.screen_width}|"
if border:
self.print(border_text, debug=debug, trace=trace)
if text:
text_list = text.split("\n")
for t in text_list:
msg = f"|{sep}{self._centered(t, sep=sep, side_space=side_space, left=left)}{sep}|"
self.print(msg, debug=debug, trace=trace)
if border:
self.print(border_text, debug=debug, trace=trace)
def print(self, msg, error=False, warning=False, debug=False, trace=False):
if error:
self.error(msg)
elif warning:
self.warning(msg)
elif debug:
self.debug(msg)
elif trace:
self.trace(msg)
else:
self.info(msg)
def debug(self, msg, *args, **kwargs):
if self._logger.isEnabledFor(DEBUG):
self._log(DEBUG, str(msg), args, **kwargs)
def info_center(self, msg, *args, **kwargs):
self.info(self._centered(str(msg)), *args, **kwargs)
def info(self, msg, *args, **kwargs):
if self._logger.isEnabledFor(INFO):
self._log(INFO, str(msg), args, **kwargs)
def warning(self, msg, *args, **kwargs):
if self._logger.isEnabledFor(WARNING):
self._log(WARNING, str(msg), args, **kwargs)
def trace(self, msg, *args, **kwargs):
if self.is_trace:
self._log(TRACE, str(msg), args, **kwargs)
def error(self, msg, *args, **kwargs):
if self.save_errors:
self.saved_errors.append(msg)
if self._logger.isEnabledFor(ERROR):
self._log(ERROR, str(msg), args, **kwargs)
def critical(self, msg, *args, **kwargs):
if self.save_errors:
self.saved_errors.append(msg)
if self._logger.isEnabledFor(CRITICAL):
self._log(CRITICAL, str(msg), args, **kwargs)
def stacktrace(self, trace=False):
self.print(traceback.format_exc(), debug=not trace, trace=trace)
def _space(self, display_title):
display_title = str(display_title)
space_length = self.spacing - len(display_title)
if space_length > 0:
display_title += " " * space_length
return display_title
def ghost(self, text):
if not self.ignore_ghost:
try:
print(self._space(f"| {text}"), end="\r")
except UnicodeEncodeError:
text = text.encode("utf-8")
print(self._space(f"| {text}"), end="\r")
self.spacing = len(text) + 2
def exorcise(self):
if not self.ignore_ghost:
print(self._space(" "), end="\r")
self.spacing = 0
def secret(self, text):
if text and str(text) not in self.secrets:
self.secrets.append(str(text))
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, stacklevel=1):
trace = level == TRACE
log_only = False
if trace:
level = DEBUG
if trace or msg.startswith("|"):
self._formatter(trace=trace, border=not msg.startswith("|"))
if self.spacing > 0:
self.exorcise()
if "\n" in msg:
for i, line in enumerate(msg.split("\n")):
self._log(level, line, args, exc_info=exc_info, extra=extra, stack_info=stack_info, stacklevel=stacklevel)
if i == 0:
self._formatter(log_only=True, space=True)
log_only = True
else:
for secret in self.secrets:
if secret in msg:
msg = msg.replace(secret, "(redacted)")
if "HTTPConnectionPool" in msg:
msg = re.sub("HTTPConnectionPool\\((.*?)\\)", "HTTPConnectionPool(redacted)", msg)
if "HTTPSConnectionPool" in msg:
msg = re.sub("HTTPSConnectionPool\\((.*?)\\)", "HTTPSConnectionPool(redacted)", msg)
try:
if not _srcfile:
raise ValueError
fn, lno, func, sinfo = self.findCaller(stack_info, stacklevel)
except ValueError:
fn, lno, func, sinfo = "(unknown file)", 0, "(unknown function)", None
if exc_info:
if isinstance(exc_info, BaseException):
exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
elif not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
record = self._logger.makeRecord(self._logger.name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
self._logger.handle(record)
if trace or log_only or msg.startswith("|"):
self._formatter()
def findCaller(self, stack_info=False, stacklevel=1):
f = logging.currentframe()
if f is not None:
f = f.f_back
orig_f = f
while f and stacklevel > 1:
f = f.f_back
stacklevel -= 1
if not f:
f = orig_f
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if filename == _srcfile:
f = f.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):\n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == '\n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
break
return rv