-
Notifications
You must be signed in to change notification settings - Fork 0
/
ping_process.py
69 lines (51 loc) · 1.68 KB
/
ping_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import socket
from time import sleep
from network.icmp_echo_reply import ICMPEchoReply
from network.icmp_socket import ICMPSocket
from network.icmp_echo_request import ICMPEchoRequest
from utils.timer import Timer
from utils.printer import Printer
def run(host: str, port: int, count: int, interval: float, wait: float):
Printer.start(host)
seq = 1
times = []
while True:
try:
time = ping(host, port, wait, seq, interval)
times.append(time)
if count == seq:
break
seq += 1
except KeyboardInterrupt:
Printer.print_statistics(host, times)
return
Printer.print_statistics(host, times)
def ping(host: str, port: int, wait: float, seq: int, interval: float):
try:
reply, timer = _ping(host, port, wait)
if interval is not None:
sleep(interval)
if reply.type != 0:
Printer.print_unexpected_type(reply.type)
elif reply.code != 0:
Printer.print_unexpected_code(reply.code)
else:
Printer.print_success_ping(
host, port, seq, reply.ttl, timer.get_ms_str()
)
return timer.get_ms()
except socket.timeout:
Printer.print_timelimit()
except (socket.gaierror, ValueError):
Printer.print_incorrect_host()
raise KeyboardInterrupt
return None
def _ping(host: str, port: int, wait: float) -> (ICMPEchoReply, Timer):
timer = Timer()
timer.start()
sock = ICMPSocket(wait)
reply = sock.send_to(ICMPEchoRequest(), host, port)
icmp_reply = ICMPEchoReply(reply)
timer.stop()
sock.close()
return icmp_reply, timer