-
Notifications
You must be signed in to change notification settings - Fork 180
/
task_runner.py
378 lines (305 loc) · 14.7 KB
/
task_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Interactive API package."""
from logging import getLogger
import numpy as np
from openfl.utilities import split_tensor_dict_for_holdouts
from openfl.utilities import TensorKey
class CoreTaskRunner(object):
"""Federated Learning Task Runner Class."""
def _prepare_tensorkeys_for_agggregation(self, metric_dict, validation_flag,
col_name, round_num):
"""
Prepare tensorkeys for aggregation.
Returns (global_tensor_dict, local_tensor_dict)
"""
global_tensor_dict, local_tensor_dict = {}, {}
origin = col_name
if not validation_flag:
# Output metric tensors (scalar)
tags = ('trained',)
# output model tensors (Doesn't include TensorKey)
output_model_dict = self.get_tensor_dict(with_opt_vars=True)
global_model_dict, local_model_dict = split_tensor_dict_for_holdouts(
self.logger, output_model_dict,
**self.tensor_dict_split_fn_kwargs
)
# Create global tensorkeys
global_tensorkey_model_dict = {
TensorKey(tensor_name, origin, round_num, False, tags):
nparray for tensor_name, nparray in global_model_dict.items()}
# Create tensorkeys that should stay local
local_tensorkey_model_dict = {
TensorKey(tensor_name, origin, round_num, False, tags):
nparray for tensor_name, nparray in local_model_dict.items()}
# The train/validate aggregated function of the next
# round will look for the updated model parameters.
# This ensures they will be resolved locally
next_local_tensorkey_model_dict = {TensorKey(
tensor_name, origin, round_num + 1, False, ('model',)): nparray
for tensor_name, nparray in local_model_dict.items()}
global_tensor_dict = global_tensorkey_model_dict
local_tensor_dict = {**local_tensorkey_model_dict, **next_local_tensorkey_model_dict}
# Update the required tensors if they need to be
# pulled from the aggregator
# TODO this logic can break if different collaborators
# have different roles between rounds.
# For example, if a collaborator only performs validation
# in the first round but training
# in the second, it has no way of knowing the optimizer
# state tensor names to request from the aggregator
# because these are only created after training occurs.
# A work around could involve doing a single epoch of training
# on random data to get the optimizer names,
# and then throwing away the model.
if self.opt_treatment == 'CONTINUE_GLOBAL':
self.initialize_tensorkeys_for_functions(with_opt_vars=True)
# This will signal that the optimizer values are now present,
# and can be loaded when the model is rebuilt
self.training_round_completed = True
else:
suffix = 'validate' + validation_flag
tags = (suffix,)
tags = ('metric', *tags)
metric_dict = {
TensorKey(metric, origin, round_num, True, tags):
np.array(value) for metric, value in metric_dict.items()
}
global_tensor_dict = {**global_tensor_dict, **metric_dict}
return global_tensor_dict, local_tensor_dict
def adapt_tasks(self):
"""
Prepare tasks for the collaborator.
Using functions from a task provider (deserialized interface object) and
registered task contracts prepares callable tasks to be invoked by the collaborator.
Preparing includes conditional model rebuilding and filling output dicts
with tensors for aggregation and storing in local DB.
There is an assumption that any training task accepts optimizer as one
of the arguments, thus the model should be aggregated after such tasks.
"""
def task_binder(task_name, callable_task):
def collaborator_adapted_task(col_name, round_num, input_tensor_dict, **kwargs):
task_contract = self.task_provider.task_contract[task_name]
# Validation flag can be [False, '_local', '_agg']
validation_flag = True if task_contract['optimizer'] is None else False
task_settings = self.task_provider.task_settings[task_name]
device = kwargs.get('device', 'cpu')
self.rebuild_model(input_tensor_dict, validation=validation_flag, device=device)
task_kwargs = {}
if validation_flag:
loader = self.data_loader.get_valid_loader()
if kwargs['apply'] == 'local':
validation_flag = '_local'
else:
validation_flag = '_agg'
else:
loader = self.data_loader.get_train_loader()
# If train task we also pass optimizer
task_kwargs[task_contract['optimizer']] = self.optimizer
for en_name, entity in zip(['model', 'data_loader', 'device'],
[self.model, loader, device]):
task_kwargs[task_contract[en_name]] = entity
# Add task settings to the keyword arguments
task_kwargs.update(task_settings)
# Here is the training metod call
metric_dict = callable_task(**task_kwargs)
return self._prepare_tensorkeys_for_agggregation(
metric_dict, validation_flag, col_name, round_num)
return collaborator_adapted_task
for task_name, callable_task in self.task_provider.task_registry.items():
self.TASK_REGISTRY[task_name] = task_binder(task_name, callable_task)
def __init__(self, **kwargs):
"""
Initialize Task Runner.
This class is a part of the Interactive python API release.
It is no longer a user interface entity that should be subclassed
but a part of OpenFL kernel.
"""
self.set_logger()
self.kwargs = kwargs
self.TASK_REGISTRY = {}
# Why is it here
self.opt_treatment = 'RESET'
self.tensor_dict_split_fn_kwargs = {}
self.required_tensorkeys_for_function = {}
# Complete hell below
self.training_round_completed = False
# overwrite attribute to account for one optimizer param (in every
# child model that does not overwrite get and set tensordict) that is
# not a numpy array
self.tensor_dict_split_fn_kwargs.update({
'holdout_tensor_names': ['__opt_state_needed']
})
def set_task_provider(self, task_provider):
"""
Set task registry.
This method recieves Task Interface object as an argument
and uses provided callables and information to prepare
tasks that may be called by the collaborator component.
"""
if task_provider is None:
return
self.task_provider = task_provider
self.adapt_tasks()
def set_data_loader(self, data_loader):
"""Register a data loader initialized with local data path."""
self.data_loader = data_loader
def set_model_provider(self, model_provider):
"""Retrieve a model and an optimizer from the interface object."""
self.model_provider = model_provider
self.model = self.model_provider.provide_model()
self.optimizer = self.model_provider.provide_optimizer()
def set_framework_adapter(self, framework_adapter):
"""
Set framework adapter.
Setting a framework adapter allows first extraction of the weigths
of the model with the purpose to make a list of parameters to be aggregated.
"""
self.framework_adapter = framework_adapter
if self.opt_treatment == 'CONTINUE_GLOBAL':
aggregate_optimizer_parameters = True
else:
aggregate_optimizer_parameters = False
self.initialize_tensorkeys_for_functions(with_opt_vars=aggregate_optimizer_parameters)
def set_logger(self):
"""Set up the log object."""
self.logger = getLogger(__name__)
def set_optimizer_treatment(self, opt_treatment):
# SHould be removed! We have this info at the initialization time
# and do not change this one during training.
"""Change the treatment of current instance optimizer."""
self.opt_treatment = opt_treatment
def rebuild_model(self, input_tensor_dict, validation=False, device='cpu'):
"""
Parse tensor names and update weights of model. Handles the optimizer treatment.
Returns:
None
"""
if self.opt_treatment == 'RESET':
self.reset_opt_vars()
self.set_tensor_dict(input_tensor_dict, with_opt_vars=False, device=device)
elif (self.training_round_completed
and self.opt_treatment == 'CONTINUE_GLOBAL' and not validation):
self.set_tensor_dict(input_tensor_dict, with_opt_vars=True, device=device)
else:
self.set_tensor_dict(input_tensor_dict, with_opt_vars=False, device=device)
def get_required_tensorkeys_for_function(self, func_name, **kwargs):
"""
Get the required tensors for specified function that could be called as part of a task.
By default, this is just all of the layers and optimizer of the model.
Parameters
----------
None
Returns
-------
List
[TensorKey]
"""
# We rely on validation type tasks parameter `apply`
# In the interface layer we add those parameters automatically
if 'apply' not in kwargs:
return [
TensorKey(tensor_name, 'GLOBAL', 0, False, ('model',))
for tensor_name in self.required_tensorkeys_for_function['global_model_dict']
] + [
TensorKey(tensor_name, 'LOCAL', 0, False, ('model',))
for tensor_name in self.required_tensorkeys_for_function['local_model_dict']
]
if kwargs['apply'] == 'local':
return [
TensorKey(tensor_name, 'LOCAL', 0, False, ('trained',))
for tensor_name in {
**self.required_tensorkeys_for_function['local_model_dict_val'],
**self.required_tensorkeys_for_function['global_model_dict_val']
}
]
elif kwargs['apply'] == 'global':
return [
TensorKey(tensor_name, 'GLOBAL', 0, False, ('model',))
for tensor_name in self.required_tensorkeys_for_function['global_model_dict_val']
] + [
TensorKey(tensor_name, 'LOCAL', 0, False, ('model',))
for tensor_name in self.required_tensorkeys_for_function['local_model_dict_val']
]
def initialize_tensorkeys_for_functions(self, with_opt_vars=False):
"""Set the required tensors for all publicly accessible task methods.
By default, this is just all of the layers and optimizer of the model.
Custom tensors should be added to this function.
Args:
None
Returns:
None
"""
# TODO: Framework adapters should have separate methods for dealing with optimizer
# Set model dict for validation tasks
output_model_dict = self.get_tensor_dict(with_opt_vars=False)
global_model_dict_val, local_model_dict_val = split_tensor_dict_for_holdouts(
self.logger,
output_model_dict,
**self.tensor_dict_split_fn_kwargs
)
# Now set model dict for training tasks
if with_opt_vars:
output_model_dict = self.get_tensor_dict(with_opt_vars=True)
global_model_dict, local_model_dict = split_tensor_dict_for_holdouts(
self.logger, output_model_dict,
**self.tensor_dict_split_fn_kwargs
)
else:
global_model_dict = global_model_dict_val
local_model_dict = local_model_dict_val
self.required_tensorkeys_for_function['global_model_dict'] = global_model_dict
self.required_tensorkeys_for_function['local_model_dict'] = local_model_dict
self.required_tensorkeys_for_function['global_model_dict_val'] = global_model_dict_val
self.required_tensorkeys_for_function['local_model_dict_val'] = local_model_dict_val
def reset_opt_vars(self):
"""
Reset optimizer variables.
Resets the optimizer variables
"""
self.optimizer = self.model_provider.provide_optimizer()
def get_train_data_size(self):
"""
Get the number of training examples.
It will be used for weighted averaging in aggregation.
Returns:
int: The number of training examples.
"""
return self.data_loader.get_train_data_size()
def get_valid_data_size(self):
"""
Get the number of examples.
It will be used for weighted averaging in aggregation.
Returns:
int: The number of validation examples.
"""
return self.data_loader.get_valid_data_size()
def get_tensor_dict(self, with_opt_vars=False):
"""Return the tensor dictionary.
Args:
with_opt_vars (bool): Return the tensor dictionary including the
optimizer tensors (Default=False)
Returns:
dict: Tensor dictionary {**dict, **optimizer_dict}
"""
args = [self.model]
if with_opt_vars:
args.append(self.optimizer)
return self.framework_adapter.get_tensor_dict(*args)
def set_tensor_dict(self, tensor_dict, with_opt_vars=False, device='cpu'):
"""Set the tensor dictionary.
Args:
tensor_dict: The tensor dictionary
with_opt_vars (bool): Return the tensor dictionary including the
optimizer tensors (Default=False)
"""
# Sets tensors for model layers and optimizer state.
# FIXME: self.parameters() instead? Unclear if load_state_dict() or
# simple assignment is better
# for now, state dict gives us names, which is good
# FIXME: do both and sanity check each time?
args = [self.model, tensor_dict]
if with_opt_vars:
args.append(self.optimizer)
kwargs = {'device': device, }
return self.framework_adapter.set_tensor_dict(*args, **kwargs)