Source code for swiftemulator.emulators.multi_gaussian_process

"""
A gaussian process emulator that uses _multiple_ internal emulators to
better predict functions that contain a 'break'.
"""

import attr
import numpy as np
import george
import math

from typing import Hashable, List, Optional, Dict

from swiftemulator.emulators.base import BaseEmulator
from swiftemulator.emulators.gaussian_process import GaussianProcessEmulator
from swiftemulator.backend.model_parameters import ModelParameters
from swiftemulator.backend.model_specification import ModelSpecification
from swiftemulator.backend.model_values import ModelValues
from swiftemulator.mean_models import MeanModel

from scipy.optimize import minimize


[docs]@attr.s class MultipleGaussianProcessEmulator(BaseEmulator): """ Generator for emulators for individual scaling relations, using multiple trained gaussian processes regression instances and linear models under the hood. Parameters ---------- kernel, george.kernels.Kernel, optional The ``george`` kernel to use. The GPE here uses a copy of this instance. By default, this is the ``ExpSquaredKernel`` in George mean_model, MeanModel, optional A mean model conforming to the ``swiftemulator`` mean model protocol (several pre-made models are available in the :mod:`swiftemulator.mean_models` module). independent_regions, List[List[float]] The regions over which to construct independent emulators. None can be used in the first and last element to specify there are no boundaries to overlap. Must be monotonically increasing. Overlaps between regions are allowed and predicted values will be a weighted linear combination of both. For example, you could use ``[[None, 1.0], [0.7, 2.0], [1.7, None]]`` for data that ran from 0.0 to 3.0 in the independent variable. Regions should not overlap more than once. This isn't checked, but will break the code. """ kernel: Optional[george.kernels.Kernel] = attr.ib(default=None) mean_model: Optional[MeanModel] = attr.ib(default=None) independent_regions: Optional[List[List[float]]] = attr.ib(default=[[None, None]]) model_specification: Optional[ModelSpecification] = None model_parameters: Optional[ModelParameters] = None model_values: Optional[ModelValues] = None model_values_regions: Optional[List[ModelValues]] = None emulators: Optional[List[george.GP]] = None def _build_arrays( self, model_specification: ModelSpecification, model_parameters: ModelParameters, model_values: ModelValues, ): """ Builds the arrays for passing to sub-emulators. Parameters ---------- model_specification: ModelSpecification Full instance of the model specification. model_parameters: ModelParameters Full instance of the model parameters. model_values: ModelValues Full instance of the model values describing this individual scaling relation. """ # Here we must take our full instances and convert # them into separate instances only containing the sub # independent regions unique_identifiers = list(model_values.keys()) values = [{} for _ in self.independent_regions] for unique_id in unique_identifiers: raw_values = model_values[unique_id] ind = raw_values["independent"] dep = raw_values["dependent"] no_error = False try: err = raw_values["dependent_error"] except KeyError: no_error = True for index, (low, high) in enumerate(self.independent_regions): mask = np.logical_and( ind > low if low is not None else np.ones_like(ind).astype(bool), ind < high if high is not None else np.ones_like(ind).astype(bool), ) values[index][unique_id] = { "independent": ind[mask], "dependent": dep[mask], } if not no_error: values[index][unique_id]["dependent_error"] = err[mask] self.model_values_regions = [ModelValues(x) for x in values] return
[docs] def fit_model( self, model_specification: ModelSpecification, model_parameters: ModelParameters, model_values: ModelValues, ): """ Fits the gaussian process model, as determined by the initialiser variables of the class (i.e. the kernel and the mean model). Parameters ---------- model_specification: ModelSpecification Full instance of the model specification. model_parameters: ModelParameters Full instance of the model parameters. model_values: ModelValues Full instance of the model values describing this individual scaling relation. Notes ----- This method uses copies of the internal kernel and mean model objects, as those objects contain slightly unhelpful state information. """ if self.independent_variables is None: # Creates independent_variables, dependent_variables. self._build_arrays( model_specification=model_specification, model_parameters=model_parameters, model_values=model_values, ) self.emulators = [ GaussianProcessEmulator(kernel=self.kernel, mean_model=self.mean_model) for _ in self.independent_regions ] for gp, model_vals in zip(self.emulators, self.model_values_regions): gp.fit_model( model_specification=model_specification, model_parameters=model_parameters, model_values=model_vals, ) return
[docs] def predict_values( self, independent: np.array, model_parameters: Dict[str, float], ) -> np.array: """ Predict values from the trained emulator contained within this object. Parameters ---------- independent, np.array Independent continuous variables to evaluate the emulator at. model_parameters: Dict[str, float] The point in model parameter space to create predicted values at. Returns ------- dependent_predictions, np.array Array of predictions, if the emulator is a function f, these are the predicted values of f(independent) evaluted at the position of the input model_parameters. dependent_prediction_errors, np.array Errors on the model predictions. Notes ----- This will use the originally defined regions and overlaps will be calculated by using the weighted linear sum corresponding to the independent variable's distance to the adjacent boundary. The errors use a weighted square sum. """ if self.emulators is None: raise AttributeError( "Please train the emulator with fit_model before attempting " "to make predictions." ) # First, do individual predictions. inputs = [] output = [] output_error = [] for index, (low, high) in enumerate(self.independent_regions): mask = np.logical_and( independent > low if low is not None else np.ones_like(independent).astype(bool), independent < high if high is not None else np.ones_like(independent).astype(bool), ) predicted, errors = self.emulators[index].predict_values( independent=independent[mask], model_parameters=model_parameters ) inputs.append(list(independent[mask])) output.append(list(predicted)) output_error.append(list(errors)) # Now that we've predicted it all, we need to explicitly deal # with overlap and non-overlap. overlap_ranges = {} for index in range(1, len(self.independent_regions)): left = self.independent_regions[index][0] right = self.independent_regions[index - 1][1] if right is None or left is None: continue elif right > left: overlap_ranges[index - 1] = [left, right] dependent_predictions = np.empty_like(independent) dependent_prediction_errors = np.empty_like(independent) current_emulator = 0 for index, x in enumerate(independent): if x not in inputs[current_emulator]: current_emulator += 1 # Is it in the prior overlap? low, high = overlap_ranges.get(current_emulator - 1, [float("inf")] * 2) if low <= x <= high: # We have already counted this independent variable. continue # Is it in this emulator's overlap? low, high = overlap_ranges.get(current_emulator, [float("inf")] * 2) if low <= x <= high: dependent_index_left = inputs[current_emulator].index(x) dependent_index_right = inputs[current_emulator + 1].index(x) ind_left = inputs[current_emulator][dependent_index_left] ind_right = inputs[current_emulator + 1][dependent_index_right] left_weight = (high - x) / (high - low) right_weight = (x - low) / (high - low) dependent_left = output[current_emulator][dependent_index_left] dependent_right = output[current_emulator + 1][dependent_index_right] dependent_error_left = output_error[current_emulator][ dependent_index_left ] dependent_error_right = output_error[current_emulator][ dependent_index_right ] dependent_predictions[index] = ( dependent_left * left_weight + dependent_right * right_weight ) dependent_prediction_errors[index] = math.sqrt( left_weight * dependent_error_left * dependent_error_left + right_weight * dependent_error_right * dependent_error_right ) else: # Easy! dependent_index = inputs[current_emulator].index(x) dependent_predictions[index] = output[current_emulator][dependent_index] dependent_prediction_errors[index] = output_error[current_emulator][ dependent_index ] return dependent_predictions, dependent_prediction_errors