Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize the fit and decision_function methods of FeatureBagging #197

Open
wants to merge 6 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 68 additions & 69 deletions pyod/models/feature_bagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,21 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import numbers

from joblib import Parallel
from joblib.parallel import delayed
import itertools
import numpy as np
from joblib import delayed, Parallel

from sklearn.base import clone
from sklearn.utils import check_random_state
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.random import sample_without_replacement

from .lof import LOF
from .base import BaseDetector
from .sklearn_base import _partition_estimators
from .combination import average, maximization
from ..utils.utility import check_parameter
from ..utils.utility import generate_indices
from ..utils.utility import generate_bagging_indices
from ..utils.utility import check_detector

Expand Down Expand Up @@ -69,23 +66,47 @@ def _set_random_states(estimator, random_state=None):
estimator.set_params(**to_set)


# def _parallel_decision_function(estimators, estimators_features, X):
# n_samples = X.shape[0]
# scores = np.zeros((n_samples, len(estimators)))
#
# for i, (estimator, features) in enumerate(
# zip(estimators, estimators_features)):
# if hasattr(estimator, 'decision_function'):
# estimator_score = estimator.decision_function(
# X[:, features])
# scores[:, i] = estimator_score
# else:
# raise NotImplementedError(
# 'current base detector has no decision_function')
# return scores
def _parallel_decision_function(estimators, estimators_features, X):
n_samples = X.shape[0]
scores = np.zeros((n_samples, len(estimators)))

for i, (estimator, features) in enumerate(
zip(estimators, estimators_features)):
if hasattr(estimator, 'decision_function'):
estimator_score = estimator.decision_function(
X[:, features])
scores[:, i] = estimator_score
else:
raise NotImplementedError(
'current base detector has no decision_function')
return scores


def _parallel_fit_estimators(n_estimators, ensemble, X, seeds):
estimators = []
estimators_features = []

for i in range(n_estimators):
random_state = np.random.RandomState(seeds[i])

# max_features is incremented by one since random
# function is [min_features, max_features)
features = generate_bagging_indices(random_state,
ensemble.bootstrap_features,
ensemble.n_features_,
ensemble.min_features_,
ensemble.max_features_ + 1)
# initialize and append estimators
estimator = ensemble._make_estimator(append=False,
random_state=random_state)
estimator.fit(X[:, features])

estimators.append(estimator)
estimators_features.append(features)

return estimators, estimators_features


# TODO: should support parallelization at the model level
# TODO: detector score combination through BFS should be implemented
# See https://github.com/yzhao062/pyod/issues/59
class FeatureBagging(BaseDetector):
Expand Down Expand Up @@ -255,36 +276,25 @@ def fit(self, X, y=None):
param_name='max_features', high=self.n_features_,
include_left=True, include_right=True)

self.estimators_ = []
self.estimators_features_ = []

n_more_estimators = self.n_estimators - len(self.estimators_)
seeds = random_state.randint(MAX_INT, size=self.n_estimators)
self._seeds = seeds

if n_more_estimators < 0:
raise ValueError('n_estimators=%d must be larger or equal to '
'len(estimators_)=%d when warm_start==True'
% (self.n_estimators, len(self.estimators_)))
n_jobs, n_estimators, starts = _partition_estimators(self.n_estimators,
self.n_jobs)

seeds = random_state.randint(MAX_INT, size=n_more_estimators)
self._seeds = seeds
all_results = Parallel(n_jobs=n_jobs, verbose=self.verbose)(
delayed(_parallel_fit_estimators)(
n_estimators[i],
self,
X,
seeds[starts[i]:starts[i + 1]])
for i in range(n_jobs))

for i in range(self.n_estimators):
random_state = np.random.RandomState(seeds[i])

# max_features is incremented by one since random
# function is [min_features, max_features)
features = generate_bagging_indices(random_state,
self.bootstrap_features,
self.n_features_,
self.min_features_,
self.max_features_ + 1)
# initialize and append estimators
estimator = self._make_estimator(append=False,
random_state=random_state)
estimator.fit(X[:, features])
self.estimators_ = list(itertools.chain.from_iterable(
t[0] for t in all_results))

