https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: e194908e541bcdc6b28e0e898ad5c6a8341d3572 authored by Bryna Hazelton on 21 March 2024, 21:20:46 UTC
got select on read working on the baseline axis
Tip revision: e194908
fhd_cal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD calibration save files."""

import os
import warnings

import numpy as np
from docstring_parser import DocstringStyle
from scipy.io import readsav

from .. import utils as uvutils
from ..docstrings import copy_replace_short_description
from ..uvdata.fhd import get_fhd_history, get_fhd_layout_info
from .uvcal import UVCal, _future_array_shapes_warning

__all__ = ["FHDCal"]


class FHDCal(UVCal):
    """
    Defines a FHD-specific subclass of UVCal for reading FHD calibration save files.

    This class should not be interacted with directly, instead use the read_fhd_cal
    method on the UVCal class.

    """

    @copy_replace_short_description(UVCal.read_fhd_cal, style=DocstringStyle.NUMPYDOC)
    def read_fhd_cal(
        self,
        cal_file,
        obs_file,
        layout_file=None,
        settings_file=None,
        raw=True,
        read_data=True,
        background_lsts=True,
        extra_history=None,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
        use_future_array_shapes=False,
        astrometry_library=None,
    ):
        """Read data from an FHD cal.sav file."""
        if not read_data and settings_file is None:
            raise ValueError("A settings_file must be provided if read_data is False.")

        filelist = [cal_file, obs_file]
        if layout_file is not None:
            filelist.append(layout_file)
        if settings_file is not None:
            filelist.append(settings_file)
        for filename in filelist:
            # update filelist
            basename = os.path.basename(filename)
            self.filename = uvutils._combine_filenames(self.filename, [basename])
            self._filename.form = (len(self.filename),)

        this_dict = readsav(obs_file, python_dict=True)
        obs_data = this_dict["obs"]
        bl_info = obs_data["BASELINE_INFO"][0]

        self.Nspws = 1
        self.spw_array = np.array([0])

        self.Nfreqs = int(obs_data["N_FREQ"][0])
        self.freq_array = np.zeros((1, len(bl_info["FREQ"][0])), dtype=np.float64)
        self.freq_array[0, :] = bl_info["FREQ"][0]
        self.channel_width = float(obs_data["FREQ_RES"][0])

        self.flex_spw_id_array = np.zeros(self.Nfreqs, dtype=int)

        # FHD only calculates one calibration over all the times.
        # obs_data.n_time /cal_data.n_times gives the number of times that goes into
        # that one calibration, UVCal.Ntimes gives the number of separate calibrations
        # along the time axis.
        self.Ntimes = 1
        time_array = bl_info["jdate"][0]

        self.time_array = np.array([np.mean(time_array)])

        # this is generated in FHD by subtracting the JD of neighboring
        # integrations. This can have limited accuracy, so it can be slightly
        # off the actual value.
        # (e.g. 1.999426... rather than 2)
        time_res = obs_data["TIME_RES"][0]
        # time_res is constrained to be a scalar currently
        self.integration_time = np.float64(time_res)

        # array of used frequencies (1: used, 0: flagged)
        freq_use = bl_info["freq_use"][0]
        # array of used antennas (1: used, 0: flagged)
        ant_use = bl_info["tile_use"][0]
        # array of used times (1: used, 0: flagged)
        time_use = bl_info["time_use"][0]

        time_array_use = time_array[np.where(time_use > 0)]
        self.time_range = np.asarray([np.min(time_array_use), np.max(time_array_use)])

        self.telescope_name = obs_data["instrument"][0].decode("utf8")
        latitude = np.deg2rad(float(obs_data["LAT"][0]))
        longitude = np.deg2rad(float(obs_data["LON"][0]))
        altitude = float(obs_data["ALT"][0])

        # get the stuff FHD read from the antenna table (in layout file)
        if layout_file is not None:
            obs_tile_names = [
                ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
            ]
            if self.telescope_name.lower() == "mwa":
                obs_tile_names = [
                    "Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
                    for ant in obs_tile_names
                ]

            layout_param_dict = get_fhd_layout_info(
                layout_file,
                self.telescope_name,
                latitude,
                longitude,
                altitude,
                self._lst_array.tols,
                self._telescope_location.tols,
                obs_tile_names,
                run_check_acceptability=True,
            )

            layout_params_to_ignore = [
                "gst0",
                "rdate",
                "earth_omega",
                "dut1",
                "timesys",
                "diameters",
            ]
            for key, value in layout_param_dict.items():
                if key not in layout_params_to_ignore:
                    setattr(self, key, value)

        else:
            warnings.warn(
                "No layout file, antenna_postions will not be defined "
                "and antenna_names might be incorrect."
            )

            self.telescope_location_lat_lon_alt = (latitude, longitude, altitude)
            # FHD stores antenna numbers, not names, in the "TILE_NAMES" field
            self.antenna_names = [
                ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
            ]
            self.antenna_numbers = np.array([int(ant) for ant in self.antenna_names])
            if self.telescope_name.lower() == "mwa":
                self.antenna_names = [
                    "Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
                    for ant in self.antenna_names
                ]
            self.Nants_telescope = len(self.antenna_names)

        self.antenna_names = np.asarray(self.antenna_names)

        try:
            self.set_telescope_params()
        except ValueError as ve:
            warnings.warn(str(ve))

        # need to make sure telescope location is defined properly before this call
        if self.telescope_location is not None:
            proc = self.set_lsts_from_time_array(
                background=background_lsts, astrometry_library=astrometry_library
            )

        self._set_sky()
        self.gain_convention = "divide"
        self.x_orientation = "east"

        self._set_gain()

        # currently don't have branch info. may change in future.
        self.git_origin_cal = "https://github.com/EoRImaging/FHD"
        self.git_hash_cal = obs_data["code_version"][0].decode("utf8")

        if "DELAYS" in obs_data.dtype.names:
            if obs_data["delays"][0] is not None:
                self.extra_keywords["delays"] = (
                    "[" + ", ".join(str(int(d)) for d in obs_data["delays"][0]) + "]"
                )
        if settings_file is not None:
            self.history, self.observer = get_fhd_history(
                settings_file, return_user=True
            )
        else:
            warnings.warn("No settings file, history will be incomplete")
            self.history = ""

        if extra_history is not None:
            if isinstance(extra_history, (list, tuple)):
                self.history += "\n" + "\n".join(extra_history)
            else:
                self.history += "\n" + extra_history

        if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
            if self.history.endswith("\n"):
                self.history += self.pyuvdata_version_str
            else:
                self.history += "\n" + self.pyuvdata_version_str

        if not read_data:
            n_pols = int(obs_data["N_POL"][0])
            # FHD only has the diagonal elements (jxx, jyy), so limit to 2
            self.Njones = int(np.min([n_pols, 2]))

            # for calibration FHD includes all antennas in the antenna table,
            # regardless of whether or not they have data
            self.Nants_data = len(self.antenna_names)

            # get details from settings file
            keywords = [
                "ref_antenna_name",
                "catalog_name",
                "n_sources",
                "min_cal_baseline",
                "max_cal_baseline",
                "galaxy_model",
                "diffuse_model",
                "auto_scale",
                "n_vis_cal",
                "time_avg",
                "conv_thresh",
            ]
            if not raw:
                keywords += [
                    "polyfit",
                    "bandpass",
                    "mode_fit",
                    "amp_degree",
                    "phase_degree",
                ]

            settings_lines = {}
            with open(settings_file, "r") as read_obj:
                cal_start = False
                for line in read_obj:
                    if not cal_start:
                        if line.startswith("##CAL"):
                            cal_start = True
                    else:
                        if line.startswith("##"):
                            break
                        # in cal structure section
                        for kw in keywords:
                            if line.strip().startswith(kw.upper()):
                                settings_lines[kw] = line.split()[1:]
            self.ref_antenna_name = settings_lines["ref_antenna_name"][0]
            self.Nsources = int(settings_lines["n_sources"][0])
            self.sky_catalog = settings_lines["catalog_name"][0]
            self.baseline_range = [
                float(settings_lines["min_cal_baseline"][0]),
                float(settings_lines["max_cal_baseline"][0]),
            ]
            galaxy_model = int(settings_lines["galaxy_model"][0])
            if len(settings_lines["diffuse_model"]) > 0:
                diffuse_model = settings_lines["diffuse_model"][0]
            else:
                diffuse_model = ""
            auto_scale = settings_lines["auto_scale"]
            n_vis_cal = np.int64(settings_lines["n_vis_cal"][0])
            time_avg = int(settings_lines["time_avg"][0])
            conv_thresh = float(settings_lines["conv_thresh"][0])

            if not raw:
                polyfit = int(settings_lines["polyfit"][0])
                bandpass = int(settings_lines["bandpass"][0])
                mode_fit = settings_lines["mode_fit"]
                # for some reason, it's a float if it's one value,
                # and integers otherwise
                if len(mode_fit) == 1:
                    mode_fit = float(mode_fit[0])
                else:
                    mode_fit = np.array(mode_fit, dtype=np.int64)

                amp_degree = int(settings_lines["amp_degree"][0])
                phase_degree = int(settings_lines["phase_degree"][0])

        else:
            this_dict = readsav(cal_file, python_dict=True)
            cal_data = this_dict["cal"]
            self.Njones = int(cal_data["n_pol"][0])
            self.Nants_data = int(cal_data["n_tile"][0])

            self.sky_catalog = cal_data["skymodel"][0]["catalog_name"][0].decode("utf8")
            self.ref_antenna_name = (
                cal_data["ref_antenna_name"][0].decode("utf8").strip()
            )
            self.Nsources = int(cal_data["skymodel"][0]["n_sources"][0])
            self.baseline_range = [
                float(cal_data["min_cal_baseline"][0]),
                float(cal_data["max_cal_baseline"][0]),
            ]

            galaxy_model = cal_data["skymodel"][0]["galaxy_model"][0]
            diffuse_model = cal_data["skymodel"][0]["diffuse_model"][0]

            auto_scale = cal_data["auto_scale"][0]
            n_vis_cal = cal_data["n_vis_cal"][0]
            time_avg = cal_data["time_avg"][0]
            conv_thresh = cal_data["conv_thresh"][0]

            if not raw:
                polyfit = cal_data["polyfit"][0]
                bandpass = cal_data["bandpass"][0]
                mode_fit = cal_data["mode_fit"][0]
                amp_degree = cal_data["amp_degree"][0]
                phase_degree = cal_data["phase_degree"][0]

            # Now read data like arrays
            fit_gain_array_in = cal_data["gain"][0]
            fit_gain_array = np.zeros(
                self._gain_array.expected_shape(self), dtype=np.complex128
            )
            for jones_i, arr in enumerate(fit_gain_array_in):
                fit_gain_array[:, 0, :, 0, jones_i] = arr
            if raw:
                res_gain_array_in = cal_data["gain_residual"][0]
                res_gain_array = np.zeros(
                    self._gain_array.expected_shape(self), dtype=np.complex128
                )
                for jones_i, arr in enumerate(res_gain_array_in):
                    res_gain_array[:, 0, :, 0, jones_i] = arr
                self.gain_array = fit_gain_array + res_gain_array
            else:
                self.gain_array = fit_gain_array

            # FHD doesn't really have a chi^2 measure. What is has is a convergence
            # measure. The solution converged well if this is less than the convergence
            # threshold ('conv_thresh' in extra_keywords).
            self.quality_array = np.zeros_like(self.gain_array, dtype=np.float64)
            convergence = cal_data["convergence"][0]
            for jones_i, arr in enumerate(convergence):
                self.quality_array[:, 0, :, 0, jones_i] = arr

            # Currently this can't include the times because the flag array
            # dimensions has to match the gain array dimensions.
            # This is somewhat artificial...
            self.flag_array = np.zeros_like(self.gain_array, dtype=np.bool_)
            flagged_ants = np.where(ant_use == 0)[0]
            for ant in flagged_ants:
                self.flag_array[ant, :] = 1
            flagged_freqs = np.where(freq_use == 0)[0]
            for freq in flagged_freqs:
                self.flag_array[:, :, freq] = 1

        if self.telescope_name.lower() == "mwa":
            self.ref_antenna_name = (
                "Tile" + "0" * (3 - len(self.ref_antenna_name)) + self.ref_antenna_name
            )
        # In Python 3, we sometimes get Unicode, sometimes bytes
        # doesn't reliably show up in tests though, so excluding it from coverage
        if isinstance(galaxy_model, bytes):  # pragma: nocover
            galaxy_model = galaxy_model.decode("utf8")
        if galaxy_model == 0:
            galaxy_model = None
        else:
            galaxy_model = "gsm"

        if isinstance(diffuse_model, bytes):
            diffuse_model = diffuse_model.decode("utf8")
        if diffuse_model == "":
            diffuse_model = None
        else:
            diffuse_model = os.path.basename(diffuse_model)

        if galaxy_model is not None:
            if diffuse_model is not None:
                self.diffuse_model = galaxy_model + " + " + diffuse_model
            else:
                self.diffuse_model = galaxy_model
        elif diffuse_model is not None:
            self.diffuse_model = diffuse_model

        # FHD only has the diagonal elements (jxx, jyy) and if there's only one
        # present it must be jxx
        if self.Njones == 1:
            self.jones_array = np.array([-5])
        else:
            self.jones_array = np.array([-5, -6])

        # for calibration FHD creates gain array of shape (Nfreqs, Nants_telescope)
        # rather than (Nfreqs, Nants_data). This means the antenna array will
        # contain all antennas in the antenna table instead of only those
        # which had data in the original uvfits file
        self.ant_array = self.antenna_numbers

        self.extra_keywords["autoscal".upper()] = (
            "[" + ", ".join(str(d) for d in auto_scale) + "]"
        )
        self.extra_keywords["nvis_cal".upper()] = n_vis_cal
        self.extra_keywords["time_avg".upper()] = time_avg
        self.extra_keywords["cvgthres".upper()] = conv_thresh

        if not raw:
            self.extra_keywords["polyfit".upper()] = polyfit
            self.extra_keywords["bandpass".upper()] = bandpass
            if isinstance(mode_fit, (list, tuple, np.ndarray)):
                self.extra_keywords["mode_fit".upper()] = (
                    "[" + ", ".join(str(m) for m in mode_fit) + "]"
                )
            else:
                self.extra_keywords["mode_fit".upper()] = mode_fit
            self.extra_keywords["amp_deg".upper()] = amp_degree
            self.extra_keywords["phse_deg".upper()] = phase_degree

        # wait for LSTs if set in background
        if proc is not None:
            proc.join()

        if use_future_array_shapes:
            self.use_future_array_shapes()
        else:
            warnings.warn(_future_array_shapes_warning, DeprecationWarning)

        if run_check:
            self.check(
                check_extra=check_extra, run_check_acceptability=run_check_acceptability
            )
back to top