Revision 8b22682704c00bd278c44dae1686f726d261b718 authored by Steven Murray on 09 January 2023, 21:13:32 UTC, committed by Steven Murray on 09 January 2023, 21:13:32 UTC
1 parent e377413
mir.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2020 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading and writing Mir files."""
import os
import warnings
import numpy as np
from astropy.time import Time
from .. import get_telescope
from .. import utils as uvutils
from . import mir_parser
from .uvdata import UVData
__all__ = ["Mir"]
class Mir(UVData):
"""
A class for Mir file objects.
This class defines an Mir-specific subclass of UVData for reading and
writing Mir files. This class should not be interacted with directly,
instead use the read_mir and write_mir methods on the UVData class.
"""
def read_mir(
self,
filepath,
isource=None,
irec=None,
isb=None,
corrchunk=None,
pseudo_cont=False,
run_check=True,
check_extra=True,
run_check_acceptability=True,
strict_uvw_antpos_check=False,
allow_flex_pol=True,
check_autos=True,
fix_autos=False,
rechunk=None,
):
"""
Read in data from an SMA MIR file, and map to the UVData model.
Note that with the exception of filename, the rest of the parameters are
used to sub-select a range of data that matches the limitations of the current
instantiation of pyuvdata -- namely 1 source. This could be dropped in the
future, as pyuvdata capabilities grow.
Parameters
----------
filepath : str
The file path to the MIR folder to read from.
isource : list of int
Source code(s) for MIR dataset
irec : array-like of int
Receiver code for MIR dataset
isb : array-like of int
Sideband codes for MIR dataset (0 = LSB, 1 = USB). Default is both sb.
corrchunk : array-like of int
Correlator chunk codes for MIR dataset
pseudo_cont : boolean
Read in only pseudo-continuuum values. Default is false.
run_check : bool
Option to check for the existence and proper shapes of parameters
before writing the file.
check_extra : bool
Option to check optional parameters as well as required ones.
run_check_acceptability : bool
Option to check acceptable range of the values of parameters before
writing the file.
strict_uvw_antpos_check : bool
Option to raise an error rather than a warning if the check that
uvws match antenna positions does not pass.
allow_flex_pol : bool
If only one polarization per spectral window is read (and the polarization
differs from window to window), allow for the `UVData` object to use
"flexible polarization", which compresses the polarization-axis of various
attributes to be of length 1, sets the `flex_spw_polarization_array`
attribute to define the polarization per spectral window. Default is True.
check_autos : bool
Check whether any auto-correlations have non-zero imaginary values in
data_array (which should not mathematically exist). Default is True.
fix_autos : bool
If auto-correlations with imaginary values are found, fix those values so
that they are real-only in data_array. Default is False.
rechunk : int
Number of channels to average over when reading in the dataset. Optional
argument, typically required to be a power of 2.
"""
# Use the mir_parser to read in metadata, which can be used to select data.
mir_data = mir_parser.MirParser(filepath)
if isource is not None:
mir_data.select(where=("isource", "eq", isource))
if not np.any(mir_data.in_data.get_mask()):
raise ValueError("No valid sources selected!")
if irec is not None:
mir_data.select(where=("irec", "eq", irec))
if not np.any(mir_data.in_data.get_mask()):
raise ValueError("No valid receivers selected!")
if isb is not None:
mir_data.select(where=("isb", "eq", isb))
if not np.any(mir_data.in_data.get_mask()):
raise ValueError("No valid sidebands selected!")
if corrchunk is not None:
mir_data.select(where=("corrchunk", "eq", corrchunk))
if not np.any(mir_data.in_data.get_mask()):
raise ValueError("No valid spectral bands selected!")
elif not pseudo_cont:
mir_data.select(where=("corrchunk", "ne", 0))
if rechunk is not None:
mir_data.rechunk(rechunk)
self._init_from_mir_parser(mir_data, allow_flex_pol=allow_flex_pol)
if run_check:
self.check(
check_extra=check_extra,
run_check_acceptability=run_check_acceptability,
strict_uvw_antpos_check=strict_uvw_antpos_check,
allow_flip_conj=True,
check_autos=check_autos,
fix_autos=fix_autos,
)
def _init_from_mir_parser(
self, mir_data: mir_parser.MirParser, allow_flex_pol=True
):
"""
Convert a MirParser object into a UVData object.
Parameters
----------
mir_data : MirParser object
MIR dataset to be converted into a UVData object.
allow_flex_pol : bool
If only one polarization per spectral window is read (and the polarization
differs from window to window), allow for the `UVData` object to use
"flexible polarization", which compresses the polarization-axis of various
attributes to be of length 1, sets the `flex_spw_polarization_array`
attribute to define the polarization per spectral window. Default is True.
"""
# By default, we will want to assume that MIR datasets are multi-spw.
# At present, there is no advantage to allowing this
# not to be true on read-in, particularly as in the long-term, this setting
# will hopefully become the default for all data sets.
self._set_flex_spw()
# Create a simple list for broadcasting values stored on a
# per-intergration basis in MIR into the (tasty) per-blt records in UVDATA.
bl_in_idx = mir_data.in_data._index_query(header_key=mir_data.bl_data["inhid"])
# Create a simple array/list for broadcasting values stored on a
# per-blt basis into per-spw records, and per-time into per-blt records
sp_bl_idx = mir_data.bl_data._index_query(header_key=mir_data.sp_data["blhid"])
ant1_rxa_mask = mir_data.bl_data.get_value("ant1rx", index=sp_bl_idx) == 0
ant2_rxa_mask = mir_data.bl_data.get_value("ant2rx", index=sp_bl_idx) == 0
if len(np.unique(mir_data.bl_data["ipol"])) == 1 and (
len(mir_data.codes_data["pol"]) == (2 * 4)
):
# If only one pol is found, and the polarization dictionary has only four
# codes + four unique index codes, then we actually need to verify this is
# a single pol observation, since the current system has a quirk that it
# marks both X- and Y-pol receivers as the same polarization.
pol_arr = np.zeros_like(sp_bl_idx)
pol_arr[np.logical_and(ant1_rxa_mask, ant2_rxa_mask)] = 0
pol_arr[np.logical_and(~ant1_rxa_mask, ~ant2_rxa_mask)] = 1
pol_arr[np.logical_and(ant1_rxa_mask, ~ant2_rxa_mask)] = 2
pol_arr[np.logical_and(~ant1_rxa_mask, ant2_rxa_mask)] = 3
else:
# If this has multiple ipol codes, then we don't need to worry about the
# single-code ambiguity.
pol_arr = mir_data.bl_data.get_value("ipol", index=sp_bl_idx)
# Construct an indexing list, that we'll use later to figure out what data
# goes where, based on spw, sideband, and pol code.
spdx_list = [
(winid, sbid, polid)
for winid, sbid, polid in zip(
mir_data.sp_data["corrchunk"],
mir_data.bl_data.get_value("isb", index=sp_bl_idx),
pol_arr,
)
]
# Create a dict with the ordering of the pols
pol_dict = {key: idx for idx, key in enumerate(np.unique(pol_arr))}
pol_split_tuning = False
if len(pol_dict) == 2:
# If dual-pol, then we need to check if the tunings are split, because
# the two polarizations will effectively be concat'd across the freq
# axis instead of the pol axis.
rxa_mask = ant1_rxa_mask & ant2_rxa_mask
rxb_mask = ~(ant1_rxa_mask | ant2_rxa_mask)
if np.any(rxa_mask) and np.any(rxb_mask):
# If we have both VV and HH data, check to see that the tunings of the
# two receivers match.
loa_freq = np.median(mir_data.sp_data["gunnLO"][rxa_mask])
lob_freq = np.median(mir_data.sp_data["gunnLO"][rxb_mask])
pol_split_tuning = not np.isclose(loa_freq, lob_freq)
# Map MIR pol code to pyuvdata/AIPS polarization number
pol_code_dict = {}
icode_dict = mir_data.codes_data["pol"]
for code in mir_data.codes_data.get_codes("pol", return_dict=False):
# There are pol modes/codes that are support in MIR that are not in AIPS
# or CASA, although they are rarely used, so we can skip over translating
# them in the try/except loop here (if present in the data, it will throw
# an error further downstream).
try:
pol_code_dict[icode_dict[code]] = uvutils.POL_STR2NUM_DICT[code.lower()]
except KeyError:
pass
if pol_split_tuning and allow_flex_pol:
# If we have a split tuning that, that means we can take advantage of
# the flex-pol feature within UVData
Npols = 1
polarization_array = np.array([0])
else:
# Otherwise, calculate dimensions and arrays for the polarization axis
# like normal.
Npols = len(set(pol_dict.values()))
polarization_array = np.zeros(Npols, dtype=int)
for key in pol_dict.keys():
polarization_array[pol_dict[key]] = pol_code_dict[key]
# Create a list of baseline-time combinations in the data
blt_list = [
(intid, ant1, ant2)
for intid, ant1, ant2 in zip(*mir_data.bl_data[["inhid", "iant1", "iant2"]])
]
# Use the list above to create a dict that maps baseline-time combo
# to position along the mouthwatering blt-axis.
blt_dict = {
blt_tuple: idx
for idx, blt_tuple in enumerate(
sorted(set(blt_list), key=lambda x: (x[0], x[1], x[2]))
)
}
# Match blhid in MIR to blt position in UVData
blhid_blt_order = {
key: blt_dict[value]
for key, value in zip(mir_data.bl_data["blhid"], blt_list)
}
# The more blts, the better
Nblts = len(blt_dict)
# Here we need to do a little fancy footwork in order to map spectral windows
# to ranges along the freq-axis, and calculate some values that will eventually
# populate arrays related to this axis (e.g., freq_array, chan_width).
spdx_dict = {}
spw_dict = {}
for spdx in set(spdx_list):
data_mask = np.array([spdx == item for item in spdx_list])
# Grab values, get them into appropriate types
spw_fsky = np.unique(mir_data.sp_data["fsky"][data_mask])
spw_fres = np.unique(mir_data.sp_data["fres"][data_mask])
spw_nchan = np.unique(mir_data.sp_data["nch"][data_mask])
# Make sure that something weird hasn't happened with the metadata (this
# really should never happen, only one value should exist per window).
assert len(spw_fsky) == 1
assert len(spw_fres) == 1
assert len(spw_nchan) == 1
# Get the data in the right units and dtype
spw_fsky = float(spw_fsky * 1e9) # GHz -> Hz
spw_fres = float(spw_fres * 1e6) # MHz -> Hz
spw_nchan = int(spw_nchan)
# We need to do a some extra handling here, because a single correlator
# can produce multiple spectral windows (e.g., LSB/USB). The scheme below
# will negate the corr band number if LSB, will set the corr band number to
# 255 if the values arise from the pseudo-wideband values, and will add 512
# if the pols are split-tuned. This scheme, while a little funky, guarantees
# that each unique freq range has its own spectral window number.
spw_id = 255 if (spdx[0] == 0) else spdx[0]
spw_id *= (-1) ** (1 + spdx[1])
spw_id += 512 if (pol_split_tuning and spdx[2] == 1) else 0
# Populate the channel width array
channel_width = abs(spw_fres) + np.zeros(spw_nchan, dtype=np.float64)
# Populate the spw_id_array
spw_id_array = spw_id + np.zeros(spw_nchan, dtype=np.int64)
# So the freq array here is a little weird, because the current fsky
# refers to the point between the nch/2 and nch/2 + 1 channel in the
# raw (unaveraged) spectrum. This was done for the sake of some
# convenience, at the cost of clarity. In some future format of the
# data, we expect to be able to drop seemingly random offset here.
freq_array = (
spw_fsky
- (np.sign(spw_fres) * 139648.4375)
+ (spw_fres * (np.arange(spw_nchan) + 0.5 - (spw_nchan / 2)))
)
# Note here that if we're using flex-pol, then the polarization axis of
# the UVData object is 1, hence why pol_idx is forced to be 0 here.
spdx_dict[spdx] = {
"spw_id": spw_id,
"pol_idx": (
0 if (pol_split_tuning and allow_flex_pol) else pol_dict[spdx[2]]
),
}
# Stuff this dictionary full of the per-spw metadata
spw_dict[spw_id] = {
"nchan": spw_nchan,
"freqs": spw_fres,
"fsky": spw_fsky,
"channel_width": channel_width,
"spw_id_array": spw_id_array,
"freq_array": freq_array,
"pol_state": pol_code_dict[spdx[2]],
}
spw_array = sorted(spw_dict.keys())
Nspws = len(spw_array)
Nfreqs = 0
for key in spw_array:
spw_dict[key]["ch_slice"] = slice(Nfreqs, Nfreqs + spw_dict[key]["nchan"])
Nfreqs += spw_dict[key]["nchan"]
# Initialize some arrays that we'll be appending to
flex_spw_id_array = np.zeros(Nfreqs, dtype=int)
channel_width = np.zeros(Nfreqs, dtype=np.float64)
freq_array = np.zeros(Nfreqs, dtype=np.float64)
flex_pol = np.zeros(Nspws, dtype=int)
for idx, key in enumerate(spw_array):
flex_spw_id_array[spw_dict[key]["ch_slice"]] = spw_dict[key]["spw_id_array"]
channel_width[spw_dict[key]["ch_slice"]] = spw_dict[key]["channel_width"]
freq_array[spw_dict[key]["ch_slice"]] = spw_dict[key]["freq_array"]
flex_pol[idx] = spw_dict[key]["pol_state"]
if not pol_split_tuning:
flex_pol = None
for key in spdx_dict:
spdx_dict[key]["ch_slice"] = spw_dict[spdx_dict[key]["spw_id"]]["ch_slice"]
# Load up the visibilities into the MirParser object. The array is shaped this
# way since when reading in a MIR file, we scan through the blt-axis the
# slowest and the freq-axis the fastest (i.e., the data is roughly ordered by
# blt, pol, freq).
vis_data = np.zeros((Nblts, Npols, Nfreqs), dtype=np.complex64)
vis_flags = np.ones((Nblts, Npols, Nfreqs), dtype=bool)
vis_weights = np.zeros((Nblts, Npols, Nfreqs), dtype=np.float32)
if mir_data.vis_data is None:
mir_data.load_data(load_cross=True, apply_tsys=True)
mir_data.apply_flags()
if not np.all(
np.isin(list(mir_data.vis_data.keys()), mir_data.sp_data["sphid"])
):
raise KeyError(
"Mismatch between keys in vis_data and sphid in sp_data, which should "
"not happen. Please file an issue in our GitHub issue log so that we "
"can fix it."
)
for sp_rec, window, vis_rec in zip(
mir_data.sp_data, spdx_list, mir_data.vis_data.values()
):
blt_idx = blhid_blt_order[sp_rec["blhid"]]
ch_slice = spdx_dict[window]["ch_slice"]
pol_idx = spdx_dict[window]["pol_idx"]
vis_data[blt_idx, pol_idx, ch_slice] = np.conj(vis_rec["data"])
vis_flags[blt_idx, pol_idx, ch_slice] = vis_rec["flags"]
# The "wt" column is calculated as (T_DSB ** 2)/(integ time), but we want
# units of Jy**-2. To do this, we just need to multiply by the forward gain
# of the antenna squared and the channel width. The factor of 2**2 (4)
# arises because we need to convert T_DSB**2 to T_SSB**2.
vis_weights[blt_idx, pol_idx, ch_slice] = (
((130.0 * 2.0) ** 2.0) * sp_rec["wt"] * np.abs(sp_rec["fres"])
)
# Drop the data from the MirParser object once we have it loaded up.
mir_data.unload_data()
# Now assign our flexible arrays to the object itself
# TODO: Spw axis to be collapsed in future release
self.freq_array = freq_array[np.newaxis, :]
self.Nfreqs = Nfreqs
self.Nspws = Nspws
self.channel_width = channel_width
self.flex_spw_id_array = flex_spw_id_array
# Derive Nants_data from baselines.
self.Nants_data = len(
np.unique(
np.concatenate((mir_data.bl_data["iant1"], mir_data.bl_data["iant2"]))
)
)
self.Nants_telescope = 8
self.Nbls = int(self.Nants_data * (self.Nants_data - 1) / 2)
self.Nblts = Nblts
self.Npols = Npols
self.Ntimes = len(mir_data.in_data)
self.antenna_names = [
"Ant 1",
"Ant 2",
"Ant 3",
"Ant 4",
"Ant 5",
"Ant 6",
"Ant 7",
"Ant 8",
]
self.antenna_numbers = np.arange(1, 9)
# Prepare the XYZ coordinates of the antenna positions.
antXYZ = np.zeros([self.Nants_telescope, 3])
for idx in range(self.Nants_telescope):
if (idx + 1) in mir_data.antpos_data["antenna"]:
antXYZ[idx] = mir_data.antpos_data["xyz_pos"][
mir_data.antpos_data["antenna"] == (idx + 1)
]
# Get the coordinates from the entry in telescope.py
lat, lon, alt = get_telescope("SMA")._telescope_location.lat_lon_alt()
self.telescope_location_lat_lon_alt = (lat, lon, alt)
# Calculate antenna postions in EFEF frame. Note that since both
# coordinate systems are in relative units, no subtraction from
# telescope geocentric position is required , i.e we are going from
# relRotECEF -> relECEF
self.antenna_positions = uvutils.ECEF_from_rotECEF(antXYZ, lon)
self.history = "Raw Data"
self.instrument = "SWARM"
# Before proceeding, we want to check that information that's stored on a
# per-spectral record basis (sphid) is consistent across a given baseline-time
# (n.b., blhid changes on sideband and polarization, so multiple blhids
# correspond to a single baseline-time). We need to do this check here since
# pyuvdata handles this metadata on a per-baseline-time basis (and there's no
# good reason it should vary on a per-sphid basis).
sp_to_blt = ["igq", "ipq", "vradcat"]
sp_temp_dict = {}
suppress_warning = False
for sp_rec in mir_data.sp_data:
# Evaluate each spectral records metadata individually, and create a simple
# dict that contains the metadata we want to check.
temp_dict = {item: sp_rec[item] for item in sp_to_blt}
try:
# If we have already captured metadata about this baseline-time, check
# to make sure that it agrees with the previous entires.
if sp_temp_dict[blhid_blt_order[sp_rec["blhid"]]] != temp_dict:
# If the entry does NOT agree with what was previously given,
# warn the user, and update the record (we only want to warn once
# to avoid flooding the user with error messages).
if not suppress_warning:
warnings.warn(
"Per-spectral window metadata differ. Defaulting to using "
"the last value in the data set."
)
suppress_warning = True
sp_temp_dict[blhid_blt_order[sp_rec["blhid"]]] = temp_dict
except KeyError:
# If we get a key error, it means this is the first record to be
# evaluated for this given baseline-time, so create a new dict
# entry so that subseqent sphid records can be evaluated.
sp_temp_dict[blhid_blt_order[sp_rec["blhid"]]] = temp_dict
# Next step: we want to check that information that's stored on a per-baseline
# record basis (blhid) is consistent across a given baseline-time (n.b., again,
# blhid changes on sideband and polarization, so multiple blhids correspond to
# a single baseline-time). We need to do this check here since pyuvdata handles
# this metadata on a per-baseline-time basis.
bl_to_blt = ["u", "v", "w", "iant1", "iant2"]
# Note that these are entries that vary per-integration record (inhid), but
# we include them here so that we can easily expand the per-integration data
# to the per-baseline-time length arrays that UVData expects.
in_to_blt = ["lst", "mjd", "ara", "adec", "isource", "rinteg"]
blt_temp_dict = {}
suppress_warning = False
for idx, bl_rec in enumerate(mir_data.bl_data):
# Evaluate each spectral records metadata individually, and create a simple
# dict that contains the metadata we want to check.
temp_dict = {item: bl_rec[item] for item in bl_to_blt}
in_rec = mir_data.in_data._data[bl_in_idx[idx]]
# Update the dict with the per-inhid data as well.
temp_dict.update({item: in_rec[item] for item in in_to_blt})
# Finally, fold in the originally per-sphid data that we checked above.
temp_dict.update(sp_temp_dict[blhid_blt_order[bl_rec["blhid"]]])
try:
# If we have already captured metadata about this baseline-time, check
# to make sure that it agrees with the previous entires.
if blt_temp_dict[blhid_blt_order[bl_rec["blhid"]]] != temp_dict:
# Again, if we reach this point, only raise a warning one time
if not suppress_warning:
warnings.warn(
"Per-baseline metadata differ. Defaulting to using "
"the last value in the data set."
)
suppress_warning = True
blt_temp_dict[blhid_blt_order[bl_rec["blhid"]]] = temp_dict
except KeyError:
# If we get a key error, it means this is the first record to be
# evaluated for this given baseline-time, so create a new dict
# entry so that subseqent blhid records can be evaluated.
blt_temp_dict[blhid_blt_order[bl_rec["blhid"]]] = temp_dict
# Initialize the metadata arrays.
integration_time = np.zeros(Nblts, dtype=float)
lst_array = np.zeros(Nblts, dtype=float)
mjd_array = np.zeros(Nblts, dtype=float)
ant_1_array = np.zeros(Nblts, dtype=int)
ant_2_array = np.zeros(Nblts, dtype=int)
uvw_array = np.zeros((Nblts, 3), dtype=float)
phase_center_id_array = np.zeros(Nblts, dtype=int)
app_ra = np.zeros(Nblts, dtype=float)
app_dec = np.zeros(Nblts, dtype=float)
# Use the per-blt dict that we constructed above to populate the various
# metadata arrays that we need for constructing the UVData object.
for blt_key in blt_temp_dict.keys():
temp_dict = blt_temp_dict[blt_key]
integration_time[blt_key] = temp_dict["rinteg"]
# TODO: Using MIR V3 convention for lst, will need make it V2 compatible.
lst_array[blt_key] = temp_dict["lst"] * (np.pi / 12.0) # Hours -> rad
mjd_array[blt_key] = temp_dict["mjd"]
ant_1_array[blt_key] = temp_dict["iant1"]
ant_2_array[blt_key] = temp_dict["iant2"]
uvw_array[blt_key] = np.array(
[temp_dict["u"], temp_dict["v"], temp_dict["w"]]
)
phase_center_id_array[blt_key] = temp_dict["isource"]
app_ra[blt_key] = temp_dict["ara"]
app_dec[blt_key] = temp_dict["adec"]
# Finally, assign arrays to arributed
self.ant_1_array = ant_1_array
self.ant_2_array = ant_2_array
self.baseline_array = self.antnums_to_baseline(
self.ant_1_array, self.ant_2_array, attempt256=False
)
self.integration_time = integration_time
self.lst_array = lst_array
self.time_array = Time(mjd_array, scale="tt", format="mjd").utc.jd
self.polarization_array = polarization_array
self.flex_spw_polarization_array = flex_pol
self.spw_array = np.array(spw_array, dtype=int)
self.telescope_name = "SMA"
# Need to flip the sign convention here on uvw, since we use a1-a2 versus the
# standard a2-a1 that uvdata expects
self.uvw_array = (-1.0) * uvw_array
self.vis_units = "Jy"
isource = np.unique(mir_data.in_data["isource"])
for sou_id in isource:
source_mask = mir_data.in_data["isource"] == sou_id
source_ra = np.mean(mir_data.in_data["rar"][source_mask]).astype(float)
source_dec = np.mean(mir_data.in_data["decr"][source_mask]).astype(float)
source_epoch = np.mean(mir_data.in_data["epoch"][source_mask]).astype(float)
self._add_phase_center(
mir_data.codes_data["source"][sou_id],
cat_type="sidereal",
cat_lon=source_ra,
cat_lat=source_dec,
cat_epoch=source_epoch,
cat_frame="fk5",
info_source="file",
cat_id=int(sou_id),
)
# Regenerate the sou_id_array thats native to MIR into a zero-indexed per-blt
# entry for UVData, then grab ra/dec/position data.
self.phase_center_id_array = phase_center_id_array
for val in np.unique(self.phase_center_id_array):
assert val in self.phase_center_catalog.keys()
# Fill in the apparent coord calculations
self.phase_center_app_ra = app_ra
self.phase_center_app_dec = app_dec
# For MIR, uvws are always calculated in the "apparent" position. We can adjust
# this by calculating the position angle with our preferred coordinate frame
# (ICRS) and applying the rotation below (via `calc_uvw`).
self._set_app_coords_helper(pa_only=True)
self.uvw_array = uvutils.calc_uvw(
uvw_array=self.uvw_array,
old_frame_pa=0.0,
frame_pa=self.phase_center_frame_pa,
use_ant_pos=False,
)
self.antenna_diameters = np.zeros(self.Nants_telescope) + 6.0
self.blt_order = ("time", "baseline")
# set filename attribute
basename = mir_data.filepath.rstrip("/")
self.filename = [os.path.basename(basename)]
self._filename.form = (1,)
# We call transpose here since vis_data above is shape (Nblts, Npols, Nfreqs),
# and we need to get it to (Nblts, 1, Nfreqs, Npols) to match what UVData
# expects.
self.data_array = np.transpose(vis_data, (0, 2, 1))[:, np.newaxis, :, :]
self.flag_array = np.transpose(vis_flags, (0, 2, 1))[:, np.newaxis, :, :]
self.nsample_array = np.transpose(vis_weights, (0, 2, 1))[:, np.newaxis, :, :]
def write_mir(self, filename):
"""
Write out the SMA MIR files.
Parameters
----------
filename : str
The path to the folder on disk to write data to.
"""
raise NotImplementedError
Computing file changes ...