-
-
Save jgdshkovi/a66bd58c6c8df1633714f2d9ed28d36b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import warnings | |
| import numpy as np | |
| import scipy.sparse as sp | |
| from joblib import Parallel, delayed | |
| from sklearn.cluster import KMeans | |
| from sklearn.cluster import _k_means_fast as _k_means | |
| from sklearn.cluster.k_means_ import ( | |
| _check_sample_weight, | |
| _init_centroids, | |
| _labels_inertia, | |
| _tolerance, | |
| _validate_center_shape, | |
| ) | |
| from sklearn.preprocessing import normalize | |
| from sklearn.utils import check_array, check_random_state | |
| from sklearn.utils.extmath import row_norms, squared_norm | |
| from sklearn.utils.validation import _num_samples | |
| def spherical_k_means(X,n_clusters): | |
| if n_init <= 0: | |
| raise ValueError( | |
| "Invalid number of initializations." | |
| " n_init=%d must be bigger than zero." % n_init | |
| ) | |
| random_state = check_random_state(random_state) | |
| if max_iter <= 0: | |
| raise ValueError( | |
| "Number of iterations should be a positive number," | |
| " got %d instead" % max_iter | |
| ) | |
| best_inertia = np.infty | |
| order = "C" if copy_x else None | |
| X = check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32], order=order, copy=copy_x) | |
| if _num_samples(X) < n_clusters: | |
| raise ValueError("n_samples=%d should be >= n_clusters=%d" % (_num_samples(X), n_clusters)) | |
| tol = _tolerance(X, tol) | |
| if hasattr(init, "__array__"): | |
| init = check_array(init, dtype=X.dtype.type, order="C", copy=True) | |
| _validate_center_shape(X, n_clusters, init) | |
| if n_init != 1: | |
| warnings.warn( | |
| "Explicit initial center position passed: " | |
| "performing only one init in k-means instead of n_init=%d" % n_init, | |
| RuntimeWarning, | |
| stacklevel=2, | |
| ) | |
| n_init = 1 | |
| # precompute squared norms of data points | |
| x_squared_norms = row_norms(X, squared=True) | |
| if n_jobs == 1: | |
| for it in range(n_init): | |
| labels, inertia, centers, n_iter_ = _spherical_kmeans_single_lloyd(X,n_clusters) | |
| if best_inertia is None or inertia < best_inertia: | |
| best_labels = labels.copy() | |
| best_centers = centers.copy() | |
| best_inertia = inertia | |
| best_n_iter = n_iter_ | |
| else: | |
| seeds = random_state.randint(np.iinfo(np.int32).max, size=n_init) | |
| results = Parallel(n_jobs=n_jobs, verbose=0)( | |
| delayed(_spherical_kmeans_single_lloyd)(X,n_clusters) | |
| for seed in seeds | |
| ) | |
| labels, inertia, centers, n_iters = zip(*results) | |
| best = np.argmin(inertia) | |
| best_labels = labels[best] | |
| best_inertia = inertia[best] | |
| best_centers = centers[best] | |
| best_n_iter = n_iters[best] | |
| if return_n_iter: | |
| return best_centers, best_labels, best_inertia, best_n_iter | |
| else: | |
| return best_centers, best_labels, best_inertia | |
| def _spherical_kmeans_single_lloyd(X,n_clusters): | |
| random_state = check_random_state(random_state) | |
| sample_weight = _check_sample_weight(sample_weight, X) | |
| best_labels, best_inertia, best_centers = None, None, None | |
| centers = _init_centroids( | |
| X, n_clusters, init, random_state=random_state, x_squared_norms=x_squared_norms | |
| ) | |
| if verbose: | |
| print("Initialization complete") | |
| distances = np.zeros(shape=(X.shape[0],), dtype=X.dtype) | |
| for i in range(max_iter): | |
| centers_old = centers.copy() | |
| labels, inertia = _labels_inertia( | |
| X, | |
| sample_weight, | |
| x_squared_norms, | |
| centers, | |
| precompute_distances=precompute_distances, | |
| distances=distances, | |
| ) | |
| if sp.issparse(X): | |
| centers = _k_means._centers_sparse( | |
| X, sample_weight, labels, n_clusters, distances | |
| ) | |
| else: | |
| centers = _k_means._centers_dense( | |
| X.astype(np.float), | |
| sample_weight.astype(np.float), | |
| labels, | |
| n_clusters, | |
| distances.astype(np.float), | |
| ) | |
| centers = normalize(centers) | |
| if verbose: | |
| print("Iteration %2d, inertia %.3f" % (i, inertia)) | |
| if best_inertia is None or inertia < best_inertia: | |
| best_labels = labels.copy() | |
| best_centers = centers.copy() | |
| best_inertia = inertia | |
| center_shift_total = squared_norm(centers_old - centers) | |
| if center_shift_total <= tol: | |
| if verbose: | |
| print( | |
| "Converged at iteration %d: " | |
| "center shift %e within tolerance %e" % (i, center_shift_total, tol) | |
| ) | |
| break | |
| if center_shift_total > 0: | |
| best_labels, best_inertia = _labels_inertia( | |
| X, | |
| sample_weight, | |
| x_squared_norms, | |
| best_centers, | |
| precompute_distances=precompute_distances, | |
| distances=distances, | |
| ) | |
| return best_labels, best_inertia, best_centers, i + 1 | |
| class SphericalKMeans(KMeans): | |
| def __init__(self,n_clusters=8): | |
| self.n_clusters = n_clusters | |
| def fit(self, X, y=None, sample_weight=None): | |
| if self.normalize: | |
| X = normalize(X) | |
| random_state = check_random_state(self.random_state) | |
| self.cluster_centers_, self.labels_, self.inertia_, self.n_iter_ = spherical_k_means(X,n_clusters=self.n_clusters) | |
| return self |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment