Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized time series k means barycenter update procedure #321

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

enricofallacara
Copy link

First of all I want to thank you for the amazing job that you have done creating this python package which is awesome and very useful. I have used it in my final thesis for a master degree in which I had to cluster some type of players based on their behaviors. The problem in using Time series k-means wiht my dataset derived from the fact that the actual implementation performs parallel computations only in distance matrices (DTW in my case),thus i re-wrote the code related to the MM algorithm and to the barycenter update proceudre in the following way: the barceynter update procedure creates a process for each cluster of the algorithm (equal to the value of k) and then each process creates a pool of threads (15 in my implementation), that are used to perform the barycenter update procedure in a parallel and efficient way.

@enricofallacara
Copy link
Author

I forgot to mention that also some redundant calls to the functions to_time_series_dataset() were removed because they caused memory allocation problems and they were basically not useful

@kushalkolar
Copy link
Contributor

Hi @enricofallacara , I'm not the maintainer for tslearn but I thought I'd share some comments since this would be useful for me.

  1. tests are failing since you're importing psutil which is not in the requirements.txt, however you do not appear to use psutil.
  2. n_jobs for joblib is hard-coded to 15, would probably be better have it as a keyword argument

@enricofallacara
Copy link
Author

Hi @enricofallacara , I'm not the maintainer for tslearn but I thought I'd share some comments since this would be useful for me.

  1. tests are failing since you're importing psutil which is not in the requirements.txt, however you do not appear to use psutil.
  2. n_jobs for joblib is hard-coded to 15, would probably be better have it as a keyword argument

You are completly right, sorry for the psutil mistake. I have used it for some stuffs and i forgot to remove it. Also, I know that I had hard-coded the number of threads, but I was in a hurry and I loaded my implementation. Tomorrow I will fix this stuffs, thanks!

@rtavenar
Copy link
Member

rtavenar commented Jan 5, 2021

Hi @enricofallacara

Your PR is now in conflict with the master branch since we have refactored the code. I can take care of the merge to resolve the conflicts if you want, after which I could do a review of your code.

@enricofallacara
Copy link
Author

enricofallacara commented Jan 5, 2021 via email

rtavenar and others added 2 commits January 5, 2021 12:08
# Conflicts:
#	tslearn/barycenters/dba.py
#	tslearn/clustering/kmeans.py
#	tslearn/metrics/dtw_variants.py

Co-authored-by: enricofallacara <enrico.fallacara@gmail.com>
# Conflicts:
#	tslearn/barycenters/dba.py
#	tslearn/clustering/kmeans.py
#	tslearn/metrics/dtw_variants.py

