https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: ae3e84eb61ccca323172c656490b018073c95293 authored by Bryna Hazelton on 25 May 2023, 16:32:56 UTC
update changelog for v2.3.3
Tip revision: ae3e84e
fhd_cal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD calibration save files."""

import os
import warnings

import numpy as np
from scipy.io import readsav

from .. import utils as uvutils
from ..uvdata.fhd import get_fhd_history, get_fhd_layout_info
from .uvcal import UVCal, _future_array_shapes_warning

__all__ = ["FHDCal"]


class FHDCal(UVCal):
    """
    Defines a FHD-specific subclass of UVCal for reading FHD calibration save files.

    This class should not be interacted with directly, instead use the read_fhd_cal
    method on the UVCal class.

    """

    def read_fhd_cal(
        self,
        cal_file,
        obs_file,
        layout_file=None,
        settings_file=None,
        raw=True,
        read_data=True,
        background_lsts=True,
        extra_history=None,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
        use_future_array_shapes=False,
    ):
        """
        Read data from an FHD cal.sav file.

        Parameters
        ----------
        cal_file : str
            The cal.sav file to read from.
        obs_file : str
            The obs.sav file to read from.
        layout_file : str
            The FHD layout file. Required for antenna_positions to be set.
        settings_file : str, optional
            The settings_file to read from. Optional, but very useful for provenance.
        raw : bool
            Option to use the raw (per antenna, per frequency) solution or
            to use the fitted (polynomial over phase/amplitude) solution.
            Default is True (meaning use the raw solutions).
        read_data : bool
            Read in the gains, quality array and flag data. If set to False, only
            the metadata will be read in. Setting read_data to False results in
            a metadata only object. If read_data is False, a settings file must be
            provided.
        background_lsts : bool
            When set to True, the lst_array is calculated in a background thread.
        extra_history : str or list of str, optional
            String(s) to add to the object's history parameter.
        run_check : bool
            Option to check for the existence and proper shapes of
            parameters after reading in the file.
        check_extra : bool
            Option to check optional parameters as well as required ones.
        run_check_acceptability : bool
            Option to check acceptable range of the values of
            parameters after reading in the file.
        use_future_array_shapes : bool
            Option to convert to the future planned array shapes before the changes go
            into effect by removing the spectral window axis.

        """
        if not read_data and settings_file is None:
            raise ValueError("A settings_file must be provided if read_data is False.")

        filelist = [cal_file, obs_file]
        if layout_file is not None:
            filelist.append(layout_file)
        if settings_file is not None:
            filelist.append(settings_file)
        for filename in filelist:
            # update filelist
            basename = os.path.basename(filename)
            self.filename = uvutils._combine_filenames(self.filename, [basename])
            self._filename.form = (len(self.filename),)

        this_dict = readsav(obs_file, python_dict=True)
        obs_data = this_dict["obs"]
        bl_info = obs_data["BASELINE_INFO"][0]

        self.Nspws = 1
        self.spw_array = np.array([0])

        self.Nfreqs = int(obs_data["N_FREQ"][0])
        self.freq_array = np.zeros((1, len(bl_info["FREQ"][0])), dtype=np.float64)
        self.freq_array[0, :] = bl_info["FREQ"][0]
        self.channel_width = float(obs_data["FREQ_RES"][0])

        self.flex_spw_id_array = np.zeros(self.Nfreqs, dtype=int)

        # FHD only calculates one calibration over all the times.
        # obs_data.n_time /cal_data.n_times gives the number of times that goes into
        # that one calibration, UVCal.Ntimes gives the number of separate calibrations
        # along the time axis.
        self.Ntimes = 1
        time_array = bl_info["jdate"][0]

        self.time_array = np.array([np.mean(time_array)])

        # this is generated in FHD by subtracting the JD of neighboring
        # integrations. This can have limited accuracy, so it can be slightly
        # off the actual value.
        # (e.g. 1.999426... rather than 2)
        time_res = obs_data["TIME_RES"][0]
        # time_res is constrained to be a scalar currently
        self.integration_time = np.float64(time_res)

        # array of used frequencies (1: used, 0: flagged)
        freq_use = bl_info["freq_use"][0]
        # array of used antennas (1: used, 0: flagged)
        ant_use = bl_info["tile_use"][0]
        # array of used times (1: used, 0: flagged)
        time_use = bl_info["time_use"][0]

        time_array_use = time_array[np.where(time_use > 0)]
        self.time_range = np.asarray([np.min(time_array_use), np.max(time_array_use)])

        self.telescope_name = obs_data["instrument"][0].decode("utf8")
        latitude = np.deg2rad(float(obs_data["LAT"][0]))
        longitude = np.deg2rad(float(obs_data["LON"][0]))
        altitude = float(obs_data["ALT"][0])

        # get the stuff FHD read from the antenna table (in layout file)
        if layout_file is not None:
            obs_tile_names = [
                ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
            ]
            if self.telescope_name.lower() == "mwa":
                obs_tile_names = [
                    "Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
                    for ant in obs_tile_names
                ]

            layout_param_dict = get_fhd_layout_info(
                layout_file,
                self.telescope_name,
                latitude,
                longitude,
                altitude,
                self._lst_array.tols,
                self._telescope_location.tols,
                obs_tile_names,
                run_check_acceptability=True,
            )

            layout_params_to_ignore = [
                "gst0",
                "rdate",
                "earth_omega",
                "dut1",
                "timesys",
                "diameters",
            ]
            for key, value in layout_param_dict.items():
                if key not in layout_params_to_ignore:
                    setattr(self, key, value)

        else:
            warnings.warn(
                "No layout file, antenna_postions will not be defined "
                "and antenna_names might be incorrect."
            )

            self.telescope_location_lat_lon_alt = (latitude, longitude, altitude)
            # FHD stores antenna numbers, not names, in the "TILE_NAMES" field
            self.antenna_names = [
                ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
            ]
            self.antenna_numbers = np.array([int(ant) for ant in self.antenna_names])
            if self.telescope_name.lower() == "mwa":
                self.antenna_names = [
                    "Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
                    for ant in self.antenna_names
                ]
            self.Nants_telescope = len(self.antenna_names)

        self.antenna_names = np.asarray(self.antenna_names)

        try:
            self.set_telescope_params()
        except ValueError as ve:
            warnings.warn(str(ve))

        # need to make sure telescope location is defined properly before this call
        if self.telescope_location is not None:
            proc = self.set_lsts_from_time_array(background=background_lsts)

        self._set_sky()
        self.sky_field = "phase center (RA, Dec): ({ra}, {dec})".format(
            ra=obs_data["orig_phasera"][0], dec=obs_data["orig_phasedec"][0]
        )
        self.gain_convention = "divide"
        self.x_orientation = "east"

        self._set_gain()

        # currently don't have branch info. may change in future.
        self.git_origin_cal = "https://github.com/EoRImaging/FHD"
        self.git_hash_cal = obs_data["code_version"][0].decode("utf8")

        if "DELAYS" in obs_data.dtype.names:
            if obs_data["delays"][0] is not None:
                self.extra_keywords["delays"] = (
                    "[" + ", ".join(str(int(d)) for d in obs_data["delays"][0]) + "]"
                )
        if settings_file is not None:
            self.history, self.observer = get_fhd_history(
                settings_file, return_user=True
            )
        else:
            warnings.warn("No settings file, history will be incomplete")
            self.history = ""

        if extra_history is not None:
            if isinstance(extra_history, (list, tuple)):
                self.history += "\n" + "\n".join(extra_history)
            else:
                self.history += "\n" + extra_history

        if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
            if self.history.endswith("\n"):
                self.history += self.pyuvdata_version_str
            else:
                self.history += "\n" + self.pyuvdata_version_str

        if not read_data:
            n_pols = int(obs_data["N_POL"])
            # FHD only has the diagonal elements (jxx, jyy), so limit to 2
            self.Njones = int(np.min([n_pols, 2]))

            # for calibration FHD includes all antennas in the antenna table,
            # regardless of whether or not they have data
            self.Nants_data = len(self.antenna_names)

            # get details from settings file
            keywords = [
                "ref_antenna_name",
                "catalog_name",
                "n_sources",
                "min_cal_baseline",
                "max_cal_baseline",
                "galaxy_model",
                "diffuse_model",
                "auto_scale",
                "n_vis_cal",
                "time_avg",
                "conv_thresh",
            ]
            if not raw:
                keywords += [
                    "polyfit",
                    "bandpass",
                    "mode_fit",
                    "amp_degree",
                    "phase_degree",
                ]

            settings_lines = {}
            with open(settings_file, "r") as read_obj:
                cal_start = False
                for line in read_obj:
                    if not cal_start:
                        if line.startswith("##CAL"):
                            cal_start = True
                    else:
                        if line.startswith("##"):
                            break
                        # in cal structure section
                        for kw in keywords:
                            if line.strip().startswith(kw.upper()):
                                settings_lines[kw] = line.split()[1:]
            self.ref_antenna_name = settings_lines["ref_antenna_name"][0]
            self.Nsources = int(settings_lines["n_sources"][0])
            self.sky_catalog = settings_lines["catalog_name"][0]
            self.baseline_range = [
                float(settings_lines["min_cal_baseline"][0]),
                float(settings_lines["max_cal_baseline"][0]),
            ]
            galaxy_model = int(settings_lines["galaxy_model"][0])
            if len(settings_lines["diffuse_model"]) > 0:
                diffuse_model = settings_lines["diffuse_model"][0]
            else:
                diffuse_model = ""
            auto_scale = settings_lines["auto_scale"]
            n_vis_cal = np.int64(settings_lines["n_vis_cal"][0])
            time_avg = int(settings_lines["time_avg"][0])
            conv_thresh = float(settings_lines["conv_thresh"][0])

            if not raw:
                polyfit = int(settings_lines["polyfit"][0])
                bandpass = int(settings_lines["bandpass"][0])
                mode_fit = settings_lines["mode_fit"]
                # for some reason, it's a float if it's one value,
                # and integers otherwise
                if len(mode_fit) == 1:
                    mode_fit = float(mode_fit[0])
                else:
                    mode_fit = np.array(mode_fit, dtype=np.int64)

                amp_degree = int(settings_lines["amp_degree"][0])
                phase_degree = int(settings_lines["phase_degree"][0])

        else:
            this_dict = readsav(cal_file, python_dict=True)
            cal_data = this_dict["cal"]
            self.Njones = int(cal_data["n_pol"][0])
            self.Nants_data = int(cal_data["n_tile"][0])

            self.sky_catalog = cal_data["skymodel"][0]["catalog_name"][0].decode("utf8")
            self.ref_antenna_name = (
                cal_data["ref_antenna_name"][0].decode("utf8").strip()
            )
            self.Nsources = int(cal_data["skymodel"][0]["n_sources"][0])
            self.baseline_range = [
                float(cal_data["min_cal_baseline"][0]),
                float(cal_data["max_cal_baseline"][0]),
            ]

            galaxy_model = cal_data["skymodel"][0]["galaxy_model"][0]
            diffuse_model = cal_data["skymodel"][0]["diffuse_model"][0]

            auto_scale = cal_data["auto_scale"][0]
            n_vis_cal = cal_data["n_vis_cal"][0]
            time_avg = cal_data["time_avg"][0]
            conv_thresh = cal_data["conv_thresh"][0]

            if not raw:
                polyfit = cal_data["polyfit"][0]
                bandpass = cal_data["bandpass"][0]
                mode_fit = cal_data["mode_fit"][0]
                amp_degree = cal_data["amp_degree"][0]
                phase_degree = cal_data["phase_degree"][0]

            # Now read data like arrays
            fit_gain_array_in = cal_data["gain"][0]
            fit_gain_array = np.zeros(
                self._gain_array.expected_shape(self), dtype=np.complex128
            )
            for jones_i, arr in enumerate(fit_gain_array_in):
                fit_gain_array[:, 0, :, 0, jones_i] = arr
            if raw:
                res_gain_array_in = cal_data["gain_residual"][0]
                res_gain_array = np.zeros(
                    self._gain_array.expected_shape(self), dtype=np.complex128
                )
                for jones_i, arr in enumerate(res_gain_array_in):
                    res_gain_array[:, 0, :, 0, jones_i] = arr
                self.gain_array = fit_gain_array + res_gain_array
            else:
                self.gain_array = fit_gain_array

            # FHD doesn't really have a chi^2 measure. What is has is a convergence
            # measure. The solution converged well if this is less than the convergence
            # threshold ('conv_thresh' in extra_keywords).
            self.quality_array = np.zeros_like(self.gain_array, dtype=np.float64)
            convergence = cal_data["convergence"][0]
            for jones_i, arr in enumerate(convergence):
                self.quality_array[:, 0, :, 0, jones_i] = arr

            # Currently this can't include the times because the flag array
            # dimensions has to match the gain array dimensions.
            # This is somewhat artificial...
            self.flag_array = np.zeros_like(self.gain_array, dtype=np.bool_)
            flagged_ants = np.where(ant_use == 0)[0]
            for ant in flagged_ants:
                self.flag_array[ant, :] = 1
            flagged_freqs = np.where(freq_use == 0)[0]
            for freq in flagged_freqs:
                self.flag_array[:, :, freq] = 1

        if self.telescope_name.lower() == "mwa":
            self.ref_antenna_name = (
                "Tile" + "0" * (3 - len(self.ref_antenna_name)) + self.ref_antenna_name
            )
        # In Python 3, we sometimes get Unicode, sometimes bytes
        # doesn't reliably show up in tests though, so excluding it from coverage
        if isinstance(galaxy_model, bytes):  # pragma: nocover
            galaxy_model = galaxy_model.decode("utf8")
        if galaxy_model == 0:
            galaxy_model = None
        else:
            galaxy_model = "gsm"

        if isinstance(diffuse_model, bytes):
            diffuse_model = diffuse_model.decode("utf8")
        if diffuse_model == "":
            diffuse_model = None
        else:
            diffuse_model = os.path.basename(diffuse_model)

        if galaxy_model is not None:
            if diffuse_model is not None:
                self.diffuse_model = galaxy_model + " + " + diffuse_model
            else:
                self.diffuse_model = galaxy_model
        elif diffuse_model is not None:
            self.diffuse_model = diffuse_model

        # FHD only has the diagonal elements (jxx, jyy) and if there's only one
        # present it must be jxx
        if self.Njones == 1:
            self.jones_array = np.array([-5])
        else:
            self.jones_array = np.array([-5, -6])

        # for calibration FHD creates gain array of shape (Nfreqs, Nants_telescope)
        # rather than (Nfreqs, Nants_data). This means the antenna array will
        # contain all antennas in the antenna table instead of only those
        # which had data in the original uvfits file
        self.ant_array = self.antenna_numbers

        self.extra_keywords["autoscal".upper()] = (
            "[" + ", ".join(str(d) for d in auto_scale) + "]"
        )
        self.extra_keywords["nvis_cal".upper()] = n_vis_cal
        self.extra_keywords["time_avg".upper()] = time_avg
        self.extra_keywords["cvgthres".upper()] = conv_thresh

        if not raw:
            self.extra_keywords["polyfit".upper()] = polyfit
            self.extra_keywords["bandpass".upper()] = bandpass
            if isinstance(mode_fit, (list, tuple, np.ndarray)):
                self.extra_keywords["mode_fit".upper()] = (
                    "[" + ", ".join(str(m) for m in mode_fit) + "]"
                )
            else:
                self.extra_keywords["mode_fit".upper()] = mode_fit
            self.extra_keywords["amp_deg".upper()] = amp_degree
            self.extra_keywords["phse_deg".upper()] = phase_degree

        # wait for LSTs if set in background
        if proc is not None:
            proc.join()

        if use_future_array_shapes:
            self.use_future_array_shapes()
        else:
            warnings.warn(_future_array_shapes_warning, DeprecationWarning)

        if run_check:
            self.check(
                check_extra=check_extra, run_check_acceptability=run_check_acceptability
            )
back to top