1# Copyright (c) 2017 the GPy Austhors (see AUTHORS.txt) 2# Licensed under the BSD 3-clause license (see LICENSE.txt) 3 4from ..core import Model 5from ..core.parameterization import Param 6from ..core import Mapping 7from ..kern import Kern, RBF 8from ..inference.latent_function_inference import ExactStudentTInference 9from ..util.normalizer import Standardize 10 11import numpy as np 12from scipy import stats 13from paramz import ObsAr 14from paramz.transformations import Logexp 15 16import warnings 17 18 19class TPRegression(Model): 20 """ 21 Student-t Process model for regression, as presented in 22 23 Shah, A., Wilson, A. and Ghahramani, Z., 2014, April. Student-t processes as alternatives to Gaussian processes. 24 In Artificial Intelligence and Statistics (pp. 877-885). 25 26 :param X: input observations 27 :param Y: observed values 28 :param kernel: a GPy kernel, defaults to rbf 29 :param deg_free: initial value for the degrees of freedom hyperparameter 30 :param Norm normalizer: [False] 31 32 Normalize Y with the norm given. 33 If normalizer is False, no normalization will be done 34 If it is None, we use GaussianNorm(alization) 35 36 .. Note:: Multiple independent outputs are allowed using columns of Y 37 38 """ 39 40 def __init__(self, X, Y, kernel=None, deg_free=5., normalizer=None, mean_function=None, name='TP regression'): 41 super(TPRegression, self).__init__(name=name) 42 # X 43 assert X.ndim == 2 44 self.set_X(X) 45 self.num_data, self.input_dim = self.X.shape 46 47 # Y 48 assert Y.ndim == 2 49 if normalizer is True: 50 self.normalizer = Standardize() 51 elif normalizer is False: 52 self.normalizer = None 53 else: 54 self.normalizer = normalizer 55 56 self.set_Y(Y) 57 58 if Y.shape[0] != self.num_data: 59 # There can be cases where we want inputs than outputs, for example if we have multiple latent 60 # function values 61 warnings.warn("There are more rows in your input data X, \ 62 than in your output data Y, be VERY sure this is what you want") 63 self.output_dim = self.Y.shape[1] 64 65 # Kernel 66 kernel = kernel or RBF(self.X.shape[1]) 67 assert isinstance(kernel, Kern) 68 self.kern = kernel 69 self.link_parameter(self.kern) 70 71 if self.kern._effective_input_dim != self.X.shape[1]: 72 warnings.warn( 73 "Your kernel has a different input dimension {} then the given X dimension {}. Be very sure this is " 74 "what you want and you have not forgotten to set the right input dimenion in your kernel".format( 75 self.kern._effective_input_dim, self.X.shape[1])) 76 77 # Mean function 78 self.mean_function = mean_function 79 if mean_function is not None: 80 assert isinstance(self.mean_function, Mapping) 81 assert mean_function.input_dim == self.input_dim 82 assert mean_function.output_dim == self.output_dim 83 self.link_parameter(mean_function) 84 85 # Degrees of freedom 86 self.nu = Param('deg_free', float(deg_free), Logexp()) 87 self.link_parameter(self.nu) 88 89 # Inference 90 self.inference_method = ExactStudentTInference() 91 self.posterior = None 92 self._log_marginal_likelihood = None 93 94 # Insert property for plotting (not used) 95 self.Y_metadata = None 96 97 def _update_posterior_dof(self, dof, which): 98 if self.posterior is not None: 99 self.posterior.nu = dof 100 101 @property 102 def _predictive_variable(self): 103 return self.X 104 105 def set_XY(self, X, Y): 106 """ 107 Set the input / output data of the model 108 This is useful if we wish to change our existing data but maintain the same model 109 110 :param X: input observations 111 :type X: np.ndarray 112 :param Y: output observations 113 :type Y: np.ndarray or ObsAr 114 """ 115 self.update_model(False) 116 self.set_Y(Y) 117 self.set_X(X) 118 self.update_model(True) 119 120 def set_X(self, X): 121 """ 122 Set the input data of the model 123 124 :param X: input observations 125 :type X: np.ndarray 126 """ 127 assert isinstance(X, np.ndarray) 128 state = self.update_model() 129 self.update_model(False) 130 self.X = ObsAr(X) 131 self.update_model(state) 132 133 def set_Y(self, Y): 134 """ 135 Set the output data of the model 136 137 :param Y: output observations 138 :type Y: np.ndarray or ObsArray 139 """ 140 assert isinstance(Y, (np.ndarray, ObsAr)) 141 state = self.update_model() 142 self.update_model(False) 143 if self.normalizer is not None: 144 self.normalizer.scale_by(Y) 145 self.Y_normalized = ObsAr(self.normalizer.normalize(Y)) 146 self.Y = Y 147 else: 148 self.Y = ObsAr(Y) if isinstance(Y, np.ndarray) else Y 149 self.Y_normalized = self.Y 150 self.update_model(state) 151 152 def parameters_changed(self): 153 """ 154 Method that is called upon any changes to :class:`~GPy.core.parameterization.param.Param` variables within the model. 155 In particular in this class this method re-performs inference, recalculating the posterior, log marginal likelihood and gradients of the model 156 157 .. warning:: 158 This method is not designed to be called manually, the framework is set up to automatically call this method upon changes to parameters, if you call 159 this method yourself, there may be unexpected consequences. 160 """ 161 self.posterior, self._log_marginal_likelihood, grad_dict = self.inference_method.inference(self.kern, 162 self.X, 163 self.Y_normalized, 164 self.nu + 2 + np.finfo( 165 float).eps, 166 self.mean_function) 167 self.kern.update_gradients_full(grad_dict['dL_dK'], self.X) 168 if self.mean_function is not None: 169 self.mean_function.update_gradients(grad_dict['dL_dm'], self.X) 170 self.nu.gradient = grad_dict['dL_dnu'] 171 172 def log_likelihood(self): 173 """ 174 The log marginal likelihood of the model, :math:`p(\mathbf{y})`, this is the objective function of the model being optimised 175 """ 176 return self._log_marginal_likelihood or self.inference()[1] 177 178 def _raw_predict(self, Xnew, full_cov=False, kern=None): 179 """ 180 For making predictions, does not account for normalization or likelihood 181 182 full_cov is a boolean which defines whether the full covariance matrix 183 of the prediction is computed. If full_cov is False (default), only the 184 diagonal of the covariance is returned. 185 186 .. math:: 187 p(f*|X*, X, Y) = \int^{\inf}_{\inf} p(f*|f,X*)p(f|X,Y) df 188 = MVN\left(\nu + N,f*| K_{x*x}(K_{xx})^{-1}Y, 189 \frac{\nu + \beta - 2}{\nu + N - 2}K_{x*x*} - K_{xx*}(K_{xx})^{-1}K_{xx*}\right) 190 \nu := \texttt{Degrees of freedom} 191 """ 192 mu, var = self.posterior._raw_predict(kern=self.kern if kern is None else kern, Xnew=Xnew, 193 pred_var=self._predictive_variable, full_cov=full_cov) 194 if self.mean_function is not None: 195 mu += self.mean_function.f(Xnew) 196 return mu, var 197 198 def predict(self, Xnew, full_cov=False, kern=None, **kwargs): 199 """ 200 Predict the function(s) at the new point(s) Xnew. For Student-t processes, this method is equivalent to 201 predict_noiseless as no likelihood is included in the model. 202 """ 203 return self.predict_noiseless(Xnew, full_cov=full_cov, kern=kern) 204 205 def predict_noiseless(self, Xnew, full_cov=False, kern=None): 206 """ 207 Predict the underlying function f at the new point(s) Xnew. 208 209 :param Xnew: The points at which to make a prediction 210 :type Xnew: np.ndarray (Nnew x self.input_dim) 211 :param full_cov: whether to return the full covariance matrix, or just the diagonal 212 :type full_cov: bool 213 :param kern: The kernel to use for prediction (defaults to the model kern). 214 215 :returns: (mean, var): 216 mean: posterior mean, a Numpy array, Nnew x self.input_dim 217 var: posterior variance, a Numpy array, Nnew x 1 if full_cov=False, Nnew x Nnew otherwise 218 219 If full_cov and self.input_dim > 1, the return shape of var is Nnew x Nnew x self.input_dim. 220 If self.input_dim == 1, the return shape is Nnew x Nnew. 221 This is to allow for different normalizations of the output dimensions. 222 """ 223 # Predict the latent function values 224 mu, var = self._raw_predict(Xnew, full_cov=full_cov, kern=kern) 225 226 # Un-apply normalization 227 if self.normalizer is not None: 228 mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var) 229 230 return mu, var 231 232 def predict_quantiles(self, X, quantiles=(2.5, 97.5), kern=None, **kwargs): 233 """ 234 Get the predictive quantiles around the prediction at X 235 236 :param X: The points at which to make a prediction 237 :type X: np.ndarray (Xnew x self.input_dim) 238 :param quantiles: tuple of quantiles, default is (2.5, 97.5) which is the 95% interval 239 :type quantiles: tuple 240 :param kern: optional kernel to use for prediction 241 :type predict_kw: dict 242 :returns: list of quantiles for each X and predictive quantiles for interval combination 243 :rtype: [np.ndarray (Xnew x self.output_dim), np.ndarray (Xnew x self.output_dim)] 244 """ 245 mu, var = self._raw_predict(X, full_cov=False, kern=kern) 246 quantiles = [stats.t.ppf(q / 100., self.nu + 2 + self.num_data) * np.sqrt(var) + mu for q in quantiles] 247 248 if self.normalizer is not None: 249 quantiles = [self.normalizer.inverse_mean(q) for q in quantiles] 250 251 return quantiles 252 253 def posterior_samples(self, X, size=10, full_cov=False, Y_metadata=None, likelihood=None, **predict_kwargs): 254 """ 255 Samples the posterior GP at the points X, equivalent to posterior_samples_f due to the absence of a likelihood. 256 """ 257 return self.posterior_samples_f(X, size, full_cov=full_cov, **predict_kwargs) 258 259 def posterior_samples_f(self, X, size=10, full_cov=True, **predict_kwargs): 260 """ 261 Samples the posterior TP at the points X. 262 263 :param X: The points at which to take the samples. 264 :type X: np.ndarray (Nnew x self.input_dim) 265 :param size: the number of a posteriori samples. 266 :type size: int. 267 :param full_cov: whether to return the full covariance matrix, or just the diagonal. 268 :type full_cov: bool. 269 :returns: fsim: set of simulations 270 :rtype: np.ndarray (D x N x samples) (if D==1 we flatten out the first dimension) 271 """ 272 mu, var = self._raw_predict(X, full_cov=full_cov, **predict_kwargs) 273 if self.normalizer is not None: 274 mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var) 275 276 def sim_one_dim(m, v): 277 nu = self.nu + 2 + self.num_data 278 v = np.diag(v.flatten()) if not full_cov else v 279 Z = np.random.multivariate_normal(np.zeros(X.shape[0]), v, size).T 280 g = np.tile(np.random.gamma(nu / 2., 2. / nu, size), (X.shape[0], 1)) 281 return m + Z / np.sqrt(g) 282 283 if self.output_dim == 1: 284 return sim_one_dim(mu, var) 285 else: 286 fsim = np.empty((self.output_dim, self.num_data, size)) 287 for d in range(self.output_dim): 288 if full_cov and var.ndim == 3: 289 fsim[d] = sim_one_dim(mu[:, d], var[:, :, d]) 290 elif (not full_cov) and var.ndim == 2: 291 fsim[d] = sim_one_dim(mu[:, d], var[:, d]) 292 else: 293 fsim[d] = sim_one_dim(mu[:, d], var) 294 return fsim 295