https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: 0b1b091eeec941c6081327cfd1d65f689feafd8c authored by Paul La Plante on 08 August 2020, 00:16 UTC
Merge branch 'Smithsonian-submillimeter_array' into master
Tip revision: 0b1b091
fhd_cal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading FHD calibration save files."""

import os
import numpy as np
import warnings
from scipy.io.idl import readsav

from .uvcal import UVCal
from .. import utils as uvutils
from ..uvdata.fhd import get_fhd_history

__all__ = ["FHDCal"]


class FHDCal(UVCal):
    """
    Defines a FHD-specific subclass of UVCal for reading FHD calibration save files.

    This class should not be interacted with directly, instead use the read_fhd_cal
    method on the UVCal class.

    """

    def read_fhd_cal(
        self,
        cal_file,
        obs_file,
        settings_file=None,
        raw=True,
        extra_history=None,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
    ):
        """
        Read data from an FHD cal.sav file.

        Parameters
        ----------
        cal_file : str
            The cal.sav file to read from.
        obs_file : str
            The obs.sav file to read from.
        settings_file : str, optional
            The settings_file to read from. Optional, but very useful for provenance.
        raw : bool
            Option to use the raw (per antenna, per frequency) solution or
            to use the fitted (polynomial over phase/amplitude) solution.
            Default is True (meaning use the raw solutions).
        extra_history : str or list of str, optional
            String(s) to add to the object's history parameter.
        run_check : bool
            Option to check for the existence and proper shapes of
            parameters after reading in the file.
        check_extra : bool
            Option to check optional parameters as well as required ones.
        run_check_acceptability : bool
            Option to check acceptable range of the values of
            parameters after reading in the file.

        """
        this_dict = readsav(cal_file, python_dict=True)
        cal_data = this_dict["cal"]

        this_dict = readsav(obs_file, python_dict=True)
        obs_data = this_dict["obs"]

        self.Nspws = 1
        self.spw_array = np.array([0])

        self.Nfreqs = int(cal_data["n_freq"][0])
        self.freq_array = np.zeros(
            (self.Nspws, len(cal_data["freq"][0])), dtype=np.float_
        )
        self.freq_array[0, :] = cal_data["freq"][0]
        self.channel_width = float(np.mean(np.diff(self.freq_array)))

        # FHD only calculates one calibration over all the times.
        # cal_data.n_times gives the number of times that goes into that one
        # calibration, UVCal.Ntimes gives the number of separate calibrations
        # along the time axis.
        self.Ntimes = 1
        time_array = obs_data["baseline_info"][0]["jdate"][0]
        self.integration_time = np.round(np.mean(np.diff(time_array)) * 24 * 3600, 2)
        self.time_array = np.array([np.mean(time_array)])

        self.Njones = int(cal_data["n_pol"][0])
        # FHD only has the diagonal elements (jxx, jyy) and if there's only one
        # present it must be jxx
        if self.Njones == 1:
            self.jones_array = np.array([-5])
        else:
            self.jones_array = np.array([-5, -6])

        self.telescope_name = obs_data["instrument"][0].decode("utf8")

        self.Nants_data = int(cal_data["n_tile"][0])
        self.Nants_telescope = int(cal_data["n_tile"][0])
        self.antenna_names = np.array(
            [n.decode("utf8") for n in cal_data["tile_names"][0].tolist()]
        )
        self.antenna_numbers = np.arange(self.Nants_telescope)
        self.ant_array = np.arange(self.Nants_data)

        self._set_sky()
        self.sky_field = "phase center (RA, Dec): ({ra}, {dec})".format(
            ra=obs_data["orig_phasera"][0], dec=obs_data["orig_phasedec"][0]
        )
        self.sky_catalog = cal_data["skymodel"][0]["catalog_name"][0].decode("utf8")
        self.ref_antenna_name = cal_data["ref_antenna_name"][0].decode("utf8")
        self.Nsources = int(cal_data["skymodel"][0]["n_sources"][0])
        self.baseline_range = [
            float(cal_data["min_cal_baseline"][0]),
            float(cal_data["max_cal_baseline"][0]),
        ]

        galaxy_model = cal_data["skymodel"][0]["galaxy_model"][0]
        # In Python 3, we sometimes get Unicode, sometimes bytes
        if isinstance(galaxy_model, bytes):
            galaxy_model = galaxy_model.decode("utf8")
        if galaxy_model == 0:
            galaxy_model = None
        else:
            galaxy_model = "gsm"

        diffuse_model = cal_data["skymodel"][0]["diffuse_model"][0]
        if isinstance(diffuse_model, bytes):
            diffuse_model = diffuse_model.decode("utf8")
        if diffuse_model == "":
            diffuse_model = None
        else:
            diffuse_model = os.path.basename(diffuse_model)

        if galaxy_model is not None:
            if diffuse_model is not None:
                self.diffuse_model = galaxy_model + " + " + diffuse_model
            else:
                self.diffuse_model = galaxy_model
        elif diffuse_model is not None:
            self.diffuse_model = diffuse_model

        self.gain_convention = "divide"
        self.x_orientation = "east"

        self._set_gain()
        fit_gain_array_in = cal_data["gain"][0]
        fit_gain_array = np.zeros(
            self._gain_array.expected_shape(self), dtype=np.complex_
        )
        for jones_i, arr in enumerate(fit_gain_array_in):
            fit_gain_array[:, 0, :, 0, jones_i] = arr
        if raw:
            res_gain_array_in = cal_data["gain_residual"][0]
            res_gain_array = np.zeros(
                self._gain_array.expected_shape(self), dtype=np.complex_
            )
            for jones_i, arr in enumerate(res_gain_array_in):
                res_gain_array[:, 0, :, 0, jones_i] = arr
            self.gain_array = fit_gain_array + res_gain_array
        else:
            self.gain_array = fit_gain_array

        # FHD doesn't really have a chi^2 measure. What is has is a convergence measure.
        # The solution converged well if this is less than the convergence
        # threshold ('conv_thresh' in extra_keywords).
        self.quality_array = np.zeros_like(self.gain_array, dtype=np.float)
        convergence = cal_data["convergence"][0]
        for jones_i, arr in enumerate(convergence):
            self.quality_array[:, 0, :, 0, jones_i] = arr

        # array of used frequencies (1: used, 0: flagged)
        freq_use = obs_data["baseline_info"][0]["freq_use"][0]
        # array of used antennas (1: used, 0: flagged)
        ant_use = obs_data["baseline_info"][0]["tile_use"][0]
        # array of used times (1: used, 0: flagged)
        time_use = obs_data["baseline_info"][0]["time_use"][0]

        time_array_use = time_array[np.where(time_use > 0)]
        self.time_range = [np.min(time_array_use), np.max(time_array_use)]

        # Currently this can't include the times because the flag array
        # dimensions has to match the gain array dimensions.
        # This is somewhat artificial...
        self.flag_array = np.zeros_like(self.gain_array, dtype=np.bool)
        flagged_ants = np.where(ant_use == 0)[0]
        for ant in flagged_ants:
            self.flag_array[ant, :] = 1
        flagged_freqs = np.where(freq_use == 0)[0]
        for freq in flagged_freqs:
            self.flag_array[:, :, freq] = 1

        # currently don't have branch info. may change in future.
        self.git_origin_cal = "https://github.com/EoRImaging/FHD"
        self.git_hash_cal = obs_data["code_version"][0].decode("utf8")

        self.extra_keywords["autoscal"] = (
            "[" + ", ".join(str(d) for d in cal_data["auto_scale"][0]) + "]"
        )
        self.extra_keywords["nvis_cal"] = cal_data["n_vis_cal"][0]
        self.extra_keywords["time_avg"] = cal_data["time_avg"][0]
        self.extra_keywords["cvgthres"] = cal_data["conv_thresh"][0]
        if "DELAYS" in obs_data.dtype.names:
            if obs_data["delays"][0] is not None:
                self.extra_keywords["delays"] = (
                    "[" + ", ".join(str(int(d)) for d in obs_data["delays"][0]) + "]"
                )

        if not raw:
            self.extra_keywords["polyfit"] = cal_data["polyfit"][0]
            self.extra_keywords["bandpass"] = cal_data["bandpass"][0]
            self.extra_keywords["mode_fit"] = cal_data["mode_fit"][0]
            self.extra_keywords["amp_deg"] = cal_data["amp_degree"][0]
            self.extra_keywords["phse_deg"] = cal_data["phase_degree"][0]

        if settings_file is not None:
            self.history, self.observer = get_fhd_history(
                settings_file, return_user=True
            )
        else:
            warnings.warn("No settings file, history will be incomplete")
            self.history = ""

        if extra_history is not None:
            if isinstance(extra_history, (list, tuple)):
                self.history += "\n" + "\n".join(extra_history)
            else:
                self.history += "\n" + extra_history

        if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
            if self.history.endswith("\n"):
                self.history += self.pyuvdata_version_str
            else:
                self.history += "\n" + self.pyuvdata_version_str

        if run_check:
            self.check(
                check_extra=check_extra, run_check_acceptability=run_check_acceptability
            )
back to top