-
Notifications
You must be signed in to change notification settings - Fork 5
/
util.py
177 lines (148 loc) · 7.67 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import tensorflow as tf
import pickle
import os
from sklearn.model_selection import train_test_split
import numpy as np
from models.metrics import get_custom_metrics
from keras.utils.np_utils import to_categorical
import random
import matplotlib.pyplot as plt
def convert_yaml_config(config):
"""
Convert str in :param config to type Obj
:param config: type dict, config parameters
:return: config : type dict
"""
if 'convolution_parameter' in config.keys():
if 'kernel_regularizer' in config['convolution_parameter'].keys():
if config['convolution_parameter']['kernel_regularizer'][0] == 'l2':
config['convolution_parameter']['kernel_regularizer'] = \
tf.keras.regularizers.l2(l=config['convolution_parameter']['kernel_regularizer'][1])
elif config['convolution_parameter']['kernel_regularizer'][0] == 'l1':
config['convolution_parameter']['kernel_regularizer'] = \
tf.keras.regularizers.l1(l=config['convolution_parameter']['kernel_regularizer'][1])
# function for remove redundant brackets of metrics
flatten = lambda x: [y for l in x for y in flatten(l)] if type(x) is list else [x]
tf_metrics = [t_metric for t_metric in config['tensorflow_metrics']]
custom_metrics = flatten(config['custom_metrics'])
config['metrics'] = tf_metrics + custom_metrics
return config
def split(config):
"""
Split the dataset paths into training, validation, and test dataset paths (patient leave-out approach)."
:param config: type dict: config parameter
:return:
"""
for dt in config['dataset']:
if config['read_body_identification']:
filename=config['dir_list_tfrecord']+'/' + config['filename_tfrec_pickle'][dt] + '_bi.pickle'
else:
filename = config['dir_list_tfrecord'] + '/' + config['filename_tfrec_pickle'][dt] + '.pickle'
with open(filename, 'rb') as fp:
dataset_path = pickle.load(fp)
# pickle files dict patterns must be:
# {'image': tfrecord list of images path,'label': tfrecord list of labels path }
dataset_image_path = dataset_path['image'] # type list of str
dataset_label_path = dataset_path['label'] # type list of str
dict_dataset = dict()
if config['test_subject'][dt] is not None:
path_test_img, path_test_label = [dataset_image_path[idx] for idx in config['test_subject'][dt]], [dataset_label_path[idx] for idx in config['test_subject'][dt]]
path_train_val_img, path_train_val_label = [dataset_image_path[idx] for idx in range(len(dataset_image_path)) if not idx in config['test_subject'][dt]], [dataset_label_path[idx] for idx in range(len(dataset_label_path)) if not idx in config['test_subject'][dt]]
else:
path_train_val_img, path_test_img, path_train_val_label, path_test_label = train_test_split \
(dataset_image_path, dataset_label_path, test_size=config['ratio_test_to_train_val'],
random_state=config['seed_random_split'])
dict_dataset['path_test_img'], dict_dataset['path_test_label'] = path_test_img, path_test_label
dict_dataset['path_train_val_img'], dict_dataset[
'path_train_val_label'] = path_train_val_img, path_train_val_label
path_train_img, path_val_img, path_train_label, path_val_label = \
train_test_split(path_train_val_img, path_train_val_label, test_size=config['ratio_val_to_train'],
random_state=config['seed_random_split'])
dict_dataset['path_train_img'], dict_dataset['path_train_label'] = path_train_img, path_train_label
dict_dataset['path_val_img'], dict_dataset['path_val_label'] = path_val_img, path_val_label
if not os.path.exists(config['dir_dataset_info']): os.makedirs(config['dir_dataset_info'])
if config['read_body_identification']:
pickle_filename = config['dir_dataset_info']+'/split_paths_' + dt + '_bi.pickle'
else:
pickle_filename = config['dir_dataset_info'] + '/split_paths_' + dt + '.pickle'
with open(pickle_filename, 'wb') as fp:
pickle.dump(dict_dataset, fp, protocol=pickle.HIGHEST_PROTOCOL)
print('Sucessfully save split paths files.')
def convert_tf_optimizer(config):
"""
Parse str in the yaml file -> tensorflow optimizer functions
:param config: type dict: config parameter
:return: function in tf.keras.optimizers
"""
optimizer_name = config['optimizer']['name']
if optimizer_name == 'SGD':
return tf.keras.optimizers.SGD(**config['optimizer']['args'])
elif optimizer_name == 'Nadam':
return tf.keras.optimizers.Nadam(**config['optimizer']['args'])
elif optimizer_name == 'Adadelta':
return tf.keras.optimizers.Adadelta(**config['optimizer']['args'])
elif optimizer_name == 'Adagrad':
return tf.keras.optimizers.Adagrad(**config['optimizer']['args'])
elif optimizer_name == 'Adam':
return tf.keras.optimizers.Adam(**config['optimizer']['args'])
elif optimizer_name == 'Adamax':
return tf.keras.optimizers.Adamax(**config['optimizer']['args'])
elif optimizer_name == 'Ftrl':
return tf.keras.optimizers.Ftrl(**config['optimizer']['args'])
elif optimizer_name == 'RMSprop':
return tf.keras.optimizers.RMSprop(**config['optimizer']['args'])
def convert_integers_to_onehot(img, num_classes=3):
"""
Convert integers-image to onehot-image
:param img: type ndarray
:param num_classes: type int
:return: ndarray onehot image
"""
# if some values in img > num_classes-1=> error
return to_categorical(img, num_classes=num_classes)
def convert_onehot_to_integers(img, axis=-1):
"""
Convert integers-image to onehot-image
:param img: type ndarray
:param num_classes: type int
:return: ndarray onehot image
"""
return np.argmax(img, axis=axis)
def convert_onehot_to_integers_add_bg(img, axis=-1):
"""
Convert onehot-image to integers-image and add background with filling 0
:param img: type ndarray
:return: ndarray onehot image
"""
shape_img = list(img.shape)
shape_img[-1] += 1
new_img = np.zeros(tuple(shape_img))
new_img[..., 1:] = img
return np.argmax(new_img, axis=axis)
def get_thresholds (decision_map, n_classes=6, row_dim=1):
"""
Special for the network body part identification.
Calculates thresholds (line) of predicted decision map
:param: decision_map: type ndarray (2D), each element represents one class
:return: thresholds: type ndarray of size(1,n_classes), threshold line of the class.
"""
# Rotate the dimension of decision map for caluating the max pixel value of each column
decision_map=decision_map.T
row_max = np.zeros(decision_map.shape[0], dtype='int32')
# calculate max value( highest frequency class) of each decision map row
for i in range(decision_map.shape[0]):
class_cnt = np.zeros(n_classes, dtype='int32') # calculate sum of each elem in decision_map row
for j in range(decision_map.shape[1]):
class_cnt[decision_map[i,j]] += 1
row_max[i] = np.argmax(class_cnt)
thresholds = np.zeros(n_classes, dtype='int32')
idx = 0
# Determine the threshold line: ... class n , class n |threshold line| class n+1, class n+1 ...
for i in range(decision_map.shape[0] - 2):
# if row_max[i + 1] == row_max[i + 2] --> this row is the threshold line of this class
if row_max[i] != row_max[i + 1] and row_max[i + 1] == row_max[i + 2]:
thresholds[idx] = i + 1
idx += 1
if idx == n_classes - 1:
return thresholds
return thresholds