https://github.com/chill90/BoloCalc
Raw File
Tip revision: 5b5fe53b155ebb7d1750975b0baa64bc7c3a04fb authored by Charles Hill on 17 September 2020, 07:23:00 UTC
Handling for updated atmosphere files
Tip revision: 5b5fe53
parameter.py
# Built-in modules
import numpy as np
import copy as cp

# BoloCalc modules
import src.unit as un
import src.distribution as ds


class Parameter:
    """
    Parameter object contains attributes for input and output parameters
    and is at the core of parameter handling for BoloCalc.
    It can take either a StandardParam input or 'raw' inputs.

    Args:
    log (src.Log): parent Log object
    inp (str or src.Distribution): parameter value(s)
    std_param (src.StandardParam): parent StandardParameter object,
    whose attributes override all following parameters, except for 'band_ids'
    name (str): parameter name
    unit (src.Unit): parameter unit. Defaults to src.Unit('NA')
    min (float): minimum allowed value. Defaults to None
    max (float): maximum allowe value. Defaults to None
    inp_type (type): cast parameter data type. Defaults to numpy.float

    Attributes
    name (str): where the 'name' arg is stored
    unit (src.Unit): where the 'unit' arg is stored

    Parents:
    log (src.Log): Logging object
    std_param (src.StandardParam): StandardParameter object
    """

    def __init__(self, log, inp, std_param=None, name=None, unit=None,
                 min=None, max=None, inp_type=float, band_ids=None):
        # Store passed arguments
        self._log = log
        # If a StandardParam object is passed, use its attributes
        if std_param is not None:
            self.name = std_param.name
            self.caps_name = std_param.caps_name
            self.unit = std_param.unit
            if self.unit is not None:
                if std_param.min is not None:
                    self._min = self.unit.to_SI(std_param.min)
                else:
                    self._min = std_param.min
                if std_param.max is not None:
                    self._max = self.unit.to_SI(std_param.max)
                else:
                    self._max = std_param.max
            else:
                if std_param.unit is not None:
                    self._min = std_param.unit.to_SI(std_param.min)
                    self._max = std_param.unit.to_SI(std_param.max)
                else:
                    self._min = std_param.min
                    self._max = std_param.max
            self._type = std_param.type
        # Otherwise, store the passed attributes
        else:
            self.name = name
            self.caps_name = self.name.replace(" ", "").strip().upper()
            # Store passed unit, otherwise assume "NA"
            if unit is not None:
                self.unit = unit
            else:
                self.unit = un.Unit("NA")
            # Store min and convert it to SI units
            if min is not None:
                if self.unit is not None:
                    self._min = self.unit.to_SI(float(min))
                else:
                    self._min = float(min)
            else:
                self._min = None
            # Store max and convert it to SI units
            if max is not None:
                if self.unit is not None:
                    self._max = self.unit.to_SI(float(max))
                else:
                    self._max = float(max)
            else:
                self._max = None
            self._type = inp_type
        # For storing parameters of type [m1, m2, ...] +/- [s1, s2, ...]
        self._band_ids = band_ids

        # Spread delimiter
        self._spread_delim = '+/-'
        # Allowed parameter string values when input type is 'float'
        self._float_str_vals = ["NA", "BAND"]

        # Store the parameter value, mean, and standard deviation
        self._store_param(inp)

    # ***** Public Methods *****
    def fetch(self, band_ind=None):
        """
        Return (avg, med, std) given a band_id, or return (val)

        Args:
        band_id (str): band ID for multi-band parameters
        band_ind (int): band index for indexing over arrays
        of multi-band parameters
        """
        if self._val is not None and self._avg is None:
            return (self._val, self._val, 'NA')
        if self._mult_bands:
            if band_ind is not None:
                return (self._avg[band_ind],
                        self._med[band_ind],
                        self._std[band_ind])
            else:
                return (self._avg,
                        self._med,
                        self._std)
        else:
            return (self._avg,
                    self._med,
                    self._std)

    def change(self, new_avg, band_ind=None, num_bands=None):
        """
        Change self._avg to new_avg for band_id or band_ind

        Args:
        new_avg (int or list): new value to be set
        band_id (str): band ID for multi-band parameters
        band_ind (int): band index for indexing over arrays
        of multi-band parameters

        Return 'True' if avg value was altered, 'False' if not
        """
        # Check that multiple bands are defined if a band_id is passed
        if band_ind is not None and num_bands is not None:
            if band_ind > num_bands:
                self._log.err(
                    "Passed band index '%s' for changing parameter "
                    "'%s' not compatible with total number of bands '%s'"
                    % (str(band_ind), self.name, str(num_bands)))
        # Bool to return indicating whether or not parameter changed
        ret_bool = False
        # Set parameter to a new string
        if isinstance(new_avg, str) or isinstance(new_avg, np.string_):
            ret_bool = self._change_str(new_avg, band_ind, num_bands)
        # Set parameter to a new float
        elif isinstance(new_avg, float) or isinstance(new_avg, np.float_):
            ret_bool = self._change_float(new_avg, band_ind, num_bands)
        else:
            self._log.err(
                "Could not change parameter '%s' to value '%s' of type '%s'"
                % (str(self.name), str(new_avg), str(type(new_avg))))
        return ret_bool

    def get_val(self):
        """ Return the input value """
        return self._val

    def get_avg(self, band_ind=None):
        """
        Return average value for band_id or band_ind

        Args:
        band_ind (int): band index for indexing over arrays
        of multi-band parameters
        """
        return self.fetch(band_ind)[0]

    def get_med(self, band_ind=None):
        """
        Return average value for band_id

        Args:
        band_ind (int): band index for indexing over arrays
        of multi-band parameters
        """
        return self.fetch(band_ind)[1]

    def get_std(self, band_ind=None):
        """
        Return standard deviation for band_id

        Args:
        band_ind (int): band index for indexing over arrays
        of multi-band parameters
        """
        return self.fetch(band_ind)[2]

    def sample(self, band_ind=None, nsample=1,
               min=None, max=None, null=False):
        """
        Sample parameter distribution for band_id nsample times
        and return the sampled values in an array if nsample > 1
        or as a float if nsample = 1.

        Args:
        band_ind (int): band index for indexing over arrays
        of multi-band parameters
        nsample (int): number of samples to draw from distribution
        min (float): the minimum allowed value to be returned
        max (float): the maximum allowed value to be returned
        null (bool): whether to sample around zero
        """
        # If min and max not explicitly passed, use constructor values
        if min is None:
            min = self._min
        if max is None:
            max = self._max
        # If this parameter is a distribution, just sample it
        if isinstance(self._val, ds.Distribution):
            samp = self._float(self._val.sample(nsample=nsample))
            # Check that the sampled value doesn't surpasse the max or min
            if min is not None and samp < min:
                return min
            if max is not None and samp > max:
                return max
            return samp
        # Retrieve the mean, median, and std for this band
        vals = self.fetch(band_ind)
        avg = vals[0]
        std = vals[2]
        # If avg is 'NA' or 'BAND', return said string
        if str(avg).strip().upper() in self._float_str_vals and not null:
            return str(avg).strip().upper()
        # If std is 'NA' or 'BAND', return avg
        elif str(std).strip().upper() in self._float_str_vals:
            return avg
        # Otherwise, sample the Gaussian described by mean +/- std
        else:
            # If the user calls for a null sampling, set avg to zero
            if null:
                samp_avg = 0.
            else:
                samp_avg = avg
            # If std is zero (or less than), return the average value
            if str(std) == "NA" or np.any(std <= 0.):
                return samp_avg
            elif nsample == 1:
                samp = np.random.normal(samp_avg, std, nsample)[0]
            else:
                samp = np.random.normal(samp_avg, std, nsample)
            # Check that the sampled value doesn't surpasse the max or min
            if min is not None and samp < min:
                return min
            if max is not None and samp > max:
                return max
            return samp

    # ***** Helper Methods *****
    def _store_param(self, inp):
        """ Store input parameter """
        # Bools only passed from simulationInputs.txt
        if self._type is bool:
            self._store_bool(inp)
        # Ints passed for num_det, num_ot, etc.
        elif self._type is int:
            self._store_int(inp)
        # Floats passed for mean +/- std values
        # This is most common
        elif self._type is float:
            self._store_float(inp)
        # Strs passed for, for example, Site
        elif self._type is str:
            self._store_str(inp)
        # List passed for simulationInputs CL values
        elif self._type is list:
            self._store_list(inp)
        else:
            self._log.err(
                "Passed paramter '%s' not one of allowed data types: \
                bool, float, int, str, list" % (self.name))
        return True

    def _store_bool(self, inp):
        """ Store input boolean parameter """
        self._mult_bands = False
        val = inp.lower().capitalize().strip()
        if val != "True" and val != "False":
            self._log.err(
                "Failed to parse boolean input '%s'" % (inp))
        self._val = eval(val)
        self._avg = None
        self._med = None
        self._std = None
        return

    def _store_int(self, inp):
        """ Store input integer parameter """
        self._mult_bands = False
        try:
            self._val = None
            avg = int(inp)
            self._avg = int(self._check_range(avg))
            self._med = self._avg
            self._std = 0.
        except ValueError:
            self._log.err(
                    "Parameter '%s' with value '%s' cannot be type "
                    "casted to int" % (self.name, str(inp)))
        return

    def _store_float(self, inp):
        """ Store input float parameter """
        # If input is a string, then one of the following
        # '[m1, m2, ...] +/- [s1, s2, ...]' or 'm +/- s' or 'm'
        if isinstance(inp, str):
            self._store_float_str(inp)
        # If input distribution, then simply store it
        elif isinstance(inp, ds.Distribution):
            self._store_float_dist(inp)
        # If input tuple, we are dealing with an optic, which
        # may have distributions for any frequency channel
        # (param_str, dist_dict)
        # param_str is assumed to be identical to what's handled for a string
        # e.g. '[m1, m2, ...] +/- [s1, s2, ...]' or 'm +/- s' or 'm'
        # dist_dict is assumed to be a dictionary of possible dist objs
        # keyed by Band ID
        elif isinstance(inp, tuple):
            self._store_float_tuple(inp)
        # If none of the above, throw an error
        else:
            self._log.err(
                "Problem handling Paramter '%s' of input type 'float.' "
                "Input value not a string, src.Distribution, or tuple"
                % (self.name))
        return

    def _store_float_str(self, inp):
        """ Store input float parameter that is a string """
        # Check for the spread format
        # [m1, m2, ...] +/- [s1, s2, ...]
        # m +/- s
        if self._spread_delim in inp:
            self._val = None
            vals = inp.split(self._spread_delim)
            avg = self._float(vals[0])
            if isinstance(avg, np.ndarray):
                self._mult_bands = True
            else:
                self._mult_bands = False
            self._avg = self._check_range(avg)
            self._med = self._avg
            self._std = self._float(vals[1])
        # Otherwise, no spread, and therefore no std
        else:
            self._val = None
            avg = self._float(inp)
            if isinstance(avg, np.ndarray):
                self._mult_bands = True
            else:
                self._mult_bands = False
            self._avg = self._check_range(avg)
            self._med = self._avg
            if (isinstance(self._avg, str) or
               isinstance(self._avg, np.str_)):
                self._std = 0.
            else:
                self._std = self._zero(self._avg)
        return

    def _store_float_dist(self, inp):
        """ Store an input float parameter that is a Distribution object """
        self._mult_bands = False
        self._val = inp
        self._avg = self._float(inp.mean())
        self._med = self._float(inp.median())
        self._std = self._float(inp.std())
        return

    def _store_float_tuple(self, inp):
        """ Store an input float parameter that is a tuple """
        # Presumed format (param_str, dict_dist), which only comes about for
        # optics distributions
        if len(inp) != 2:
            self._log.err(
                "More or less than two elements in float_tuple '%s' in for "
                "Parameter '%s' "
                % (str(inp), self.name))
        param_str = inp[0]
        dist_dict = inp[1]
        # If there is no distribution dictionary passed, then process
        # the input string as normal
        if dist_dict is None:
            self._store_float_str(param_str)
            return
        # Otherwise, there is a PDF to be processed
        pdf_conv_dict = {'PDF': str('PDF')}  # dict for helping eval()
        # Split the input string using +/-
        if self._spread_delim in param_str:
            vals = param_str.split(self._spread_delim)
            # Each should evaluate to a list, float, or string
            mean_val = eval(vals[0], pdf_conv_dict)
            std_val = eval(vals[1], pdf_conv_dict)
        else:
            # Should evaluate to list, float, or string
            mean_val = eval(param_str, pdf_conv_dict)
            std_val = None
        # Figure out which bands want to use a PDF
        # Is the input a list of values? If so, then multiple bands
        if isinstance(mean_val, list):
            self._mult_bands = True
            # Std also needs to be a list
            if std_val is not None and not isinstance(std_val, list):
                self._log.err(
                    "Mean values '%s' a list for optic paramter "
                    "'%s' but std values '%s' not a list"
                    % (str(mean_val), self.name, str(std_val)))
            # The std list needs to be the same length as the mean list
            if std_val is not None and len(mean_val) != len(std_val):
                self._log.err(
                    "Mean value list '%s' and std value list '%s' "
                    "for optic parameter '%s' not the same length"
                    % (str(mean_val), str(std_val), self.name))
            # Channel values stored in channel dict order
            self._val = []
            self._avg = []
            self._med = []
            self._std = []
            # Loop over mean values, checking for distributions
            for i, mv in enumerate(mean_val):
                if std_val is not None:
                    sv = std_val[i]
                else:
                    sv = 0.
                # If mean value is 'PDF', find its distribution in the
                # passed distribution dictionary
                if 'PDF' in str(mv).upper():
                    if self._band_ids is None:
                        self._log.err(
                            "'PDF' found in Parameter '%s' but no band_ids "
                            "passed to Parameter() constructor" % (self.name))
                    # Idenfity the distribution using the Band ID
                    band_id = self._band_ids[i].upper()
                    try:
                        dist = dist_dict[band_id]
                    except KeyError:
                        self._log.err(
                            "Could not find Distribution for optic "
                            "paramter '%s' and Band ID '%s'"
                            % (self.name, band_id))
                    self._val.append(dist)
                    self._avg.append(dist.mean())
                    self._med.append(dist.median())
                    self._std.append(dist.std())
                else:
                    mv = self._check_range(float(mv))
                    self._val.append(None)
                    self._avg.append(float(mv))
                    self._med.append(float(mv))
                    self._std.append(float(sv))
        # If the mean value is a string, then the value covers all bands
        elif isinstance(mean_val, str):
            self._mult_bands = False
            # Check if the parameter is given by a PDF
            if 'PDF' in str(mean_val).upper():
                # Single value named 'PDF' is assumed to define
                # a parameter distribution for all bands
                try:
                    dist = dist_dict['ALL']
                except KeyError:
                    self._log.err(
                        "Could not find Distribution for optic "
                        "paramter '%s' and Band ID '%s'"
                        % (self.name, band_id))
                self._val = dist
                self._avg = dist.mean()
                self._med = dist.median()
                self._std = dist.std()
            else:
                self._log.err(
                    "Could not understand mean value '%s' for Parameter '%s'"
                    % (self.name))
        # If the mean value is a float or an int, then simply store the
        # values straight away
        elif isinstance(mean_val, float) or isinstance(mean_val, int):
            self._mult_bands = False
            self._val = None
            avg = self._check_range(mean_val)
            self._avg = avg
            self._med = avg
            if std_val is not None:
                if (not isinstance(std_val, float) or
                   not isinstance(std_val, int)):
                    self._log.err(
                        "Std value '%s' for Parameter '%s' is not a float "
                        "or int even though its mean value '%s' is"
                        % (str(std_val), self.name, str(mean_val)))
                self._std = std_val
            else:
                self._std = 0.
        else:
            self._log.err(
                "Could characterize mean value '%s' for Parameter '%s'"
                % (str(mean_val), self.name))
        return

    def _store_list(self, inp):
        """ Store an input list parameter """
        self._mult_bands = False
        self._val = eval(inp)
        if type(self._val) is not list:
            self._log.err(
                "Parameter '%s' with value '%s' cannot be type "
                "casted to list" % (self.name, str(inp)))
        self._avg = None
        self._med = None
        self._std = None
        return

    def _store_str(self, inp):
        """ Store an input string parameter """
        self._mult_bands = False
        if isinstance(inp, str):
            self._val = inp
        # For optic params, the input is a tuple. However, if this is a string
        # the tuple is presumed to be ('string', None)
        else:
            self._val = str(inp).split(',')[0].strip(" ()''")
        self._avg = None
        self._med = None
        self._std = None
        return

    def _float(self, val):
        """ Convert val to an array of or single float(s) """
        # If the passed value is None, return it right back
        if val is None:
            return None
        # Try to convert the value to a float. If successful,
        # convert to SI units and return
        try:
            float_val = float(val)
            return self.unit.to_SI(float_val)
        # If unable to convert to a float...
        except ValueError:
            # Try to convert to a numpy array of floats. If successful,
            # convert to an array of SI unit floats and return
            try:
                arr_val = np.array(eval(val)).astype(float)
                return self.unit.to_SI(arr_val)
            # If that fails, look for a special string
            except NameError:
                # The final option is to accpet either "BAND" or "NA"
                ret = str(val).strip().upper()
                if ret in self._float_str_vals:
                    return ret
                # Otherwise throw an error
                else:
                    self._log.err(
                        "Passed parameter '%s' with value '%s' cannot be type "
                        "casted to float" % (self.name, str(val)))
        return

    def _zero(self, val):
        """Convert val to an array of or single zero(s)"""
        if (isinstance(val, str) or isinstance(val, np.str_)):
            return 0.
        elif isinstance(val, list) or isinstance(val, np.ndarray):
            return np.zeros(len(val))
        else:
            return 0.

    def _check_range(self, val):
        if isinstance(val, list) or isinstance(val, np.ndarray):
            val = np.array(
                [self._check_range_val(v) for v in val])
        else:
            val = self._check_range_val(val)
        return val

    def _check_range_val(self, val):
        # Simply return the value if it is not a float
        try:
            val = float(val)
        except ValueError:
            return val
        # Return the minimum value if below it
        if self._min is not None and val < self._min:
            self._log.log(
                "Sampled/stored value for '%s' Parameter '%s' below minimum "
                "allowed value '%s'. Forcing to min"
                % (str(val), str(self.name), str(self._min)))
            return self._min
        # Return the maximum value if below it
        elif self._max is not None and val > self._max:
            self._log.log(
                "Sampled/stored value for '%s' Parameter '%s' above maximum "
                "allowed value '%s'. Forcing to max"
                % (str(val), str(self.name), str(self._max)))
            return self._max
        else:
            return val

    def _sig_figs(self, inp, sig):
        """ Return an input with a specified number of sig figs """
        if inp == 0:
            return inp
        else:
            return round(inp, sig-int(np.floor(np.log10(abs(inp))))-1)

    def _is_empty(self, band_id=None, band_ind=None):
        """ Check if a parameter average is defined """
        if self._mult_bands:
            if band_id is not None:
                band_ind = self._band_ids.index(str(band_id))
            if str(self._avg[band_ind]).upper() in self._float_str_vals:
                return True
            else:
                return False
        else:
            if str(self._avg).upper() in self._float_str_vals:
                return True
            else:
                return False

    def _change_str(self, new_avg, band_ind=None, num_bands=None):
        """ Change string parameter to a new value """
        avg_new = new_avg
        # If multiple bands are already set, just change the value
        if band_ind is not None and self._mult_bands:
            if self._avg[band_ind].upper() != avg_new.upper():
                self._avg[band_ind] = avg_new
                self._med[band_ind] = self._avg[band_ind]
                ret_bool = True
            else:
                ret_bool = False
        # If multiple bands aren't defined, we can define them now
        elif band_ind is not None and not self._mult_bands:
            avg_old = self._avg
            self._avg = [avg_new if (i == band_ind) else avg_old
                         for i in range(int(num_bands))]
            self._med = self._avg
            self._std = [self._std for i in range(int(num_bands))]
            self._mult_bands = True
            if avg_new.upper() == avg_old.upper():
                ret_bool = False
            else:
                ret_bool = True
        # Otherwise handle the scenario of a single value
        else:
            if self._avg.upper() != avg_new.upper():
                self._avg = avg_new
                ret_bool = True
            else:
                ret_bool = False
        return ret_bool

    def _change_float(self, new_avg, band_ind=None, num_bands=None):
        """ Change a float parmaeter to a new value """
        avg_new = self._float(new_avg)
        # If multiple bands are already set, just change the value
        if band_ind is not None and self._mult_bands:
            # If this value is empty, simply store it
            if self._is_empty(band_ind=band_ind):
                self._avg[band_ind] = avg_new
                self._med[band_ind] = self._avg[band_ind]
                ret_bool = True
            # If this value is a distribution, adjust it
            elif (self._val is not None and
                  isinstance(self._val[band_ind], ds.Distribution)):
                self._val[band_ind].change(avg_new)
                self._avg[band_ind] = self._val[band_ind].mean()
                self._med[band_ind] = self._val[band_ind].median()
                self._std[band_ind] = self._val[band_ind].std()
                ret_bool = True
            # Otherwise, simply set a new value
            elif (self._sig_figs(avg_new, 5) !=
                  self._sig_figs(self._avg[band_ind], 5)):
                self._avg[band_ind] = avg_new
                self._med[band_ind] = self._avg[band_ind]
                ret_bool = True
            else:
                ret_bool = False
        # If multiple bands aren't defined, we can define them now
        elif band_ind is not None and not self._mult_bands:
            if isinstance(self._val, ds.Distribution):
                old_val = self._val
                old_avg = old_val.mean()
                new_val = cp.deepcopy(self._val).change(avg_new)
                new_avg = new_val.mean()
            else:
                old_val = self._val
                old_avg = self._avg
                new_val = old_val   # val is always 'None' for floats
                new_avg = avg_new
            self._val = [new_val if i == band_ind else old_val
                         for i in range(int(num_bands))]
            self._avg = [new_avg if i == band_ind else old_avg
                         for i in range(int(num_bands))]
            self._med = self._avg
            self._std = [self._std for i in range(int(num_bands))]
            self._mult_bands = True
            # Check to see if anything has changed
            if str(old_avg).upper() in self._float_str_vals:
                ret_bool = True
            elif (self._sig_figs(new_avg, 5) !=
                  self._sig_figs(old_avg, 5)):
                ret_bool = True
            else:
                ret_bool = False
        # Otherwise handle the scenario of a single value
        else:
            if self._avg == "NA":
                self._avg = avg_new
                self._med = self._avg
                ret_bool = True
            elif isinstance(self._val, ds.Distribution):
                self._val.change(avg_new)
                self._avg = self._val.mean()
                self._med = self._val.median()
                self._std = self._val.std()
                ret_bool = True
            elif (self._sig_figs(avg_new, 5) !=
                  self._sig_figs(self._avg, 5)):
                self._avg = avg_new
                self._med = self._avg
                ret_bool = True
            else:
                ret_bool = False
        return ret_bool
back to top