123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- # Natural Language Toolkit: Expectation Maximization Clusterer
- #
- # Copyright (C) 2001-2019 NLTK Project
- # Author: Trevor Cohn <tacohn@cs.mu.oz.au>
- # URL: <http://nltk.org/>
- # For license information, see LICENSE.TXT
- from __future__ import print_function, unicode_literals
- try:
- import numpy
- except ImportError:
- pass
- from nltk.compat import python_2_unicode_compatible
- from nltk.cluster.util import VectorSpaceClusterer
- @python_2_unicode_compatible
- class EMClusterer(VectorSpaceClusterer):
- """
- The Gaussian EM clusterer models the vectors as being produced by
- a mixture of k Gaussian sources. The parameters of these sources
- (prior probability, mean and covariance matrix) are then found to
- maximise the likelihood of the given data. This is done with the
- expectation maximisation algorithm. It starts with k arbitrarily
- chosen means, priors and covariance matrices. It then calculates
- the membership probabilities for each vector in each of the
- clusters; this is the 'E' step. The cluster parameters are then
- updated in the 'M' step using the maximum likelihood estimate from
- the cluster membership probabilities. This process continues until
- the likelihood of the data does not significantly increase.
- """
- def __init__(
- self,
- initial_means,
- priors=None,
- covariance_matrices=None,
- conv_threshold=1e-6,
- bias=0.1,
- normalise=False,
- svd_dimensions=None,
- ):
- """
- Creates an EM clusterer with the given starting parameters,
- convergence threshold and vector mangling parameters.
- :param initial_means: the means of the gaussian cluster centers
- :type initial_means: [seq of] numpy array or seq of SparseArray
- :param priors: the prior probability for each cluster
- :type priors: numpy array or seq of float
- :param covariance_matrices: the covariance matrix for each cluster
- :type covariance_matrices: [seq of] numpy array
- :param conv_threshold: maximum change in likelihood before deemed
- convergent
- :type conv_threshold: int or float
- :param bias: variance bias used to ensure non-singular covariance
- matrices
- :type bias: float
- :param normalise: should vectors be normalised to length 1
- :type normalise: boolean
- :param svd_dimensions: number of dimensions to use in reducing vector
- dimensionsionality with SVD
- :type svd_dimensions: int
- """
- VectorSpaceClusterer.__init__(self, normalise, svd_dimensions)
- self._means = numpy.array(initial_means, numpy.float64)
- self._num_clusters = len(initial_means)
- self._conv_threshold = conv_threshold
- self._covariance_matrices = covariance_matrices
- self._priors = priors
- self._bias = bias
- def num_clusters(self):
- return self._num_clusters
- def cluster_vectorspace(self, vectors, trace=False):
- assert len(vectors) > 0
- # set the parameters to initial values
- dimensions = len(vectors[0])
- means = self._means
- priors = self._priors
- if not priors:
- priors = self._priors = (
- numpy.ones(self._num_clusters, numpy.float64) / self._num_clusters
- )
- covariances = self._covariance_matrices
- if not covariances:
- covariances = self._covariance_matrices = [
- numpy.identity(dimensions, numpy.float64)
- for i in range(self._num_clusters)
- ]
- # do the E and M steps until the likelihood plateaus
- lastl = self._loglikelihood(vectors, priors, means, covariances)
- converged = False
- while not converged:
- if trace:
- print('iteration; loglikelihood', lastl)
- # E-step, calculate hidden variables, h[i,j]
- h = numpy.zeros((len(vectors), self._num_clusters), numpy.float64)
- for i in range(len(vectors)):
- for j in range(self._num_clusters):
- h[i, j] = priors[j] * self._gaussian(
- means[j], covariances[j], vectors[i]
- )
- h[i, :] /= sum(h[i, :])
- # M-step, update parameters - cvm, p, mean
- for j in range(self._num_clusters):
- covariance_before = covariances[j]
- new_covariance = numpy.zeros((dimensions, dimensions), numpy.float64)
- new_mean = numpy.zeros(dimensions, numpy.float64)
- sum_hj = 0.0
- for i in range(len(vectors)):
- delta = vectors[i] - means[j]
- new_covariance += h[i, j] * numpy.multiply.outer(delta, delta)
- sum_hj += h[i, j]
- new_mean += h[i, j] * vectors[i]
- covariances[j] = new_covariance / sum_hj
- means[j] = new_mean / sum_hj
- priors[j] = sum_hj / len(vectors)
- # bias term to stop covariance matrix being singular
- covariances[j] += self._bias * numpy.identity(dimensions, numpy.float64)
- # calculate likelihood - FIXME: may be broken
- l = self._loglikelihood(vectors, priors, means, covariances)
- # check for convergence
- if abs(lastl - l) < self._conv_threshold:
- converged = True
- lastl = l
- def classify_vectorspace(self, vector):
- best = None
- for j in range(self._num_clusters):
- p = self._priors[j] * self._gaussian(
- self._means[j], self._covariance_matrices[j], vector
- )
- if not best or p > best[0]:
- best = (p, j)
- return best[1]
- def likelihood_vectorspace(self, vector, cluster):
- cid = self.cluster_names().index(cluster)
- return self._priors[cluster] * self._gaussian(
- self._means[cluster], self._covariance_matrices[cluster], vector
- )
- def _gaussian(self, mean, cvm, x):
- m = len(mean)
- assert cvm.shape == (m, m), 'bad sized covariance matrix, %s' % str(cvm.shape)
- try:
- det = numpy.linalg.det(cvm)
- inv = numpy.linalg.inv(cvm)
- a = det ** -0.5 * (2 * numpy.pi) ** (-m / 2.0)
- dx = x - mean
- print(dx, inv)
- b = -0.5 * numpy.dot(numpy.dot(dx, inv), dx)
- return a * numpy.exp(b)
- except OverflowError:
- # happens when the exponent is negative infinity - i.e. b = 0
- # i.e. the inverse of cvm is huge (cvm is almost zero)
- return 0
- def _loglikelihood(self, vectors, priors, means, covariances):
- llh = 0.0
- for vector in vectors:
- p = 0
- for j in range(len(priors)):
- p += priors[j] * self._gaussian(means[j], covariances[j], vector)
- llh += numpy.log(p)
- return llh
- def __repr__(self):
- return '<EMClusterer means=%s>' % list(self._means)
- def demo():
- """
- Non-interactive demonstration of the clusterers with simple 2-D data.
- """
- from nltk import cluster
- # example from figure 14.10, page 519, Manning and Schutze
- vectors = [numpy.array(f) for f in [[0.5, 0.5], [1.5, 0.5], [1, 3]]]
- means = [[4, 2], [4, 2.01]]
- clusterer = cluster.EMClusterer(means, bias=0.1)
- clusters = clusterer.cluster(vectors, True, trace=True)
- print('Clustered:', vectors)
- print('As: ', clusters)
- print()
- for c in range(2):
- print('Cluster:', c)
- print('Prior: ', clusterer._priors[c])
- print('Mean: ', clusterer._means[c])
- print('Covar: ', clusterer._covariance_matrices[c])
- print()
- # classify a new vector
- vector = numpy.array([2, 2])
- print('classify(%s):' % vector, end=' ')
- print(clusterer.classify(vector))
- # show the classification probabilities
- vector = numpy.array([2, 2])
- print('classification_probdist(%s):' % vector)
- pdist = clusterer.classification_probdist(vector)
- for sample in pdist.samples():
- print('%s => %.0f%%' % (sample, pdist.prob(sample) * 100))
- #
- # The following demo code is broken.
- #
- # # use a set of tokens with 2D indices
- # vectors = [numpy.array(f) for f in [[3, 3], [1, 2], [4, 2], [4, 0], [2, 3], [3, 1]]]
- # # test the EM clusterer with means given by k-means (2) and
- # # dimensionality reduction
- # clusterer = cluster.KMeans(2, euclidean_distance, svd_dimensions=1)
- # print 'Clusterer:', clusterer
- # clusters = clusterer.cluster(vectors)
- # means = clusterer.means()
- # print 'Means:', clusterer.means()
- # print
- # clusterer = cluster.EMClusterer(means, svd_dimensions=1)
- # clusters = clusterer.cluster(vectors, True)
- # print 'Clusterer:', clusterer
- # print 'Clustered:', str(vectors)[:60], '...'
- # print 'As:', str(clusters)[:60], '...'
- # print
- # # classify a new vector
- # vector = numpy.array([3, 3])
- # print 'classify(%s):' % vector,
- # print clusterer.classify(vector)
- # print
- # # show the classification probabilities
- # vector = numpy.array([2.2, 2])
- # print 'classification_probdist(%s)' % vector
- # pdist = clusterer.classification_probdist(vector)
- # for sample in pdist:
- # print '%s => %.0f%%' % (sample, pdist.prob(sample) *100)
- if __name__ == '__main__':
- demo()
|