We are hiring ! See our job offers.
https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: 37c5370106db1465372a14b225e90fee12e0fe1b authored by pyxieloustar on 31 January 2022, 21:11:03 UTC
fix test messages
Tip revision: 37c5370
test_aipy_extracts.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Tests for aipy_extracts

"""
import os
import shutil
import pytest
import numpy as np

from pyuvdata.data import DATA_PATH


aipy_extracts = pytest.importorskip("pyuvdata.uvdata.aipy_extracts")


def test_bl2ij():
    """Test bl2ij function"""
    # small baseline number
    bl = 258
    assert aipy_extracts.bl2ij(bl)[0] == 0
    assert aipy_extracts.bl2ij(bl)[1] == 1

    # large baseline number
    bl = 67587
    assert aipy_extracts.bl2ij(bl)[0] == 0
    assert aipy_extracts.bl2ij(bl)[1] == 2
    return


def test_ij2bl():
    """Test ij2bl function"""
    # test < 256 antennas
    i = 1
    j = 2
    assert aipy_extracts.ij2bl(i, j) == 515

    # test > 256 antennas
    i = 2
    j = 257
    assert aipy_extracts.ij2bl(i, j) == 71938

    # test case where i > j
    i = 257
    j = 2
    assert aipy_extracts.ij2bl(i, j) == 71938
    return


def test_parse_ants():
    """Test parsing ant strings to tuples"""
    nants = 4
    cases = {
        "all": [],
        "auto": [("auto", 1)],
        "cross": [("auto", 0)],
        "-auto": [("auto", 0)],
        "-cross": [("auto", 1)],
        "all,cross": [("auto", 0)],  # Masks are and'd, should be equal to just cross
        "0_1": [(aipy_extracts.ij2bl(0, 1), 1)],
        "0_1,1_2": [(aipy_extracts.ij2bl(0, 1), 1), (aipy_extracts.ij2bl(1, 2), 1)],
        "0x_1x": [(aipy_extracts.ij2bl(0, 1), 1, "xx")],
        "0_1x": [
            (aipy_extracts.ij2bl(0, 1), 1, "xx"),
            (aipy_extracts.ij2bl(0, 1), 1, "yx"),
        ],
        "0y_1": [
            (aipy_extracts.ij2bl(0, 1), 1, "yx"),
            (aipy_extracts.ij2bl(0, 1), 1, "yy"),
        ],
        "(0x,0y)_1x": [
            (aipy_extracts.ij2bl(0, 1), 1, "xx"),
            (aipy_extracts.ij2bl(0, 1), 1, "yx"),
        ],
        "(0,1)_2": [(aipy_extracts.ij2bl(0, 2), 1), (aipy_extracts.ij2bl(1, 2), 1)],
        "0_(1,2)": [(aipy_extracts.ij2bl(0, 1), 1), (aipy_extracts.ij2bl(0, 2), 1)],
        "(0,1)_(2,3)": [
            (aipy_extracts.ij2bl(0, 2), 1),
            (aipy_extracts.ij2bl(0, 3), 1),
            (aipy_extracts.ij2bl(1, 2), 1),
            (aipy_extracts.ij2bl(1, 3), 1),
        ],
        "0_(1,-2)": [(aipy_extracts.ij2bl(0, 1), 1), (aipy_extracts.ij2bl(0, 2), 0)],
        "(-0,1)_(2,-3)": [
            (aipy_extracts.ij2bl(0, 2), 0),
            (aipy_extracts.ij2bl(0, 3), 0),
            (aipy_extracts.ij2bl(1, 2), 1),
            (aipy_extracts.ij2bl(1, 3), 0),
        ],
        "0,1,all": [],
    }
    for i in range(nants):
        cases[str(i)] = [(aipy_extracts.ij2bl(x, i), 1) for x in range(nants)]
        cases["-" + str(i)] = [(aipy_extracts.ij2bl(x, i), 0) for x in range(nants)]
    # inelegantly paste on the new pol parsing flag on the above tests
    # XXX really should add some new tests for the new pol parsing
    for k in cases:
        cases[k] = [(v + (-1,))[:3] for v in cases[k]]
    for ant_str in cases:
        assert aipy_extracts.parse_ants(ant_str, nants) == cases[ant_str]

    # check that malformed antstr raises and error
    pytest.raises(ValueError, aipy_extracts.parse_ants, "(0_1)_2", nants)
    return


def test_uv_wrhd(tmp_path):
    """Test _wrdh method on UV object"""
    test_file = str(tmp_path / "miriad_test.uv")
    uv = aipy_extracts.UV(test_file, status="new", corrmode="r")

    # test writing freqs
    freqs = [3, 1, 0.1, 0.2, 2, 0.2, 0.3, 3, 0.3, 0.4]
    uv._wrhd("freqs", freqs)

    # test writing other values
    uv._wrhd("nchan0", 1024)

    # test that we wrote something
    del uv
    assert os.path.isdir(test_file)

    # clean up
    shutil.rmtree(test_file)
    return


def test_uv_wrhd_special(tmp_path):
    """Test _wrhd_special method on UV object"""
    test_file = str(tmp_path / "miriad_test.uv")
    uv = aipy_extracts.UV(test_file, status="new", corrmode="r")
    freqs = [3, 1, 0.1, 0.2, 2, 0.2, 0.3, 3, 0.3, 0.4]
    uv._wrhd_special("freqs", freqs)

    # check that we wrote something to disk
    assert os.path.isdir(test_file)

    # check that anything besides 'freqs' raises an error
    pytest.raises(ValueError, uv._wrhd_special, "foo", 12)

    # clean up after ourselves
    del uv
    shutil.rmtree(test_file)
    return


def test_uv_rdhd_special(tmp_path):
    """Test _rdhd_special method on UV object"""
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    test_file = str(tmp_path / "miriad_test.uv")
    if os.path.exists(test_file):
        shutil.rmtree(test_file)
    # make a new file using an old one as a template
    uv1 = aipy_extracts.UV(infile)
    uv2 = aipy_extracts.UV(test_file, status="new", corrmode="r")
    uv2.init_from_uv(uv1)

    # define freqs to write
    freqs = [3, 1, 0.1, 0.2, 2, 0.2, 0.3, 3, 0.3, 0.4]
    uv2._wrhd_special("freqs", freqs)

    # add a single record; otherwise, opening the file fails
    preamble, data = uv1.read()
    uv2.write(preamble, data)
    del uv1
    del uv2

    # open a new file and check that freqs match the written ones
    uv3 = aipy_extracts.UV(test_file)
    freqs2 = uv3._rdhd_special("freqs")
    assert freqs == freqs2

    # rdhr should correctly handle "special" cases as well -- verify that it does so
    freqs2 = uv3._rdhd("freqs")
    assert freqs == freqs2

    # check that anything besides 'freqs' raises an error
    pytest.raises(ValueError, uv3._rdhd_special, "foo")

    # cleean up after ourselves
    shutil.rmtree(test_file)
    return


@pytest.mark.parametrize("raw", [False, True])
@pytest.mark.parametrize("insert_blank", [False, True])
def test_pipe(tmp_path, raw, insert_blank):
    """Test pipe method on UV object"""
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    test_file = os.path.join(tmp_path, "miriad_test.uv")

    aipy_uv = aipy_extracts.UV(infile)
    aipy_uv2 = aipy_extracts.UV(test_file, status="new")
    aipy_uv2.init_from_uv(aipy_uv)
    if insert_blank:
        # Insert a blank record, which _should_ get skipped on write
        # such that pipe will still produce an identical file
        aipy_uv2.write((np.zeros(3), 0.0, (0, 0)), None)
    aipy_uv2.pipe(aipy_uv, raw=raw)
    aipy_uv2.close()
    aipy_uv2 = aipy_extracts.UV(test_file)
    aipy_uv.rewind()
    with pytest.raises(OSError, match="No data read"):
        while True:
            rec_orig = aipy_uv.read()
            rec_new = aipy_uv2.read()
            assert np.all(rec_new[0][0] == rec_orig[0][0])
            assert rec_new[0][1] == rec_orig[0][1]
            assert rec_new[0][2] == rec_orig[0][2]
            assert np.ma.allequal(rec_orig[1], rec_new[1])
    aipy_uv.close()
    aipy_uv2.close()


@pytest.mark.parametrize("exclude", [[], "telescop", "history", "dummy"])
def test_init_from_uv_exclude(tmp_path, exclude):
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    test_file = os.path.join(tmp_path, "miriad_test.uv")

    aipy_uv = aipy_extracts.UV(infile)
    ((uvw, time, (idx, jdx)), data) = aipy_uv.read()

    aipy_uv2 = aipy_extracts.UV(test_file, status="new")
    # Manually add the variable to the object to make sure the exclude works
    # even if the value has already been initiated. Check to make sure we aren't
    # adding an empty list up front.
    if exclude:
        aipy_uv2.add_var(exclude, "d")
    aipy_uv2.init_from_uv(aipy_uv, exclude=exclude)
    aipy_uv2.write((uvw, time, (idx, jdx)), data)
    aipy_uv2.close()

    aipy_uv2 = aipy_extracts.UV(test_file)

    # Again check that exclude isn't just an empty list, and then make sure that
    # the excluded variable isn't actually in the data set.
    if exclude:
        assert exclude not in aipy_uv2.vartable

    for item in aipy_uv.variables():
        if item != exclude:
            assert np.all(aipy_uv[item] == aipy_uv2[item])

    aipy_uv.close()
    aipy_uv2.close()


@pytest.mark.parametrize(
    "var_dict",
    [{}, {"latitud": 0.0}, {"longitu": 0.0}, {"history": "abc"}],
)
def test_init_from_uv_override(tmp_path, var_dict):
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    test_file = os.path.join(tmp_path, "miriad_test.uv")

    aipy_uv = aipy_extracts.UV(infile)
    ((uvw, time, (idx, jdx)), data) = aipy_uv.read()

    aipy_uv2 = aipy_extracts.UV(test_file, status="new")
    aipy_uv2.init_from_uv(aipy_uv, override=var_dict)
    aipy_uv2.write((uvw, time, (idx, jdx)), data)
    aipy_uv2.close()

    aipy_uv2 = aipy_extracts.UV(test_file)

    for item in aipy_uv.variables():
        if item in var_dict.keys():
            assert aipy_uv2[item] == var_dict[item]
        else:
            assert np.all(aipy_uv[item] == aipy_uv2[item])

    aipy_uv.close()
    aipy_uv2.close()


def test_write_no_flags(tmp_path):
    """
    Verify that if flags are deleted, write will recreate them appropriately
    """
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    test_file = os.path.join(tmp_path, "miriad_test.uv")

    aipy_uv = aipy_extracts.UV(infile)
    ((uvw, time, (idx, jdx)), data) = aipy_uv.read()

    aipy_uv2 = aipy_extracts.UV(test_file, status="new")
    aipy_uv2.init_from_uv(aipy_uv)

    # If the mask is remove (all set to False), then write write will create
    # a mask based on where the data have been zeroed out (i.e., default MIRIAD
    # behavior).
    data.mask = 0.0
    aipy_uv2.write((uvw, time, (idx, jdx)), data)
    # Clear the mask entirely, which will also set all the flags to zero.
    data = np.ma.array(data.data, fill_value=data.fill_value)
    aipy_uv2.write((uvw, time, (idx, jdx)), data)
    aipy_uv2.close()

    aipy_uv2 = aipy_extracts.UV(test_file)
    (_, data2) = aipy_uv2.read()
    # Check for equality w/ both masks and data
    assert np.all(data.data == data2.data)
    # Next, check that the masks all look correct
    assert np.all(data.mask == data2.mask)
    (_, data2) = aipy_uv2.read()
    assert np.all(data.mask == data2.mask)
    aipy_uv.close()
    aipy_uv2.close()


def test_add_to_header(tmp_path):
    """
    Verify that if flags are deleted, write will create a mask based on where
    the data have been zeroed out (i.e., default MIRIAD behavior).
    """
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    test_file = os.path.join(tmp_path, "miriad_test.uv")

    # Create a fake entry in the item table so that we can run this test -- the double
    # values means an array of length 2 for aipy_extracts.
    aipy_extracts.itemtable["xyz"] = "dd"

    aipy_uv = aipy_extracts.UV(infile)
    aipy_uv2 = aipy_extracts.UV(test_file, status="new")

    aipy_uv2["xyz"] = (2.0, 2.0)
    aipy_uv2.init_from_uv(aipy_uv)
    aipy_uv2.pipe(aipy_uv)
    aipy_uv2.close()

    aipy_uv2 = aipy_extracts.UV(test_file)
    assert np.all(aipy_uv2["xyz"] == np.array([2.0, 2.0]))
    aipy_uv.close()
    aipy_uv2.close()


@pytest.mark.parametrize(
    "polstr,antstr,ants_exp,nrec_exp",
    [
        [-1, "all", (0, 1, 2, 3, 4, 5), 399],
        [-1, "cross", (0, 1, 2, 3, 4, 5), 285],
        [-1, "auto", (0, 1, 2, 3, 4, 5), 114],
        [-1, "(0,1)_(2,3)", (0, 1, 2, 3), 76],
        [-1, "(0,-1)_(-2,3)", (0, 1, 2, 3, 4, 5), 342],
        [-1, "(0x,1x)_(2y,3y)", (0, 1, 2, 3), 76],
        [-1, "4,5,", (0, 1, 2, 3, 4, 5), 209],
        ["xx", -1, (), 0],
        ["xy", -1, (0, 1, 2, 3, 4, 5), 399],
        ["yx", -1, (), 0],
        ["yy", -1, (), 0],
        ["xy", "(0,1)_(2,3)", (0, 1, 2, 3), 76],
        ["xy", "4,5,", (0, 1, 2, 3, 4, 5), 209],
        ["xy", "0,1,(2)_(3)", (0, 1, 2, 3, 4, 5), 228],
    ],
)
def test_uv_selector(polstr, antstr, ants_exp, nrec_exp):
    infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
    aipy_uv = aipy_extracts.UV(infile)

    aipy_extracts.uv_selector(aipy_uv, ants=antstr, pol_str=polstr)
    nrec = 0
    with pytest.raises(OSError, match="No data read"):
        while True:
            (_, _, bl_ants), _ = aipy_uv.read()
            nrec += 1
            # Make sure we only have ants we expect
            assert np.all(np.isin(bl_ants, ants_exp))
            if isinstance(polstr, str):
                assert aipy_uv["pol"] == aipy_extracts.str2pol[polstr]

    assert nrec == nrec_exp
    aipy_uv.close()
back to top