Skip to content

Instantly share code, notes, and snippets.

@jgdshkovi
Last active June 16, 2020 19:53
Show Gist options
  • Select an option

  • Save jgdshkovi/a66bd58c6c8df1633714f2d9ed28d36b to your computer and use it in GitHub Desktop.

Select an option

Save jgdshkovi/a66bd58c6c8df1633714f2d9ed28d36b to your computer and use it in GitHub Desktop.
import warnings
import numpy as np
import scipy.sparse as sp
from joblib import Parallel, delayed
from sklearn.cluster import KMeans
from sklearn.cluster import _k_means_fast as _k_means
from sklearn.cluster.k_means_ import (
_check_sample_weight,
_init_centroids,
_labels_inertia,
_tolerance,
_validate_center_shape,
)
from sklearn.preprocessing import normalize
from sklearn.utils import check_array, check_random_state
from sklearn.utils.extmath import row_norms, squared_norm
from sklearn.utils.validation import _num_samples
def spherical_k_means(X,n_clusters):
if n_init <= 0:
raise ValueError(
"Invalid number of initializations."
" n_init=%d must be bigger than zero." % n_init
)
random_state = check_random_state(random_state)
if max_iter <= 0:
raise ValueError(
"Number of iterations should be a positive number,"
" got %d instead" % max_iter
)
best_inertia = np.infty
order = "C" if copy_x else None
X = check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32], order=order, copy=copy_x)
if _num_samples(X) < n_clusters:
raise ValueError("n_samples=%d should be >= n_clusters=%d" % (_num_samples(X), n_clusters))
tol = _tolerance(X, tol)
if hasattr(init, "__array__"):
init = check_array(init, dtype=X.dtype.type, order="C", copy=True)
_validate_center_shape(X, n_clusters, init)
if n_init != 1:
warnings.warn(
"Explicit initial center position passed: "
"performing only one init in k-means instead of n_init=%d" % n_init,
RuntimeWarning,
stacklevel=2,
)
n_init = 1
# precompute squared norms of data points
x_squared_norms = row_norms(X, squared=True)
if n_jobs == 1:
for it in range(n_init):
labels, inertia, centers, n_iter_ = _spherical_kmeans_single_lloyd(X,n_clusters)
if best_inertia is None or inertia < best_inertia:
best_labels = labels.copy()
best_centers = centers.copy()
best_inertia = inertia
best_n_iter = n_iter_
else:
seeds = random_state.randint(np.iinfo(np.int32).max, size=n_init)
results = Parallel(n_jobs=n_jobs, verbose=0)(
delayed(_spherical_kmeans_single_lloyd)(X,n_clusters)
for seed in seeds
)
labels, inertia, centers, n_iters = zip(*results)
best = np.argmin(inertia)
best_labels = labels[best]
best_inertia = inertia[best]
best_centers = centers[best]
best_n_iter = n_iters[best]
if return_n_iter:
return best_centers, best_labels, best_inertia, best_n_iter
else:
return best_centers, best_labels, best_inertia
def _spherical_kmeans_single_lloyd(X,n_clusters):
random_state = check_random_state(random_state)
sample_weight = _check_sample_weight(sample_weight, X)
best_labels, best_inertia, best_centers = None, None, None
centers = _init_centroids(
X, n_clusters, init, random_state=random_state, x_squared_norms=x_squared_norms
)
if verbose:
print("Initialization complete")
distances = np.zeros(shape=(X.shape[0],), dtype=X.dtype)
for i in range(max_iter):
centers_old = centers.copy()
labels, inertia = _labels_inertia(
X,
sample_weight,
x_squared_norms,
centers,
precompute_distances=precompute_distances,
distances=distances,
)
if sp.issparse(X):
centers = _k_means._centers_sparse(
X, sample_weight, labels, n_clusters, distances
)
else:
centers = _k_means._centers_dense(
X.astype(np.float),
sample_weight.astype(np.float),
labels,
n_clusters,
distances.astype(np.float),
)
centers = normalize(centers)
if verbose:
print("Iteration %2d, inertia %.3f" % (i, inertia))
if best_inertia is None or inertia < best_inertia:
best_labels = labels.copy()
best_centers = centers.copy()
best_inertia = inertia
center_shift_total = squared_norm(centers_old - centers)
if center_shift_total <= tol:
if verbose:
print(
"Converged at iteration %d: "
"center shift %e within tolerance %e" % (i, center_shift_total, tol)
)
break
if center_shift_total > 0:
best_labels, best_inertia = _labels_inertia(
X,
sample_weight,
x_squared_norms,
best_centers,
precompute_distances=precompute_distances,
distances=distances,
)
return best_labels, best_inertia, best_centers, i + 1
class SphericalKMeans(KMeans):
def __init__(self,n_clusters=8):
self.n_clusters = n_clusters
def fit(self, X, y=None, sample_weight=None):
if self.normalize:
X = normalize(X)
random_state = check_random_state(self.random_state)
self.cluster_centers_, self.labels_, self.inertia_, self.n_iter_ = spherical_k_means(X,n_clusters=self.n_clusters)
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment