-
Notifications
You must be signed in to change notification settings - Fork 27
/
synthesizer.py
139 lines (103 loc) · 4.34 KB
/
synthesizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import numpy as np
import itertools
import multiprocessing as mp
from sklearn.metrics import f1_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
import copy_reg
import types
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
class Synthesizer(object):
"""
A class to synthesize heuristics from primitives and validation labels
"""
def __init__(self, primitive_matrix, val_ground,b=0.5):
"""
Initialize Synthesizer object
b: class prior of most likely class
beta: threshold to decide whether to abstain or label for heuristics
"""
self.val_primitive_matrix = primitive_matrix
self.val_ground = val_ground
self.p = np.shape(self.val_primitive_matrix)[1]
self.b=b
def generate_feature_combinations(self, max_cardinality=1):
"""
Create a list of primitive index combinations for given cardinality
max_cardinality: max number of features each heuristic operates over
"""
primitive_idx = range(self.p)
feature_combinations = []
#for cardinality in range(1, max_cardinality+1):
for comb in itertools.combinations(primitive_idx, max_cardinality):
feature_combinations.append(comb)
return feature_combinations
def fit_function(self, comb, model):
"""
Fits a single logistic regression or decision tree model
comb: feature combination to fit model over
model: fit logistic regression or a decision tree
"""
X = self.val_primitive_matrix[:,comb]
if np.shape(X)[0] == 1:
X = X.reshape(-1,1)
# fit decision tree or logistic regression
if model == 'dt':
dt = DecisionTreeClassifier(max_depth=len(comb))
dt.fit(X,self.val_ground)
return dt
elif model == 'lr':
lr = LogisticRegression()
lr.fit(X,self.val_ground)
return lr
def generate_heuristics(self, model, max_cardinality=1):
"""
Generates heuristics over given feature cardinality
model: fit logistic regression or a decision tree
max_cardinality: max number of features each heuristic operates over
"""
feature_combinations = self.generate_feature_combinations(max_cardinality)
m = len(feature_combinations)
# pool = mp.Pool(processes=5)
# heuristics = [pool.apply(self.fit_function, args=(comb,model)) for comb in feature_combinations]
# import pdb; pdb.set_trace()
heuristics = []
for i,comb in enumerate(feature_combinations):
heuristics.append(self.fit_function(comb, model))
return heuristics, feature_combinations
def beta_optimizer(self,marginals, ground):
"""
Returns the best beta parameter for abstain threshold given marginals
Uses F1 score that maximizes the F1 score
marginals: confidences for data from a single heuristic
"""
#Set the range of beta params
#0.25 instead of 0.0 as a min makes controls coverage better
beta_params = np.linspace(0.25,0.45,10)
f1 = []
for beta in beta_params:
labels_cutoff = np.zeros(np.shape(marginals))
labels_cutoff[marginals <= (self.b-beta)] = -1.
labels_cutoff[marginals >= (self.b+beta)] = 1.
f1.append(f1_score(ground, labels_cutoff, average='micro'))
f1 = np.nan_to_num(f1)
return beta_params[np.argmax(np.array(f1))]
def find_optimal_beta(self, heuristics, X, ground):
"""
Returns optimal beta for given heuristics
heuristics: list of pre-trained logistic regression models
X: primitive matrix to apply heuristics to
ground: ground truth associated with X data
"""
beta_opt = []
for i,hf in enumerate(heuristics):
marginals = hf.predict_proba(X[:,i])[:,1]
labels_cutoff = np.zeros(np.shape(marginals))
beta_opt.append((self.beta_optimizer(marginals, ground)))
return beta_opt
#TODO: function for getting accuracies and TP FP rates