Raw File
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Tests for MS object.

"""
from __future__ import absolute_import, division, print_function

import pytest
import os
import shutil
import numpy as np

from pyuvdata import UVData
from pyuvdata.data import DATA_PATH
import pyuvdata.tests as uvtest
from pyuvdata.uvfits import UVFITS


@uvtest.skipIf_no_casa
def test_cotter_ms():
    """Test reading in an ms made from MWA data with cotter (no dysco compression)"""
    UV = UVData()
    testfile = os.path.join(DATA_PATH, '1102865728_small.ms')
    UV.read(testfile)

    # check that a select on read works
    UV2 = UVData()
    uvtest.checkWarnings(UV2.read, [testfile], {'freq_chans': np.arange(2)},
                         message='Warning: select on read keyword set')
    UV.select(freq_chans=np.arange(2))
    assert UV == UV2
    del(UV)


@uvtest.skipIf_no_casa
def test_readNRAO():
    """Test reading in a CASA tutorial ms file."""
    UV = UVData()
    testfile = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.ms')
    expected_extra_keywords = ['DATA_COL']

    UV.read(testfile)
    assert sorted(expected_extra_keywords) == sorted(list(UV.extra_keywords.keys()))


@uvtest.skipIf_no_casa
def test_read_LWA():
    """Test reading in an LWA ms file."""
    UV = UVData()
    testfile = os.path.join(DATA_PATH, 'lwasv_cor_58342_05_00_14.ms.tar.gz')
    expected_extra_keywords = ['DATA_COL']

    import tarfile
    with tarfile.open(testfile) as tf:
        new_filename = os.path.join(DATA_PATH, tf.getnames()[0])
        tf.extractall(path=DATA_PATH)

    UV.read(new_filename, file_type='ms')
    assert sorted(expected_extra_keywords) == sorted(list(UV.extra_keywords.keys()))

    assert UV.history == UV.pyuvdata_version_str

    # delete the untarred folder
    shutil.rmtree(new_filename)


@uvtest.skipIf_no_casa
def test_noSPW():
    """Test reading in a PAPER ms converted by CASA from a uvfits with no spw axis."""
    UV = UVData()
    testfile_no_spw = os.path.join(
        DATA_PATH, 'zen.2456865.60537.xy.uvcRREAAM.ms')
    UV.read(testfile_no_spw)
    del(UV)


@uvtest.skipIf_no_casa
def test_spwnotsupported():
    """Test errors on reading in an ms file with multiple spws."""
    UV = UVData()
    testfile = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1scan.ms')
    pytest.raises(ValueError, UV.read, testfile)
    del(UV)


@uvtest.skipIf_no_casa
def test_multi_len_spw():
    """Test errors on reading in an ms file with multiple spws with variable lenghth."""
    UV = UVData()
    testfile = os.path.join(DATA_PATH, 'multi_len_spw.ms')
    with pytest.raises(ValueError) as cm:
        UV.read(testfile)
    assert str(cm.value).startswith('Sorry.  Files with more than one spectral')


@uvtest.skipIf_no_casa
def test_extra_pol_setup():
    """Test reading in an ms file with extra polarization setups (not used in data)."""
    UV = UVData()
    testfile = os.path.join(DATA_PATH, 'X5707_1spw_1scan_10chan_1time_1bl_noatm.ms.tar.gz')

    import tarfile
    with tarfile.open(testfile) as tf:
        new_filename = os.path.join(DATA_PATH, tf.getnames()[0])
        tf.extractall(path=DATA_PATH)

    UV.read(new_filename, file_type='ms')

    # delete the untarred folder
    shutil.rmtree(new_filename)


@uvtest.skipIf_no_casa
def test_readMSreadUVFITS():
    """
    Test that a uvdata object instantiated from an ms file created with CASA's
    importuvfits is equal to a uvdata object instantiated from the original
    uvfits file (tests equivalence with importuvfits in uvdata).
    Since the histories are different, this test sets both uvdata
    histories to identical empty strings before comparing them.
    """
    ms_uv = UVData()
    uvfits_uv = UVData()
    ms_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.ms')
    uvfits_file = os.path.join(
        DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
    uvtest.checkWarnings(uvfits_uv.read, [uvfits_file],
                         message='Telescope EVLA is not')
    ms_uv.read(ms_file)
    # set histories to identical blank strings since we do not expect
    # them to be the same anyways.
    ms_uv.history = ""
    uvfits_uv.history = ""

    # the objects won't be equal because uvfits adds some optional parameters
    # and the ms sets default antenna diameters even thoug the uvfits file doesn't have them
    assert uvfits_uv != ms_uv
    # they are equal if only required parameters are checked:
    assert uvfits_uv.__eq__(ms_uv, check_extra=False)

    # set those parameters to none to check that the rest of the objects match
    ms_uv.antenna_diameters = None

    for p in uvfits_uv.extra():
        fits_param = getattr(uvfits_uv, p)
        ms_param = getattr(ms_uv, p)
        if fits_param.name in UVFITS.uvfits_required_extra and ms_param.value is None:
            fits_param.value = None
            setattr(uvfits_uv, p, fits_param)

    # extra keywords are also different, set both to empty dicts
    uvfits_uv.extra_keywords = {}
    ms_uv.extra_keywords = {}

    assert uvfits_uv == ms_uv
    del(ms_uv)
    del(uvfits_uv)


@uvtest.skipIf_no_casa
def test_readMSWriteUVFITS():
    """
    read ms, write uvfits test.
    Read in ms file, write out as uvfits, read back in and check for
    object equality.
    """
    ms_uv = UVData()
    uvfits_uv = UVData()
    ms_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.ms')
    testfile = os.path.join(DATA_PATH, 'test/outtest.uvfits')
    ms_uv.read(ms_file)
    ms_uv.write_uvfits(testfile, spoof_nonessential=True)
    uvtest.checkWarnings(uvfits_uv.read, [testfile],
                         message='Telescope EVLA is not')

    assert uvfits_uv == ms_uv
    del(ms_uv)
    del(uvfits_uv)


@uvtest.skipIf_no_casa
def test_readMSWriteMiriad():
    """
    read ms, write miriad test.
    Read in ms file, write out as miriad, read back in and check for
    object equality.
    """
    ms_uv = UVData()
    miriad_uv = UVData()
    ms_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.ms')
    testfile = os.path.join(DATA_PATH, 'test/outtest_miriad')
    ms_uv.read(ms_file)
    ms_uv.write_miriad(testfile, clobber=True)
    uvtest.checkWarnings(miriad_uv.read, [testfile],
                         message='Telescope EVLA is not')

    assert miriad_uv == ms_uv


@uvtest.skipIf_no_casa
def test_multi_files():
    """
    Reading multiple files at once.
    """
    uv_full = UVData()
    uv_multi = UVData()
    uvfits_file = os.path.join(
        DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
    uvtest.checkWarnings(uv_full.read, [uvfits_file], message='Telescope EVLA is not')
    testfile1 = os.path.join(DATA_PATH, 'multi_1.ms')
    testfile2 = os.path.join(DATA_PATH, 'multi_2.ms')
    uvtest.checkWarnings(
        uv_multi.read_ms, func_args=[np.array([testfile1, testfile2])],
        message=['Please use the generic'], category=DeprecationWarning)
    # Casa scrambles the history parameter. Replace for now.
    uv_multi.history = uv_full.history

    # the objects won't be equal because uvfits adds some optional parameters
    # and the ms sets default antenna diameters even thoug the uvfits file doesn't have them
    assert uv_multi != uv_full
    # they are equal if only required parameters are checked:
    assert uv_multi.__eq__(uv_full, check_extra=False)

    # set those parameters to none to check that the rest of the objects match
    uv_multi.antenna_diameters = None

    for p in uv_full.extra():
        fits_param = getattr(uv_full, p)
        ms_param = getattr(uv_multi, p)
        if fits_param.name in UVFITS.uvfits_required_extra and ms_param.value is None:
            fits_param.value = None
            setattr(uv_full, p, fits_param)

    # extra keywords are also different, set both to empty dicts
    uv_full.extra_keywords = {}
    uv_multi.extra_keywords = {}

    assert uv_multi == uv_full
    del(uv_full)
    del(uv_multi)


@uvtest.skipIf_no_casa
def test_multi_files_axis():
    """
    Reading multiple files at once, setting axis keyword
    """
    uv_full = UVData()
    uv_multi = UVData()
    uvfits_file = os.path.join(
        DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
    uvtest.checkWarnings(uv_full.read, [uvfits_file], message='Telescope EVLA is not')
    testfile1 = os.path.join(DATA_PATH, 'multi_1.ms')
    testfile2 = os.path.join(DATA_PATH, 'multi_2.ms')
    uv_multi.read([testfile1, testfile2], axis='freq')
    # Casa scrambles the history parameter. Replace for now.
    uv_multi.history = uv_full.history

    # the objects won't be equal because uvfits adds some optional parameters
    # and the ms sets default antenna diameters even thoug the uvfits file doesn't have them
    assert uv_multi != uv_full
    # they are equal if only required parameters are checked:
    assert uv_multi.__eq__(uv_full, check_extra=False)

    # set those parameters to none to check that the rest of the objects match
    uv_multi.antenna_diameters = None

    for p in uv_full.extra():
        fits_param = getattr(uv_full, p)
        ms_param = getattr(uv_multi, p)
        if fits_param.name in UVFITS.uvfits_required_extra and ms_param.value is None:
            fits_param.value = None
            setattr(uv_full, p, fits_param)

    # extra keywords are also different, set both to empty dicts
    uv_full.extra_keywords = {}
    uv_multi.extra_keywords = {}

    assert uv_multi == uv_full


@uvtest.skipIf_no_casa
def test_bad_col_name():
    """
    Test error with invalid column name.
    """
    UV = UVData()
    testfile = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.ms')

    with pytest.raises(ValueError) as cm:
        UV.read_ms(testfile, data_column='FOO')
    assert str(cm.value).startswith('Invalid data_column value supplied')
back to top