forked from Ulm-IQO/qudi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
module.py
346 lines (291 loc) · 12.8 KB
/
module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# -*- coding: utf-8 -*-
"""
This file contains the Qudi module base class.
Qudi is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Qudi is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Qudi. If not, see <http://www.gnu.org/licenses/>.
Copyright (c) the Qudi Developers. See the COPYRIGHT.txt file at the
top-level directory of this distribution and at <https://github.com/Ulm-IQO/qudi/>
"""
import copy
import logging
import warnings
from fysom import Fysom # provides a final state machine
from collections import OrderedDict
from qtpy import QtCore
from .meta import ModuleMeta
from .configoption import MissingOption
from .connector import Connector
from .statusvariable import StatusVar
class ModuleStateMachine(QtCore.QObject, Fysom):
"""
FIXME
"""
# do not copy declaration of trigger(self, event, *args, **kwargs), just apply Slot decorator
trigger = QtCore.Slot(str, result=bool)(Fysom.trigger)
# signals
sigStateChanged = QtCore.Signal(object) # (module name, state change)
def __init__(self, parent, callbacks=None, **kwargs):
self._parent = parent
if callbacks is None:
callbacks = {}
# State machine definition
# the abbreviations for the event list are the following:
# name: event name,
# src: source state,
# dst: destination state
_baseStateList = {
'initial': 'deactivated',
'events': [
{'name': 'activate', 'src': 'deactivated', 'dst': 'idle'},
{'name': 'deactivate', 'src': 'idle', 'dst': 'deactivated'},
{'name': 'deactivate', 'src': 'running', 'dst': 'deactivated'},
{'name': 'deactivate', 'src': 'locked', 'dst': 'deactivated'},
{'name': 'run', 'src': 'idle', 'dst': 'running'},
{'name': 'stop', 'src': 'running', 'dst': 'idle'},
{'name': 'lock', 'src': 'idle', 'dst': 'locked'},
{'name': 'lock', 'src': 'running', 'dst': 'locked'},
{'name': 'unlock', 'src': 'locked', 'dst': 'idle'},
{'name': 'runlock', 'src': 'locked', 'dst': 'running'},
],
'callbacks': callbacks
}
# Initialise state machine:
super().__init__(parent=parent, cfg=_baseStateList, **kwargs)
def __call__(self):
"""
Returns the current state.
"""
return self.current
def _build_event(self, event):
"""
Overrides fysom _build_event to wrap on_activate and on_deactivate to
catch and log exceptions.
"""
base_event = super()._build_event(event)
if event in ['activate', 'deactivate']:
if event == 'activate':
noun = 'activation'
else:
noun = 'deactivation'
def wrap_event(*args, **kwargs):
self._parent.log.debug('{0} in thread {1}'.format(
noun.capitalize(),
QtCore.QThread.currentThreadId()))
try:
base_event(*args, **kwargs)
except:
self._parent.log.exception('Error during {0}'.format(noun))
return False
return True
return wrap_event
else:
return base_event
def onchangestate(self, e):
""" Fysom callback for state transition.
@param object e: Fysom state transition description
"""
self.sigStateChanged.emit(e)
class BaseMixin(metaclass=ModuleMeta):
"""
Base class for all loadable modules
* Ensure that the program will not die during the load of modules in any case,
and therefore do nothing!!!
* Initialize modules
* Provides a self identification of the used module
* Output redirection (instead of print)
* Provides a self de-initialization of the used module
* Reload the module with code changes
* Get your own configuration (for save)
* Get name of status variables
* Get status variables
* Reload module data (from saved variables)
"""
_threaded = False
_connectors = dict()
def __init__(self, manager, name, config=None, callbacks=None, **kwargs):
""" Initialise Base class object and set up its state machine.
@param object self: the object being initialised
@param object manager: the manager object that
@param str name: unique name for this object
@param dict configuration: parameters from the configuration file
@param dict callbacks: dictionary specifying functions to be run
on state machine transitions
"""
super().__init__(**kwargs)
if config is None:
config = {}
if callbacks is None:
callbacks = {}
default_callbacks = {
'onactivate': self.__load_status_vars_activate,
'ondeactivate': self.__save_status_vars_deactivate
}
default_callbacks.update(callbacks)
self.module_state = ModuleStateMachine(parent=self, callbacks=default_callbacks)
# add connectors
self.connectors = OrderedDict()
for cname, con in self._conn.items():
self.connectors[con.name] = con
# add connection base (legacy)
for con in self._connectors:
self.connectors[con] = OrderedDict()
self.connectors[con]['class'] = self._connectors[con]
self.connectors[con]['object'] = None
# add config options
for oname, opt in self._config_options.items():
if opt.name in config:
cfg_val = config[opt.name]
else:
if opt.missing == MissingOption.error:
raise Exception(
'Required variable >> {0} << not given in configuration.\n'
'Configuration is: {1}'.format(opt.name, config))
elif opt.missing == MissingOption.warn:
self.log.warning(
'No variable >> {0} << configured, using default value {1} instead.'
''.format(opt.name, opt.default))
elif opt.missing == MissingOption.info:
self.log.info(
'No variable >> {0} << configured, using default value {1} instead.'
''.format(opt.name, opt.default))
cfg_val = opt.default
if opt.check(cfg_val):
converted_val = opt.convert(cfg_val)
if opt.constructor_function is None:
setattr(self, opt.var_name, converted_val)
else:
setattr(self, opt.var_name, opt.constructor_function(self, converted_val))
self._manager = manager
self._name = name
self._configuration = config
self._statusVariables = OrderedDict()
def __load_status_vars_activate(self, event):
""" Restore status variables before activation.
@param e: Fysom event
"""
# add status vars
sv = self._statusVariables
for vname, var in self._stat_vars.items():
if isinstance(var.default, dict) and var.name in sv:
svar = copy.deepcopy(var.default)
svar.update(sv[var.name])
else:
svar = sv[var.name] if var.name in sv else copy.deepcopy(var.default)
if var.constructor_function is None:
setattr(self, var.var_name, svar)
else:
setattr(self, var.var_name, var.constructor_function(self, svar))
# activate
self.on_activate()
def __save_status_vars_deactivate(self, event):
""" Save status variables after deactivation.
@param e: Fysom event
"""
try:
self.on_deactivate()
except Exception as e:
raise e
finally:
# save status vars even if deactivation failed
for vname, var in self._stat_vars.items():
if hasattr(self, var.var_name):
value = getattr(self, var.var_name)
if not isinstance(value, StatusVar):
if var.representer_function is None:
self._statusVariables[var.name] = value
else:
self._statusVariables[var.name] = var.representer_function(self, value)
@property
def log(self):
"""
Returns a logger object
"""
return logging.getLogger("{0}.{1}".format(
self.__module__, self.__class__.__name__))
@property
def is_module_threaded(self):
"""
Returns whether the module shall be started in a thread.
"""
return self._threaded
def on_activate(self):
""" Method called when module is activated. If not overridden
this method returns an error.
"""
self.log.error('Please implement and specify the activation method '
'for {0}.'.format(self.__class__.__name__))
def on_deactivate(self):
""" Method called when module is deactivated. If not overridden
this method returns an error.
"""
self.log.error('Please implement and specify the deactivation '
'method {0}.'.format(self.__class__.__name__))
def getStatusVariables(self):
""" Return a dict of variable names and their content representing
the module state for saving.
@return dict: variable names and contents.
@deprecated declare and use StatusVar class variables directly
"""
warnings.warn('getStatusVariables is deprecated and will be removed in future versions. Use '
'StatusVar instead.', DeprecationWarning)
return self._statusVariables
def setStatusVariables(self, variableDict):
""" Give a module a dict of variable names and their content
representing the module state.
@param OrderedDict dict: variable names and contents.
@deprecated declare and use StatusVar class variables
"""
warnings.warn('setStatusVariables is deprecated and will be removed in future versions. Use '
'StatusVar instead.', DeprecationWarning)
if not isinstance(variableDict, (dict, OrderedDict)):
self.log.error('Did not pass a dict or OrderedDict to '
'setStatusVariables in {0}.'.format(
self.__class__.__name__))
return
self._statusVariables = variableDict
def getConfiguration(self):
""" Return the configration dictionary for this module.
@return dict: confiuration dictionary
@deprecated declare and use ConfigOption class variables directly
"""
warnings.warn('getConfiguration is deprecated and will be removed in future versions. Use '
'ConfigOptions instead.', DeprecationWarning)
return self._configuration
def get_connector(self, connector_name):
""" Return module connected to the given named connector.
@param str connector_name: name of the connector
@return obj: module that is connected to the named connector
@deprecated instead of get_connector(connector_name) just use connector_name(). Enabled by using Connector
objects as class variables
"""
warnings.warn('get_connector is deprecated and will be removed in future versions. Use '
'Connector() callable instead.', DeprecationWarning)
if connector_name in self.connectors:
connector = self.connectors[connector_name]
# new style legacy connector
if isinstance(connector, Connector):
return connector()
# legacy legacy connector
elif isinstance(connector, dict):
obj = connector['object']
if obj is None:
raise TypeError('No module connected')
return obj
else:
raise Exception(
'Entry {0} in connector dict is of wrong type {1}.'
''.format(connector_name, type(connector)))
else:
raise Exception(
'Connector {0} does not exist.'
''.format(connector_name))
class Base(QtCore.QObject, BaseMixin):
pass