Co-authored-by: enricofallacara <enrico.fallacara@gmail.com>
@@ -148,7 +149,8 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None,
for dynamic time warping, with applications to clustering. Pattern
Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693
"""
X_ = to_time_series_dataset(X)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you want to avoid a copy here, but making sure that the data is in the right format is important. Maybe we shoud tweak the to_time_series_dataset function to add an avoid_copy_if_possible parameter, WDYT?

@@ -176,7 +178,7 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None,
return barycenter


def _mm_assignment(X, barycenter, weights, metric_params=None):
def _mm_assignment(X, barycenter, weights, metric_params=None, n_treads=15):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

#with parallel_backend('threading'):
res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n))
paths, dists = zip(*res)
paths = list(paths)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need these 2 casts?

cost += dist_i ** 2 * weights[i]
list_p_k.append(path)
#with parallel_backend('threading'):
res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define a cdist_dtw_path function instead?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should design a high-efficiency cdist_dtw_path function instead. The main bottlenecks are _mm_assignment function and _subgradient_valence_warping from my view. However, as DBA is communication-intensive, we require a good tune on parallelizing DBA's implementation. Perhaps, a good target is to ensure that all the available computing units are at the desired utilization level.

return list_p_k, cost


def _subgradient_valence_warping(list_p_k, barycenter_size, weights):
def _subgradient_valence_warping(list_p_k, barycenter_size, weights, n_treads):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

list_w_k.append(w_k * weights[k])
list_v_k.append(w_k.sum(axis=1) * weights[k])
return w_k
w_k_ones = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(create_w_k)(p_k,barycenter_size) for p_k in list_p_k)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part really crucial to parallelize? I would expect that it would be neglectible in terms of running times.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function cannot fully utilize CPU cores, even if we force to parallelize dba on different clusters and then becomes the bottleneck.

return list_v_k, list_w_k


def _mm_valence_warping(list_p_k, barycenter_size, weights):
def _mm_valence_warping(list_p_k, barycenter_size, weights, n_treads):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

#diag_sum_v_k = numpy.zeros(list_v_k[0].shape)
#for v_k in list_v_k:
# diag_sum_v_k += v_k
diag_sum_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(list_v_k)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, I'm not sure we need to parallelize this part.

return diag_sum_v_k, list_w_k


def _mm_update_barycenter(X, diag_sum_v_k, list_w_k):
def _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

#sum_w_x = numpy.zeros((barycenter_size, d))
#for k, (w_k, x_k) in enumerate(zip(list_w_k, X)):
# sum_w_x += w_k.dot(x_k[:ts_size(x_k)])
sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.dot)(w_k,x_k[:ts_size(x_k)]) for (w_k, x_k) in zip(list_w_k, X))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, I'm not sure we need to parallelize this part.

@@ -392,7 +412,7 @@ def _subgradient_update_barycenter(X, list_diag_v_k, list_w_k, weights_sum,


def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None,
max_iter=30, tol=1e-5, weights=None,
max_iter=30, tol=1e-5, weights=None, n_treads=15,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

@@ -514,6 +534,7 @@ def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None,
def dtw_barycenter_averaging_one_init(X, barycenter_size=None,
init_barycenter=None,
max_iter=30, tol=1e-5, weights=None,
n_treads=15,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

@@ -576,23 +597,23 @@ def dtw_barycenter_averaging_one_init(X, barycenter_size=None,
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
X_ = to_time_series_dataset(X)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about the array copy issue.

@@ -692,23 +693,37 @@ def _assign(self, X, update_class_attributes=True):

def _update_centroids(self, X):
metric_params = self._get_metric_params()
X_list = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I guess we could use a comprehension list for better readability.

self.cluster_centers_[k] = euclidean_barycenter(
X=X[self.labels_ == k])
X_list.append(X[self.labels_ == k])
cluster_centers = Parallel(backend='threading', prefer='threads', require='sharedmem', n_jobs=self.n_clusters, verbose=5)(delayed(dtw_barycenter_averaging)(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take care that here, the function to be called depends on the value of self.metric

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply parallelize DBA over the clusters only speedup less than 0.5x times. More effort is required.

@@ -9,48 +9,39 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose,
compute_diagonal=True, dtype=numpy.float, *args, **kwargs):
"""Compute cross-similarity matrix with joblib parallelization for a given
similarity function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please revert all these line suppressions? They tend to pack the docstrings.

@@ -60,7 +51,7 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose,
k=0 if compute_diagonal else 1,
m=len(dataset1))
matrix[indices] = Parallel(n_jobs=n_jobs,
prefer="threads",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always prefer this? Or should it be specified when calling the function?

dataset1[i], dataset2[j],
*args, **kwargs
#dataset2 = to_time_series_dataset(dataset2, dtype=dtype)
with parallel_backend('loky'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good idea to specify the parallel_backend here? Will loky always be a better alternative or could it depend on some specificities or one's data?

@rtavenar
Copy link
Member

rtavenar commented Jan 5, 2021

I have left a few comments. Maybe the main point would be to assess the improvement (in terms of running times) that these changes offer on some benchmark.

@rtavenar
Copy link
Member

rtavenar commented Jan 5, 2021

Also, tests seem to fail at the moment (let me know if it's due to a bug in my merge).

Base automatically changed from master to main January 26, 2021 12:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants