1# Copyright (c) 2012-2014, GPy authors (see AUTHORS.txt). 2# Licensed under the BSD 3-clause license (see LICENSE.txt) 3 4import numpy as np 5import logging 6from .. import kern 7from ..likelihoods import Gaussian 8from GPy.core.parameterization.variational import NormalPosterior, NormalPrior 9from .sparse_gp_minibatch import SparseGPMiniBatch 10from ..core.parameterization.param import Param 11 12class BayesianGPLVMMiniBatch(SparseGPMiniBatch): 13 """ 14 Bayesian Gaussian Process Latent Variable Model 15 16 :param Y: observed data (np.ndarray) or GPy.likelihood 17 :type Y: np.ndarray| GPy.likelihood instance 18 :param input_dim: latent dimensionality 19 :type input_dim: int 20 :param init: initialisation method for the latent space 21 :type init: 'PCA'|'random' 22 23 """ 24 def __init__(self, Y, input_dim, X=None, X_variance=None, init='PCA', num_inducing=10, 25 Z=None, kernel=None, inference_method=None, likelihood=None, 26 name='bayesian gplvm', normalizer=None, 27 missing_data=False, stochastic=False, batchsize=1): 28 self.logger = logging.getLogger(self.__class__.__name__) 29 if X is None: 30 from ..util.initialization import initialize_latent 31 self.logger.info("initializing latent space X with method {}".format(init)) 32 X, fracs = initialize_latent(init, input_dim, Y) 33 else: 34 fracs = np.ones(input_dim) 35 36 self.init = init 37 38 if Z is None: 39 self.logger.info("initializing inducing inputs") 40 Z = np.random.permutation(X.copy())[:num_inducing] 41 assert Z.shape[1] == X.shape[1] 42 43 if X_variance is False: 44 self.logger.info('no variance on X, activating sparse GPLVM') 45 X = Param("latent space", X) 46 else: 47 if X_variance is None: 48 self.logger.info("initializing latent space variance ~ uniform(0,.1)") 49 X_variance = np.random.uniform(0,.1,X.shape) 50 self.variational_prior = NormalPrior() 51 X = NormalPosterior(X, X_variance) 52 53 if kernel is None: 54 self.logger.info("initializing kernel RBF") 55 kernel = kern.RBF(input_dim, lengthscale=1./fracs, ARD=True) #+ kern.Bias(input_dim) + kern.White(input_dim) 56 57 if likelihood is None: 58 likelihood = Gaussian() 59 60 self.kl_factr = 1. 61 62 if inference_method is None: 63 from ..inference.latent_function_inference.var_dtc import VarDTC 64 self.logger.debug("creating inference_method var_dtc") 65 inference_method = VarDTC(limit=3 if not missing_data else Y.shape[1]) 66 67 super(BayesianGPLVMMiniBatch,self).__init__(X, Y, Z, kernel, likelihood=likelihood, 68 name=name, inference_method=inference_method, 69 normalizer=normalizer, 70 missing_data=missing_data, stochastic=stochastic, 71 batchsize=batchsize) 72 self.X = X 73 self.link_parameter(self.X, 0) 74 75 #def set_X_gradients(self, X, X_grad): 76 # """Set the gradients of the posterior distribution of X in its specific form.""" 77 # X.mean.gradient, X.variance.gradient = X_grad 78 79 #def get_X_gradients(self, X): 80 # """Get the gradients of the posterior distribution of X in its specific form.""" 81 # return X.mean.gradient, X.variance.gradient 82 83 def _outer_values_update(self, full_values): 84 """ 85 Here you put the values, which were collected before in the right places. 86 E.g. set the gradients of parameters, etc. 87 """ 88 super(BayesianGPLVMMiniBatch, self)._outer_values_update(full_values) 89 if self.has_uncertain_inputs(): 90 meangrad_tmp, vargrad_tmp = self.kern.gradients_qX_expectations( 91 variational_posterior=self.X, 92 Z=self.Z, dL_dpsi0=full_values['dL_dpsi0'], 93 dL_dpsi1=full_values['dL_dpsi1'], 94 dL_dpsi2=full_values['dL_dpsi2'], 95 psi0=self.psi0, psi1=self.psi1, psi2=self.psi2) 96 97 self.X.mean.gradient = meangrad_tmp 98 self.X.variance.gradient = vargrad_tmp 99 else: 100 self.X.gradient = self.kern.gradients_X(full_values['dL_dKnm'], self.X, self.Z) 101 self.X.gradient += self.kern.gradients_X_diag(full_values['dL_dKdiag'], self.X) 102 103 def _outer_init_full_values(self): 104 return super(BayesianGPLVMMiniBatch, self)._outer_init_full_values() 105 106 def parameters_changed(self): 107 super(BayesianGPLVMMiniBatch,self).parameters_changed() 108 109 kl_fctr = self.kl_factr 110 if kl_fctr > 0 and self.has_uncertain_inputs(): 111 Xgrad = self.X.gradient.copy() 112 self.X.gradient[:] = 0 113 self.variational_prior.update_gradients_KL(self.X) 114 115 if self.missing_data or not self.stochastics: 116 self.X.mean.gradient = kl_fctr*self.X.mean.gradient 117 self.X.variance.gradient = kl_fctr*self.X.variance.gradient 118 else: 119 d = self.output_dim 120 self.X.mean.gradient = kl_fctr*self.X.mean.gradient*self.stochastics.batchsize/d 121 self.X.variance.gradient = kl_fctr*self.X.variance.gradient*self.stochastics.batchsize/d 122 self.X.gradient += Xgrad 123 124 if self.missing_data or not self.stochastics: 125 self._log_marginal_likelihood -= kl_fctr*self.variational_prior.KL_divergence(self.X) 126 else: #self.stochastics is given: 127 d = self.output_dim 128 self._log_marginal_likelihood -= kl_fctr*self.variational_prior.KL_divergence(self.X)*self.stochastics.batchsize/d 129 130 self._Xgrad = self.X.gradient.copy() 131