test_ms.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for MS object.
"""
import pytest
import os
import shutil
import numpy as np
from pyuvdata import UVData
from pyuvdata.uvdata.ms import MS
from pyuvdata.data import DATA_PATH
import pyuvdata.tests as uvtest
from ..uvfits import UVFITS
pytest.importorskip("casacore")
@pytest.fixture(scope="session")
def nrao_uv_main():
uvobj = UVData()
testfile = os.path.join(DATA_PATH, "day2_TDEM0003_10s_norx_1src_1spw.ms")
uvobj.read(testfile)
yield uvobj
del uvobj
@pytest.fixture(scope="function")
def nrao_uv(nrao_uv_main):
"""Make function level NRAO ms object."""
nrao_ms = nrao_uv_main.copy()
yield nrao_ms
# clean up when done
del nrao_ms
return
@pytest.fixture(scope="session")
def mir_uv_main():
mir_uv = UVData()
testfile = os.path.join(DATA_PATH, "sma_test.mir")
mir_uv.read(testfile)
yield mir_uv
@pytest.fixture(scope="function")
def mir_uv(mir_uv_main):
"""Make a function level copy of a MIR data object"""
mir_uv = mir_uv_main.copy()
yield mir_uv
@pytest.mark.filterwarnings("ignore:ITRF coordinate frame detected,")
@pytest.mark.filterwarnings("ignore:UVW orientation appears to be flipped,")
@pytest.mark.filterwarnings("ignore:telescope_location is not set")
def test_cotter_ms():
"""Test reading in an ms made from MWA data with cotter (no dysco compression)"""
uvobj = UVData()
testfile = os.path.join(DATA_PATH, "1102865728_small.ms/")
uvobj.read(testfile)
# check that a select on read works
uvobj2 = UVData()
with uvtest.check_warnings(
UserWarning,
match=[
"Warning: select on read keyword set",
"telescope_location is not set. Using known values for MWA.",
"UVW orientation appears to be flipped,",
],
):
uvobj2.read(testfile, freq_chans=np.arange(2))
uvobj.select(freq_chans=np.arange(2))
assert uvobj == uvobj2
del uvobj
@pytest.mark.filterwarnings("ignore:ITRF coordinate frame detected,")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_read_nrao_loopback(tmp_path, nrao_uv):
"""Test reading in a CASA tutorial ms file and looping it through write_ms."""
uvobj = nrao_uv
expected_extra_keywords = ["DATA_COL", "observer"]
assert sorted(expected_extra_keywords) == sorted(uvobj.extra_keywords.keys())
testfile = os.path.join(tmp_path, "ms_testfile.ms")
uvobj.write_ms(testfile)
uvobj2 = UVData()
uvobj2.read_ms(testfile)
# also update filenames
assert uvobj.filename == ["day2_TDEM0003_10s_norx_1src_1spw.ms"]
assert uvobj2.filename == ["ms_testfile.ms"]
uvobj.filename = uvobj2.filename
assert uvobj == uvobj2
@pytest.mark.filterwarnings("ignore:ITRF coordinate frame detected,")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_read_lwa(tmp_path):
"""Test reading in an LWA ms file."""
uvobj = UVData()
testfile = os.path.join(DATA_PATH, "lwasv_cor_58342_05_00_14.ms.tar.gz")
expected_extra_keywords = ["DATA_COL", "observer"]
import tarfile
with tarfile.open(testfile) as tf:
new_filename = os.path.join(tmp_path, tf.getnames()[0])
tf.extractall(path=tmp_path)
uvobj.read(new_filename, file_type="ms")
assert sorted(expected_extra_keywords) == sorted(uvobj.extra_keywords.keys())
assert uvobj.history == uvobj.pyuvdata_version_str
# delete the untarred folder
shutil.rmtree(new_filename)
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
@pytest.mark.filterwarnings("ignore:telescope_location is not set")
def test_no_spw():
"""Test reading in a PAPER ms converted by CASA from a uvfits with no spw axis."""
uvobj = UVData()
testfile_no_spw = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAAM.ms")
uvobj.read(testfile_no_spw)
del uvobj
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_spwsupported():
"""Test reading in an ms file with multiple spws."""
uvobj = UVData()
testfile = os.path.join(DATA_PATH, "day2_TDEM0003_10s_norx_1scan.ms")
uvobj.read(testfile)
assert uvobj.Nspws == 2
@pytest.mark.filterwarnings("ignore:Coordinate reference frame not detected,")
@pytest.mark.filterwarnings("ignore:UVW orientation appears to be flipped,")
@pytest.mark.filterwarnings("ignore:telescope_location is not set")
def test_extra_pol_setup(tmp_path):
"""Test reading in an ms file with extra polarization setups (not used in data)."""
uvobj = UVData()
testfile = os.path.join(
DATA_PATH, "X5707_1spw_1scan_10chan_1time_1bl_noatm.ms.tar.gz"
)
import tarfile
with tarfile.open(testfile) as tf:
new_filename = os.path.join(tmp_path, tf.getnames()[0])
tf.extractall(path=tmp_path)
uvobj.read(new_filename, file_type="ms")
# delete the untarred folder
shutil.rmtree(new_filename)
@pytest.mark.filterwarnings("ignore:Telescope EVLA is not in known_telescopes.")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_read_ms_read_uvfits(nrao_uv, casa_uvfits):
"""
Test that a uvdata object instantiated from an ms file created with CASA's
importuvfits is equal to a uvdata object instantiated from the original
uvfits file (tests equivalence with importuvfits in uvdata up to issues around the
direction of the uvw array).
Since the history is missing from the ms file, this test sets both uvdata
histories to identical empty strings before comparing them.
"""
ms_uv = nrao_uv.copy()
uvfits_uv = casa_uvfits.copy()
# set histories to identical blank strings since we do not expect
# them to be the same anyways.
ms_uv.history = ""
uvfits_uv.history = ""
# the objects won't be equal because uvfits adds some optional parameters
# and the ms sets default antenna diameters even though the uvfits file
# doesn't have them
assert uvfits_uv != ms_uv
uvfits_uv.integration_time = ms_uv.integration_time
# they are equal if only required parameters are checked:
assert uvfits_uv.__eq__(ms_uv, check_extra=False)
# set those parameters to none to check that the rest of the objects match
ms_uv.antenna_diameters = None
for p in uvfits_uv.extra():
fits_param = getattr(uvfits_uv, p)
ms_param = getattr(ms_uv, p)
if fits_param.name in UVFITS.uvfits_required_extra and ms_param.value is None:
fits_param.value = None
setattr(uvfits_uv, p, fits_param)
# extra keywords are also different, set both to empty dicts
uvfits_uv.extra_keywords = {}
ms_uv.extra_keywords = {}
# also update filenames
assert uvfits_uv.filename == ["day2_TDEM0003_10s_norx_1src_1spw.uvfits"]
assert ms_uv.filename == ["day2_TDEM0003_10s_norx_1src_1spw.ms"]
uvfits_uv.filename = ms_uv.filename
assert uvfits_uv == ms_uv
@pytest.mark.filterwarnings("ignore:Telescope EVLA is not in known_telescopes.")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_read_ms_write_uvfits(nrao_uv, tmp_path):
"""
read ms, write uvfits test.
Read in ms file, write out as uvfits, read back in and check for
object equality.
"""
ms_uv = nrao_uv
uvfits_uv = UVData()
testfile = os.path.join(tmp_path, "outtest.uvfits")
ms_uv.write_uvfits(testfile, spoof_nonessential=True)
uvfits_uv.read(testfile)
# make sure filenames are what we expect
assert uvfits_uv.filename == ["outtest.uvfits"]
assert ms_uv.filename == ["day2_TDEM0003_10s_norx_1src_1spw.ms"]
uvfits_uv.filename = ms_uv.filename
assert uvfits_uv == ms_uv
del ms_uv
del uvfits_uv
@pytest.mark.filterwarnings("ignore:Telescope EVLA is not in known_telescopes.")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_read_ms_write_miriad(nrao_uv, tmp_path):
"""
read ms, write miriad test.
Read in ms file, write out as miriad, read back in and check for
object equality.
"""
pytest.importorskip("pyuvdata._miriad")
ms_uv = nrao_uv
miriad_uv = UVData()
testfile = os.path.join(tmp_path, "outtest_miriad")
ms_uv.write_miriad(testfile, clobber=True)
miriad_uv.read(testfile)
# make sure filenames are what we expect
assert miriad_uv.filename == ["outtest_miriad"]
assert ms_uv.filename == ["day2_TDEM0003_10s_norx_1src_1spw.ms"]
miriad_uv.filename = ms_uv.filename
assert miriad_uv == ms_uv
@pytest.mark.filterwarnings("ignore:Telescope EVLA is not in known_telescopes.")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
@pytest.mark.parametrize("axis", [None, "freq"])
def test_multi_files(casa_uvfits, axis):
"""
Reading multiple files at once.
"""
uv_full = casa_uvfits.copy()
uv_multi = UVData()
testfile1 = os.path.join(DATA_PATH, "multi_1.ms")
testfile2 = os.path.join(DATA_PATH, "multi_2.ms")
# It seems that these two files were made using the CASA importuvfits task, but the
# history tables are missing, so we can't infer that from the history. This means
# that the uvws are not flipped/data is not conjugated as they should be. Fix that.
filesread = [testfile1, testfile2]
# test once as list and once as an array
if axis is None:
filesread = np.array(filesread)
uv_multi.read(filesread, axis=axis)
# histories are different because of combining along freq. axis
# replace the history
uv_multi.history = uv_full.history
# the objects won't be equal because uvfits adds some optional parameters
# and the ms sets default antenna diameters even though the uvfits file
# doesn't have them
assert uv_multi != uv_full
# they are equal if only required parameters are checked:
assert uv_multi.__eq__(uv_full, check_extra=False)
# set those parameters to none to check that the rest of the objects match
uv_multi.antenna_diameters = None
for p in uv_full.extra():
fits_param = getattr(uv_full, p)
ms_param = getattr(uv_multi, p)
if fits_param.name in UVFITS.uvfits_required_extra and ms_param.value is None:
fits_param.value = None
setattr(uv_full, p, fits_param)
# extra keywords are also different, set both to empty dicts
uv_full.extra_keywords = {}
uv_multi.extra_keywords = {}
# make sure filenames are what we expect
assert set(uv_multi.filename) == {"multi_1.ms", "multi_2.ms"}
assert uv_full.filename == ["day2_TDEM0003_10s_norx_1src_1spw.uvfits"]
uv_multi.filename = uv_full.filename
uv_multi._filename.form = (1,)
assert uv_multi == uv_full
del uv_full
del uv_multi
def test_bad_col_name():
"""
Test error with invalid column name.
"""
uvobj = UVData()
testfile = os.path.join(DATA_PATH, "day2_TDEM0003_10s_norx_1src_1spw.ms")
with pytest.raises(ValueError, match="Invalid data_column value supplied"):
uvobj.read_ms(testfile, data_column="FOO")
@pytest.mark.parametrize("check_warning", [True, False])
@pytest.mark.parametrize(
"frame,errtype,msg",
(
["JNAT", NotImplementedError, "Support for the JNAT frame is not yet"],
["AZEL", NotImplementedError, "Support for the AZEL frame is not yet"],
["GALACTIC", NotImplementedError, "Support for the GALACTIC frame is not yet"],
["ABC", ValueError, "The coordinate frame ABC is not one of the supported"],
["123", ValueError, "The coordinate frame 123 is not one of the supported"],
),
)
def test_parse_casa_frame_ref_errors(check_warning, frame, errtype, msg):
"""
Test errors with matching CASA frames to astropy frame/epochs
"""
uvobj = MS()
if check_warning:
with uvtest.check_warnings(UserWarning, match=msg):
uvobj._parse_casa_frame_ref(frame, raise_error=False)
else:
with pytest.raises(errtype) as cm:
uvobj._parse_casa_frame_ref(frame)
assert str(cm.value).startswith(msg)
@pytest.mark.parametrize("check_warning", [True, False])
@pytest.mark.parametrize(
"frame,epoch,msg",
(
["fk5", 1991.1, "Frame fk5 (epoch 1991.1) does not have a corresponding match"],
["fk4", 1991.1, "Frame fk4 (epoch 1991.1) does not have a corresponding match"],
["icrs", 2021.0, "Frame icrs (epoch 2021) does not have a corresponding"],
),
)
def test_parse_pyuvdata_frame_ref_errors(check_warning, frame, epoch, msg):
"""
Test errors with matching CASA frames to astropy frame/epochs
"""
uvobj = MS()
if check_warning:
with uvtest.check_warnings(UserWarning, match=msg):
uvobj._parse_pyuvdata_frame_ref(frame, epoch, raise_error=False)
else:
with pytest.raises(ValueError) as cm:
uvobj._parse_pyuvdata_frame_ref(frame, epoch)
assert str(cm.value).startswith(msg)
def test_ms_history_lesson(mir_uv, tmp_path):
"""
Test that the MS reader/writer can parse complex history
"""
from casacore import tables
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_history_lesson.ms")
mir_uv.history = (
"Line 1.\nBegin measurement set history\nAPP_PARAMS;CLI_COMMAND;"
"APPLICATION;MESSAGE;OBJECT_ID;OBSERVATION_ID;ORIGIN;PRIORITY;TIME\n"
"End measurement set history.\nLine 2.\n"
)
mir_uv.write_ms(testfile, clobber=True)
tb_hist = tables.table(testfile + "/HISTORY", readonly=False, ack=False)
tb_hist.addrows()
for col in tb_hist.colnames():
tb_hist.putcell(col, tb_hist.nrows() - 1, tb_hist.getcell(col, 0))
tb_hist.putcell("ORIGIN", 1, "DUMMY")
tb_hist.putcell("APPLICATION", 1, "DUMMY")
tb_hist.putcell("TIME", 1, 0.0)
tb_hist.putcell("MESSAGE", 2, "Line 3.")
tb_hist.close()
ms_uv.read(testfile)
assert ms_uv.history.startswith(
"Line 1.\nBegin measurement set history\nAPP_PARAMS;CLI_COMMAND;APPLICATION;"
"MESSAGE;OBJECT_ID;OBSERVATION_ID;ORIGIN;PRIORITY;TIME\n;;DUMMY;Line 2.;0;-1;"
"DUMMY;INFO;0.0\nEnd measurement set history.\nLine 3.\n Read/written with "
"pyuvdata version:"
)
tb_hist = tables.table(os.path.join(testfile, "HISTORY"), ack=False, readonly=False)
tb_hist.rename(os.path.join(testfile, "FORGOTTEN"))
tb_hist.close()
ms_uv.read(testfile)
assert ms_uv.history.startswith(" Read/written with pyuvdata version:")
def test_ms_no_ref_dir_source(mir_uv, tmp_path):
"""
Test that the MS writer/reader appropriately reads in a single-source data set
as non-multi-phase if it can be, even if the original data set was.
"""
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_no_ref_dir_source.ms")
mir_uv.phase_center_frame = "fk5"
mir_uv._set_app_coords_helper()
mir_uv.write_ms(testfile, clobber=True)
ms_uv.read(testfile)
assert ms_uv.multi_phase_center is False
ms_uv._set_multi_phase_center(preserve_phase_center_info=True)
ms_uv._update_phase_center_id("3c84", 1)
ms_uv.phase_center_catalog["3c84"]["info_source"] = "file"
assert ms_uv.phase_center_catalog == mir_uv.phase_center_catalog
def test_ms_multi_spw_data_variation(mir_uv, tmp_path):
"""
Test that the MS writer/reader appropriately reads in a single-source data set
as non-multi-phase if it can be, even if the original data set was.
"""
from casacore import tables
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_multi_spw_data_variation.ms")
mir_uv.write_ms(testfile, clobber=True)
tb_main = tables.table(testfile, readonly=False, ack=False)
tb_main.putcol("EXPOSURE", np.arange(mir_uv.Nblts * mir_uv.Nspws) + 1.0)
tb_main.close()
with pytest.raises(ValueError) as cm:
ms_uv.read_ms(testfile)
assert str(cm.value).startswith("Column EXPOSURE appears to vary on between")
with uvtest.check_warnings(
UserWarning, match="Column EXPOSURE appears to vary on between windows, ",
):
ms_uv.read_ms(testfile, raise_error=False)
# Check that the values do indeed match the first entry in the catalog
assert np.all(ms_uv.integration_time == np.array([1.0]))
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
def test_ms_phasing(mir_uv, tmp_path):
"""
Test that the MS writer can appropriately handle unphased data sets.
"""
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_phasing.ms")
mir_uv.unphase_to_drift()
with pytest.raises(ValueError) as cm:
mir_uv.write_ms(testfile, clobber=True)
assert str(cm.value).startswith("The data are in drift mode.")
mir_uv.write_ms(testfile, clobber=True, force_phase=True)
ms_uv.read(testfile)
assert np.allclose(ms_uv.phase_center_app_ra, ms_uv.lst_array)
assert np.allclose(
ms_uv.phase_center_app_dec, ms_uv.telescope_location_lat_lon_alt[0]
)
def test_ms_single_chan(mir_uv, tmp_path):
"""
Make sure that single channel writing/reading work as expected
"""
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_single_chan.ms")
mir_uv.select(freq_chans=0)
mir_uv.write_ms(testfile, clobber=True)
mir_uv.set_lsts_from_time_array()
mir_uv._set_app_coords_helper()
with pytest.raises(ValueError) as cm:
ms_uv.read_ms(testfile)
assert str(cm.value).startswith("No valid data available in the MS file.")
ms_uv.read_ms(testfile, ignore_single_chan=False, read_weights=False)
# Easiest way to check that everything worked is to just check for equality, but
# the MS file is single-spw, single-field, so we have a few things we need to fix
# First, make the date multi-phase-ctr
ms_uv._set_multi_phase_center(preserve_phase_center_info=True)
ms_uv._update_phase_center_id("3c84", 1)
# Next, turn on flex-spw
ms_uv._set_flex_spw()
ms_uv.channel_width = np.array([ms_uv.channel_width])
ms_uv.flex_spw_id_array = ms_uv.spw_array.copy()
# Finally, take care of the odds and ends
ms_uv.extra_keywords = {}
ms_uv.history = mir_uv.history
ms_uv.filename = mir_uv.filename
ms_uv.instrument = mir_uv.instrument
ms_uv.reorder_blts()
assert ms_uv == mir_uv
def test_ms_extra_data_descrip(mir_uv, tmp_path):
"""
Make sure that data sets can be read even if the main table doesn't have data
for a particular listed spectral window in the DATA_DESCRIPTION table.
"""
from casacore import tables
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_extra_data_descrip.ms")
mir_uv.write_ms(testfile, clobber=True)
tb_dd = tables.table(
os.path.join(testfile, "DATA_DESCRIPTION"), ack=False, readonly=False
)
tb_dd.addrows()
for col in tb_dd.colnames():
tb_dd.putcell(col, tb_dd.nrows() - 1, tb_dd.getcell(col, 0))
tb_dd.close()
ms_uv.read_ms(testfile, ignore_single_chan=False, read_weights=False)
# There are some minor differences between the values stored by MIR and that
# calculated by UVData. Since MS format requires these to be calculated on the fly,
# we calculate them here just to verify that everything is looking okay.
mir_uv.set_lsts_from_time_array()
mir_uv._set_app_coords_helper()
# These reorderings just make sure that data from the two formats are lined up
# correctly.
mir_uv.reorder_freqs(spw_order="number")
ms_uv.reorder_blts()
# Fix the remaining differences between the two objects, all of which are expected
mir_uv.instrument = mir_uv.telescope_name
ms_uv.history = mir_uv.history
mir_uv.extra_keywords = ms_uv.extra_keywords
mir_uv.filename = ms_uv.filename = None
# Finally, with all exceptions handled, check for equality.
assert ms_uv == mir_uv
def test_ms_weights(mir_uv, tmp_path):
"""
Test that the MS writer/reader appropriately handles data when the
WEIGHT_SPECTRUM column is missing or bypassed.
"""
from casacore import tables
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_weights.ms")
mir_uv.nsample_array[0, 0, :, 0] = np.tile(
np.arange(mir_uv.Nfreqs / mir_uv.Nspws), mir_uv.Nspws,
)
mir_uv.write_ms(testfile, clobber=True)
tb_main = tables.table(testfile, readonly=False, ack=False)
tb_main.removecols("WEIGHT_SPECTRUM")
tb_main.close()
ms_uv.read_ms(testfile)
# Check that the values do indeed match expected (median) value
assert np.all(ms_uv.nsample_array == np.median(mir_uv.nsample_array))
ms_uv.read_ms(testfile, read_weights=False)
# Check that the values do indeed match expected (median) value
assert np.all(ms_uv.nsample_array == 1.0)
@pytest.mark.parametrize(
"badcol,badval,errtype,msg",
(
[None, None, IOError, "Thisisnofile.ms not found"],
["DATA_DESC_ID", [1000] * 8, ValueError, "No valid data available in the MS"],
["ARRAY_ID", np.arange(8), ValueError, "This file appears to have multiple"],
["DATA_COL", None, ValueError, "Invalid data_column value supplied."],
["TEL_LOC", None, ValueError, "Telescope frame is not ITRF and telescope is"],
),
)
def test_ms_reader_errs(mir_uv, tmp_path, badcol, badval, errtype, msg):
"""
Test whether the reader throws an appripropriate errors on read.
"""
from casacore import tables
ms_uv = UVData()
testfile = os.path.join(tmp_path, "out_ms_reader_errs.ms")
mir_uv.write_ms(testfile, clobber=True)
data_col = "DATA"
if badcol is None:
testfile = "Thisisnofile.ms"
elif badcol == "DATA_COL":
data_col = badval
elif badcol == "TEL_LOC":
tb_obs = tables.table(
os.path.join(testfile, "OBSERVATION"), ack=False, readonly=False
)
tb_obs.removecols("TELESCOPE_LOCATION")
tb_obs.putcol("TELESCOPE_NAME", "ABC")
tb_obs.close()
tb_ant = tables.table(
os.path.join(testfile, "ANTENNA"), ack=False, readonly=False
)
tb_ant.putcolkeyword("POSITION", "MEASINFO", {"type": "position", "Ref": "ABC"})
tb_ant.close()
else:
tb_main = tables.table(testfile, ack=False, readonly=False)
tb_main.putcol(badcol, badval)
tb_main.close()
with pytest.raises(errtype) as cm:
ms_uv.read_ms(testfile, data_column=data_col)
assert str(cm.value).startswith(msg)