Source code for swiftemulator.comparison.penalty

"""
Penalty functions and their specifications.
"""

import attr
import unyt
import numpy as np
import matplotlib.pyplot as plt

from velociraptor.observations import ObservationalData
from scipy.interpolate import interp1d

from swiftemulator.backend.model_values import ModelValues

from typing import Dict, Hashable, Optional, Union, Callable, List


[docs]@attr.s class PenaltyCalculator(object): """ Base class for the penalty functions. Extend this with your own, following the following pattern: 1. Configuration parameters for the penalty function are taken as initialisation parameters to the class. 2. The observational data set to be matched to is passed to register_observation. 3. The penalty function is calculated using the penalty() method, taking the exact arguments that are taken here, for an individual model. Provided for convenience is penalties() which calculates the penalty function for all data in a `ModelValues` container. """ observation: ObservationalData interpolator_values: interp1d interpolator_error: Optional[interp1d] log_independent: bool log_dependent: bool independent_units: unyt.unyt_quantity dependent_units: unyt.unyt_quantity
[docs] def register_observation( self, observation: ObservationalData, log_independent: bool, log_dependent: bool, independent_units: unyt.unyt_quantity, dependent_units: unyt.unyt_quantity, ) -> None: """ Registers the observation for use in `penalty` with the class. Parameters ---------- observation: ObservationalData Instance of the velociraptor observational data used for comparisons. log_independent: bool Take the base-10 log of the independent data before comparison? log_dependent: bool Take the base-10 log of the dependent data before comparison? independent_units: unyt.unyt_quantity The units that the model was calculated in (independent) dependent_units: unyt.unyt_quantity The units that the model was calculated in (dependent) """ self.observation = observation self.log_independent = log_independent self.log_dependent = log_dependent self.independent_units = independent_units self.dependent_units = dependent_units self.observation_interpolation() return
[docs] def observation_interpolation(self): """ Produces the interpolation for the internal observation. """ x = self.observation.x.to(self.independent_units) y = self.observation.y.to(self.dependent_units) if self.log_independent: x = np.log10(x.value) if self.log_dependent: y = np.log10(y.value) fill_value = lambda x, y: (y[x.argmin()], y[x.argmax()]) self.interpolator_values = interp1d( x=x, y=y, kind="linear", copy=False, bounds_error=False, fill_value=fill_value(x, y), ) # Set observational error to None for now. self.interpolator_error = None return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> List[float]: """ Calculate the penalty function, relative to the observational data, for this model. It is highly recommended that you evaluate the model at the same independent variables as the observational data. The observational data is linearly interpolated to find a prediction at the independent variables that you provide. independent: np.array The independent data. dependent: np.array The dependent data for comparison. dependent_error: np.array, optional The dependent errors, for comparison. Returns ------- penalty, List[float] The penalties for this model, between 0 and 1 each. """ raise NotImplementedError
[docs] def penalties( self, model: ModelValues, collate_with: Callable ) -> Dict[Hashable, float]: """ Calculate the penalty function for all models in the model values container. It is highly recommended that you evaluate the model at the same independent variables as the observational data. The observational data is linearly interpolated to find a prediction at the independent variables that you provide. Parameters ---------- model: ModelValues The set of model (values) to calculate the penalty function for. collate_with: Callable A function that takes a numpy array and returns the 'global' penalty for a model given the input for all of the valid points in the array. Examples could be ``np.max``, ``np.mean``, ``np.median``. Returns ------- penalties: Dict[Hashable, float] Penalty functions for each of the models, with the key being the unique identifier. """ penalties = {} for unique_id, data in model.model_values.items(): independent = data["independent"] dependent = data["dependent"] dependent_error = data.get("dependent_error", None) penalties[unique_id] = collate_with( self.penalty( independent=independent, dependent=dependent, dependent_error=dependent_error, ) ) return penalties
[docs] def plot_penalty( self, low_independent: Union[unyt.unyt_quantity, float], high_independent: Union[unyt.unyt_quantity, float], low_dependent: Union[unyt.unyt_quantity, float], high_dependent: Union[unyt.unyt_quantity, float], filename: str, x_label: Optional[str] = None, y_label: Optional[str] = None, resolution: int = 128, marker: Optional[str] = None, ): """ Create a figure of the penalty function, over the limits given. Limits are given in log space if logged, linear space if not (with units required in that case). """ fig, ax = plt.subplots() if not self.log_independent: low_independent = low_independent.to(self.independent_units) high_independent = high_independent.to(self.independent_units) if not self.log_dependent: low_dependent = low_dependent.to(self.dependent_units) high_dependent = high_dependent.to(self.dependent_units) x = np.linspace(low_independent, high_independent, resolution) y = np.linspace(low_dependent, high_dependent, resolution) xs, ys = np.meshgrid(x, y) output = np.empty_like(xs) for index, (independent, dependent) in enumerate(zip(xs.flat, ys.flat)): output.flat[index] = self.penalty( independent=np.array([independent]), dependent=np.array([dependent]), dependent_error=None, ) mappable = ax.pcolormesh(xs, ys, output, vmin=0.0, vmax=1.0, rasterized=True) fig.colorbar(mappable=mappable, ax=ax, label="Penalty Function") # Plot obs data x = self.observation.x.to(self.independent_units) y = self.observation.y.to(self.dependent_units) if self.log_independent: x = np.log10(x.value) if self.log_dependent: y = np.log10(y.value) ax.plot(x, y, linestyle="dashed", marker=marker) ax.set_xlabel(x_label) ax.set_ylabel(y_label) ax.set_xlim(low_independent, high_independent) ax.set_ylim(low_dependent, high_dependent) fig.savefig(filename) return
[docs]@attr.s class L1PenaltyCalculator(PenaltyCalculator): """ Penalty calculator for an L1-type norm, i.e. a linear penalty function away from the data. This penalty function is capped after a (vertical) distance, provided with units if the provided observation is used in linear space, or provided as a logarithmic offset in dex if in log space. Parameters ---------- offset: Union[unyt.unyt_quantity, float] The vertical offset at which to set the L1 norm to the maximum. This is required as the penalty function is not allowed to be unlimited. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. """ offset: Union[unyt.unyt_quantity, float] = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib()
[docs] def observation_interpolation(self): super().observation_interpolation() # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) if not self.log_dependent: self.offset.convert_to_units(self.dependent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> float: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) penalties = np.abs(obs_dependent - dependent[valid_data_mask]) / self.offset return np.minimum(penalties, 1.0)
[docs]@attr.s class L1PenaltyCalculatorOneSided(PenaltyCalculator): """ Penalty calculator for an L1-type norm, i.e. a linear penalty function away from the data, but one-sided only. Values above/below the line are given a maximal penalty. This penalty function is capped after a (vertical) distance, provided with units if the provided observation is used in linear space, or provided as a logarithmic offset in dex if in log space. Parameters ---------- offset: Union[unyt.unyt_quantity, float] The vertical offset at which to set the L1 norm to the maximum. This is required as the penalty function is not allowed to be unlimited. maximum_penalty: str Give the maximum penalty above or below the line. Accepted values are "above" or "below" as strings. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. """ offset: Union[unyt.unyt_quantity, float] = attr.ib() maximum_penalty: str = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib() @maximum_penalty.validator def _check_maximum_penalty(self, attribute, value): valid = ["above", "below"] if value not in valid: raise AttributeError("maximum_penalty must be one of above or below.")
[docs] def observation_interpolation(self): super().observation_interpolation() # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) if not self.log_dependent: self.offset.convert_to_units(self.dependent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> float: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) penalties = np.abs(obs_dependent - dependent[valid_data_mask]) / self.offset if self.maximum_penalty == "above": penalties[dependent[valid_data_mask] > obs_dependent] = 1.0 elif self.maximum_penalty == "below": penalties[dependent[valid_data_mask] < obs_dependent] = 1.0 return np.minimum(penalties, 1.0)
[docs]@attr.s class L2PenaltyCalculator(PenaltyCalculator): """ Penalty calculator for an L2-type norm, i.e. a square penalty function away from the data. This penalty function is capped after a (vertical) distance, provided with units if the provided observation is used in linear space, or provided as a logarithmic offset in dex if in log space. Parameters ---------- offset: Union[unyt.unyt_quantity, float] The vertical offset at which to set the L1 norm to the maximum. This is required as the penalty function is not allowed to be unlimited. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. """ offset: Union[unyt.unyt_quantity, float] = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib()
[docs] def observation_interpolation(self): super().observation_interpolation() # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) if not self.log_dependent: self.offset.convert_to_units(self.dependent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> float: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) penalties = (obs_dependent - dependent[valid_data_mask]) ** 2 / self.offset**2 return np.minimum(penalties, 1.0)
[docs]@attr.s class L1VariablePenaltyCalculator(PenaltyCalculator): """ Penalty calculator for an L1-type norm, i.e. a linear penalty function away from the data. This penalty function is capped after a (vertical) distance, provided with units if the provided observation is used in linear space, or provided as a logarithmic offset in dex if in log space. In this version the offset is variable, using a logistic curve. Parameters ---------- offset_lower: Union[unyt.unyt_quantity, float] The vertical offset at which to set the L1 norm to the maximum, at the lower end of the independent range. offset_upper: Union[unyt.unyt_quantity, float] The vertical offset at which to set the L1 norm to the maximum, at the upper end of the independent range. offset_transition: Union[unyt.unyt_quantity, float] The independent variable at which you would like the offset to transition from ``offset_lower`` to ``offset_upper``. transition_width: Union[unyt.unyt_quantity, float] The width of the transition between offsets, centered around ``offset_transition``. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. offset_below_above_ratio: float, optional Ratio of the allowed offset below or above the data. If this takes a value of less than 1.0, models below the data are penalised more (by that factor) than models above. Default: 1.0 """ offset_lower: Union[unyt.unyt_quantity, float] = attr.ib() offset_upper: Union[unyt.unyt_quantity, float] = attr.ib() offset_transition: Union[unyt.unyt_quantity, float] = attr.ib() transition_width: Union[unyt.unyt_quantity, float] = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib() offset_below_above_ratio: float = attr.ib(default=1.0)
[docs] def observation_interpolation(self): super().observation_interpolation() # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) self.offset_transition.convert_to_units(self.independent_units) self.transition_width.convert_to_units(self.independent_units) if not self.log_dependent: self.offset_lower.convert_to_units(self.dependent_units) self.offset_upper.convert_to_units(self.dependent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> float: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 offsets = self.offset_lower + (self.offset_upper - self.offset_lower) * ( 1.0 / ( 1.0 + np.exp( (self.offset_transition - independent[valid_data_mask]) / self.transition_width ) ) ) obs_dependent = self.interpolator_values(independent[valid_data_mask]) valid_and_low_mask = dependent[valid_data_mask] < obs_dependent offsets[valid_and_low_mask] *= self.offset_below_above_ratio penalties = np.abs(obs_dependent - dependent[valid_data_mask]) / offsets return np.minimum(penalties, 1.0)
[docs]@attr.s class L1SqueezePenaltyCalculator(PenaltyCalculator): """ Penalty calculator for an L1-type norm, i.e. a linear penalty function away from the data. This penalty function is capped after a (vertical) distance, provided with units if the provided observation is used in linear space, or provided as a logarithmic offset in dex if in log space. In this version the offset is variable, with it being 'squeezed' at a point over a width. Parameters ---------- offset_squeeze: Union[unyt.unyt_quantity, float] The vertical offset at which to set the L1 norm to the maximum at the pinch point. offset_normal: Union[unyt.unyt_quantity, float] The usual vertical offset at which to set the L1 norm to the maximum. offset_transition: Union[unyt.unyt_quantity, float] The independent variable at which you would like the offset to transition from ``offset_lower`` to ``offset_upper``. transition_width: Union[unyt.unyt_quantity, float] The width of the transition between offsets, centered around ``offset_transition``. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. offset_below_above_ratio: float, optional Ratio of the allowed offset below or above the data. If this takes a value of less than 1.0, models below the data are penalised more (by that factor) than models above. Default: 1.0 """ offset_squeeze: Union[unyt.unyt_quantity, float] = attr.ib() offset_normal: Union[unyt.unyt_quantity, float] = attr.ib() offset_transition: Union[unyt.unyt_quantity, float] = attr.ib() transition_width: Union[unyt.unyt_quantity, float] = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib() offset_below_above_ratio: float = attr.ib(default=1.0)
[docs] def observation_interpolation(self): super().observation_interpolation() # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) self.offset_transition.convert_to_units(self.independent_units) self.transition_width.convert_to_units(self.independent_units) if not self.log_dependent: self.offset_squeeze.convert_to_units(self.dependent_units) self.offset_normal.convert_to_units(self.dependent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> float: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 offsets = self.offset_normal - (self.offset_normal - self.offset_squeeze) * ( np.exp( -0.5 * ( ( (independent[valid_data_mask] - self.offset_transition) / self.transition_width ) ** 2 ) ) ) obs_dependent = self.interpolator_values(independent[valid_data_mask]) valid_and_low_mask = dependent[valid_data_mask] < obs_dependent offsets[valid_and_low_mask] *= self.offset_below_above_ratio penalties = np.abs(obs_dependent - dependent[valid_data_mask]) / offsets return np.minimum(penalties, 1.0)
[docs]@attr.s class GaussianDataErrorsPenaltyCalculator(PenaltyCalculator): """ Penalty calculator for observations that include errors. This penalty function uses a Gaussian distribution around the data, based on the observational errors. Capped at a input number of sigmas away from the data. Parameters ---------- sigma_max: Union[unyt.unyt_quantity, float] The number of sigmas at which the function is capped. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. """ sigma_max: Union[unyt.unyt_quantity, float] = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib() error_interpolator_values: interp1d
[docs] def observation_interpolation(self): super().observation_interpolation() x = self.observation.x.to(self.independent_units) y_scatter = self.observation.y_scatter.to(self.dependent_units) if self.log_independent: x = np.log10(x.value) # Propagate errors to log space if needed if self.log_independent: y = self.observation.y.to(self.dependent_units) y_scatter = np.abs(y_scatter / (y * np.log(10))) # Account for unsymmetric errors if y_scatter.ndim > 1: y_scatter = np.mean(y_scatter, axis=0) fill_value = lambda x, y_scatter: (y_scatter[x.argmin()], y_scatter[x.argmax()]) self.error_interpolator_values = interp1d( x=x, y=y_scatter, kind="linear", copy=False, bounds_error=False, fill_value=fill_value(x, y_scatter), ) # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> List[float]: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) obs_dependent_errors = self.error_interpolator_values( independent[valid_data_mask] ) penalties = 1.0 - np.exp( -0.5 * ( (dependent[valid_data_mask] - obs_dependent) ** 2 / (obs_dependent_errors**2) ) ) sigma_max_mask_value = 1.0 - np.exp(-0.5 * (self.sigma_max**2)) penalties[penalties > sigma_max_mask_value] = 1.0 return np.minimum(penalties, 1.0)
[docs]@attr.s class GaussianPercentErrorsPenaltyCalculator(PenaltyCalculator): """ Penalty calculator that that uses Gaussian errors with widths based on the percentages difference between the model and the data. Parameters ---------- percent_error: float percent error that sets the one-sigma deviation, in units of percent (0-100). sigma_max: float The number of sigmas at which the function is capped. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. """ percent_error: float = attr.ib() sigma_max: float = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib()
[docs] def observation_interpolation(self): super().observation_interpolation() # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> List[float]: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) penalties = 1.0 - np.exp( -0.5 * ( (dependent[valid_data_mask] - obs_dependent) ** 2 / ((obs_dependent * (self.percent_error / 100)) ** 2) ) ) sigma_max_mask_value = 1.0 - np.exp(-0.5 * (self.sigma_max**2)) penalties[penalties > sigma_max_mask_value] = 1.0 return np.minimum(penalties, 1.0)
[docs]@attr.s class GaussianDataErrorsPercentFloorPenaltyCalculator(PenaltyCalculator): """ Penalty calculator that that uses Gaussian errors based on the observational data. Includes a floor based on a percent error. It will pick the worst out of the two. This is meant as a way to not fit better then the emulator allows, while also not constraining stronger than observations. Parameters ---------- percent_error: float percent error that sets the one-sigma deviation, in units of percent (0-100). sigma_max: float The number of sigmas at which the function is capped. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. """ percent_error: float = attr.ib() sigma_max: float = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib() error_interpolator_values: interp1d
[docs] def observation_interpolation(self): super().observation_interpolation() x = self.observation.x.to(self.independent_units) y_scatter = self.observation.y_scatter.to(self.dependent_units) if self.log_independent: x = np.log10(x.value) # Propagate errors to log space if needed if self.log_independent: y = self.observation.y.to(self.dependent_units) y_scatter = np.abs(y_scatter / (y * np.log(10))) # Account for unsymmetric errors if y_scatter.ndim > 1: y_scatter = np.mean(y_scatter, axis=0) fill_value = lambda x, y_scatter: (y_scatter[x.argmin()], y_scatter[x.argmax()]) self.error_interpolator_values = interp1d( x=x, y=y_scatter, kind="linear", copy=False, bounds_error=False, fill_value=fill_value(x, y_scatter), ) # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> List[float]: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) obs_dependent_errors = self.error_interpolator_values( independent[valid_data_mask] ) penalties = 1.0 - np.exp( -0.5 * ( (dependent[valid_data_mask] - obs_dependent) ** 2 / ( obs_dependent_errors**2 + (obs_dependent * (self.percent_error / 100)) ** 2 ) ) ) sigma_max_mask_value = 1.0 - np.exp(-0.5 * (self.sigma_max**2)) penalties[penalties > sigma_max_mask_value] = 1.0 return np.minimum(penalties, 1.0)
[docs]@attr.s class GaussianWeightedDataErrorsPercentFloorPenaltyCalculator(PenaltyCalculator): """ Penalty calculator that that uses Gaussian errors based on the observational data. Includes a floor based on a percent error. It will pick the worst out of the two. This is meant as a way to not fit better then the emulator allows, while also not constraining stronger than observations. Parameters ---------- percent_error: float percent error that sets the one-sigma deviation, in units of percent (0-100). sigma_max: float The number of sigmas at which the function is capped. lower: Union[unyt.unyt_quantity, float] The lowest independent value to calculate the model offset at. upper: Union[unyt.unyt_quantity, float] The highest independent value to calculate the model offset at. weight: A general weight that scales the entire range of errors, but keeps relative weights intact. """ percent_error: float = attr.ib() sigma_max: float = attr.ib() weight: float = attr.ib() lower: Union[unyt.unyt_quantity, float] = attr.ib() upper: Union[unyt.unyt_quantity, float] = attr.ib() error_interpolator_values: interp1d
[docs] def observation_interpolation(self): super().observation_interpolation() x = self.observation.x.to(self.independent_units) y_scatter = self.observation.y_scatter.to(self.dependent_units) if self.log_independent: x = np.log10(x.value) # Propagate errors to log space if needed if self.log_independent: y = self.observation.y.to(self.dependent_units) y_scatter = np.abs(y_scatter / (y * np.log(10))) # Account for unsymmetric errors if y_scatter.ndim > 1: y_scatter = np.mean(y_scatter, axis=0) fill_value = lambda x, y_scatter: (y_scatter[x.argmin()], y_scatter[x.argmax()]) self.error_interpolator_values = interp1d( x=x, y=y_scatter, kind="linear", copy=False, bounds_error=False, fill_value=fill_value(x, y_scatter), ) # Convert limits to sensible units and log them # if necessary if not self.log_independent: self.lower.convert_to_units(self.independent_units) self.upper.convert_to_units(self.independent_units) return
[docs] def penalty( self, independent: np.array, dependent: np.array, dependent_error: Optional[np.array] = None, ) -> List[float]: valid_data_mask = np.logical_and( independent >= self.lower, independent < self.upper ) number_of_valid_points = valid_data_mask.sum() if number_of_valid_points == 0: return 0.0 obs_dependent = self.interpolator_values(independent[valid_data_mask]) obs_dependent_errors = self.error_interpolator_values( independent[valid_data_mask] ) penalties = 1.0 - np.exp( -0.5 * ( (dependent[valid_data_mask] - obs_dependent) ** 2 / ( obs_dependent_errors**2 + (obs_dependent * (self.percent_error / 100)) ** 2 ) ) ) sigma_max_mask_value = 1.0 - np.exp(-0.5 * (self.sigma_max**2)) penalties[penalties > sigma_max_mask_value] = 1.0 return np.minimum(penalties, 1.0)