-
Notifications
You must be signed in to change notification settings - Fork 33
/
log-tail-check
executable file
·134 lines (105 loc) · 4.11 KB
/
log-tail-check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
import os, sys, re, hashlib, struct, time, calendar, datetime as dt
class Conf:
xattr_pos = 'user.log-tail-check.pos'
xattr_ts = 'user.log-tail-check.ts'
tail_check_size = 200
line_re = r'^(?P<ts>\S+)\s+(?P<name>\S+)\s*$'
ts_re = r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(?:\.\d+)?([-+]\d{2}:\d{2}|Z)$'
ts_tpl = '%Y-%m-%dT%H:%M:%S'
check_names = list()
check_timeout = 60 * 60
class TailCheckError(Exception): pass
tail_pos_hash = lambda buff: hashlib.blake2s(
buff, digest_size=9, key=b'tail-chk' ).digest()
tail_pos_struct = struct.Struct('>IH9s')
def tail_seek(src, pos):
if not pos: raise TailCheckError
pos, mark_len, mark_chk = tail_pos_struct.unpack(pos)
src.seek(pos)
buff = src.read(mark_len)
if len(buff) != mark_len or tail_pos_hash(buff) != mark_chk: raise TailCheckError
def tail_pos(src, tail_check_size):
pos0 = src.tell()
src.seek(-min(tail_check_size, pos0), os.SEEK_CUR)
pos = src.tell()
return tail_pos_struct.pack( pos,
mark_len := pos0 - pos, tail_pos_hash(src.read(mark_len)) )
def parse_ts(ts, ts_re, ts_tpl):
m = ts_re.search(ts)
assert m, ts
ts, ts_tz = m.groups()
ts = dt.datetime.strptime(ts, ts_tpl)
if ts_tz == 'Z': ts_tz = dt.timedelta(0)
else:
(s,hh,mm), = re.findall(r'^([-+])(\d{2}):(\d{2})$', ts_tz)
ts_tz = dt.timedelta(hours=int(hh), minutes=int(mm))
if s == '+': ts_tz = -ts_tz
ts += ts_tz
return int(calendar.timegm(ts.timetuple()))
def name_map_dump(name_ts_map):
return b''.join(( name.encode() + b'\0' +
ts.to_bytes(4, 'big') ) for name, ts in name_ts_map.items())
def name_map_load(buff):
name_ts_map = dict()
while buff:
name, buff = buff.split(b'\0', 1)
ts, buff = int.from_bytes(buff[:4], 'big'), buff[4:]
name_ts_map[name.decode()] = ts
return name_ts_map
def check_tail(src, conf, report=True, ts_base=dict()):
pos_old = None
try:
pos_old = os.getxattr(src.name, conf.xattr_pos)
tail_seek(src, pos_old)
except (TailCheckError, OSError): src.seek(0)
ts_last = dict()
line_re, ts_re = map(re.compile, (conf.line_re, conf.ts_re))
for line in iter(src.readline, b''):
try: line = line.decode()
except Exception as err:
print(( 'tail check error - failed to decode line ([{}] {}),'
' skipping: {!r}' ).format(err.__class__.__name__, err, line))
continue
m = line_re.search(line.strip('\0')) # nul-bytes is a common fs corruption
if not m: continue
ts_str, name = m.group('ts'), m.group('name')
ts = parse_ts(ts_str, ts_re, conf.ts_tpl)
ts_last[name] = max(ts_last.get(name, 0), ts)
pos = tail_pos(src, conf.tail_check_size)
if pos != pos_old: os.setxattr(src.name, conf.xattr_pos, pos)
try: ts_file = name_map_load(os.getxattr(src.name, conf.xattr_ts))
except OSError: ts_file = dict()
ts_now = time.time()
for name in conf.check_names:
ts = ts_base.get(name), ts_file.get(name), ts_last.get(name)
ts = any(ts) and max(filter(None, ts))
if ts: ts_last[name] = ts
if not ts or ts_now - ts > conf.check_timeout:
if not ts: ts = ts_diff = None
else:
ts, ts1 = dt.datetime.fromtimestamp(ts), dt.datetime.fromtimestamp(ts_now)
ts_diff = ts1 - ts
if report: print(f'tail check failed for name: {name} (last: {ts}, delta: {ts_diff})')
if sorted(ts_file.items()) != sorted(ts_last.items()):
os.setxattr(src.name, conf.xattr_ts, name_map_dump(ts_last))
return ts_last
def main(args=None, defaults=None):
conf = defaults or Conf()
import argparse
parser = argparse.ArgumentParser(
description='Process new entries from a log file,'
' find names that fail to appear there within timeout interval from now.')
parser.add_argument('log', nargs='+',
help='Path(s) to log file(s) to process. Last one should be latest.')
parser.add_argument('-n', '--names', metavar='name',
action='append', help='Name(s) to track in specified file(s).')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
conf.check_names = opts.names or list()
ts_base = dict()
for path in opts.log:
report = path == opts.log[-1]
if report or os.path.exists(path):
with open(path, 'rb') as src:
ts_base = check_tail(src, conf, report=report, ts_base=ts_base)
if __name__ == '__main__': sys.exit(main())