self.estimators_.append(estimator)
self.estimators_features_.append(features)
self.estimators_features_ = list(itertools.chain.from_iterable(
t[1] for t in all_results))

# decision score matrix from all estimators
all_decision_scores = self._get_decision_scores()
Expand All @@ -300,7 +310,6 @@ def fit(self, X, y=None):

def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.

The anomaly score of an input sample is computed based on different
detector algorithms. For consistency, outliers are assigned with
larger anomaly scores.
Expand All @@ -326,33 +335,23 @@ def decision_function(self, X):
"input n_features is {1}."
"".format(self.n_features_, X.shape[1]))

# Parallel loop
# n_jobs, n_estimators, starts = _partition_estimators(self.n_estimators,
# self.n_jobs)
# all_pred_scores = Parallel(n_jobs=n_jobs, verbose=self.verbose)(
# delayed(_parallel_decision_function)(
# self.estimators_[starts[i]:starts[i + 1]],
# self.estimators_features_[starts[i]:starts[i + 1]],
# X)
# for i in range(n_jobs))
#
# # Reduce
# all_pred_scores = np.concatenate(all_pred_scores, axis=1)
all_pred_scores = self._predict_decision_scores(X)
n_jobs, n_estimators, starts = _partition_estimators(self.n_estimators,
self.n_jobs)
all_pred_scores = Parallel(n_jobs=n_jobs, verbose=self.verbose)(
delayed(_parallel_decision_function)(
self.estimators_[starts[i]:starts[i + 1]],
self.estimators_features_[starts[i]:starts[i + 1]],
X)
for i in range(n_jobs))

# Reduce
all_pred_scores = np.concatenate(all_pred_scores, axis=1)

if self.combination == 'average':
return average(all_pred_scores)
else:
return maximization(all_pred_scores)

def _predict_decision_scores(self, X):
all_pred_scores = np.zeros([X.shape[0], self.n_estimators])
for i in range(self.n_estimators):
features = self.estimators_features_[i]
all_pred_scores[:, i] = self.estimators_[i].decision_function(
X[:, features])
return all_pred_scores

def _get_decision_scores(self):
all_decision_scores = np.zeros([self.n_samples_, self.n_estimators])
for i in range(self.n_estimators):
Expand Down
38 changes: 38 additions & 0 deletions pyod/test/test_feature_bagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sklearn.utils.testing import assert_greater_equal
from sklearn.utils.testing import assert_less_equal
from sklearn.utils.testing import assert_raises
from sklearn.utils.testing import assert_array_almost_equal

from sklearn.utils.estimator_checks import check_estimator

Expand Down Expand Up @@ -125,6 +126,43 @@ def test_predict_rank_normalized(self):
assert_array_less(pred_ranks, 1.01)
assert_array_less(-0.1, pred_ranks)

def test_parallel(self):
feat_bag = FeatureBagging(
n_jobs=3,
random_state=42).fit(self.X_train, self.y_train)

# predict_proba
feat_bag.set_params(n_jobs=1)
y1 = feat_bag.predict_proba(self.X_test)
feat_bag.set_params(n_jobs=2)
y2 = feat_bag.predict_proba(self.X_test)
assert_array_almost_equal(y1, y2)

feat_bag = FeatureBagging(
n_jobs=1,
random_state=42).fit(self.X_train, self.y_train)

y3 = feat_bag.predict_proba(self.X_test)
assert_array_almost_equal(y1, y3)

# decision_function
feat_bag = FeatureBagging(
n_jobs=3,
random_state=42).fit(self.X_train, self.y_train)

feat_bag.set_params(n_jobs=1)
decisions1 = feat_bag.decision_function(self.X_test)
feat_bag.set_params(n_jobs=2)
decisions2 = feat_bag.decision_function(self.X_test)
assert_array_almost_equal(decisions1, decisions2)

feat_bag = FeatureBagging(
n_jobs=1,
random_state=42).fit(self.X_train, self.y_train)

decisions3 = feat_bag.decision_function(self.X_test)
assert_array_almost_equal(decisions1, decisions3)

def tearDown(self):
pass

Expand Down