https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: c04379a692c3771630240a6f5a4d8d10163fb1ed authored by Matthew Kolopanis on 07 June 2022, 21:14:45 UTC
add setup-conda option necessary for caching to work
add setup-conda option necessary for caching to work
Tip revision: c04379a
test_aipy_extracts.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for aipy_extracts
"""
import os
import shutil
import pytest
import numpy as np
from pyuvdata.data import DATA_PATH
aipy_extracts = pytest.importorskip("pyuvdata.uvdata.aipy_extracts")
def test_bl2ij():
"""Test bl2ij function"""
# small baseline number
bl = 258
assert aipy_extracts.bl2ij(bl)[0] == 0
assert aipy_extracts.bl2ij(bl)[1] == 1
# large baseline number
bl = 67587
assert aipy_extracts.bl2ij(bl)[0] == 0
assert aipy_extracts.bl2ij(bl)[1] == 2
return
def test_ij2bl():
"""Test ij2bl function"""
# test < 256 antennas
i = 1
j = 2
assert aipy_extracts.ij2bl(i, j) == 515
# test > 256 antennas
i = 2
j = 257
assert aipy_extracts.ij2bl(i, j) == 71938
# test case where i > j
i = 257
j = 2
assert aipy_extracts.ij2bl(i, j) == 71938
return
def test_parse_ants():
"""Test parsing ant strings to tuples"""
nants = 4
cases = {
"all": [],
"auto": [("auto", 1)],
"cross": [("auto", 0)],
"-auto": [("auto", 0)],
"-cross": [("auto", 1)],
"all,cross": [("auto", 0)], # Masks are and'd, should be equal to just cross
"0_1": [(aipy_extracts.ij2bl(0, 1), 1)],
"0_1,1_2": [(aipy_extracts.ij2bl(0, 1), 1), (aipy_extracts.ij2bl(1, 2), 1)],
"0x_1x": [(aipy_extracts.ij2bl(0, 1), 1, "xx")],
"0_1x": [
(aipy_extracts.ij2bl(0, 1), 1, "xx"),
(aipy_extracts.ij2bl(0, 1), 1, "yx"),
],
"0y_1": [
(aipy_extracts.ij2bl(0, 1), 1, "yx"),
(aipy_extracts.ij2bl(0, 1), 1, "yy"),
],
"(0x,0y)_1x": [
(aipy_extracts.ij2bl(0, 1), 1, "xx"),
(aipy_extracts.ij2bl(0, 1), 1, "yx"),
],
"(0,1)_2": [(aipy_extracts.ij2bl(0, 2), 1), (aipy_extracts.ij2bl(1, 2), 1)],
"0_(1,2)": [(aipy_extracts.ij2bl(0, 1), 1), (aipy_extracts.ij2bl(0, 2), 1)],
"(0,1)_(2,3)": [
(aipy_extracts.ij2bl(0, 2), 1),
(aipy_extracts.ij2bl(0, 3), 1),
(aipy_extracts.ij2bl(1, 2), 1),
(aipy_extracts.ij2bl(1, 3), 1),
],
"0_(1,-2)": [(aipy_extracts.ij2bl(0, 1), 1), (aipy_extracts.ij2bl(0, 2), 0)],
"(-0,1)_(2,-3)": [
(aipy_extracts.ij2bl(0, 2), 0),
(aipy_extracts.ij2bl(0, 3), 0),
(aipy_extracts.ij2bl(1, 2), 1),
(aipy_extracts.ij2bl(1, 3), 0),
],
"0,1,all": [],
}
for i in range(nants):
cases[str(i)] = [(aipy_extracts.ij2bl(x, i), 1) for x in range(nants)]
cases["-" + str(i)] = [(aipy_extracts.ij2bl(x, i), 0) for x in range(nants)]
# inelegantly paste on the new pol parsing flag on the above tests
# XXX really should add some new tests for the new pol parsing
for k in cases:
cases[k] = [(v + (-1,))[:3] for v in cases[k]]
for ant_str in cases:
assert aipy_extracts.parse_ants(ant_str, nants) == cases[ant_str]
# check that malformed antstr raises and error
pytest.raises(ValueError, aipy_extracts.parse_ants, "(0_1)_2", nants)
return
def test_uv_wrhd(tmp_path):
"""Test _wrdh method on UV object"""
test_file = str(tmp_path / "miriad_test.uv")
uv = aipy_extracts.UV(test_file, status="new", corrmode="r")
# test writing freqs
freqs = [3, 1, 0.1, 0.2, 2, 0.2, 0.3, 3, 0.3, 0.4]
uv._wrhd("freqs", freqs)
# test writing other values
uv._wrhd("nchan0", 1024)
# test that we wrote something
del uv
assert os.path.isdir(test_file)
# clean up
shutil.rmtree(test_file)
return
def test_uv_wrhd_special(tmp_path):
"""Test _wrhd_special method on UV object"""
test_file = str(tmp_path / "miriad_test.uv")
uv = aipy_extracts.UV(test_file, status="new", corrmode="r")
freqs = [3, 1, 0.1, 0.2, 2, 0.2, 0.3, 3, 0.3, 0.4]
uv._wrhd_special("freqs", freqs)
# check that we wrote something to disk
assert os.path.isdir(test_file)
# check that anything besides 'freqs' raises an error
pytest.raises(ValueError, uv._wrhd_special, "foo", 12)
# clean up after ourselves
del uv
shutil.rmtree(test_file)
return
def test_uv_rdhd_special(tmp_path):
"""Test _rdhd_special method on UV object"""
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
test_file = str(tmp_path / "miriad_test.uv")
if os.path.exists(test_file):
shutil.rmtree(test_file)
# make a new file using an old one as a template
uv1 = aipy_extracts.UV(infile)
uv2 = aipy_extracts.UV(test_file, status="new", corrmode="r")
uv2.init_from_uv(uv1)
# define freqs to write
freqs = [3, 1, 0.1, 0.2, 2, 0.2, 0.3, 3, 0.3, 0.4]
uv2._wrhd_special("freqs", freqs)
# add a single record; otherwise, opening the file fails
preamble, data = uv1.read()
uv2.write(preamble, data)
del uv1
del uv2
# open a new file and check that freqs match the written ones
uv3 = aipy_extracts.UV(test_file)
freqs2 = uv3._rdhd_special("freqs")
assert freqs == freqs2
# rdhr should correctly handle "special" cases as well -- verify that it does so
freqs2 = uv3._rdhd("freqs")
assert freqs == freqs2
# check that anything besides 'freqs' raises an error
pytest.raises(ValueError, uv3._rdhd_special, "foo")
# cleean up after ourselves
shutil.rmtree(test_file)
return
@pytest.mark.parametrize("raw", [False, True])
@pytest.mark.parametrize("insert_blank", [False, True])
def test_pipe(tmp_path, raw, insert_blank):
"""Test pipe method on UV object"""
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
test_file = os.path.join(tmp_path, "miriad_test.uv")
aipy_uv = aipy_extracts.UV(infile)
aipy_uv2 = aipy_extracts.UV(test_file, status="new")
aipy_uv2.init_from_uv(aipy_uv)
if insert_blank:
# Insert a blank record, which _should_ get skipped on write
# such that pipe will still produce an identical file
aipy_uv2.write((np.zeros(3), 0.0, (0, 0)), None)
aipy_uv2.pipe(aipy_uv, raw=raw)
aipy_uv2.close()
aipy_uv2 = aipy_extracts.UV(test_file)
aipy_uv.rewind()
with pytest.raises(OSError, match="No data read"):
while True:
rec_orig = aipy_uv.read()
rec_new = aipy_uv2.read()
assert np.all(rec_new[0][0] == rec_orig[0][0])
assert rec_new[0][1] == rec_orig[0][1]
assert rec_new[0][2] == rec_orig[0][2]
assert np.ma.allequal(rec_orig[1], rec_new[1])
aipy_uv.close()
aipy_uv2.close()
@pytest.mark.parametrize("exclude", [[], "telescop", "history", "dummy"])
def test_init_from_uv_exclude(tmp_path, exclude):
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
test_file = os.path.join(tmp_path, "miriad_test.uv")
aipy_uv = aipy_extracts.UV(infile)
((uvw, time, (idx, jdx)), data) = aipy_uv.read()
aipy_uv2 = aipy_extracts.UV(test_file, status="new")
# Manually add the variable to the object to make sure the exclude works
# even if the value has already been initiated. Check to make sure we aren't
# adding an empty list up front.
if exclude:
aipy_uv2.add_var(exclude, "d")
aipy_uv2.init_from_uv(aipy_uv, exclude=exclude)
aipy_uv2.write((uvw, time, (idx, jdx)), data)
aipy_uv2.close()
aipy_uv2 = aipy_extracts.UV(test_file)
# Again check that exclude isn't just an empty list, and then make sure that
# the excluded variable isn't actually in the data set.
if exclude:
assert exclude not in aipy_uv2.vartable
for item in aipy_uv.variables():
if item != exclude:
assert np.all(aipy_uv[item] == aipy_uv2[item])
aipy_uv.close()
aipy_uv2.close()
@pytest.mark.parametrize(
"var_dict",
[{}, {"latitud": 0.0}, {"longitu": 0.0}, {"history": "abc"}],
)
def test_init_from_uv_override(tmp_path, var_dict):
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
test_file = os.path.join(tmp_path, "miriad_test.uv")
aipy_uv = aipy_extracts.UV(infile)
((uvw, time, (idx, jdx)), data) = aipy_uv.read()
aipy_uv2 = aipy_extracts.UV(test_file, status="new")
aipy_uv2.init_from_uv(aipy_uv, override=var_dict)
aipy_uv2.write((uvw, time, (idx, jdx)), data)
aipy_uv2.close()
aipy_uv2 = aipy_extracts.UV(test_file)
for item in aipy_uv.variables():
if item in var_dict.keys():
assert aipy_uv2[item] == var_dict[item]
else:
assert np.all(aipy_uv[item] == aipy_uv2[item])
aipy_uv.close()
aipy_uv2.close()
def test_write_no_flags(tmp_path):
"""
Verify that if flags are deleted, write will recreate them appropriately
"""
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
test_file = os.path.join(tmp_path, "miriad_test.uv")
aipy_uv = aipy_extracts.UV(infile)
((uvw, time, (idx, jdx)), data) = aipy_uv.read()
aipy_uv2 = aipy_extracts.UV(test_file, status="new")
aipy_uv2.init_from_uv(aipy_uv)
# If the mask is remove (all set to False), then write write will create
# a mask based on where the data have been zeroed out (i.e., default MIRIAD
# behavior).
data.mask = 0.0
aipy_uv2.write((uvw, time, (idx, jdx)), data)
# Clear the mask entirely, which will also set all the flags to zero.
data = np.ma.array(data.data, fill_value=data.fill_value)
aipy_uv2.write((uvw, time, (idx, jdx)), data)
aipy_uv2.close()
aipy_uv2 = aipy_extracts.UV(test_file)
(_, data2) = aipy_uv2.read()
# Check for equality w/ both masks and data
assert np.all(data.data == data2.data)
# Next, check that the masks all look correct
assert np.all(data.mask == data2.mask)
(_, data2) = aipy_uv2.read()
assert np.all(data.mask == data2.mask)
aipy_uv.close()
aipy_uv2.close()
def test_add_to_header(tmp_path):
"""
Verify that if flags are deleted, write will create a mask based on where
the data have been zeroed out (i.e., default MIRIAD behavior).
"""
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
test_file = os.path.join(tmp_path, "miriad_test.uv")
# Create a fake entry in the item table so that we can run this test -- the double
# values means an array of length 2 for aipy_extracts.
aipy_extracts.itemtable["xyz"] = "dd"
aipy_uv = aipy_extracts.UV(infile)
aipy_uv2 = aipy_extracts.UV(test_file, status="new")
aipy_uv2["xyz"] = (2.0, 2.0)
aipy_uv2.init_from_uv(aipy_uv)
aipy_uv2.pipe(aipy_uv)
aipy_uv2.close()
aipy_uv2 = aipy_extracts.UV(test_file)
assert np.all(aipy_uv2["xyz"] == np.array([2.0, 2.0]))
aipy_uv.close()
aipy_uv2.close()
@pytest.mark.parametrize(
"polstr,antstr,ants_exp,nrec_exp",
[
[-1, "all", (0, 1, 2, 3, 4, 5), 399],
[-1, "cross", (0, 1, 2, 3, 4, 5), 285],
[-1, "auto", (0, 1, 2, 3, 4, 5), 114],
[-1, "(0,1)_(2,3)", (0, 1, 2, 3), 76],
[-1, "(0,-1)_(-2,3)", (0, 1, 2, 3, 4, 5), 342],
[-1, "(0x,1x)_(2y,3y)", (0, 1, 2, 3), 76],
[-1, "4,5,", (0, 1, 2, 3, 4, 5), 209],
["xx", -1, (), 0],
["xy", -1, (0, 1, 2, 3, 4, 5), 399],
["yx", -1, (), 0],
["yy", -1, (), 0],
["xy", "(0,1)_(2,3)", (0, 1, 2, 3), 76],
["xy", "4,5,", (0, 1, 2, 3, 4, 5), 209],
["xy", "0,1,(2)_(3)", (0, 1, 2, 3, 4, 5), 228],
],
)
def test_uv_selector(polstr, antstr, ants_exp, nrec_exp):
infile = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA")
aipy_uv = aipy_extracts.UV(infile)
aipy_extracts.uv_selector(aipy_uv, ants=antstr, pol_str=polstr)
nrec = 0
with pytest.raises(OSError, match="No data read"):
while True:
(_, _, bl_ants), _ = aipy_uv.read()
nrec += 1
# Make sure we only have ants we expect
assert np.all(np.isin(bl_ants, ants_exp))
if isinstance(polstr, str):
assert aipy_uv["pol"] == aipy_extracts.str2pol[polstr]
assert nrec == nrec_exp
aipy_uv.close()