-
Notifications
You must be signed in to change notification settings - Fork 2
/
serialcompy.py
309 lines (242 loc) · 8.74 KB
/
serialcompy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#! /usr/bin/env python3.7
# -*- coding: utf-8 -*-
# Created by kayrlas on August 10, 2019 (https://github.com/kayrlas)
# serialcompy.py
import time
from threading import Thread
from serial import Serial
from serial.tools import list_ports
from serial.serialutil import SerialException
class SerialCom(object):
"""Class of serial communication
Args:
`baudrate (int)`: Serial communication symbol rate
`timeout (float)`: Serial communication connection timeout
`writemode (bool)`: Serial write accept
Example:
```python
if __name__ == "__main__":
com = SerialCom()
com.start_serialcom(9600, 0.1 , true)
```
"""
def __init__(self, baudrate: int, timeout: float, writemode: bool):
self.devices = [] # find_comports
self.device = None # select_comport
self.serial = Serial(baudrate=baudrate, timeout=timeout)
self.writemode = writemode
def find_comports(self) -> list:
"""Find comports and save to `self.devices`
Returns:
`list`: List of comports
"""
_ports = list_ports.comports()
_devices = [info for info in _ports]
self.devices = _devices
return _devices
def select_comport(self, *, devices=[]) -> bool:
"""Select a comport from list
Args (option):
`devices`: List of comports
Returns:
`bool`: Saving `self.device` successfully
"""
if devices == []:
_devices = self.devices
_num_devices = len(self.devices)
else:
_devices = devices
_num_devices = len(devices)
if _num_devices == 0: # No device
print("Device not found")
return False
elif _num_devices == 1: # Only one device
print("Only found %s" % _devices[0])
self.device = _devices[0].device
return True
else: # Some devices
print("Connected comports are as follows:")
for i in range(_num_devices):
print("%d : %s" % (i, _devices[i]))
_inp_num = input("Input the number of your target port >> ")
if not _inp_num.isdecimal():
print("%s is not a number!" % _inp_num)
return False
elif int(_inp_num) in range(_num_devices):
self.device = _devices[int(_inp_num)].device
return True
else:
print("%s is out of the number!" % _inp_num)
return False
def register_comport(self, *, device=None) -> bool:
"""Save a comport to `self.serial.port`
Args (option):
`device`: Comport name
Returns:
`bool`: Registered comport successfully
"""
if device is None:
_device = self.device
else:
_device = device
# Check device specified or not
if _device is None:
print("Device has not been specified yet!")
return False
else:
self.serial.port = _device
return True
def open_comport(self, *, device=None) -> bool:
"""Open the comport
Args (option):
`device`: Comport name
Returns:
`bool`: Open comport successfully
"""
if device is None:
_b_reg = self.register_comport()
else:
_b_reg = self.register_comport(device=device)
if not _b_reg:
return False
# Input Yes/No
_inp_yn = input("Open %s ? [Yes/No] >> " % self.serial.port).lower()
if _inp_yn in ["y", "yes"]: # YES
print("Opening...")
try:
self.serial.open()
except SerialException:
print("%s was disconnected while you input [Yes/No]" % self.serial.port)
return False
else:
return True
elif _inp_yn in ["n", "no"]: # NO
print("Canceled")
return False
else: # Other than [Yes/No]
print("Oops, you didn't enter [Yes/No]. Please try again.")
return False
def close_comport(self) -> bool:
"""Close the comport
Returns:
`bool`: Close comport successfully
"""
try:
self.serial.close()
except AttributeError:
print("open_comport has not been called yet!")
return False
else:
print("Closing...")
return True
def serial_write(self):
"""Write strings to comport
Returns:
None
"""
_format = "%Y/%m/%d %H:%M:%S"
while self.serial.is_open:
_t1 = time.strftime(_format, time.localtime())
try:
_send_data = input(_t1 + " (TX) >> ")
self.serial.write(_send_data.encode("utf-8"))
except EOFError:
print("Sending texts canceled.")
self.close_comport()
except SerialException:
print("%s was disconnected while writing texts" % self.serial.port)
self.close_comport()
else:
time.sleep(1)
def start_serialwrite(self):
"""Start `serial_write` in another thread
Returns:
None
"""
self.th_swrite = Thread(target=self.serial_write)
self.th_swrite.start()
def serial_read(self):
"""Read strings from comport
Returns:
None
"""
_format = "%Y/%m/%d %H:%M:%S"
while self.serial.is_open:
try:
_recv_data = self.serial.readline()
except SerialException:
print("%s was disconnected while reading comport" % self.serial.port)
self.close_comport()
else:
if _recv_data != b'':
_t1 = time.strftime(_format, time.localtime())
print(_t1 + " (RX) : " + _recv_data.strip().decode("utf-8"))
time.sleep(1)
def start_serialcom(self) -> bool:
"""Start serial communication
Returns:
`bool`: Serial communication normally
"""
self.find_comports()
if not self.select_comport():
print("Device has not been specified yet!")
return False
if not self.open_comport():
print("Cannot open the comport. Please try again.")
return False
if self.writemode:
self.start_serialwrite()
try:
self.serial_read()
except KeyboardInterrupt:
self.close_comport()
if self.writemode:
print("Please input Enter if this terminal is still locked.")
self.th_swrite.join()
print("Comport has been closed. See you.")
return True
def start_serialcom_option(self) -> bool:
"""Start serial communication (using option args)
Returns:
`bool`: Serial communication normally
"""
_devices = self.find_comports()
if not self.select_comport(devices=_devices):
print("Device has not been specified yet!")
return False
_device = self.get_selected_device()
if not self.open_comport(device=_device):
print("Cannot open the comport. Please try again.")
return False
if self.writemode:
self.start_serialwrite()
try:
self.serial_read()
except KeyboardInterrupt:
self.close_comport()
if self.writemode:
print("Please input Enter if this terminal is still locked.")
self.th_swrite.join()
print("Comport has been closed. See you.")
return True
def get_found_devices(self) -> list:
"""Return `self.devices`
Returns:
`list`: comports list
"""
return self.devices
def get_selected_device(self):
"""Return `self.device`
Returns:
`list_ports.comports().device`: selected comport
"""
return self.device
def get_write_available(self) -> bool:
"""Return `self.writemode`
Returns:
`bool`: serial write accept
"""
return self.writemode
if __name__ == "__main__":
com = SerialCom(baudrate=9600, timeout=0.1, writemode=True)
com.start_serialcom()