# Copyright (C) 2004 Paul Harrison
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
""" MML mixture estimator.
This module has functionality similar to Chris Wallace's "Snob" programs
(but lacks some features found in Snob).
"""
from numarray import *
from numarray import random_array, records
import random
import estimate
class Composite_estimate(estimate.Estimate):
""" Concatenate several sub-messages into one message.
(Useful as a class estimate for Mixture_estimate)
data: record.array with a field for each prior in "priors"
priors: list of (Estimate class, prior parameters... )
Specification of the Estimate class and prior parameters to use
for each field in the data.
Fields:
estimates : list of Estimate
An estimate for each column of the data.
"""
def __init__(self, data, priors, weighting=None):
estimate.Estimate.__init__(self, data, weighting)
self.estimates = [ priors[i][0](data.field(i), weighting=weighting, *priors[i][1:])
for i in xrange(len(priors)) ]
for item in self.estimates:
self.has_prior = self.has_prior and item.has_prior
self.violates_prior = self.violates_prior or item.violates_prior
def __repr__(self):
result = estimate.Estimate.__repr__(self)
for estimate_ in self.estimates:
result += ("\n"+repr(estimate_)).replace("\n","\n ")
return result
def dimensions(self):
return sum([ estimate.dimensions() for estimate in self.estimates ])
def neglog_prior(self):
return sum([ estimate.neglog_prior() for estimate in self.estimates ])
def neglog_fisher(self):
return sum([ estimate.neglog_fisher() for estimate in self.estimates ])
def total_neglog_data(self):
return sum([ estimate.total_neglog_data() for estimate in self.estimates ])
def neglog_data(self):
return sum([ estimate.neglog_data() for estimate in self.estimates ])
class _Mixture_estimate(estimate.Estimate):
""" A mixture estimate, consisting of a set of classes.
You should not instantiate this class directly, instead use
Mixture_estimate.
Fields:
class_likelihoods : Discrete_estimate
Estimate of the chance a random datum belongs to each class.
classes : list of Estimate
Class estimates.
assignments : N-data x N-class array of float
Partial-assignment matrix. The likelihood of each datum belonging
to each class.
"""
def __init__(self, data, prior, suggested_assignments, weighting=None):
estimate.Estimate.__init__(self, data, weighting)
n_classes = suggested_assignments.shape[1]
weighted_assignments = suggested_assignments * self.weighting[:,NewAxis]
total = sum(weighted_assignments)
self.class_likelihoods = estimate.Discrete_estimate(
arange(n_classes), n_classes, weighting=total)
self.classes = [ ]
for i in xrange(suggested_assignments.shape[1]):
self.classes.append( prior[0](data, weighting=weighted_assignments[:,i], *prior[1:]) )
neglog_likelihood_per_class = repeat([-log(self.class_likelihoods.probability)],
len(data))
for i in xrange(n_classes):
neglog_likelihood_per_class[:,i] += self.classes[i].neglog_data()
#self._neglog_data = -log(sum(exp(-neglog_likelihood_per_class),1))
# ... however the exp can cause underflows and inaccuracy, so:
min_neglog_likelihood = minimum.reduce(neglog_likelihood_per_class, 1)
excess = exp( min_neglog_likelihood[:,NewAxis] - neglog_likelihood_per_class )
self._neglog_data = min_neglog_likelihood - log(sum(excess,1))
excess /= sum(excess,1)[:,NewAxis]
self.assignments = excess
for item in self.classes:
self.has_prior = self.has_prior and item.has_prior
self.violates_prior = self.violates_prior or item.violates_prior
def __repr__(self):
result = estimate.Estimate.__repr__(self)[1:]
for i, class_ in enumerate(self.classes):
result += "\n% 3.0f%% " % (self.class_likelihoods.probability[i]*100.0)
result += repr(class_).replace("\n","\n ")
return result
def dimensions(self):
result = self.class_likelihoods.dimensions()
for class_ in self.classes:
result += class_.dimensions()
return result
def neglog_prior(self):
# Likelihood of this many classes = 2^-n
result = len(self.classes) * log(2.0)
result += self.class_likelihoods.neglog_prior()
for class_ in self.classes:
result += class_.neglog_prior()
# All possible orderings of classes are considered equivalent
# This allows a few bits to be reclaimed
result -= sum(log(arange(1,len(self.classes)+1)))
return result
def neglog_fisher(self):
result = self.class_likelihoods.neglog_fisher()
for class_ in self.classes:
result += class_.neglog_fisher()
return result
def neglog_data(self):
return self._neglog_data
def _destroy_class(assignments, index):
""" Remove a column from an assignment matrix. """
assignments = concatenate((assignments[:,:index], assignments[:,index+1:]), 1)
summation = sum(assignments,1)
# Avoid divide-by-zero
zeros = (summation <= 0.0)
if sometrue(zeros):
assignments = transpose(where(zeros, 1.0, transpose(assignments)))
summation = where(zeros, assignments.shape[1], summation)
assignments /= summation[:,NewAxis]
return assignments
def _split_class(assignments, index):
""" Add a new column to an assignment matrix by splitting an existing column
at random. """
assignments = concatenate((assignments[:,:index+1], assignments[:,index:]),1)
amount = random.random()
for i in xrange(assignments.shape[0]):
if random.random() < amount:
assignments[i,index] = 0.0
else:
assignments[i,index+1] = 0.0
return assignments
def _prune(assignments, weighting):
""" Remove any columns that are too small from an assignment matrix. """
while 1:
for i in xrange(assignments.shape[1]):
if sum(assignments[:,i] * weighting) < 2.0:
assignments = _destroy_class(assignments, i)
break
else:
return assignments
def Mixture_estimate(data, prior, iterations=1000, assignments=None, weighting=None):
""" Construct a mixture estimate.
data : data suitable for class estimator given by "prior"
prior: (Estimate class, prior parameters... )
Specification of the Estimate class and prior parameters to use
when modelling the data.
iterations : integer
Number of iterations to use in refining the estimate.
assignments : N-data x N-classes array of float
If specified, an initial guess as to the assignments of data to classes.
You can use a previous _Mixture_estimate.assignments here to perform further
iterations on that model.
Returns a _Mixture_estimate.
TODO: The algorithm used here is rather simpler than that of Snob, and has not been
thoroughly tested.
"""
if assignments is None:
assignments = ones((len(data),1), Float)
if weighting is None:
weighting = ones(len(data), Float)
best_length = 1e100
best_model = None
best_assignments = None
expected_length = None
for i in xrange(iterations):
assignments = _prune(assignments, weighting)
model = _Mixture_estimate(data, prior, assignments, weighting)
permutation = argsort(model.class_likelihoods.probability)
assignments = take(model.assignments, permutation[::-1], 1)
length = model.length()
if length < best_length:
best_length = length
best_model = model
best_assignments = assignments
if expected_length is not None and abs(length-expected_length) < 0.001:
expected_length = None
if random.random() < 0.1:
assignments = best_assignments
expected_length = best_length
elif assignments.shape[1] > 1 and random.random() < 0.5:
assignments = _destroy_class(assignments, random.randrange(assignments.shape[1]))
else:
assignments = _split_class(assignments, random.randrange(assignments.shape[1]))
else:
expected_length = length
return best_model
if __name__ == "__main__":
# Run tests if not "import"ed.
data = records.array([
concatenate([random_array.normal(0.0, 10.0, 10),random_array.normal(20.0,2.0,90)]),
random_array.normal(5.0, 10.0, 100)
])
priors = [
(estimate.Gaussian_estimate, (-30.0, 30.0), 1.0, 30.0),
(estimate.Gaussian_estimate, (-30.0, 30.0), 1.0, 30.0)
]
if 1:
class_est = Composite_estimate(data, priors)
print class_est
print
if 1:
mixture_est = Mixture_estimate(data, (Composite_estimate, priors), 100)
print mixture_est
print
if 1:
weighting = zeros(len(data), Float)
weighting[:10] = 1.0
mixture_est = Mixture_estimate(data, (Composite_estimate, priors), 100, weighting=weighting)
print mixture_est
print
data = transpose(array([data.field(0), data.field(1)]))
if 1:
prior = (estimate.Hidden_factor_estimate,
array([[-30.0,30.0],[-30.0,30.0]]), array([1.0,1.0]), array([30.0,30.0]))
mixture_est = Mixture_estimate(data, prior, 10)
print mixture_est
print
if 1:
prior = (estimate.Multivariate_gaussian_estimate, 1.0)
mixture_est = Mixture_estimate(data, prior, 100)
print mixture_est
print