https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: a791f16817631a29e28371ea8d41c17dec5ba721 authored by Bryna Hazelton on 08 February 2024, 19:32:24 UTC
fix warnings filtering
Tip revision: a791f16
fhd.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Class for reading FHD save files."""
import os
import warnings

import numpy as np
from astropy import constants as const
from docstring_parser import DocstringStyle
from scipy.io import readsav

from .. import telescopes as uvtel
from .. import utils as uvutils
from ..docstrings import copy_replace_short_description
from .uvdata import UVData, _future_array_shapes_warning

__all__ = ["get_fhd_history", "get_fhd_layout_info", "FHD"]


def get_fhd_history(settings_file, return_user=False):
    """
    Small function to get the important history from an FHD settings text file.

    Includes information about the command line call, the user, machine name and date

    Parameters
    ----------
    settings_file : str
        FHD settings file name
    return_user : bool
        optionally return the username who ran FHD

    Returns
    -------
    history : str
        string of history extracted from the settings file
    user : str
        Only returned if return_user is True

    """
    with open(settings_file, "r") as f:
        settings_lines = f.readlines()
    main_loc = None
    command_loc = None
    obs_loc = None
    user_line = None
    for ind, line in enumerate(settings_lines):
        if line.startswith("##MAIN"):
            main_loc = ind
        if line.startswith("##COMMAND_LINE"):
            command_loc = ind
        if line.startswith("##OBS"):
            obs_loc = ind
        if line.startswith("User"):
            user_line = ind
        if (
            main_loc is not None
            and command_loc is not None
            and obs_loc is not None
            and user_line is not None
        ):
            break

    main_lines = settings_lines[main_loc + 1 : command_loc]
    command_lines = settings_lines[command_loc + 1 : obs_loc]
    history_lines = ["FHD history\n"] + main_lines + command_lines
    for ind, line in enumerate(history_lines):
        history_lines[ind] = line.rstrip().replace("\t", " ")
    history = "\n".join(history_lines)
    user = settings_lines[user_line].split()[1]

    if return_user:
        return history, user
    else:
        return history


def _xyz_close(xyz1, xyz2, loc_tols):
    return np.allclose(xyz1, xyz2, rtol=loc_tols[0], atol=loc_tols[1])


def _latlonalt_close(latlonalt1, latlonalt2, radian_tol, loc_tols):
    latlon_close = np.allclose(
        np.array(latlonalt1[0:2]), np.array(latlonalt2[0:2]), rtol=0, atol=radian_tol
    )
    alt_close = np.isclose(
        latlonalt1[2], latlonalt2[2], rtol=loc_tols[0], atol=loc_tols[1]
    )
    return latlon_close and alt_close


def get_fhd_layout_info(
    layout_file,
    telescope_name,
    latitude,
    longitude,
    altitude,
    radian_tol,
    loc_tols,
    obs_tile_names,
    run_check_acceptability=True,
):
    """
    Get the telescope and antenna positions from an FHD layout file.

    Parameters
    ----------
    layout_file : str
        FHD layout file name
    telescope_name : str
        Telescope name
    latitude : float
        telescope latitude in radians
    longitude : float
        telescope longitude in radians
    altitude : float
        telescope altitude in meters
    loc_tols : float
        telescope_location tolerance in meters.
    radian_tols : float
        lat/lon tolerance in radians.
    obs_tile_names : array-like of str
        Tile names from the bl_info structure inside the obs structure.
        Only used if telescope_name is "mwa".
    run_check_acceptability : bool
        Option to check acceptable range of the telescope locations.

    Returns
    -------
    dict
        A dictionary of parameters from the layout file to assign to the object. The
        keys are:

        * telescope_xyz : Telescope location in ECEF, shape (3,) (float)
        * Nants_telescope : Number of antennas in the telescope (int)
        * antenna_postions : Antenna positions in relative ECEF,
            shape (Nants_telescope, 3) (float)
        * antenna_names : Antenna names, length Nants_telescope (list of str)
        * antenna_numbers : Antenna numbers, shape (Nants_telescope,) (array of int)
        * gst0 : Greenwich sidereal time at midnight on reference date. (float)
        * earth_omega : Earth's rotation rate in degrees per day. (float)
        * dut1 : DUT1 (google it) AIPS 117 calls it UT1UTC. (float)
        * timesys : Time system (should only ever be UTC). (str)
        * diameters : Antenna diameters in meters. shape (Nants_telescope,) (float)
        * extra_keywords : Dictionary of extra keywords to preserve on the object.

    """
    layout_dict = readsav(layout_file, python_dict=True)
    layout = layout_dict["layout"]

    layout_fields = [name.lower() for name in layout.dtype.names]
    # Try to get the telescope location from the layout file &
    # compare it to the position from the obs structure.
    arr_center = layout["array_center"][0]
    layout_fields.remove("array_center")

    xyz_telescope_frame = layout["coordinate_frame"][0].decode("utf8").lower()
    layout_fields.remove("coordinate_frame")

    if xyz_telescope_frame.strip() == "itrf":
        # compare to lat/lon/alt
        location_latlonalt = uvutils.XYZ_from_LatLonAlt(latitude, longitude, altitude)
        latlonalt_arr_center = uvutils.LatLonAlt_from_XYZ(
            arr_center, check_acceptability=run_check_acceptability
        )

        # check both lat/lon/alt and xyz because of subtle differences
        # in tolerances
        if _xyz_close(location_latlonalt, arr_center, loc_tols) or _latlonalt_close(
            (latitude, longitude, altitude), latlonalt_arr_center, radian_tol, loc_tols
        ):
            telescope_location = arr_center
        else:
            # values do not agree with each other to within the tolerances.
            # this is a known issue with FHD runs on cotter uvfits
            # files for the MWA
            # compare with the known_telescopes values
            telescope_obj = uvtel.get_telescope(telescope_name)
            # start warning message
            message = (
                "Telescope location derived from obs lat/lon/alt "
                "values does not match the location in the layout file."
            )

            if telescope_obj is not False:
                message += " Using the value from known_telescopes."
                telescope_location = telescope_obj.telescope_location
            else:
                message += (
                    " Telescope is not in known_telescopes. "
                    "Defaulting to using the obs derived values."
                )
                telescope_location = location_latlonalt
            # issue warning
            warnings.warn(message)
    else:
        telescope_location = uvutils.XYZ_from_LatLonAlt(latitude, longitude, altitude)

    # The FHD positions derive directly from uvfits, so they are in the rotated
    # ECEF frame and must be converted to ECEF
    rot_ecef_positions = layout["antenna_coords"][0]
    layout_fields.remove("antenna_coords")
    # use the longitude from the layout file because that's how the antenna
    # positions were calculated
    latitude, longitude, altitude = uvutils.LatLonAlt_from_XYZ(
        arr_center, check_acceptability=run_check_acceptability
    )
    antenna_positions = uvutils.ECEF_from_rotECEF(rot_ecef_positions, longitude)

    antenna_names = [ant.decode("utf8") for ant in layout["antenna_names"][0].tolist()]
    layout_fields.remove("antenna_names")

    # make these 0-indexed (rather than one indexed)
    antenna_numbers = layout["antenna_numbers"][0]
    layout_fields.remove("antenna_numbers")

    Nants_telescope = int(layout["n_antenna"][0])
    layout_fields.remove("n_antenna")

    if telescope_name.lower() == "mwa":
        # check that obs.baseline_info.tile_names match the antenna names
        # (accounting for possible differences in white space)
        # this only applies for MWA because the tile_names come from
        # metafits files. layout["antenna_names"] comes from the antenna table
        # in the uvfits file and will be used if no metafits was submitted
        if [ant.strip() for ant in obs_tile_names] != [
            ant.strip() for ant in antenna_names
        ]:
            warnings.warn(
                "tile_names from obs structure does not match "
                "antenna_names from layout"
            )

    gst0 = float(layout["gst0"][0])
    layout_fields.remove("gst0")

    if layout["ref_date"][0] != "":
        rdate = layout["ref_date"][0].decode("utf8").lower()
    else:
        rdate = None
    layout_fields.remove("ref_date")

    earth_omega = float(layout["earth_degpd"][0])
    layout_fields.remove("earth_degpd")

    dut1 = float(layout["dut1"][0])
    layout_fields.remove("dut1")

    timesys = layout["time_system"][0].decode("utf8").upper().strip()
    layout_fields.remove("time_system")

    if "diameters" in layout_fields:
        diameters = np.asarray(layout["diameters"])
        layout_fields.remove("diameters")
    else:
        diameters = None

    extra_keywords = {}
    # ignore some fields, put everything else in extra_keywords
    layout_fields_ignore = [
        "diff_utc",
        "pol_type",
        "n_pol_cal_params",
        "mount_type",
        "axis_offset",
        "pola",
        "pola_orientation",
        "pola_cal_params",
        "polb",
        "polb_orientation",
        "polb_cal_params",
        "beam_fwhm",
    ]
    for field in layout_fields_ignore:
        if field in layout_fields:
            layout_fields.remove(field)
    for field in layout_fields:
        keyword = field
        if len(keyword) > 8:
            keyword = field.replace("_", "")

        value = layout[field][0]
        if isinstance(value, bytes):
            value = value.decode("utf8")

        extra_keywords[keyword.upper()] = value

    layout_param_dict = {
        "telescope_location": telescope_location,
        "Nants_telescope": Nants_telescope,
        "antenna_positions": antenna_positions,
        "antenna_names": antenna_names,
        "antenna_numbers": antenna_numbers,
        "gst0": gst0,
        "rdate": rdate,
        "earth_omega": earth_omega,
        "dut1": dut1,
        "timesys": timesys,
        "diameters": diameters,
        "extra_keywords": extra_keywords,
    }

    return layout_param_dict


class FHD(UVData):
    """
    Defines a FHD-specific subclass of UVData for reading FHD save files.

    This class should not be interacted with directly, instead use the read_fhd
    method on the UVData class.
    """

    @copy_replace_short_description(UVData.read_fhd, style=DocstringStyle.NUMPYDOC)
    def read_fhd(
        self,
        filelist,
        use_model=False,
        background_lsts=True,
        read_data=True,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
        strict_uvw_antpos_check=False,
        check_autos=True,
        fix_autos=True,
        use_future_array_shapes=False,
        astrometry_library=None,
    ):
        """Read in data from a list of FHD files."""
        datafiles = {}
        params_file = None
        obs_file = None
        flags_file = None
        layout_file = None
        settings_file = None
        if use_model:
            data_name = "_vis_model_"
        else:
            data_name = "_vis_"
        for filename in filelist:
            # update filelist
            basename = os.path.basename(filename)
            self.filename = uvutils._combine_filenames(self.filename, [basename])
            self._filename.form = (len(self.filename),)
            if filename.lower().endswith(data_name + "xx.sav"):
                if "xx" in list(datafiles.keys()):
                    raise ValueError("multiple xx datafiles in filelist")
                datafiles["xx"] = filename
            elif filename.lower().endswith(data_name + "yy.sav"):
                if "yy" in list(datafiles.keys()):
                    raise ValueError("multiple yy datafiles in filelist")
                datafiles["yy"] = filename
            elif filename.lower().endswith(data_name + "xy.sav"):
                if "xy" in list(datafiles.keys()):
                    raise ValueError("multiple xy datafiles in filelist")
                datafiles["xy"] = filename
            elif filename.lower().endswith(data_name + "yx.sav"):
                if "yx" in list(datafiles.keys()):
                    raise ValueError("multiple yx datafiles in filelist")
                datafiles["yx"] = filename
            elif filename.lower().endswith("_params.sav"):
                if params_file is not None:
                    raise ValueError("multiple params files in filelist")
                params_file = filename
            elif filename.lower().endswith("_obs.sav"):
                if obs_file is not None:
                    raise ValueError("multiple obs files in filelist")
                obs_file = filename
            elif filename.lower().endswith("_flags.sav"):
                if flags_file is not None:
                    raise ValueError("multiple flags files in filelist")
                flags_file = filename
            elif filename.lower().endswith("_layout.sav"):
                if layout_file is not None:
                    raise ValueError("multiple layout files in filelist")
                layout_file = filename
            elif filename.lower().endswith("_settings.txt"):
                if settings_file is not None:
                    raise ValueError("multiple settings files in filelist")
                settings_file = filename
            else:
                # this is reached in tests but marked as uncovered because
                # CPython's peephole optimizer replaces a jump to a continue
                # with a jump to the top of the loop
                continue  # pragma: no cover

        if len(datafiles) < 1 and read_data is True:
            raise ValueError(
                "No data files included in file list and read_data is True."
            )
        if obs_file is None and read_data is False:
            raise ValueError(
                "No obs file included in file list and read_data is False."
            )
        if params_file is None:
            raise ValueError("No params file included in file list")
        if flags_file is None:
            raise ValueError("No flags file included in file list")
        if layout_file is None:
            warnings.warn(
                "No layout file included in file list, "
                "antenna_postions will not be defined "
                "and antenna names and numbers might be incorrect."
            )
        if settings_file is None:
            warnings.warn("No settings file included in file list")

        if not read_data:
            obs_dict = readsav(obs_file, python_dict=True)
            this_obs = obs_dict["obs"]
            self.Npols = int(this_obs[0]["N_POL"])
        else:
            # TODO: add checking to make sure params, flags and datafiles are
            # consistent with each other
            vis_data = {}
            for pol, file in datafiles.items():
                this_dict = readsav(file, python_dict=True)
                if use_model:
                    vis_data[pol] = this_dict["vis_model_ptr"]
                else:
                    vis_data[pol] = this_dict["vis_ptr"]
                this_obs = this_dict["obs"]
            self.Npols = len(list(vis_data.keys()))

        obs = this_obs
        bl_info = obs["BASELINE_INFO"][0]
        astrometry = obs["ASTR"][0]
        fhd_pol_list = []
        for pol in obs["POL_NAMES"][0]:
            fhd_pol_list.append(pol.decode("utf8").lower())

        params_dict = readsav(params_file, python_dict=True)
        params = params_dict["params"]

        if read_data:
            flag_file_dict = readsav(flags_file, python_dict=True)
            # The name for this variable changed recently (July 2016). Test for both.
            vis_weights_data = {}
            if "flag_arr" in flag_file_dict:
                weights_key = "flag_arr"
            elif "vis_weights" in flag_file_dict:
                weights_key = "vis_weights"
            else:
                raise ValueError(
                    "No recognized key for visibility weights in flags_file."
                )
            for index, w in enumerate(flag_file_dict[weights_key]):
                vis_weights_data[fhd_pol_list[index]] = w

        self.Ntimes = int(obs["N_TIME"][0])
        self.Nbls = int(obs["NBASELINES"][0])
        self.Nblts = params["UU"][0].size
        self.Nfreqs = int(obs["N_FREQ"][0])
        self.Nspws = 1
        self.spw_array = np.array([0])

        # Future proof: set the flex_spw_id_array.
        self.flex_spw_id_array = np.zeros(self.Nfreqs, dtype=int)

        self.vis_units = "Jy"

        # bl_info.JDATE (a vector of length Ntimes) is the only safe date/time
        # to use in FHD files.
        # (obs.JD0 (float) and params.TIME (vector of length Nblts) are
        #   context dependent and are not safe
        #   because they depend on the phasing of the visibilities)
        # the values in bl_info.JDATE are the JD for each integration.
        # We need to expand up to Nblts.
        int_times = list(uvutils._get_iterable(bl_info["JDATE"][0]))
        bin_offset = bl_info["BIN_OFFSET"][0]
        if self.Ntimes != len(int_times):
            warnings.warn(
                "Ntimes does not match the number of unique times in the data"
            )
        self.time_array = np.zeros(self.Nblts)
        if self.Ntimes == 1:
            self.time_array.fill(int_times[0])
        else:
            for ii in range(0, len(int_times)):
                if ii < (len(int_times) - 1):
                    self.time_array[bin_offset[ii] : bin_offset[ii + 1]] = int_times[ii]
                else:
                    self.time_array[bin_offset[ii] :] = int_times[ii]

        # this is generated in FHD by subtracting the JD of neighboring
        # integrations. This can have limited accuracy, so it can be slightly
        # off the actual value.
        # (e.g. 1.999426... rather than 2)
        time_res = obs["TIME_RES"]
        # time_res is constrained to be a scalar currently
        self.integration_time = (
            np.ones_like(self.time_array, dtype=np.float64) * time_res[0]
        )
        # # --- observation information ---
        self.telescope_name = obs["INSTRUMENT"][0].decode("utf8")

        # This is a bit of a kludge because nothing like a phase center name exists
        # in FHD files.
        # At least for the MWA, obs.ORIG_PHASERA and obs.ORIG_PHASEDEC specify
        # the field the telescope was nominally pointing at
        # (May need to be revisited, but probably isn't too important)
        cat_name = (
            "Field RA(deg): "
            + str(obs["ORIG_PHASERA"][0])
            + ", Dec:"
            + str(obs["ORIG_PHASEDEC"][0])
        )
        # For the MWA, this can sometimes be converted to EoR fields
        if self.telescope_name.lower() == "mwa":
            if np.isclose(obs["ORIG_PHASERA"][0], 0) and np.isclose(
                obs["ORIG_PHASEDEC"][0], -27
            ):
                cat_name = "EoR 0 Field"

        self.instrument = self.telescope_name
        latitude = np.deg2rad(float(obs["LAT"][0]))
        longitude = np.deg2rad(float(obs["LON"][0]))
        altitude = float(obs["ALT"][0])

        # get the stuff FHD read from the antenna table (in layout file)
        if layout_file is not None:
            # in older FHD versions, incorrect tile names for the mwa
            # might have been stored in bl_info, so pull the obs_tile_names
            # and check them against the layout file
            obs_tile_names = [
                ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
            ]
            if self.telescope_name.lower() == "mwa" and obs_tile_names[0][0] != "T":
                obs_tile_names = [
                    "Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
                    for ant in obs_tile_names
                ]
            layout_param_dict = get_fhd_layout_info(
                layout_file,
                self.telescope_name,
                latitude,
                longitude,
                altitude,
                uvutils.RADIAN_TOL,
                self._telescope_location.tols,
                obs_tile_names,
                run_check_acceptability=True,
            )

            for key, value in layout_param_dict.items():
                setattr(self, key, value)

        else:
            self.telescope_location_lat_lon_alt = (latitude, longitude, altitude)

            # we don't have layout info, so go ahead and set the antenna_names,
            # antenna_numbers and Nants_telescope from the baseline info struct.
            self.antenna_names = [
                ant.decode("utf8") for ant in bl_info["TILE_NAMES"][0].tolist()
            ]
            self.antenna_numbers = np.array([int(ant) for ant in self.antenna_names])
            if self.telescope_name.lower() == "mwa":
                self.antenna_names = [
                    "Tile" + "0" * (3 - len(ant.strip())) + ant.strip()
                    for ant in self.antenna_names
                ]
            self.Nants_telescope = len(self.antenna_names)

        # need to make sure telescope location is defined properly before this call
        proc = self.set_lsts_from_time_array(
            background=background_lsts, astrometry_library=astrometry_library
        )

        if not np.isclose(obs["OBSRA"][0], obs["PHASERA"][0]) or not np.isclose(
            obs["OBSDEC"][0], obs["PHASEDEC"][0]
        ):
            warnings.warn(
                "These visibilities may have been phased "
                "improperly -- without changing the uvw locations"
            )

        cat_id = self._add_phase_center(
            cat_name=cat_name,
            cat_type="sidereal",
            cat_lon=np.deg2rad(float(obs["OBSRA"][0])),
            cat_lat=np.deg2rad(float(obs["OBSDEC"][0])),
            cat_frame=astrometry["RADECSYS"][0].decode().lower(),
            cat_epoch=astrometry["EQUINOX"][0],
        )
        self.phase_center_id_array = np.zeros(self.Nblts, dtype=int) + cat_id

        # Note that FHD antenna arrays are 1-indexed so we subtract 1
        # to get 0-indexed arrays
        ind_1_array = bl_info["TILE_A"][0] - 1
        ind_2_array = bl_info["TILE_B"][0] - 1
        self.ant_1_array = self.antenna_numbers[ind_1_array]
        self.ant_2_array = self.antenna_numbers[ind_2_array]
        self.Nants_data = int(np.union1d(self.ant_1_array, self.ant_2_array).size)

        self.baseline_array = self.antnums_to_baseline(
            self.ant_1_array, self.ant_2_array
        )
        if self.Nbls != len(np.unique(self.baseline_array)):
            warnings.warn(
                "Nbls does not match the number of unique baselines in the data"
            )

        # TODO: Spw axis to be collapsed in future release
        self.freq_array = np.zeros((1, len(bl_info["FREQ"][0])), dtype=np.float64)
        self.freq_array[0, :] = bl_info["FREQ"][0]

        self.channel_width = float(obs["FREQ_RES"][0])

        # In FHD, uvws are in seconds not meters.
        # FHD follows the FITS uvw direction convention, which is opposite
        # ours and Miriad's.
        # So conjugate the visibilities and flip the uvws:
        self.uvw_array = np.zeros((self.Nblts, 3))
        self.uvw_array[:, 0] = (-1) * params["UU"][0] * const.c.to("m/s").value
        self.uvw_array[:, 1] = (-1) * params["VV"][0] * const.c.to("m/s").value
        self.uvw_array[:, 2] = (-1) * params["WW"][0] * const.c.to("m/s").value

        lin_pol_order = ["xx", "yy", "xy", "yx"]
        linear_pol_dict = dict(zip(lin_pol_order, np.arange(5, 9) * -1))
        pol_list = []
        if read_data:
            for pol in lin_pol_order:
                if pol in vis_data:
                    pol_list.append(linear_pol_dict[pol])
            self.polarization_array = np.asarray(pol_list)
        else:
            # Use Npols because for FHD, npol fully specifies which pols to use
            pol_strings = lin_pol_order[: self.Npols]
            self.polarization_array = np.asarray(
                [linear_pol_dict[pol] for pol in pol_strings]
            )

        # history: add the first few lines from the settings file
        if settings_file is not None:
            self.history = get_fhd_history(settings_file)
        else:
            self.history = ""

        if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
            self.history += self.pyuvdata_version_str

        if read_data:
            # TODO: Spw axis to be collapsed in future release
            self.data_array = np.zeros(
                (self.Nblts, 1, self.Nfreqs, self.Npols), dtype=np.complex_
            )
            # TODO: Spw axis to be collapsed in future release
            self.nsample_array = np.zeros(
                (self.Nblts, 1, self.Nfreqs, self.Npols), dtype=np.float64
            )
            # TODO: Spw axis to be collapsed in future release
            self.flag_array = np.zeros(
                (self.Nblts, 1, self.Nfreqs, self.Npols), dtype=np.bool_
            )
            for pol, vis in vis_data.items():
                pol_i = pol_list.index(linear_pol_dict[pol])
                # FHD follows the FITS uvw direction convention, which is opposite
                # ours and Miriad's.
                # So conjugate the visibilities and flip the uvws:
                self.data_array[:, 0, :, pol_i] = np.conj(vis)
                self.flag_array[:, 0, :, pol_i] = vis_weights_data[pol] <= 0
                self.nsample_array[:, 0, :, pol_i] = np.abs(vis_weights_data[pol])

        # wait for LSTs if set in background
        if proc is not None:
            proc.join()

        self._set_app_coords_helper()

        if use_future_array_shapes:
            self.use_future_array_shapes()
        else:
            warnings.warn(_future_array_shapes_warning, DeprecationWarning)

        # check if object has all required uv_properties set
        if run_check:
            self.check(
                check_extra=check_extra,
                run_check_acceptability=run_check_acceptability,
                strict_uvw_antpos_check=strict_uvw_antpos_check,
                allow_flip_conj=True,
                check_autos=check_autos,
                fix_autos=fix_autos,
            )
back to top