https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: 11b8ea5df074237d36b6ed8acfa8557fb1244b3c authored by Bryna Hazelton on 12 July 2023, 18:53:15 UTC
update release date
Tip revision: 11b8ea5
mir.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2020 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Class for reading and writing Mir files."""
import os
import warnings

import numpy as np
from astropy.coordinates import angular_separation
from astropy.time import Time

from .. import get_telescope
from .. import utils as uvutils
from . import mir_parser
from .uvdata import UVData

__all__ = ["Mir"]


class Mir(UVData):
    """
    A class for Mir file objects.

    This class defines an Mir-specific subclass of UVData for reading and
    writing Mir files. This class should not be interacted with directly,
    instead use the read_mir and write_mir methods on the UVData class.
    """

    def read_mir(
        self,
        filepath,
        antenna_nums=None,
        antenna_names=None,
        bls=None,
        time_range=None,
        lst_range=None,
        polarizations=None,
        catalog_names=None,
        corrchunk=None,
        receivers=None,
        sidebands=None,
        select_where=None,
        apply_tsys=True,
        apply_flags=True,
        apply_dedoppler=False,
        pseudo_cont=False,
        rechunk=None,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
        strict_uvw_antpos_check=False,
        allow_flex_pol=True,
        check_autos=True,
        fix_autos=False,
    ):
        """
        Read in data from an SMA MIR file, and map to a UVData object.

        Note that with the exception of filename, most of the remaining parameters are
        used to sub-select a range of data.

        Parameters
        ----------
        filepath : str
            The file path to the MIR folder to read from.
        antenna_nums : array_like of int, optional
            The antennas numbers to include when reading data into the object
            (antenna positions and names for the removed antennas will be retained
            unless `keep_all_metadata` is False). This cannot be provided if
            `antenna_names` is also provided.
        antenna_names : array_like of str, optional
            The antennas names to include when reading data into the object
            (antenna positions and names for the removed antennas will be retained
            unless `keep_all_metadata` is False). This cannot be provided if
            `antenna_nums` is also provided.
        bls : list of tuple, optional
            A list of antenna number tuples (e.g. [(0, 1), (3, 2)]) specifying baselines
            to include when reading data in to the object.
        time_range : array_like of float, optional
            The time range in Julian Date to include when reading data into
            the object, must be length 2. Some of the times in the file should
            fall between the first and last elements.
        lst_range : array_like of float, optional
            The local sidereal time (LST) range in radians to keep in the
            object, must be of length 2. Some of the LSTs in the object should
            fall between the first and last elements. If the second value is
            smaller than the first, the LSTs are treated as having phase-wrapped
            around LST = 2*pi = 0, and the LSTs kept on the object will run from
            the larger value, through 0, and end at the smaller value.
        polarizations : array_like of int, optional
            The polarizations numbers to include when reading data into the
            object, each value passed here should exist in the polarization_array.
        catalog_names : str or array-like of str
            The names of the phase centers (sources) to include when reading data into
            the object, which should match exactly in spelling and capitalization.
        corrchunk : int or array-like of int
            Correlator "chunk" (spectral window) to include when reading data into the
            object, where 0 corresponds to the pseudo-continuum channel.
        receivers : str or array-like of str
            The names of the receivers ("230", "240", "345", "400") to include when
            reading data into the object.
        sidebands : str or array-like of str
            The names of the sidebands ("l" for lower, "u" for upper) to include when
            reading data into the object.
        select_where : tuple or list of tuples, optional
            Argument to pass to the `MirParser.select` method, which will downselect
            which data is read into the object.
        apply_flags : bool
            If set to True, apply "wideband" flags to the visibilities, which are
            recorded by the realtime system to denote when data are expected to be bad
            (e.g., antennas not on source, dewar warm). Default it true.
        apply_tsys : bool
            If set to False, data are returned as correlation coefficients (normalized
            by the auto-correlations). Default is True, which instead scales the raw
            visibilities and forward-gain of the antenna to produce values in Jy
            (uncalibrated).
        apply_dedoppler : bool
            If set to True, data will be corrected for any doppler-tracking performed
            during observations, and brought into the topocentric rest frame (default
            for UVData objects). Default is False.
        pseudo_cont : boolean
            Read in only pseudo-continuum values. Default is false.
        rechunk : int
            Number of channels to average over when reading in the dataset. Optional
            argument, typically required to be a power of 2.
        run_check : bool
            Option to check for the existence and proper shapes of parameters
            before writing the file.
        check_extra : bool
            Option to check optional parameters as well as required ones.
        run_check_acceptability : bool
            Option to check acceptable range of the values of parameters before
            writing the file.
        strict_uvw_antpos_check : bool
            Option to raise an error rather than a warning if the check that
            uvw coordinates match antenna positions does not pass.
        allow_flex_pol : bool
            If only one polarization per spectral window is read (and the polarization
            differs from window to window), allow for the `UVData` object to use
            "flexible polarization", which compresses the polarization-axis of various
            attributes to be of length 1, sets the `flex_spw_polarization_array`
            attribute to define the polarization per spectral window. Default is True.
        check_autos : bool
            Check whether any auto-correlations have non-zero imaginary values in
            data_array (which should not mathematically exist). Default is True.
        fix_autos : bool
            If auto-correlations with imaginary values are found, fix those values so
            that they are real-only in data_array. Default is False.
        """
        # Use the mir_parser to read in metadata, which can be used to select data.
        mir_data = mir_parser.MirParser(filepath)

        if select_where is None:
            select_where = []

            if antenna_nums is not None:
                select_where += [("ant", "eq", antenna_nums)]
            if antenna_names is not None:
                select_where += [("tel1", "eq", antenna_names)]
                select_where += [("tel2", "eq", antenna_names)]
            if bls is not None:
                select_where += [
                    ("blcd", "eq", ["%i-%i" % (tup[0], tup[1]) for tup in bls])
                ]
            if time_range is not None:
                # Have to convert times from UTC JD -> TT MJD for mIR
                select_where += [
                    (
                        "mjd",
                        "between",
                        Time(time_range, format="jd", scale="utc").tt.mjd,
                    )
                ]
            if lst_range is not None:
                select_where += [("lst_range", "between", lst_range)]
            if polarizations is not None:
                select_where += [("pol", "eq", polarizations)]
            if catalog_names is not None:
                select_where += [("source", "eq", catalog_names)]
            if receivers is not None:
                select_where += [("rec", "eq", receivers)]
            if sidebands is not None:
                select_where += [("sb", "eq", sidebands)]

            if corrchunk is not None:
                select_where += [("corrchunk", "eq", corrchunk)]
            elif not pseudo_cont:
                select_where += [("corrchunk", "ne", 0)]

        if select_where:
            mir_data.select(where=select_where)

        if rechunk is not None:
            mir_data.rechunk(rechunk)

        self._init_from_mir_parser(
            mir_data,
            allow_flex_pol=allow_flex_pol,
            apply_flags=apply_flags,
            apply_tsys=apply_tsys,
            apply_dedoppler=apply_dedoppler,
        )

        if run_check:
            self.check(
                check_extra=check_extra,
                run_check_acceptability=run_check_acceptability,
                strict_uvw_antpos_check=strict_uvw_antpos_check,
                allow_flip_conj=True,
                check_autos=check_autos,
                fix_autos=fix_autos,
            )

    def _prep_and_insert_data(
        self,
        mir_data: mir_parser.MirParser,
        sphid_dict,
        spdx_dict,
        blhid_blt_order,
        apply_flags=True,
        apply_tsys=True,
        apply_dedoppler=False,
    ):
        """
        Load and prep data for import into UVData object.

        This is a helper function, not meant for users to call. It is used by the read
        method to fill in the data and flag arrays of a UVData object based on the
        records stored within the MIR dataset.

        Parameters
        ----------
        mir_data : MirParser object
            Object from which to plug data into the data arrays.
        sphid_dict : dict
            Map between MIR spectral record and position in the UVData array, as
            dictated by blt-index, pol-index, and slice corresponding to the range
            in frequencies.
        spdx_dict : dict
            Map between MIR sideband, receiver/polarization, and corrchunk to
            UVData-oriented blt-index, pol-index, and slice corresponding to the range
            in frequencies.
        blhid_blt_order : dict
            Map between MIR blhid to blt-index in the UVData object
        apply_flags : bool
            If set to True, apply "wideband" flags to the visibilities, which are
            recorded by the realtime system to denote when data are expected to be bad
            (e.g., antennas not on source, dewar warm). Default it true.
        apply_tsys : bool
            If set to False, data are returned as correlation coefficients (normalized
            by the auto-correlations). Default is True, which instead scales the raw
            visibilities and forward-gain of the antenna to produce values in Jy
            (uncalibrated).
        apply_dedoppler : bool
            If set to True, data will be corrected for any doppler-tracking performed
            during observations, and brought into the topocentric rest frame (default
            for UVData objects). Default is False.
        """
        if mir_data.vis_data is None:
            mir_data.load_data(load_cross=True, apply_tsys=apply_tsys)
            if apply_flags:
                mir_data.apply_flags()
            if apply_dedoppler:
                mir_data.redoppler_data()

        if not np.all(
            np.isin(list(mir_data.vis_data.keys()), mir_data.sp_data.get_header_keys())
        ):
            raise KeyError(
                "Mismatch between keys in vis_data and sphid in sp_data, which "
                "should not happen. Please file an issue in our GitHub issue log "
                "so that we can fix it."
            )

        # Go through the data record-by-record to fill things in
        for sp_rec, sphid in zip(mir_data.sp_data, mir_data.sp_data.get_header_keys()):
            # sphid_dict is a 3-element tuple, containing the sideband, receiver, and
            # corrchunk for the spectral record (whose header key is sphid)
            window = sphid_dict[sphid]
            vis_rec = mir_data.vis_data[sphid]  # Visibilities + flags
            blt_idx = blhid_blt_order[sp_rec["blhid"]]  # Blt index to load data into
            pol_idx = spdx_dict[window]["pol_idx"]  # Polarization to load data into
            ch_slice = spdx_dict[window]["ch_slice"]  # Channel range to load data into

            # Now populate the fields with the relevant data from the object
            self.data_array[blt_idx, pol_idx, ch_slice] = np.conj(vis_rec["data"])
            self.flag_array[blt_idx, pol_idx, ch_slice] = vis_rec["flags"]

        # Drop the data from the MirParser object once we have it loaded up.
        mir_data.unload_data()

    def _init_from_mir_parser(
        self,
        mir_data: mir_parser.MirParser,
        allow_flex_pol=True,
        apply_tsys=True,
        apply_flags=True,
        apply_dedoppler=False,
    ):
        """
        Convert a MirParser object into a UVData object.

        Parameters
        ----------
        mir_data : MirParser object
            MIR dataset to be converted into a UVData object.
        allow_flex_pol : bool
            If only one polarization per spectral window is read (and the polarization
            differs from window to window), allow for the `UVData` object to use
            "flexible polarization", which compresses the polarization-axis of various
            attributes to be of length 1, sets the `flex_spw_polarization_array`
            attribute to define the polarization per spectral window. Default is True.
        """
        # By default, we will want to assume that MIR datasets are multi-spw.
        # At present, there is no advantage to allowing this
        # not to be true on read-in, particularly as in the long-term, this setting
        # will hopefully become the default for all data sets.
        self._set_flex_spw()

        # Also set future array shapes here
        self._set_future_array_shapes()

        # Create a simple list for broadcasting values stored on a
        # per-integration basis in MIR into the (tasty) per-blt records in UVDATA.
        bl_in_idx = mir_data.in_data._index_query(header_key=mir_data.bl_data["inhid"])

        # Create a simple array/list for broadcasting values stored on a
        # per-blt basis into per-spw records, and per-time into per-blt records
        sp_bl_idx = mir_data.bl_data._index_query(header_key=mir_data.sp_data["blhid"])

        ant1_rxa_mask = mir_data.bl_data.get_value("ant1rx", index=sp_bl_idx) == 0
        ant2_rxa_mask = mir_data.bl_data.get_value("ant2rx", index=sp_bl_idx) == 0

        if len(np.unique(mir_data.bl_data["ipol"])) == 1 and (
            len(mir_data.codes_data["pol"]) == (2 * 4)
        ):
            # If only one pol is found, and the polarization dictionary has only four
            # codes + four unique index codes, then we actually need to verify this is
            # a single pol observation, since the current system has a quirk that it
            # marks both X- and Y-pol receivers as the same polarization.
            pol_arr = np.zeros_like(sp_bl_idx)

            pol_arr[np.logical_and(ant1_rxa_mask, ant2_rxa_mask)] = 0
            pol_arr[np.logical_and(~ant1_rxa_mask, ~ant2_rxa_mask)] = 1
            pol_arr[np.logical_and(ant1_rxa_mask, ~ant2_rxa_mask)] = 2
            pol_arr[np.logical_and(~ant1_rxa_mask, ant2_rxa_mask)] = 3
        else:
            # If this has multiple ipol codes, then we don't need to worry about the
            # single-code ambiguity.
            pol_arr = mir_data.bl_data.get_value("ipol", index=sp_bl_idx)

        # Construct an indexing list, that we'll use later to figure out what data
        # goes where, based on spw, sideband, and pol code.
        spdx_list = [
            (winid, sbid, polid)
            for winid, sbid, polid in zip(
                mir_data.sp_data["corrchunk"],
                mir_data.bl_data.get_value("isb", index=sp_bl_idx),
                pol_arr,
            )
        ]

        # We'll also use this later
        sphid_dict = dict(zip(mir_data.sp_data.get_header_keys(), spdx_list))

        # Create a dict with the ordering of the pols
        pol_dict = {key: idx for idx, key in enumerate(np.unique(pol_arr))}

        pol_split_tuning = False
        if len(pol_dict) == 2:
            # If dual-pol, then we need to check if the tunings are split, because
            # the two polarizations will effectively be concat'd across the freq
            # axis instead of the pol axis.
            rxa_mask = ant1_rxa_mask & ant2_rxa_mask
            rxb_mask = ~(ant1_rxa_mask | ant2_rxa_mask)

            if np.any(rxa_mask) and np.any(rxb_mask):
                # If we have both VV and HH data, check to see that the tunings of the
                # two receivers match.
                loa_freq = np.median(mir_data.sp_data["gunnLO"][rxa_mask])
                lob_freq = np.median(mir_data.sp_data["gunnLO"][rxb_mask])
                pol_split_tuning = not np.isclose(loa_freq, lob_freq)

        # Map MIR pol code to pyuvdata/AIPS polarization number
        pol_code_dict = {}
        icode_dict = mir_data.codes_data["pol"]
        for code in mir_data.codes_data.get_codes("pol", return_dict=False):
            # There are pol modes/codes that are support in MIR that are not in AIPS
            # or CASA, although they are rarely used, so we can skip over translating
            # them in the try/except loop here (if present in the data, it will throw
            # an error further downstream).
            try:
                pol_code_dict[icode_dict[code]] = uvutils.POL_STR2NUM_DICT[code.lower()]
            except KeyError:
                pass
        if pol_split_tuning and allow_flex_pol:
            # If we have a split tuning that, that means we can take advantage of
            # the flex-pol feature within UVData
            Npols = 1
            polarization_array = np.array([0])
        else:
            # Otherwise, calculate dimensions and arrays for the polarization axis
            # like normal.
            Npols = len(set(pol_dict.values()))
            polarization_array = np.zeros(Npols, dtype=int)

            for key in pol_dict.keys():
                polarization_array[pol_dict[key]] = pol_code_dict[key]

        # Create a list of baseline-time combinations in the data
        blt_list = [
            (inhid, ant1, ant2)
            for inhid, ant1, ant2 in zip(*mir_data.bl_data[["inhid", "iant1", "iant2"]])
        ]

        # Use the list above to create a dict that maps baseline-time combo
        # to position along the mouthwatering blt-axis.
        blt_dict = {
            blt_tuple: idx
            for idx, blt_tuple in enumerate(
                sorted(set(blt_list), key=lambda x: (x[0], x[1], x[2]))
            )
        }

        # Match blhid in MIR to blt position in UVData
        blhid_blt_order = {
            key: blt_dict[value]
            for key, value in zip(mir_data.bl_data["blhid"], blt_list)
        }

        # The more blts, the better
        Nblts = len(blt_dict)

        # Here we need to do a little fancy footwork in order to map spectral windows
        # to ranges along the freq-axis, and calculate some values that will eventually
        # populate arrays related to this axis (e.g., freq_array, chan_width).
        spdx_dict = {}
        spw_dict = {}
        for spdx in set(spdx_list):
            data_mask = np.array([spdx == item for item in spdx_list])

            # Grab values, get them into appropriate types
            spw_fsky = np.unique(mir_data.sp_data["fsky"][data_mask])
            spw_fres = np.unique(mir_data.sp_data["fres"][data_mask])
            spw_nchan = np.unique(mir_data.sp_data["nch"][data_mask])

            # Make sure that something weird hasn't happened with the metadata (this
            # really should never happen, only one value should exist per window).
            assert len(spw_fsky) == 1
            assert len(spw_fres) == 1
            assert len(spw_nchan) == 1

            # Get the data in the right units and dtype
            spw_fsky = float(spw_fsky[0] * 1e9)  # GHz -> Hz
            spw_fres = float(spw_fres[0] * 1e6)  # MHz -> Hz
            spw_nchan = int(spw_nchan[0])

            # We need to do a some extra handling here, because a single correlator
            # can produce multiple spectral windows (e.g., LSB/USB). The scheme below
            # will negate the corr band number if LSB, will set the corr band number to
            # 255 if the values arise from the pseudo-wideband values, and will add 512
            # if the pols are split-tuned. This scheme, while a little funky, guarantees
            # that each unique freq range has its own spectral window number.
            spw_id = 255 if (spdx[0] == 0) else spdx[0]
            spw_id *= (-1) ** (1 + spdx[1])
            spw_id += 512 if (pol_split_tuning and spdx[2] == 1) else 0

            # Populate the channel width array
            channel_width = abs(spw_fres) + np.zeros(spw_nchan, dtype=np.float64)

            # Populate the spw_id_array
            spw_id_array = spw_id + np.zeros(spw_nchan, dtype=np.int64)

            # So the freq array here is a little weird, because the current fsky
            # refers to the point between the nch/2 and nch/2 + 1 channel in the
            # raw (unaveraged) spectrum. This was done for the sake of some
            # convenience, at the cost of clarity. In some future format of the
            # data, we expect to be able to drop seemingly random offset here.
            freq_array = (
                spw_fsky
                - (np.sign(spw_fres) * 139648.4375)
                + (spw_fres * (np.arange(spw_nchan) + 0.5 - (spw_nchan / 2)))
            )

            # Note here that if we're using flex-pol, then the polarization axis of
            # the UVData object is 1, hence why pol_idx is forced to be 0 here.
            spdx_dict[spdx] = {
                "spw_id": spw_id,
                "pol_idx": (
                    0 if (pol_split_tuning and allow_flex_pol) else pol_dict[spdx[2]]
                ),
            }

            # Stuff this dictionary full of the per-spw metadata
            spw_dict[spw_id] = {
                "nchan": spw_nchan,
                "freqs": spw_fres,
                "fsky": spw_fsky,
                "channel_width": channel_width,
                "spw_id_array": spw_id_array,
                "freq_array": freq_array,
                "pol_state": pol_code_dict[spdx[2]],
            }

        spw_array = sorted(spw_dict.keys())
        Nspws = len(spw_array)
        Nfreqs = 0
        for key in spw_array:
            spw_dict[key]["ch_slice"] = slice(Nfreqs, Nfreqs + spw_dict[key]["nchan"])
            Nfreqs += spw_dict[key]["nchan"]

        # Initialize some arrays that we'll be appending to
        flex_spw_id_array = np.zeros(Nfreqs, dtype=int)
        channel_width = np.zeros(Nfreqs, dtype=np.float64)
        freq_array = np.zeros(Nfreqs, dtype=np.float64)
        flex_pol = np.zeros(Nspws, dtype=int)
        for idx, key in enumerate(spw_array):
            flex_spw_id_array[spw_dict[key]["ch_slice"]] = spw_dict[key]["spw_id_array"]
            channel_width[spw_dict[key]["ch_slice"]] = spw_dict[key]["channel_width"]
            freq_array[spw_dict[key]["ch_slice"]] = spw_dict[key]["freq_array"]
            flex_pol[idx] = spw_dict[key]["pol_state"]

        if not pol_split_tuning:
            flex_pol = None

        for key in spdx_dict:
            spdx_dict[key]["ch_slice"] = spw_dict[spdx_dict[key]["spw_id"]]["ch_slice"]

        # Create arrays to plug visibilities and flags into. The array is shaped this
        # way since when reading in a MIR file, we scan through the blt-axis the
        # slowest and the freq-axis the fastest (i.e., the data is roughly ordered by
        # blt, pol, freq).
        self.data_array = np.zeros((Nblts, Npols, Nfreqs), dtype=np.complex64)
        self.flag_array = np.ones((Nblts, Npols, Nfreqs), dtype=bool)

        # Get a list of the current inhids for later
        inhid_list = mir_data.in_data["inhid"].copy()

        # Store a backup of the selection masks
        backup_masks = {}
        for item, obj in mir_data._metadata_attrs.items():
            backup_masks[item] = obj._mask.copy()

        # If no data is loaded, we want to load subsets of data to start rather than
        # the whole block in one go, since this will save us 2x in memory.
        inhid_step = len(inhid_list)
        if (mir_data.vis_data is None) and (mir_data.auto_data is None):
            inhid_step = (inhid_step // 4) + 1

        for start in range(0, len(inhid_list), inhid_step):
            # If no data is loaded, load up a quarter of the data at a time. This
            # keeps the "extra" memory usage below that for the nsample_array attr,
            # which is generated and filled _after_ this step (thus no extra memory
            # should be required)
            if (mir_data.vis_data is None) and (mir_data.auto_data is None):
                # Note that the masks are combined via and operation.
                mir_data.select(
                    where=("inhid", "eq", inhid_list[start : start + inhid_step])
                )

            # Call this convenience function in case we need to run the data filling
            # multiple times (if we are loading up subsets of data)
            self._prep_and_insert_data(
                mir_data,
                sphid_dict,
                spdx_dict,
                blhid_blt_order,
                apply_flags=apply_flags,
                apply_tsys=apply_tsys,
                apply_dedoppler=apply_dedoppler,
            )

            for item, obj in mir_data._metadata_attrs.items():
                # Because the select operation messes with the masks, we want to restore
                # those in case we mucked with them earlier (so that subsequent selects
                # behave as expected).
                obj._mask = backup_masks[item].copy()

        # Now handle the weights
        self.nsample_array = np.zeros((Nblts, Npols, Nfreqs), dtype=np.float32)
        for sp_rec, window in zip(mir_data.sp_data, spdx_list):
            blt_idx = blhid_blt_order[sp_rec["blhid"]]
            ch_slice = spdx_dict[window]["ch_slice"]
            pol_idx = spdx_dict[window]["pol_idx"]
            # The "wt" column is calculated as (integ time)/(T_DSB ** 2), but we want
            # units of Jy**-2. To do this, we just need to multiply by one of the
            # forward gain of the antenna (130 Jy/K for SMA) squared and the channel
            # width. The factor of 2**2 (4) arises because we need to convert T_DSB**2
            # to T_SSB**2.
            self.nsample_array[blt_idx, pol_idx, ch_slice] = (
                ((130.0 * 2.0) ** (-2.0)) * sp_rec["wt"] * np.abs(1e6 * sp_rec["fres"])
            )

        # Now assign our flexible arrays to the object itself
        self.freq_array = freq_array
        self.Nfreqs = Nfreqs
        self.Nspws = Nspws
        self.channel_width = channel_width
        self.flex_spw_id_array = flex_spw_id_array

        # Derive Nants_data from baselines.
        self.Nants_data = len(
            np.unique(
                np.concatenate((mir_data.bl_data["iant1"], mir_data.bl_data["iant2"]))
            )
        )
        self.Nants_telescope = 8
        self.Nbls = int(self.Nants_data * (self.Nants_data - 1) / 2)
        self.Nblts = Nblts
        self.Npols = Npols
        self.Ntimes = len(mir_data.in_data)
        self.antenna_names = ["Ant%i" % idx for idx in range(1, 9)]

        self.antenna_numbers = np.arange(1, 9)

        # Prepare the XYZ coordinates of the antenna positions.
        antXYZ = np.zeros([self.Nants_telescope, 3])
        for idx in range(self.Nants_telescope):
            if (idx + 1) in mir_data.antpos_data["antenna"]:
                antXYZ[idx] = mir_data.antpos_data["xyz_pos"][
                    mir_data.antpos_data["antenna"] == (idx + 1)
                ]

        # Get the coordinates from the entry in telescope.py
        lat, lon, alt = get_telescope("SMA")._telescope_location.lat_lon_alt()
        self.telescope_location_lat_lon_alt = (lat, lon, alt)

        # Calculate antenna positions in ECEF frame. Note that since both
        # coordinate systems are in relative units, no subtraction from
        # telescope geocentric position is required , i.e we are going from
        # relRotECEF -> relECEF
        self.antenna_positions = uvutils.ECEF_from_rotECEF(antXYZ, lon)

        self.history = "Raw Data"
        self.instrument = "SWARM"

        # Before proceeding, we want to check that information that's stored on a
        # per-spectral record basis (sphid) is consistent across a given baseline-time
        # (n.b., blhid changes on sideband and polarization, so multiple blhids
        # correspond to a single baseline-time). We need to do this check here since
        # pyuvdata handles this metadata on a per-baseline-time basis (and there's no
        # good reason it should vary on a per-sphid basis).
        sp_to_blt = ["igq", "ipq", "vradcat"]
        sp_temp_dict = {}
        suppress_warning = False
        for sp_rec in mir_data.sp_data:
            # Evaluate each spectral records metadata individually, and create a simple
            # dict that contains the metadata we want to check.
            temp_dict = {item: sp_rec[item] for item in sp_to_blt}
            try:
                # If we have already captured metadata about this baseline-time, check
                # to make sure that it agrees with the previous entires.
                if sp_temp_dict[blhid_blt_order[sp_rec["blhid"]]] != temp_dict:
                    # If the entry does NOT agree with what was previously given,
                    # warn the user, and update the record (we only want to warn once
                    # to avoid flooding the user with error messages).
                    if not suppress_warning:
                        warnings.warn(
                            "Per-spectral window metadata differ. Defaulting to using "
                            "the last value in the data set."
                        )
                        suppress_warning = True
                    sp_temp_dict[blhid_blt_order[sp_rec["blhid"]]] = temp_dict
            except KeyError:
                # If we get a key error, it means this is the first record to be
                # evaluated for this given baseline-time, so create a new dict
                # entry so that subsequent sphid records can be evaluated.
                sp_temp_dict[blhid_blt_order[sp_rec["blhid"]]] = temp_dict

        # Next step: we want to check that information that's stored on a per-baseline
        # record basis (blhid) is consistent across a given baseline-time (n.b., again,
        # blhid changes on sideband and polarization, so multiple blhids correspond to
        # a single baseline-time). We need to do this check here since pyuvdata handles
        # this metadata on a per-baseline-time basis.
        bl_to_blt = ["u", "v", "w", "iant1", "iant2"]

        # Note that these are entries that vary per-integration record (inhid), but
        # we include them here so that we can easily expand the per-integration data
        # to the per-baseline-time length arrays that UVData expects.
        in_to_blt = ["lst", "mjd", "ara", "adec", "isource", "rinteg"]
        blt_temp_dict = {}
        suppress_warning = False
        for idx, bl_rec in enumerate(mir_data.bl_data):
            # Evaluate each spectral records metadata individually, and create a simple
            # dict that contains the metadata we want to check.
            temp_dict = {item: bl_rec[item] for item in bl_to_blt}
            in_rec = mir_data.in_data._data[bl_in_idx[idx]]

            # Update the dict with the per-inhid data as well.
            temp_dict.update({item: in_rec[item] for item in in_to_blt})

            # Finally, fold in the originally per-sphid data that we checked above.
            temp_dict.update(sp_temp_dict[blhid_blt_order[bl_rec["blhid"]]])
            try:
                # If we have already captured metadata about this baseline-time, check
                # to make sure that it agrees with the previous entires.
                if blt_temp_dict[blhid_blt_order[bl_rec["blhid"]]] != temp_dict:
                    # Again, if we reach this point, only raise a warning one time
                    if not suppress_warning:
                        warnings.warn(
                            "Per-baseline metadata differ. Defaulting to using "
                            "the last value in the data set."
                        )
                        suppress_warning = True
                    blt_temp_dict[blhid_blt_order[bl_rec["blhid"]]] = temp_dict
            except KeyError:
                # If we get a key error, it means this is the first record to be
                # evaluated for this given baseline-time, so create a new dict
                # entry so that subsequent blhid records can be evaluated.
                blt_temp_dict[blhid_blt_order[bl_rec["blhid"]]] = temp_dict

        # Initialize the metadata arrays.
        integration_time = np.zeros(Nblts, dtype=float)
        lst_array = np.zeros(Nblts, dtype=float)
        mjd_array = np.zeros(Nblts, dtype=float)
        ant_1_array = np.zeros(Nblts, dtype=int)
        ant_2_array = np.zeros(Nblts, dtype=int)
        uvw_array = np.zeros((Nblts, 3), dtype=float)
        phase_center_id_array = np.zeros(Nblts, dtype=int)
        app_ra = np.zeros(Nblts, dtype=float)
        app_dec = np.zeros(Nblts, dtype=float)

        # Use the per-blt dict that we constructed above to populate the various
        # metadata arrays that we need for constructing the UVData object.
        for blt_key in blt_temp_dict.keys():
            temp_dict = blt_temp_dict[blt_key]
            integration_time[blt_key] = temp_dict["rinteg"]
            # TODO: Using MIR V3 convention for lst, will need make it V2 compatible.
            lst_array[blt_key] = temp_dict["lst"] * (np.pi / 12.0)  # Hours -> rad
            mjd_array[blt_key] = temp_dict["mjd"]
            ant_1_array[blt_key] = temp_dict["iant1"]
            ant_2_array[blt_key] = temp_dict["iant2"]
            uvw_array[blt_key] = np.array(
                [temp_dict["u"], temp_dict["v"], temp_dict["w"]]
            )
            phase_center_id_array[blt_key] = temp_dict["isource"]
            app_ra[blt_key] = temp_dict["ara"]
            app_dec[blt_key] = temp_dict["adec"]

        # Finally, assign arrays to attributed
        self.ant_1_array = ant_1_array
        self.ant_2_array = ant_2_array
        self.baseline_array = self.antnums_to_baseline(
            self.ant_1_array, self.ant_2_array, attempt256=False
        )
        self.integration_time = integration_time
        self.lst_array = lst_array
        self.time_array = Time(mjd_array, scale="tt", format="mjd").utc.jd

        self.polarization_array = polarization_array
        self.flex_spw_polarization_array = flex_pol
        self.spw_array = np.array(spw_array, dtype=int)
        self.telescope_name = "SMA"

        # Need to flip the sign convention here on uvw, since we use a1-a2 versus the
        # standard a2-a1 that uvdata expects
        self.uvw_array = (-1.0) * uvw_array

        self.vis_units = "Jy"

        isource = np.unique(mir_data.in_data["isource"])
        for sou_id in isource:
            source_mask = mir_data.in_data["isource"] == sou_id
            source_ra = mir_data.in_data["rar"][source_mask].astype(float)
            source_dec = mir_data.in_data["decr"][source_mask].astype(float)
            source_epoch = np.mean(mir_data.in_data["epoch"][source_mask]).astype(float)
            source_name = mir_data.codes_data["source"][sou_id]
            if source_epoch != 2000.0:
                # When fed a non-J2000 coordinate, we want to convert that so that it
                # can easily be written into CASA MS format. In this case, we take the
                # median apparent position and translate that to ICRS
                time_arr = Time(
                    mir_data.in_data["mjd"][source_mask], scale="tt", format="mjd"
                ).utc.jd
                source_ra, source_dec = uvutils.transform_app_to_icrs(
                    time_arr,
                    mir_data.in_data["ara"][source_mask],
                    mir_data.in_data["adec"][source_mask],
                    self.telescope_location_lat_lon_alt,
                )
            self._add_phase_center(
                source_name,
                cat_type="sidereal",
                cat_lon=np.median(source_ra),
                cat_lat=np.median(source_dec),
                cat_epoch=None if (source_epoch != 2000.0) else source_epoch,
                cat_frame="icrs" if (source_epoch != 2000.0) else "fk5",
                info_source="file",
                cat_id=int(sou_id),
            )

            # See if the ra/dec positions change by more than an arcmin, and if so,
            # raise a warning to the user since this isn't common.
            dist_check = angular_separation(
                source_ra[0], source_dec[0], source_ra, source_dec
            )

            if any(dist_check > ((np.pi / 180.0) / 60)):
                warnings.warn(
                    "Position for %s changes by more than an arcminute." % source_name
                )

        # Regenerate the sou_id_array thats native to MIR into a zero-indexed per-blt
        # entry for UVData, then grab ra/dec/position data.
        self.phase_center_id_array = phase_center_id_array
        for val in np.unique(self.phase_center_id_array):
            assert val in self.phase_center_catalog.keys()

        # Fill in the apparent coord calculations
        self.phase_center_app_ra = app_ra
        self.phase_center_app_dec = app_dec

        # For MIR, uvw coords are always calculated in the "apparent" position. We can
        # adjust this by calculating the position angle with our preferred coordinate
        # frame (ICRS) and applying the rotation below (via `calc_uvw`).
        self._set_app_coords_helper(pa_only=True)

        self.uvw_array = uvutils.calc_uvw(
            uvw_array=self.uvw_array,
            old_frame_pa=0.0,
            frame_pa=self.phase_center_frame_pa,
            use_ant_pos=False,
        )

        self.antenna_diameters = np.zeros(self.Nants_telescope) + 6.0
        self.blt_order = ("time", "baseline")

        # set filename attribute
        basename = mir_data.filepath.rstrip("/")
        self.filename = [os.path.basename(basename)]
        self._filename.form = (1,)

        # We call transpose here since vis_data above is shape (Nblts, Npols, Nfreqs),
        # and we need to get it to (Nblts,Nfreqs, Npols) to match what UVData expects.
        self.data_array = np.transpose(self.data_array, (0, 2, 1))
        self.flag_array = np.transpose(self.flag_array, (0, 2, 1))
        self.nsample_array = np.transpose(self.nsample_array, (0, 2, 1))

    def write_mir(self, filename):
        """
        Write out the SMA MIR files.

        Parameters
        ----------
        filename : str
            The path to the folder on disk to write data to.
        """
        raise NotImplementedError
back to top