Source code for vivarium.risk_distributions.risk_distributions

from __future__ import annotations

import copy
import warnings
from collections.abc import Callable
from typing import Any, TypeAlias, cast

import numpy as np
import numpy.typing as npt
import pandas as pd
from scipy import optimize, special, stats

from vivarium.risk_distributions.formatting import (
    Parameter,
    Parameters,
    cast_to_series,
    format_call_data,
    format_data,
)

Numeric: TypeAlias = "pd.Series[Any] | npt.NDArray[Any] | float"
NumericInput: TypeAlias = "Numeric | int"


[docs] class BaseDistribution: """Generic vectorized wrapper around scipy distributions.""" # scipy distributions (e.g. ``stats.beta``) are unstubbed; subclasses assign one. distribution: Any = None expected_parameters: tuple[str, ...] = () _extra_parameters: tuple[str, ...] = () def __init__( self, parameters: Parameters | None = None, mean: Parameter | None = None, sd: Parameter | None = None, computability_bound: float = 0.001, ) -> None: self.parameters = self.get_parameters(parameters, mean, sd, computability_bound) self.computability_bound = computability_bound
[docs] @classmethod def get_parameters( cls, parameters: Parameters | None = None, mean: Parameter | None = None, sd: Parameter | None = None, computability_bound: float = 0.001, ) -> pd.DataFrame: required_parameters = list( cls.expected_parameters + cls._extra_parameters + ("computability_min", "computability_max") ) if parameters is not None: if not (mean is None and sd is None): raise ValueError( "You may supply either pre-calculated parameters or" " mean and standard deviation but not both." ) parameters = format_data(parameters, required_parameters, "parameters") else: if mean is None or sd is None: raise ValueError( "You may supply either pre-calculated parameters or" " mean and standard deviation but not both." ) mean, sd = cast_to_series(mean, sd) parameters = pd.DataFrame(0.0, columns=required_parameters, index=mean.index) computable = cls.computable_parameter_index(mean, sd) # The scipy.stats distributions run optimization routines that handle FloatingPointErrors, # transforming them into RuntimeWarnings. This gets noisy in our logs. with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) parameters.loc[ computable, list(cls.expected_parameters + cls._extra_parameters) ] = cls._get_parameters( mean.loc[computable], sd.loc[computable], computability_bound ) parameters.loc[ computable, ["computability_min", "computability_max"] ] = cls.get_computability_bounds(parameters.loc[computable], computability_bound) return parameters
[docs] @staticmethod def computable_parameter_index(mean: pd.Series[Any], sd: pd.Series[Any]) -> pd.Index[Any]: index: pd.Index[Any] = mean[ (mean != 0) & ~np.isnan(mean) & (sd != 0) & ~np.isnan(sd) ].index return index
[docs] @classmethod def get_computability_bounds( cls, parameters: pd.DataFrame, computability_bound: float ) -> pd.DataFrame: kwargs = {parameter: parameters[parameter] for parameter in cls.expected_parameters} compatability_min = cls.distribution(**kwargs).ppf(computability_bound) compatability_max = cls.distribution(**kwargs).ppf(1 - computability_bound) return pd.DataFrame( {"computability_min": compatability_min, "computability_max": compatability_max}, index=parameters.index, )
@staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: raise NotImplementedError()
[docs] def process( self, data: pd.Series[Any], parameters: pd.DataFrame, process_type: str ) -> pd.Series[Any]: """Function called before and after distribution looks to handle pre- and post-processing. This function should look like an if sieve on the `process_type` and fall back with a call to this method if no processing needs to be done. Parameters ---------- data The data to be processed. parameters Parameter data to be used in the processing. process_type One of `pdf_preprocess`, `pdf_postprocess`, `ppf_preprocess`, `ppf_post_process`. Returns ------- pandas.Series The processed data. """ return data
[docs] def pdf(self, x: NumericInput) -> Numeric: single_val = isinstance(x, (float, int)) values_only = isinstance(x, np.ndarray) x, parameters = format_call_data(x, self.parameters) computable = parameters[ (parameters.sum(axis=1) != 0) & ~np.isnan(x) & (parameters["computability_min"] <= x) & (x <= parameters["computability_max"]) ].index x.loc[computable] = self.process( x.loc[computable], parameters.loc[computable], "pdf_preprocess" ) p = pd.Series(np.nan, x.index) if not computable.empty: params = parameters.loc[computable, list(self.expected_parameters)] kwargs = {parameter: params[parameter] for parameter in self.expected_parameters} p.loc[computable] = self.distribution(**kwargs).pdf(x.loc[computable]) p.loc[computable] = self.process( p.loc[computable], parameters.loc[computable], "pdf_postprocess" ) result: Numeric = p if single_val: result = p.iloc[0] if values_only: result = cast("npt.NDArray[Any]", p.values) return result
[docs] def ppf(self, q: NumericInput) -> Numeric: single_val = isinstance(q, (float, int)) values_only = isinstance(q, np.ndarray) q, parameters = format_call_data(q, self.parameters) computable = parameters[ (parameters.sum(axis=1) != 0) & ~np.isnan(q) & (self.computability_bound <= q.to_numpy()) & (q.to_numpy() <= 1 - self.computability_bound) ].index q.loc[computable] = self.process( q.loc[computable], parameters.loc[computable], "ppf_preprocess" ) x = pd.Series(np.nan, q.index) if not computable.empty: params = parameters.loc[computable, list(self.expected_parameters)] kwargs = {parameter: params[parameter] for parameter in self.expected_parameters} x.loc[computable] = self.distribution(**kwargs).ppf(q.loc[computable]) x.loc[computable] = self.process( x.loc[computable], parameters.loc[computable], "ppf_postprocess" ) result: Numeric = x if single_val: result = x.iloc[0] if values_only: result = cast("npt.NDArray[Any]", x.values) return result
[docs] def cdf(self, x: NumericInput) -> Numeric: single_val = isinstance(x, (float, int)) values_only = isinstance(x, np.ndarray) x, parameters = format_call_data(x, self.parameters) computable = parameters[ (parameters.sum(axis=1) != 0) & ~np.isnan(x) & (parameters["computability_min"] <= x) & (x <= parameters["computability_max"]) ].index x.loc[computable] = self.process( x.loc[computable], parameters.loc[computable], "cdf_preprocess" ) c = pd.Series(np.nan, x.index) if not computable.empty: params = parameters.loc[computable, list(self.expected_parameters)] kwargs = {parameter: params[parameter] for parameter in self.expected_parameters} c.loc[computable] = self.distribution(**kwargs).cdf(x.loc[computable]) c.loc[computable] = self.process( c.loc[computable], parameters.loc[computable], "cdf_postprocess" ) result: Numeric = c if single_val: result = c.iloc[0] if values_only: result = cast("npt.NDArray[Any]", c.values) return result
[docs] class MirroredDistribution(BaseDistribution): _extra_parameters = ("mirror_point",) def __init__( self, parameters: Parameters | None = None, mean: Parameter | None = None, sd: Parameter | None = None, computability_bound: float = 0.001, ) -> None: super().__init__(parameters, mean, sd, computability_bound) # When called from EnsembleDistribution.ppf, only parameters are provided. if mean is not None and sd is not None: mean, sd = cast_to_series(mean, sd) self.mirror_point = self.compute_mirror_point(mean, sd, computability_bound) else: self.mirror_point = self.parameters["mirror_point"]
[docs] @staticmethod def compute_mirror_point( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.Series[Any]: """Computes the point around which the distribution is mirrored. NOTE: In the corresponding GBD code, this is called 'x_max'. """ alpha = 1 + sd**2 / mean**2 scale = mean / np.sqrt(alpha) s = np.sqrt(np.log(alpha)) mirror_point: pd.Series[Any] = pd.Series( stats.lognorm(s=s, scale=scale).ppf(1 - computability_bound), index=mean.index, ) return mirror_point
[docs] class Beta(BaseDistribution): distribution = stats.beta expected_parameters = ("a", "b", "scale", "loc")
[docs] @staticmethod def compute_scaling_bounds( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: """Gets the upper and lower bounds of the distribution support.""" # noinspection PyTypeChecker alpha = 1 + sd**2 / mean**2 scale = mean / np.sqrt(alpha) s = np.sqrt(np.log(alpha)) x_min = stats.lognorm(s=s, scale=scale).ppf(computability_bound) x_max = stats.lognorm(s=s, scale=scale).ppf(1 - computability_bound) return pd.DataFrame({"x_min": x_min, "x_max": x_max}, index=mean.index)
@staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: x_bounds = Beta.compute_scaling_bounds(mean, sd, computability_bound) x_min = x_bounds["x_min"] x_max = x_bounds["x_max"] scale = x_max - x_min a = 1 / scale * (mean - x_min) # noinspection PyTypeChecker b = (1 / scale * sd) ** 2 params = pd.DataFrame( { "a": a**2 / b * (1 - a) - a, "b": a / b * (1 - a) ** 2 + (a - 1), "scale": scale, "loc": x_min, }, index=mean.index, ) return params
[docs] class Exponential(BaseDistribution): distribution = stats.expon expected_parameters = ("scale",) @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: return pd.DataFrame({"scale": mean}, index=mean.index)
[docs] class Gamma(BaseDistribution): distribution = stats.gamma expected_parameters = ("a", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: # noinspection PyTypeChecker params = pd.DataFrame( { "a": (mean / sd) ** 2, "scale": sd**2 / mean, }, index=mean.index, ) return params
[docs] class Gumbel(BaseDistribution): distribution = stats.gumbel_r expected_parameters = ("loc", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: params = pd.DataFrame( { "loc": mean - (np.euler_gamma * np.sqrt(6) / np.pi * sd), "scale": np.sqrt(6) / np.pi * sd, }, index=mean.index, ) return params
[docs] class InverseGamma(BaseDistribution): distribution = stats.invgamma expected_parameters = ("a", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: def target_function(guess: npt.NDArray[Any], m: float, s: float) -> float: alpha, beta = np.abs(guess) mean_guess = beta / (alpha - 1) var_guess = beta**2 / ((alpha - 1) ** 2 * (alpha - 2)) return float((m - mean_guess) ** 2 + (s**2 - var_guess) ** 2) opt_results = _get_optimization_result( mean, sd, target_function, lambda m, s: np.array((m, m * s)) ) result_indices = range(len(mean)) if not np.all([opt_results[k].success for k in result_indices]): raise NonConvergenceError("InverseGamma did not converge!!", "invgamma") params = pd.DataFrame( { "a": np.abs([opt_results[k].x[0] for k in result_indices]), "scale": np.abs([opt_results[k].x[1] for k in result_indices]), }, index=mean.index, ) return params
[docs] class InverseWeibull(BaseDistribution): distribution = stats.invweibull expected_parameters = ("c", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: # moments from Stat Papers (2011) 52: 591. https://doi.org/10.1007/s00362-009-0271-3 # it is much faster than using stats.invweibull.mean/var def target_function(guess: npt.NDArray[Any], m: float, s: float) -> float: shape, scale = np.abs(guess) mean_guess = scale * special.gamma(1 - 1 / shape) var_guess = scale**2 * special.gamma(1 - 2 / shape) - mean_guess**2 return float((m - mean_guess) ** 2 + (s**2 - var_guess) ** 2) opt_results = _get_optimization_result( mean, sd, target_function, lambda m, s: np.array((max(2.2, s / m), m)) ) result_indices = range(len(mean)) if not np.all([opt_results[k].success for k in result_indices]): raise NonConvergenceError("InverseWeibull did not converge!!", "invweibull") params = pd.DataFrame( { "c": np.abs([opt_results[k].x[0] for k in result_indices]), "scale": np.abs([opt_results[k].x[1] for k in result_indices]), }, index=mean.index, ) return params
[docs] class LogLogistic(BaseDistribution): distribution = stats.burr12 expected_parameters = ("c", "d", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: def target_function(guess: npt.NDArray[Any], m: float, s: float) -> float: shape, scale = np.abs(guess) b = np.pi / shape mean_guess = scale * b / np.sin(b) var_guess = scale**2 * 2 * b / np.sin(2 * b) - mean_guess**2 return float((m - mean_guess) ** 2 + (s**2 - var_guess) ** 2) opt_results = _get_optimization_result( mean, sd, target_function, lambda m, s: np.array((max(2, m), m)) ) result_indices = range(len(mean)) if not np.all([opt_results[k].success for k in result_indices]): raise NonConvergenceError("LogLogistic did not converge!!", "llogis") params = pd.DataFrame( { "c": np.abs([opt_results[k].x[0] for k in result_indices]), "d": 1, "scale": np.abs([opt_results[k].x[1] for k in result_indices]), }, index=mean.index, ) return params
[docs] class LogNormal(BaseDistribution): distribution = stats.lognorm expected_parameters = ("s", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: # noinspection PyTypeChecker alpha = 1 + sd**2 / mean**2 params = pd.DataFrame( { "s": np.sqrt(np.log(alpha)), "scale": mean / np.sqrt(alpha), }, index=mean.index, ) return params
[docs] class MirroredGumbel(MirroredDistribution): distribution = stats.gumbel_r expected_parameters = ("loc", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: # Persist mirror_point in params so it's available when re-instantiated with only parameters. mirror_point = MirroredGumbel.compute_mirror_point(mean, sd, computability_bound) params = pd.DataFrame( { "loc": mirror_point - mean - (np.euler_gamma * np.sqrt(6) / np.pi * sd), "scale": np.sqrt(6) / np.pi * sd, "mirror_point": mirror_point, }, index=mean.index, ) return params
[docs] def process( self, data: pd.Series[Any], parameters: pd.DataFrame, process_type: str ) -> pd.Series[Any]: if process_type in ["pdf_preprocess", "cdf_preprocess"]: value = self.mirror_point - data elif process_type in ["ppf_preprocess", "cdf_postprocess"]: # noinspection PyTypeChecker value = 1 - data elif process_type == "ppf_postprocess": value = self.mirror_point - data else: value = super().process(data, parameters, process_type) return value
[docs] class MirroredGamma(MirroredDistribution): distribution = stats.gamma expected_parameters = ("a", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: # noinspection PyTypeChecker mirror_point = MirroredGamma.compute_mirror_point(mean, sd, computability_bound) params = pd.DataFrame( { "a": ((mirror_point - mean) / sd) ** 2, "scale": sd**2 / (mirror_point - mean), "mirror_point": mirror_point, }, index=mean.index, ) return params
[docs] def process( self, data: pd.Series[Any], parameters: pd.DataFrame, process_type: str ) -> pd.Series[Any]: if process_type in ["pdf_preprocess", "cdf_preprocess"]: value = self.mirror_point - data elif process_type in ["ppf_preprocess", "cdf_postprocess"]: # noinspection PyTypeChecker value = 1 - data elif process_type == "ppf_postprocess": value = self.mirror_point - data else: value = super().process(data, parameters, process_type) return value
[docs] class Normal(BaseDistribution): distribution = stats.norm expected_parameters = ("loc", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: params = pd.DataFrame( { "loc": mean, "scale": sd, }, mean.index, ) return params
[docs] class Weibull(BaseDistribution): distribution = stats.weibull_min expected_parameters = ("c", "scale") @staticmethod def _get_parameters( mean: pd.Series[Any], sd: pd.Series[Any], computability_bound: float ) -> pd.DataFrame: def target_function(guess: npt.NDArray[Any], m: float, s: float) -> float: shape, scale = np.abs(guess) mean_guess = scale * special.gamma(1 + 1 / shape) var_guess = scale**2 * special.gamma(1 + 2 / shape) - mean_guess**2 return float((m - mean_guess) ** 2 + (s**2 - var_guess) ** 2) opt_results = _get_optimization_result( mean, sd, target_function, lambda m, s: np.array((m, m / s)) ) result_indices = range(len(mean)) if not np.all([opt_results[k].success is True for k in result_indices]): raise NonConvergenceError("Weibull did not converge!!", "weibull") params = pd.DataFrame( { "c": np.abs([opt_results[k].x[0] for k in result_indices]), "scale": np.abs([opt_results[k].x[1] for k in result_indices]), }, index=mean.index, ) return params
[docs] class EnsembleDistribution: """Represents an arbitrary distribution as a weighted sum of several concrete distribution types.""" _distribution_map = { "betasr": Beta, "exp": Exponential, "gamma": Gamma, "gumbel": Gumbel, "invgamma": InverseGamma, "invweibull": InverseWeibull, "llogis": LogLogistic, "lnorm": LogNormal, "mgamma": MirroredGamma, "mgumbel": MirroredGumbel, "norm": Normal, "weibull": Weibull, } def __init__( self, weights: Parameters, parameters: dict[str, Parameters] | None = None, mean: Parameter | None = None, sd: Parameter | None = None, computability_bound: float = 0.001, ) -> None: self.weights, self.parameters = self.get_parameters( weights, parameters, mean, sd, computability_bound )
[docs] @classmethod def get_parameters( cls, weights: Parameters, parameters: dict[str, Parameters] | None = None, mean: Parameter | None = None, sd: Parameter | None = None, computability_bound: float = 0.001, ) -> tuple[pd.DataFrame, dict[str, pd.DataFrame]]: expected_columns = list(cls._distribution_map.keys()) weights = cls.fill_missing_weights(weights, expected_columns) weights = format_data(weights, expected_columns, "weights") params: dict[str, pd.DataFrame] = {} for name, dist in cls._distribution_map.items(): try: param = parameters[name] if parameters else None params[name] = dist.get_parameters(param, mean, sd, computability_bound) except NonConvergenceError: if weights[name].max() < 0.05: weights.loc[name, :] = 0 else: raise NonConvergenceError( f"Divergent {name} distribution has " f"weights: {100 * weights[name]}%", name, ) # Rescale weights in case we floored any of them: non_zero_rows = weights[weights.sum(axis=1) != 0] # pandas-stubs' _LocIndexerFrame.__setitem__ accepts list/mask/slice keys but # not a bare Index; .loc[Index] = DataFrame is valid at runtime. weights is a # DataFrame here (from format_data above). weights.loc[non_zero_rows.index] = non_zero_rows.divide( # type: ignore[call-overload] non_zero_rows.sum(axis=1), axis=0 ) return weights, params
[docs] @classmethod def get_expected_parameters(cls, distribution_name: str) -> list[str]: """Get the expected parameters for a given distribution in the ensemble.""" return [ *cls._distribution_map[distribution_name].expected_parameters, "x_min", "x_max", ]
[docs] @staticmethod def fill_missing_weights(weights: Parameters, expected_columns: list[str]) -> Parameters: weights = copy.deepcopy(weights) # Get existing keys/columns/index based on weights type if isinstance(weights, dict): column_names = set(weights.keys()) elif isinstance(weights, pd.DataFrame): column_names = set(weights.columns) elif isinstance(weights, pd.Series): column_names = set(weights.index) else: column_names = None # For list, tuple, np.array, we can't fill missing columns # Add missing columns with 0.0 value. ``column_names`` is only populated in # the dict/DataFrame/Series branches above, so when it is truthy ``weights`` # is one of those indexable types; cast documents that for the assignment. if column_names and column_names < set(expected_columns): indexable = cast("dict[str, Any] | pd.DataFrame | pd.Series[Any]", weights) for col in expected_columns: if col not in column_names: indexable[col] = 0.0 return weights
[docs] def pdf(self, x: NumericInput) -> Numeric: single_val = isinstance(x, (float, int)) values_only = isinstance(x, np.ndarray) x, weights = format_call_data(x, self.weights) computable = weights[(weights.sum(axis=1) != 0) & ~np.isnan(x)].index p = pd.Series(np.nan, index=x.index) if not computable.empty: p.loc[computable] = 0 for name, parameters in self.parameters.items(): w = weights.loc[computable, name] params = parameters.loc[computable] if len(parameters) > 1 else parameters p += w * self._distribution_map[name](parameters=params).pdf( x.loc[computable] ) result: Numeric = p if single_val: result = p.iloc[0] if values_only: result = cast("npt.NDArray[Any]", p.values) return result
[docs] def ppf( self, q: NumericInput, q_dist: NumericInput, ) -> Numeric: """Quantile function using 2 propensities. Parameters --------- q : value propensity q_dist : propensity for picking the distribution Returns -------- Union[pandas.Series, numpy.ndarray, float] Sample from the ensembled distribution. """ single_val = isinstance(q, (float, int)) values_only = isinstance(q, np.ndarray) q, weights = format_call_data(q, self.weights) q_dist, _ = format_call_data(q_dist, self.weights) computable = weights[(weights.sum(axis=1) != 0) & ~np.isnan(q)].index x = pd.Series(np.nan, index=q.index) if not computable.empty: p_bins = np.cumsum(weights.loc[computable], axis=1) choice_index = (q_dist.loc[computable].values[np.newaxis].T > p_bins).sum(axis=1) x.loc[computable] = 0 idx_lookup = {v: i for i, v in enumerate(weights.columns)} for name, parameters in self.parameters.items(): idx = choice_index[choice_index == idx_lookup[name]].index if len(idx): params = ( parameters.loc[computable.intersection(idx)] if len(parameters) > 1 else parameters ) x[idx] = self._distribution_map[name](parameters=params).ppf(q[idx]) result: Numeric = x if single_val: result = x.iloc[0] if values_only: result = cast("npt.NDArray[Any]", x.values) return result
[docs] def cdf(self, x: NumericInput) -> Numeric: single_val = isinstance(x, (float, int)) values_only = isinstance(x, np.ndarray) x, weights = format_call_data(x, self.weights) computable = weights[(weights.sum(axis=1) != 0) & ~np.isnan(x)].index c = pd.Series(np.nan, index=x.index) c.loc[computable] = 0 if not computable.empty: for name, parameters in self.parameters.items(): w = weights.loc[computable, name] params = parameters.loc[computable] if len(parameters) > 1 else parameters c += w * self._distribution_map[name](parameters=params).cdf( x.loc[computable] ) result: Numeric = c if single_val: result = c.iloc[0] if values_only: result = cast("npt.NDArray[Any]", c.values) return result
[docs] class NonConvergenceError(Exception): """Raised when the optimization fails to converge""" def __init__(self, message: str, dist: str) -> None: super().__init__(message) self.dist = dist
def _get_optimization_result( mean: pd.Series[Any], sd: pd.Series[Any], func: Callable[[npt.NDArray[Any], float, float], float], initial_func: Callable[[float, float], npt.NDArray[Any]], ) -> tuple[Any, ...]: """Finds the shape parameters of distributions which generates mean/sd close to actual mean/sd. Parameters --------- mean : Series where each row has a mean for a single distribution, matches with sd. sd : Series where each row has a standard deviation for a single distribution, matches with mean. func: The optimization objective function. Takes arguments `initial guess`, `mean`, and `standard_deviation`. initial_func: Function to produce initial guess from a `mean` and `standard_deviation`. Returns -------- A tuple of the optimization results. """ mean_values, sd_values = mean.values, sd.values results = [] with np.errstate(all="warn"): for i in range(len(mean_values)): initial_guess = initial_func(mean_values[i], sd_values[i]) result = optimize.minimize( func, initial_guess, ( mean_values[i], sd_values[i], ), method="Nelder-Mead", options={"maxiter": 10000}, ) results.append(result) return tuple(results)