https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: ec0c8b89ac731cdf4a3365f273b2e374376b06f2 authored by Bryna Hazelton on 07 October 2020, 15:24:19 UTC
Update changelog for new release
Tip revision: ec0c8b8
test_mir.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2020 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Tests for Mir class.

Performs a series of test for the Mir class, which inherits from UVData. Note that
there is a separate test module for the MirParser class (mir_parser.py), which is
what is used to read the raw binary data into something that the Mir class can
manipulate into a UVData object.
"""
import os

import pytest
import numpy as np

from ... import UVData
from ...data import DATA_PATH
from ...uvdata.mir import mir_parser


@pytest.fixture
def mir_data_object():
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    mir_data = mir_parser.MirParser(
        testfile, load_vis=True, load_raw=True, load_auto=True,
    )

    yield mir_data

    # cleanup
    del mir_data


@pytest.fixture
def uv_in_uvfits(tmp_path):
    uv_in = UVData()
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    write_file = os.path.join(tmp_path, "outtest_mir.uvfits")

    # Currently only one source is supported.
    uv_in.read(testfile, pseudo_cont=True)
    uv_out = UVData()

    yield uv_in, uv_out, write_file

    # cleanup
    del uv_in, uv_out


@pytest.fixture
def uv_in_uvh5(tmp_path):
    uv_in = UVData()
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    write_file = os.path.join(tmp_path, "outtest_mir.uvh5")

    # Currently only one source is supported.
    uv_in.read(testfile)
    uv_out = UVData()

    yield uv_in, uv_out, write_file

    # cleanup
    del uv_in, uv_out


@pytest.mark.filterwarnings("ignore:LST values stored in this file are not ")
def test_read_mir_write_uvfits(uv_in_uvfits):
    """
    Mir to uvfits loopback test.

    Read in Mir files, write out as uvfits, read back in and check for
    object equality.
    """
    mir_uv, uvfits_uv, testfile = uv_in_uvfits

    mir_uv.write_uvfits(testfile, spoof_nonessential=True)
    uvfits_uv.read_uvfits(testfile)

    # Check the history first via find
    assert 0 == uvfits_uv.history.find(
        mir_uv.history + "  Read/written with pyuvdata version:"
    )

    mir_uv.history = uvfits_uv.history
    assert mir_uv == uvfits_uv


@pytest.mark.filterwarnings("ignore:LST values stored ")
def test_read_mir_write_uvh5(uv_in_uvh5):
    """
    Mir to uvfits loopback test.

    Read in Mir files, write out as uvfits, read back in and check for
    object equality.
    """
    mir_uv, uvh5_uv, testfile = uv_in_uvh5

    mir_uv.write_uvh5(testfile)
    uvh5_uv.read_uvh5(testfile)

    # Check the history first via find
    assert 0 == uvh5_uv.history.find(
        mir_uv.history + "  Read/written with pyuvdata version:"
    )

    # test fails because of updated history, so this is our workaround for now.
    mir_uv.history = uvh5_uv.history

    assert mir_uv == uvh5_uv


def test_write_mir(uv_in_uvfits, err_type=NotImplementedError):
    """
    Mir writer test

    Check and make sure that attempts to use the writer return a
    'not implemented; error.
    """
    mir_uv, uvfits_uv, testfile = uv_in_uvfits

    # Check and see if the correct error is raised
    with pytest.raises(err_type):
        mir_uv.write_mir("dummy.mir")


def test_flex_spw_read(tmp_path):
    """
    Mir test for flexible spws.

    Read in Mir files using flexible spectral windows, all of the same nchan
    """
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    uv_in = UVData()
    uv_in.read_mir(testfile)
    dummyfile = os.path.join(tmp_path, "dummy.mirtest.uvfits")

    uv_in2 = uv_in.copy()
    with pytest.raises(NotImplementedError):
        uv_in2.frequency_average(2)

    uv_in2.flex_spw_id_array[0] = 1
    with pytest.raises(ValueError):
        uv_in2._check_flex_spw_contiguous()

    uv_in2 = uv_in.copy()
    uv_in2.channel_width[0] = 0
    with pytest.raises(ValueError, match="The frequencies are not evenly spaced"):
        uv_in2._check_freq_spacing()

    uv_in2 = uv_in.copy()
    uv_in2.channel_width[:] = 0
    with pytest.raises(ValueError, match="The frequencies are separated by more"):
        uv_in2._check_freq_spacing()

    uv_in2 = uv_in.copy()
    uv_in2.freq_array *= -1
    with pytest.raises(ValueError, match="Frequency values must be > 0 for UVFITS!"):
        uv_in2.write_uvfits(dummyfile, spoof_nonessential=True)

    uv_in2 = uv_in.copy()
    uv_in2.freq_array[:] = 1
    uv_in2.channel_width *= 0
    with pytest.raises(ValueError, match="Something is wrong, frequency values not"):
        uv_in2.write_uvfits(dummyfile, spoof_nonessential=True)


def test_multi_nchan_spw_read(tmp_path):
    """
    Mir to uvfits error test for spws of different sizes.

    Read in Mir files, write out as uvfits, read back in and check for
    object equality.
    """
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    uv_in = UVData()
    uv_in.read_mir(testfile, corrchunk=[0, 1, 2, 3, 4])

    dummyfile = os.path.join(tmp_path, "dummy.mirtest.uvfits")
    with pytest.raises(IndexError):
        uv_in.write_uvfits(dummyfile, spoof_nonessential=True)


def test_read_mir_no_records():
    """
    Mir no-records check

    Make sure that mir correctly handles the case where no matching records are found
    """
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    uv_in = UVData()
    with pytest.raises(IndexError, match="No valid records matching those selections!"):
        uv_in.read_mir(testfile, isource=-1)

    with pytest.raises(IndexError, match="No valid sidebands selected!"):
        uv_in.read_mir(testfile, isb=[])

    with pytest.raises(IndexError, match="isb values contain invalid entries"):
        uv_in.read_mir(testfile, isb=[-156])


def test_mir_auto_read(
    err_type=IndexError, err_msg="Could not determine auto-correlation record size!"
):
    """
    Mir read tester

    Make sure that Mir autocorrelations are read correctly
    """
    testfile = os.path.join(DATA_PATH, "sma_test.mir")
    mir_data = mir_parser.MirParser(testfile)
    with pytest.raises(err_type, match=err_msg):
        ac_data = mir_data.scan_auto_data(testfile, nchunks=999)

    ac_data = mir_data.scan_auto_data(testfile)
    assert np.all(ac_data["nchunks"] == 8)

    mir_data.load_data(load_vis=False, load_auto=True)

    # Select the relevant auto records, which should be for spwin 0-3
    auto_data = mir_data.read_auto_data(testfile, ac_data)[:, 0:4, :, :]
    assert np.all(
        np.logical_or(
            auto_data == mir_data.auto_data,
            np.logical_and(np.isnan(auto_data), np.isnan(mir_data.auto_data)),
        )
    )
    mir_data.unload_data()


# Below are a series of checks that are designed to check to make sure that the
# MirParser class is able to produce consistent values from an engineering data
# set (originally stored in /data/engineering/mir_data/200724_16:35:14), to make
# sure that we haven't broken the ability of the reader to handle the data. Since
# this file is the basis for the above checks, we've put this here rather than in
# test_mir_parser.py


def test_mir_remember_me_record_lengths(mir_data_object):
    """
    Mir record length checker

    Make sure the test file containts the right number of records
    """
    mir_data = mir_data_object

    # Check to make sure we've got the right number of records everywhere
    assert len(mir_data.ac_read) == 2

    assert len(mir_data.bl_read) == 4

    assert len(mir_data.codes_read) == 99

    assert len(mir_data.eng_read) == 2

    assert len(mir_data.in_read) == 1

    assert len(mir_data.raw_data) == 20

    assert len(mir_data.raw_scale_fac) == 20

    assert len(mir_data.sp_read) == 20

    assert len(mir_data.vis_data) == 20

    assert len(mir_data.we_read) == 1


def test_mir_remember_me_codes_read(mir_data_object):
    """
    Mir codes_read checker.

    Make sure that certain values in the codes_read file of the test data set match
    whatwe know to be 'true' at the time of observations.
    """
    mir_data = mir_data_object

    assert mir_data.codes_read[0][0] == b"filever"

    assert mir_data.codes_read[0][2] == b"3"

    assert mir_data.codes_read[90][0] == b"ref_time"

    assert mir_data.codes_read[90][1] == 0

    assert mir_data.codes_read[90][2] == b"Jul 24, 2020"

    assert mir_data.codes_read[90][3] == 0

    assert mir_data.codes_read[91][0] == b"ut"

    assert mir_data.codes_read[91][1] == 1

    assert mir_data.codes_read[91][2] == b"Jul 24 2020  4:34:39.00PM"

    assert mir_data.codes_read[91][3] == 0

    assert mir_data.codes_read[93][0] == b"source"

    assert mir_data.codes_read[93][2] == b"3c84"

    assert mir_data.codes_read[97][0] == b"ra"

    assert mir_data.codes_read[97][2] == b"03:19:48.15"

    assert mir_data.codes_read[98][0] == b"dec"

    assert mir_data.codes_read[98][2] == b"+41:30:42.1"


def test_mir_remember_me_in_read(mir_data_object):
    """
    Mir in_read checker.

    Make sure that certain values in the in_read file of the test data set match what
    we know to be 'true' at the time of observations, including that spare values are
    stored as zero.
    """
    mir_data = mir_data_object

    # Check to make sure that things seem right in in_read
    assert np.all(mir_data.in_read["traid"] == 484)

    assert np.all(mir_data.in_read["proid"] == 484)

    assert np.all(mir_data.in_read["inhid"] == 1)

    assert np.all(mir_data.in_read["ints"] == 1)

    assert np.all(mir_data.in_read["souid"] == 1)

    assert np.all(mir_data.in_read["isource"] == 1)

    assert np.all(mir_data.in_read["ivrad"] == 1)

    assert np.all(mir_data.in_read["ira"] == 1)

    assert np.all(mir_data.in_read["idec"] == 1)

    assert np.all(mir_data.in_read["epoch"] == 2000.0)

    assert np.all(mir_data.in_read["tile"] == 0)

    assert np.all(mir_data.in_read["obsflag"] == 0)

    assert np.all(mir_data.in_read["obsmode"] == 0)

    assert np.all(np.round(mir_data.in_read["mjd"]) == 59055)

    assert np.all(mir_data.in_read["spareshort"] == 0)

    assert np.all(mir_data.in_read["spareint6"] == 0)


def test_mir_remember_me_bl_read(mir_data_object):
    """
    Mir bl_read checker.

    Make sure that certain values in the bl_read file of the test data set match what
    we know to be 'true' at the time of observations, including that spare values are
    stored as zero.
    """
    mir_data = mir_data_object

    # Now check bl_read
    assert np.all(mir_data.bl_read["blhid"] == np.arange(1, 5))

    assert np.all(mir_data.bl_read["isb"] == [0, 0, 1, 1])

    assert np.all(mir_data.bl_read["ipol"] == [0, 0, 0, 0])

    assert np.all(mir_data.bl_read["ant1rx"] == [0, 1, 0, 1])

    assert np.all(mir_data.bl_read["ant2rx"] == [0, 1, 0, 1])

    assert np.all(mir_data.bl_read["pointing"] == 0)

    assert np.all(mir_data.bl_read["irec"] == [0, 3, 0, 3])

    assert np.all(mir_data.bl_read["iant1"] == 1)

    assert np.all(mir_data.bl_read["iant2"] == 4)

    assert np.all(mir_data.bl_read["iblcd"] == 2)

    assert np.all(mir_data.bl_read["spareint1"] == 0)

    assert np.all(mir_data.bl_read["spareint2"] == 0)

    assert np.all(mir_data.bl_read["spareint3"] == 0)

    assert np.all(mir_data.bl_read["spareint4"] == 0)

    assert np.all(mir_data.bl_read["spareint5"] == 0)

    assert np.all(mir_data.bl_read["spareint6"] == 0)

    assert np.all(mir_data.bl_read["sparedbl3"] == 0.0)

    assert np.all(mir_data.bl_read["sparedbl4"] == 0.0)

    assert np.all(mir_data.bl_read["sparedbl5"] == 0.0)

    assert np.all(mir_data.bl_read["sparedbl6"] == 0.0)


def test_mir_remember_me_eng_read(mir_data_object):
    """
    Mir bl_read checker.

    Make sure that certain values in the eng_read file of the test data set match what
    we know to be 'true' at the time of observations.
    """
    mir_data = mir_data_object

    # Now check eng_read
    assert np.all(mir_data.eng_read["antennaNumber"] == [1, 4])

    assert np.all(mir_data.eng_read["padNumber"] == [5, 8])

    assert np.all(mir_data.eng_read["trackStatus"] == 1)

    assert np.all(mir_data.eng_read["commStatus"] == 1)

    assert np.all(mir_data.eng_read["inhid"] == 1)


def test_mir_remember_me_ac_read(mir_data_object):
    """
    Mir bl_read checker.

    Make sure that certain values in the autoCorrelations file of the test data set
    match what we know to be 'true' at the time of observations.
    """
    mir_data = mir_data_object

    # Now check ac_read
    assert np.all(mir_data.ac_read["inhid"] == 1)

    assert np.all(mir_data.ac_read["achid"] == np.arange(1, 3))

    assert np.all(mir_data.ac_read["antenna"] == [1, 4])

    assert np.all(mir_data.ac_read["nchunks"] == 8)

    assert np.all(mir_data.ac_read["datasize"] == 1048596)

    assert np.all(mir_data.we_read["scanNumber"] == 1)

    assert np.all(mir_data.we_read["flags"] == 0)


def test_mir_remember_me_sp_read(mir_data_object):
    """
    Mir sp_read checker.

    Make sure that certain values in the sp_read file of the test data set match what
    we know to be 'true' at the time of observations, including that spare values are
    stored as zero.
    """
    mir_data = mir_data_object

    # Now check sp_read
    assert np.all(mir_data.sp_read["sphid"] == np.arange(1, 21))

    assert np.all(mir_data.sp_read["sphid"] == np.arange(1, 21))

    assert np.all(mir_data.sp_read["igq"] == 0)

    assert np.all(mir_data.sp_read["ipq"] == 1)

    assert np.all(mir_data.sp_read["igq"] == 0)

    assert np.all(mir_data.sp_read["iband"] == [0, 1, 2, 3, 4] * 4)

    assert np.all(mir_data.sp_read["ipstate"] == 0)

    assert np.all(mir_data.sp_read["tau0"] == 0.0)

    assert np.all(mir_data.sp_read["cabinLO"] == 0.0)

    assert np.all(mir_data.sp_read["corrLO1"] == 0.0)

    assert np.all(mir_data.sp_read["vradcat"] == 0.0)

    assert np.all(mir_data.sp_read["nch"] == [4, 16384, 16384, 16384, 16384] * 4)

    assert np.all(mir_data.sp_read["corrblock"] == [0, 1, 1, 1, 1] * 4)

    assert np.all(mir_data.sp_read["corrchunk"] == [0, 1, 2, 3, 4] * 4)

    assert np.all(mir_data.sp_read["correlator"] == 1)

    assert np.all(mir_data.sp_read["spareint2"] == 0)

    assert np.all(mir_data.sp_read["spareint3"] == 0)

    assert np.all(mir_data.sp_read["spareint4"] == 0)

    assert np.all(mir_data.sp_read["spareint5"] == 0)

    assert np.all(mir_data.sp_read["spareint6"] == 0)

    assert np.all(mir_data.sp_read["sparedbl1"] == 0.0)

    assert np.all(mir_data.sp_read["sparedbl2"] == 0.0)

    assert np.all(mir_data.sp_read["sparedbl3"] == 0.0)

    assert np.all(mir_data.sp_read["sparedbl4"] == 0.0)

    assert np.all(mir_data.sp_read["sparedbl5"] == 0.0)

    assert np.all(mir_data.sp_read["sparedbl6"] == 0.0)


def test_mir_remember_me_sch_read(mir_data_object):
    """
    Mir sch_read checker.

    Make sure that certain values in the sch_read file of the test data set match what
    we know to be 'true' at the time of observations.
    """
    mir_data = mir_data_object

    # Now check sch_read related values. Thanks to a glitch in the data recorder,
    # all of the pseudo-cont values are the same
    assert np.all(mir_data.raw_scale_fac[0::5] == [-26] * 4)

    assert (
        np.array(mir_data.raw_data[0::5]).flatten().tolist()
        == [-4302, -20291, -5261, -21128, -4192, -19634, -4999, -16346] * 4
    )
back to top