test_uvcal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for uvcal object.
"""
import pytest
import os
import copy
import numpy as np
from pyuvdata import UVCal
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
pytestmark = pytest.mark.filterwarnings(
"ignore:telescope_location is not set. Using known values",
"ignore:antenna_positions is not set. Using known values",
)
@pytest.fixture(scope="function")
def uvcal_data():
"""Set up some uvcal iter tests."""
required_parameters = [
"_Nfreqs",
"_Njones",
"_Ntimes",
"_Nspws",
"_Nants_data",
"_Nants_telescope",
"_antenna_names",
"_antenna_numbers",
"_ant_array",
"_telescope_name",
"_freq_array",
"_channel_width",
"_spw_array",
"_jones_array",
"_time_array",
"_integration_time",
"_gain_convention",
"_flag_array",
"_quality_array",
"_cal_type",
"_cal_style",
"_x_orientation",
"_history",
]
required_properties = [
"Nfreqs",
"Njones",
"Ntimes",
"Nspws",
"Nants_data",
"Nants_telescope",
"antenna_names",
"antenna_numbers",
"ant_array",
"telescope_name",
"freq_array",
"channel_width",
"spw_array",
"jones_array",
"time_array",
"integration_time",
"gain_convention",
"flag_array",
"quality_array",
"cal_type",
"cal_style",
"x_orientation",
"history",
]
extra_parameters = [
"_telescope_location",
"_antenna_positions",
"_lst_array",
"_gain_array",
"_delay_array",
"_sky_field",
"_sky_catalog",
"_ref_antenna_name",
"_Nsources",
"_baseline_range",
"_diffuse_model",
"_input_flag_array",
"_time_range",
"_freq_range",
"_observer",
"_git_origin_cal",
"_git_hash_cal",
"_total_quality_array",
"_extra_keywords",
"_gain_scale",
]
extra_properties = [
"telescope_location",
"antenna_positions",
"lst_array",
"gain_array",
"delay_array",
"sky_field",
"sky_catalog",
"ref_antenna_name",
"Nsources",
"baseline_range",
"diffuse_model",
"input_flag_array",
"time_range",
"freq_range",
"observer",
"git_origin_cal",
"git_hash_cal",
"total_quality_array",
"extra_keywords",
"gain_scale",
]
other_properties = ["pyuvdata_version_str"]
uv_cal_object = UVCal()
# yields the data we need but will continue to the del call after tests
yield (
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
)
# some post-test object cleanup
del uv_cal_object
return
@pytest.fixture(scope="function")
def gain_data():
"""Initialize for some basic uvcal tests."""
gain_object = UVCal()
gainfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.gain.calfits")
gain_object.read_calfits(gainfile)
yield gain_object
del gain_object
@pytest.fixture(scope="function")
def delay_data(tmp_path):
"""Initialization for some basic uvcal tests."""
delay_object = UVCal()
delayfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.delay.calfits")
# add an input flag array to the file to test for that.
write_file = str(tmp_path / "outtest_input_flags.fits")
uv_in = UVCal()
uv_in.read_calfits(delayfile)
uv_in.input_flag_array = np.zeros(
uv_in._input_flag_array.expected_shape(uv_in), dtype=bool
)
uv_in.write_calfits(write_file, clobber=True)
delay_object.read_calfits(write_file)
# yield the data for testing, then del after tests finish
yield delay_object
del delay_object
def test_parameter_iter(uvcal_data):
"""Test expected parameters."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
all_params = []
for prop in uv_cal_object:
all_params.append(prop)
for a in required_parameters + extra_parameters:
assert a in all_params, (
"expected attribute " + a + " not returned in object iterator"
)
def test_required_parameter_iter(uvcal_data):
"""Test expected required parameters."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
# at first it's a metadata_only object, so need to modify required_parameters
required = []
for prop in uv_cal_object.required():
required.append(prop)
expected_required = copy.copy(required_parameters)
expected_required.remove("_quality_array")
expected_required.remove("_flag_array")
for a in expected_required:
assert a in required, (
"expected attribute " + a + " not returned in required iterator"
)
uv_cal_object.quality_array = 1
uv_cal_object.flag_array = 1
required = []
for prop in uv_cal_object.required():
required.append(prop)
for a in required_parameters:
assert a in required, (
"expected attribute " + a + " not returned in required iterator"
)
def test_unexpected_parameters(uvcal_data):
"""Test for extra parameters."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
expected_parameters = required_parameters + extra_parameters
attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] == "_"]
for a in attributes:
assert a in expected_parameters, "unexpected parameter " + a + " found in UVCal"
def test_unexpected_attributes(uvcal_data):
"""Test for extra attributes."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
expected_attributes = required_properties + extra_properties + other_properties
attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] != "_"]
for a in attributes:
assert a in expected_attributes, "unexpected attribute " + a + " found in UVCal"
def test_properties(uvcal_data):
"""Test that properties can be get and set properly."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
prop_dict = dict(
list(
zip(
required_properties + extra_properties,
required_parameters + extra_parameters,
)
)
)
for k, v in prop_dict.items():
rand_num = np.random.rand()
setattr(uv_cal_object, k, rand_num)
this_param = getattr(uv_cal_object, v)
try:
assert rand_num == this_param.value
except AssertionError:
print("setting {prop_name} to a random number failed".format(prop_name=k))
raise
def test_equality(gain_data):
"""Basic equality test"""
assert gain_data == gain_data
def test_check(gain_data):
"""Test that parameter checks run properly"""
assert gain_data.check()
def test_check_warnings(gain_data):
"""Test that parameter checks run properly"""
gain_data.telescope_location = None
gain_data.lst_array = None
gain_data.antenna_positions = None
with uvtest.check_warnings(
DeprecationWarning,
[
"The telescope_location is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
"The antenna_positions parameter is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
"The lst_array is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
],
):
assert gain_data.check()
def test_check_flag_array(gain_data):
gain_data.flag_array = np.ones((gain_data.flag_array.shape), dtype=int)
with pytest.raises(
ValueError, match="UVParameter _flag_array is not the appropriate type.",
):
gain_data.check()
def test_unknown_telescopes(gain_data, tmp_path):
calobj = gain_data
calobj.telescope_name = "foo"
calobj.telescope_location = None
calobj.lst_array = None
calobj.antenna_positions = None
write_file_calfits = str(tmp_path / "test.calfits")
deprecation_messages = [
"The telescope_location is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
"The antenna_positions parameter is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
"The lst_array is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
]
with uvtest.check_warnings(DeprecationWarning, match=deprecation_messages):
calobj.write_calfits(write_file_calfits, clobber=True)
calobj2 = UVCal()
with uvtest.check_warnings(
[UserWarning] + [DeprecationWarning] * 3,
match=["Telescope foo is not in known_telescopes"] + deprecation_messages,
):
calobj2.read_calfits(write_file_calfits)
def test_nants_data_telescope_larger(gain_data):
# make sure it's okay for Nants_telescope to be strictly greater than Nants_data
gain_data.Nants_telescope += 1
# add dummy information for "new antenna" to pass object check
gain_data.antenna_names = np.concatenate((gain_data.antenna_names, ["dummy_ant"]))
gain_data.antenna_numbers = np.concatenate((gain_data.antenna_numbers, [20]))
gain_data.antenna_positions = np.concatenate(
(gain_data.antenna_positions, np.zeros((1, 3), dtype=float))
)
assert gain_data.check()
def test_ant_array_not_in_antnums(gain_data):
# make sure an error is raised if antennas with data not in antenna_numbers
# remove antennas from antenna_names & antenna_numbers by hand
gain_data.antenna_names = gain_data.antenna_names[1:]
gain_data.antenna_numbers = gain_data.antenna_numbers[1:]
gain_data.antenna_positions = gain_data.antenna_positions[1:, :]
gain_data.Nants_telescope = gain_data.antenna_numbers.size
with pytest.raises(ValueError) as cm:
gain_data.check()
assert str(cm.value).startswith(
"All antennas in ant_array must be in antenna_numbers"
)
def test_set_gain(gain_data, delay_data):
delay_data._set_gain()
assert delay_data._gain_array.required
assert not delay_data._delay_array.required
assert delay_data._gain_array.form == delay_data._flag_array.form
assert delay_data._gain_array.form == delay_data._quality_array.form
def test_set_delay(gain_data):
gain_data._set_delay()
assert gain_data._delay_array.required
assert not gain_data._gain_array.required
assert gain_data._gain_array.form == gain_data._flag_array.form
assert gain_data._delay_array.form == gain_data._quality_array.form
def test_set_unknown(gain_data):
gain_data._set_unknown_cal_type()
assert not gain_data._delay_array.required
assert not gain_data._gain_array.required
assert gain_data._gain_array.form == gain_data._flag_array.form
assert gain_data._gain_array.form == gain_data._quality_array.form
def test_set_sky(gain_data):
gain_data._set_sky()
assert gain_data._sky_field.required
assert gain_data._sky_catalog.required
assert gain_data._ref_antenna_name.required
def test_set_redundant(gain_data):
gain_data._set_redundant()
assert not gain_data._sky_field.required
assert not gain_data._sky_catalog.required
assert not gain_data._ref_antenna_name.required
def test_convert_filetype(gain_data):
# error testing
with pytest.raises(ValueError, match="filetype must be calfits."):
gain_data._convert_to_filetype("uvfits")
def test_convert_to_gain(gain_data, delay_data):
conventions = ["minus", "plus"]
for c in conventions:
gain_data.new_object = delay_data.copy()
gain_data.new_object.convert_to_gain(delay_convention=c)
assert np.isclose(
np.max(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.isclose(
np.min(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
if c == "minus":
conv = -1
else:
conv = 1
assert np.allclose(
np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
(
conv
* 2
* np.pi
* delay_data.delay_array[:, :, 0, :, :]
* delay_data.freq_array[0, 10]
)
% (2 * np.pi),
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.allclose(
delay_data.quality_array,
gain_data.new_object.quality_array[:, :, 10, :, :],
rtol=gain_data.new_object._quality_array.tols[0],
atol=gain_data.new_object._quality_array.tols[1],
)
assert gain_data.new_object.history == (
delay_data.history + " Converted from delays to gains using pyuvdata."
)
# test a file with a total_quality_array
gain_data.new_object = delay_data.copy()
tqa_size = gain_data.new_object.delay_array.shape[1:]
gain_data.new_object.total_quality_array = np.ones(tqa_size)
gain_data.new_object.convert_to_gain(delay_convention="minus")
assert np.isclose(
np.max(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.isclose(
np.min(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.allclose(
np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
(
-1
* 2
* np.pi
* delay_data.delay_array[:, :, 0, :, :]
* delay_data.freq_array[0, 10]
)
% (2 * np.pi),
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.allclose(
delay_data.quality_array,
gain_data.new_object.quality_array[:, :, 10, :, :],
rtol=gain_data.new_object._quality_array.tols[0],
atol=gain_data.new_object._quality_array.tols[1],
)
assert gain_data.new_object.history == (
delay_data.history + " Converted from delays to gains using pyuvdata."
)
# error testing
pytest.raises(ValueError, delay_data.convert_to_gain, delay_convention="bogus")
pytest.raises(ValueError, gain_data.convert_to_gain)
gain_data._set_unknown_cal_type()
pytest.raises(ValueError, gain_data.convert_to_gain)
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_select_antennas(caltype, gain_data, delay_data, tmp_path):
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
old_history = calobj.history
ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
calobj2.select(antenna_nums=ants_to_keep)
assert len(ants_to_keep) == calobj2.Nants_data
for ant in ants_to_keep:
assert ant in calobj2.ant_array
for ant in calobj2.ant_array:
assert ant in ants_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific antennas using pyuvdata.",
calobj2.history,
)
# now test using antenna_names to specify antennas to keep
ants_to_keep = np.array(sorted(ants_to_keep))
ant_names = []
for a in ants_to_keep:
ind = np.where(calobj.antenna_numbers == a)[0][0]
ant_names.append(calobj.antenna_names[ind])
calobj3 = calobj.select(antenna_names=ant_names, inplace=False)
assert calobj2 == calobj3
# check for errors associated with antennas not included in data, bad names
# or providing numbers and names
with pytest.raises(
ValueError,
match=f"Antenna number {np.max(calobj.ant_array)+1} "
"is not present in the array",
):
calobj.select(antenna_nums=np.max(calobj.ant_array) + np.arange(1, 3))
with pytest.raises(
ValueError, match="Antenna name test1 is not present in the antenna_names array"
):
calobj.select(antenna_names=["test1"])
with pytest.raises(
ValueError, match="Only one of antenna_nums and antenna_names can be provided."
):
calobj.select(antenna_nums=ants_to_keep, antenna_names=ant_names)
# check that write_calfits works with Nants_data < Nants_telescope
write_file_calfits = str(tmp_path / "select_test.calfits")
calobj2.write_calfits(write_file_calfits, clobber=True)
# check that reading it back in works too
new_gain_object = UVCal()
new_gain_object.read_calfits(write_file_calfits)
assert calobj2 == new_gain_object
# check that total_quality_array is handled properly when present
calobj.total_quality_array = np.zeros(
calobj._total_quality_array.expected_shape(calobj)
)
with uvtest.check_warnings(
UserWarning, match="Cannot preserve total_quality_array",
):
calobj.select(antenna_names=ant_names, inplace=True)
assert calobj.total_quality_array is None
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_select_times(caltype, gain_data, delay_data, tmp_path):
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
old_history = calobj.history
times_to_keep = calobj.time_array[2:5]
calobj2.select(times=times_to_keep)
assert len(times_to_keep) == calobj2.Ntimes
for t in times_to_keep:
assert t in calobj2.time_array
for t in np.unique(calobj2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific times using pyuvdata.",
calobj2.history,
)
write_file_calfits = str(tmp_path / "select_test.calfits")
# test writing calfits with only one time
calobj2 = calobj.copy()
times_to_keep = calobj.time_array[[1]]
calobj2.select(times=times_to_keep)
calobj2.write_calfits(write_file_calfits, clobber=True)
# check for errors associated with times not included in data
pytest.raises(
ValueError,
calobj.select,
times=[np.min(calobj.time_array) - calobj.integration_time],
)
# check for warnings and errors associated with unevenly spaced times
calobj2 = calobj.copy()
with uvtest.check_warnings(
UserWarning, match="Selected times are not evenly spaced",
):
calobj2.select(times=calobj2.time_array[[0, 2, 3]])
pytest.raises(ValueError, calobj2.write_calfits, write_file_calfits)
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_select_frequencies(caltype, gain_data, delay_data, tmp_path):
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
old_history = calobj.history
freqs_to_keep = calobj.freq_array[0, np.arange(4, 8)]
# add dummy total_quality_array
calobj.total_quality_array = np.zeros(
calobj._total_quality_array.expected_shape(calobj)
)
calobj2.total_quality_array = np.zeros(
calobj2._total_quality_array.expected_shape(calobj2)
)
calobj2.select(frequencies=freqs_to_keep)
assert len(freqs_to_keep) == calobj2.Nfreqs
for f in freqs_to_keep:
assert f in calobj2.freq_array
for f in np.unique(calobj2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific frequencies using pyuvdata.",
calobj2.history,
)
write_file_calfits = str(tmp_path / "select_test.calfits")
# test writing calfits with only one frequency
calobj2 = calobj.copy()
freqs_to_keep = calobj.freq_array[0, 5]
calobj2.select(frequencies=freqs_to_keep)
calobj2.write_calfits(write_file_calfits, clobber=True)
# check for errors associated with frequencies not included in data
pytest.raises(
ValueError,
calobj.select,
frequencies=[np.max(calobj.freq_array) + calobj.channel_width],
)
# check for warnings and errors associated with unevenly spaced frequencies
calobj2 = calobj.copy()
with uvtest.check_warnings(
UserWarning, match="Selected frequencies are not evenly spaced",
):
calobj2.select(frequencies=calobj2.freq_array[0, [0, 5, 6]])
pytest.raises(ValueError, calobj2.write_calfits, write_file_calfits)
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_select_freq_chans(caltype, gain_data, delay_data):
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
old_history = calobj.history
chans_to_keep = np.arange(4, 8)
# add dummy total_quality_array
calobj.total_quality_array = np.zeros(
calobj._total_quality_array.expected_shape(calobj)
)
calobj2.total_quality_array = np.zeros(
calobj2._total_quality_array.expected_shape(calobj2)
)
calobj2.select(freq_chans=chans_to_keep)
assert len(chans_to_keep) == calobj2.Nfreqs
for chan in chans_to_keep:
assert calobj.freq_array[0, chan] in calobj2.freq_array
for f in np.unique(calobj2.freq_array):
assert f in calobj.freq_array[0, chans_to_keep]
assert uvutils._check_histories(
old_history + " Downselected to " "specific frequencies using pyuvdata.",
calobj2.history,
)
# Test selecting both channels and frequencies
freqs_to_keep = calobj.freq_array[0, np.arange(7, 10)] # Overlaps with chans
all_chans_to_keep = np.arange(4, 10)
calobj2 = calobj.copy()
calobj2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
assert len(all_chans_to_keep) == calobj2.Nfreqs
for chan in all_chans_to_keep:
assert calobj.freq_array[0, chan] in calobj2.freq_array
for f in np.unique(calobj2.freq_array):
assert f in calobj.freq_array[0, all_chans_to_keep]
@pytest.mark.parametrize("caltype", ["gain", "delay"])
@pytest.mark.parametrize(
"jones_to_keep", ([-5, -6], ["xx", "yy"], ["nn", "ee"], [[-5, -6]])
)
def test_select_polarizations(caltype, jones_to_keep, gain_data, delay_data):
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
# add more jones terms to allow for better testing of selections
while calobj.Njones < 4:
new_jones = np.min(calobj.jones_array) - 1
calobj.jones_array = np.append(calobj.jones_array, new_jones)
calobj.Njones += 1
calobj.flag_array = np.concatenate(
(calobj.flag_array, calobj.flag_array[:, :, :, :, [-1]],), axis=4,
)
if calobj.input_flag_array is not None:
calobj.input_flag_array = np.concatenate(
(calobj.input_flag_array, calobj.input_flag_array[:, :, :, :, [-1]],),
axis=4,
)
if caltype == "gain":
calobj.gain_array = np.concatenate(
(calobj.gain_array, calobj.gain_array[:, :, :, :, [-1]],), axis=4,
)
else:
delay_data.delay_array = np.concatenate(
(delay_data.delay_array, delay_data.delay_array[:, :, :, :, [-1]],),
axis=4,
)
calobj.quality_array = np.concatenate(
(calobj.quality_array, calobj.quality_array[:, :, :, :, [-1]],), axis=4,
)
# add dummy total_quality_array
calobj.total_quality_array = np.zeros(
calobj._total_quality_array.expected_shape(calobj)
)
assert calobj.check()
calobj2 = calobj.copy()
old_history = calobj.history
calobj2.select(jones=jones_to_keep)
if isinstance(jones_to_keep[0], list):
jones_to_keep = jones_to_keep[0]
assert len(jones_to_keep) == calobj2.Njones
for j in jones_to_keep:
if isinstance(j, int):
assert j in calobj2.jones_array
else:
assert (
uvutils.jstr2num(j, x_orientation=calobj2.x_orientation)
in calobj2.jones_array
)
for j in np.unique(calobj2.jones_array):
if isinstance(jones_to_keep[0], int):
assert j in jones_to_keep
else:
assert j in uvutils.jstr2num(
jones_to_keep, x_orientation=calobj2.x_orientation
)
assert uvutils._check_histories(
old_history + " Downselected to "
"specific jones polarization terms "
"using pyuvdata.",
calobj2.history,
)
# check for errors associated with polarizations not included in data
pytest.raises(ValueError, calobj2.select, jones=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
with uvtest.check_warnings(
UserWarning, match="Selected jones polarization terms are not evenly spaced",
):
calobj.select(jones=calobj.jones_array[[0, 1, 3]])
write_file_calfits = os.path.join(DATA_PATH, "test/select_test.calfits")
pytest.raises(ValueError, calobj.write_calfits, write_file_calfits)
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_select(caltype, gain_data, delay_data):
# now test selecting along all axes at once
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
old_history = calobj.history
ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
freqs_to_keep = calobj.freq_array[0, np.arange(2, 5)]
times_to_keep = calobj.time_array[[1, 2]]
jones_to_keep = [-5]
calobj2.select(
antenna_nums=ants_to_keep,
frequencies=freqs_to_keep,
times=times_to_keep,
jones=jones_to_keep,
)
assert len(ants_to_keep) == calobj2.Nants_data
for ant in ants_to_keep:
assert ant in calobj2.ant_array
for ant in calobj2.ant_array:
assert ant in ants_to_keep
assert len(times_to_keep) == calobj2.Ntimes
for t in times_to_keep:
assert t in calobj2.time_array
for t in np.unique(calobj2.time_array):
assert t in times_to_keep
assert len(freqs_to_keep) == calobj2.Nfreqs
for f in freqs_to_keep:
assert f in calobj2.freq_array
for f in np.unique(calobj2.freq_array):
assert f in freqs_to_keep
assert len(jones_to_keep) == calobj2.Njones
for j in jones_to_keep:
assert j in calobj2.jones_array
for j in np.unique(calobj2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(
old_history + " Downselected to "
"specific antennas, times, "
"frequencies, jones polarization terms "
"using pyuvdata.",
calobj2.history,
)
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_add_antennas(caltype, gain_data, delay_data):
"""Test adding antennas between two UVCal objects"""
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
calobj_full = calobj.copy()
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
calobj.select(antenna_nums=ants1)
calobj2.select(antenna_nums=ants2)
calobj += calobj2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
calobj_full.history + " Downselected to specific "
"antennas using pyuvdata. Combined "
"data along antenna axis using pyuvdata.",
calobj.history,
)
calobj.history = calobj_full.history
assert calobj == calobj_full
# test for when total_quality_array is present
calobj.select(antenna_nums=ants1)
calobj.total_quality_array = np.zeros(
calobj._total_quality_array.expected_shape(calobj)
)
with uvtest.check_warnings(UserWarning, match="Total quality array detected"):
calobj.__iadd__(calobj2)
assert calobj.total_quality_array is None
if caltype == "delay":
# test for when input_flag_array is present in first file but not second
calobj.select(antenna_nums=ants1)
ifa = np.zeros(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.ones(calobj2._input_flag_array.expected_shape(calobj2)).astype(
np.bool_
)
tot_ifa = np.concatenate([ifa, ifa2], axis=0)
calobj.input_flag_array = ifa
calobj2.input_flag_array = None
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
calobj.select(antenna_nums=ants1)
ifa = np.ones(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.zeros(calobj2._input_flag_array.expected_shape(calobj2)).astype(
np.bool_
)
tot_ifa = np.concatenate([ifa, ifa2], axis=0)
calobj.input_flag_array = None
calobj2.input_flag_array = ifa2
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# Out of order - antennas
calobj = calobj_full.copy()
calobj2 = calobj.copy()
calobj.select(antenna_nums=ants2)
calobj2.select(antenna_nums=ants1)
calobj += calobj2
calobj.history = calobj_full.history
assert calobj == calobj_full
def test_add_frequencies(gain_data):
"""Test adding frequencies between two UVCal objects"""
# don't test on delays because there's no freq axis for the delay array
calobj = gain_data
calobj2 = calobj.copy()
calobj_full = calobj.copy()
freqs1 = calobj.freq_array[0, np.arange(0, calobj.Nfreqs // 2)]
freqs2 = calobj.freq_array[0, np.arange(calobj.Nfreqs // 2, calobj.Nfreqs)]
calobj.select(frequencies=freqs1)
calobj2.select(frequencies=freqs2)
calobj += calobj2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
calobj_full.history + " Downselected to specific "
"frequencies using pyuvdata. Combined "
"data along frequency axis using pyuvdata.",
calobj.history,
)
calobj.history = calobj_full.history
assert calobj == calobj_full
# test for when total_quality_array is present in first file but not second
calobj.select(frequencies=freqs1)
tqa = np.ones(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.zeros(calobj2._total_quality_array.expected_shape(calobj2))
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
calobj.total_quality_array = tqa
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
calobj.select(frequencies=freqs1)
tqa = np.zeros(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.ones(calobj2._total_quality_array.expected_shape(calobj2))
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
calobj.total_quality_array = None
calobj2.total_quality_array = tqa2
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
calobj.select(frequencies=freqs1)
tqa = np.ones(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.ones(calobj2._total_quality_array.expected_shape(calobj2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
calobj.total_quality_array = tqa
calobj2.total_quality_array = tqa2
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when input_flag_array is present in first file but not second
calobj = calobj_full.copy()
calobj.input_flag_array = np.zeros(
calobj._input_flag_array.expected_shape(calobj), dtype=bool
)
calobj2 = calobj.copy()
calobj.select(frequencies=freqs1)
calobj2.select(frequencies=freqs2)
ifa = np.zeros(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.ones(calobj2._input_flag_array.expected_shape(calobj2)).astype(np.bool_)
tot_ifa = np.concatenate([ifa, ifa2], axis=2)
calobj.input_flag_array = ifa
calobj2.input_flag_array = None
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
calobj.select(frequencies=freqs1)
ifa = np.ones(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.zeros(calobj2._input_flag_array.expected_shape(calobj2)).astype(np.bool_)
tot_ifa = np.concatenate([ifa, ifa2], axis=2)
calobj.input_flag_array = None
calobj2.input_flag_array = ifa2
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# Out of order - freqs
calobj = calobj_full.copy()
calobj2 = calobj_full.copy()
calobj.select(frequencies=freqs2)
calobj2.select(frequencies=freqs1)
calobj += calobj2
calobj.history = calobj_full.history
assert calobj == calobj_full
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_add_times(caltype, gain_data, delay_data):
"""Test adding times between two UVCal objects"""
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
calobj_full = calobj.copy()
n_times2 = calobj.Ntimes // 2
times1 = calobj.time_array[:n_times2]
times2 = calobj.time_array[n_times2:]
calobj.select(times=times1)
calobj2.select(times=times2)
calobj += calobj2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
calobj_full.history + " Downselected to specific "
"times using pyuvdata. Combined "
"data along time axis using pyuvdata.",
calobj.history,
)
calobj.history = calobj_full.history
assert calobj == calobj_full
# test for when total_quality_array is present in first file but not second
calobj.select(times=times1)
tqa = np.ones(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.zeros(calobj2._total_quality_array.expected_shape(calobj2))
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
calobj.total_quality_array = tqa
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
calobj.select(times=times1)
tqa = np.zeros(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.ones(calobj2._total_quality_array.expected_shape(calobj2))
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
calobj.total_quality_array = None
calobj2.total_quality_array = tqa2
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
calobj.select(times=times1)
tqa = np.ones(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.ones(calobj2._total_quality_array.expected_shape(calobj2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
calobj.total_quality_array = tqa
calobj2.total_quality_array = tqa2
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
if caltype == "delay":
# test for when input_flag_array is present in first file but not second
calobj.select(times=times1)
ifa = np.zeros(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.ones(calobj2._input_flag_array.expected_shape(calobj2)).astype(
np.bool_
)
tot_ifa = np.concatenate([ifa, ifa2], axis=3)
calobj.input_flag_array = ifa
calobj2.input_flag_array = None
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
calobj.select(times=times1)
ifa = np.ones(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.zeros(calobj2._input_flag_array.expected_shape(calobj2)).astype(
np.bool_
)
tot_ifa = np.concatenate([ifa, ifa2], axis=3)
calobj.input_flag_array = None
calobj2.input_flag_array = ifa2
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# Out of order - times
calobj = calobj_full.copy()
calobj2 = calobj.copy()
calobj.select(times=times2)
calobj2.select(times=times1)
calobj += calobj2
calobj.history = calobj_full.history
assert calobj == calobj_full
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_add_jones(caltype, gain_data, delay_data):
"""Test adding Jones axes between two UVCal objects"""
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
calobj_original = calobj.copy()
# artificially change the Jones value to permit addition
calobj2.jones_array[0] = -6
calobj += calobj2
# check dimensionality of resulting object
if caltype == "gain":
assert calobj.gain_array.shape[-1] == 2
else:
assert calobj.delay_array.shape[-1] == 2
assert sorted(calobj.jones_array) == [-6, -5]
# test for when total_quality_array is present in first file but not second
calobj = calobj_original.copy()
tqa = np.ones(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.zeros(calobj2._total_quality_array.expected_shape(calobj2))
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
calobj.total_quality_array = tqa
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
calobj = calobj_original.copy()
tqa = np.zeros(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.ones(calobj2._total_quality_array.expected_shape(calobj2))
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
calobj2.total_quality_array = tqa2
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
calobj = calobj_original.copy()
tqa = np.ones(calobj._total_quality_array.expected_shape(calobj))
tqa2 = np.ones(calobj2._total_quality_array.expected_shape(calobj2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
calobj.total_quality_array = tqa
calobj2.total_quality_array = tqa2
calobj += calobj2
assert np.allclose(
calobj.total_quality_array,
tot_tqa,
rtol=calobj._total_quality_array.tols[0],
atol=calobj._total_quality_array.tols[1],
)
if caltype == "delay":
# test for when input_flag_array is present in first file but not second
calobj = calobj_original.copy()
ifa = np.zeros(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.ones(calobj2._input_flag_array.expected_shape(calobj2)).astype(
np.bool_
)
tot_ifa = np.concatenate([ifa, ifa2], axis=4)
calobj.input_flag_array = ifa
calobj2.input_flag_array = None
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
calobj = calobj_original.copy()
ifa = np.ones(calobj._input_flag_array.expected_shape(calobj)).astype(np.bool_)
ifa2 = np.zeros(calobj2._input_flag_array.expected_shape(calobj2)).astype(
np.bool_
)
tot_ifa = np.concatenate([ifa, ifa2], axis=4)
calobj.input_flag_array = None
calobj2.input_flag_array = ifa2
calobj += calobj2
assert np.allclose(calobj.input_flag_array, tot_ifa)
# Out of order - jones
calobj = calobj_original.copy()
calobj2 = calobj_original.copy()
calobj.jones_array[0] = -6
calobj += calobj2
calobj2 = calobj.copy()
calobj.select(jones=-5)
calobj.history = calobj_original.history
assert calobj == calobj_original
calobj2.select(jones=-6)
calobj2.jones_array[:] = -5
calobj2.history = calobj_original.history
assert calobj2 == calobj_original
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_add(caltype, gain_data, delay_data):
"""Test miscellaneous aspects of add method"""
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
# test not-in-place addition
calobj_original = calobj.copy()
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
calobj.select(antenna_nums=ants1)
calobj2.select(antenna_nums=ants2)
calobj_add = calobj + calobj2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
calobj_original.history + " Downselected to specific "
"antennas using pyuvdata. Combined "
"data along antenna axis using pyuvdata.",
calobj_add.history,
)
calobj_add.history = calobj_original.history
assert calobj_add == calobj_original
# test history concatenation
calobj.history = calobj_original.history
if caltype == "gain":
calobj2.history = "Some random history string OMNI_RUN:"
else:
calobj2.history = "Some random history string firstcal.py"
new_cal = calobj + calobj2
additional_history = "Some random history string"
assert uvutils._check_histories(
calobj_original.history + " Combined data along antenna axis "
"using pyuvdata. Unique part of next object history follows. "
+ additional_history,
new_cal.history,
)
new_cal = calobj.__add__(calobj2, verbose_history=True)
assert uvutils._check_histories(
calobj_original.history + " Combined data along antenna axis "
"using pyuvdata. Next object history follows. " + calobj2.history,
new_cal.history,
)
def test_add_multiple_axes(gain_data):
"""Test addition along multiple axes"""
calobj = gain_data
calobj2 = calobj.copy()
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
freqs1 = calobj.freq_array[0, np.arange(0, 5)]
freqs2 = calobj.freq_array[0, np.arange(5, 10)]
n_times2 = calobj.Ntimes // 2
times1 = calobj.time_array[:n_times2]
times2 = calobj.time_array[n_times2:]
# artificially change the Jones value to permit addition
calobj2.jones_array[0] = -6
# perform select
calobj.select(antenna_nums=ants1, frequencies=freqs1, times=times1)
calobj2.select(antenna_nums=ants2, frequencies=freqs2, times=times2)
calobj += calobj2
# check resulting dimensionality
assert len(calobj.ant_array) == 19
assert len(calobj.freq_array[0, :]) == 10
assert len(calobj.time_array) == calobj.Ntimes
assert len(calobj.jones_array) == 2
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_add_errors(caltype, gain_data, delay_data):
"""Test behavior that will raise errors"""
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
# test addition of two identical objects
with pytest.raises(
ValueError, match="These objects have overlapping data and cannot be combined."
):
calobj.__add__(calobj2)
# test addition of UVCal and non-UVCal object (empty list)
with pytest.raises(
ValueError, match="Only UVCal ",
):
calobj.__add__([])
# test compatibility param mismatch
calobj2.telescope_name = "PAPER"
with pytest.raises(
ValueError, match="Parameter telescope_name does not match",
):
calobj.__add__(calobj2)
def test_jones_warning(gain_data):
"""Test having non-contiguous Jones elements"""
calobj = gain_data
calobj2 = calobj.copy()
calobj2.jones_array[0] = -6
calobj += calobj2
calobj2.jones_array[0] = -8
with uvtest.check_warnings(
UserWarning, match="Combined Jones elements",
):
calobj.__iadd__(calobj2)
assert sorted(calobj.jones_array) == [-8, -6, -5]
def test_frequency_warnings(gain_data):
"""Test having uneven or non-contiguous frequencies"""
# test having unevenly spaced frequency separations
calobj = gain_data
calobj2 = calobj.copy()
go1 = calobj.copy()
go2 = calobj2.copy()
freqs1 = calobj.freq_array[0, np.arange(0, 5)]
freqs2 = calobj2.freq_array[0, np.arange(5, 10)]
calobj.select(frequencies=freqs1)
calobj2.select(frequencies=freqs2)
# change the last frequency bin to be smaller than the others
df = calobj2.freq_array[0, -1] - calobj2.freq_array[0, -2]
calobj2.freq_array[0, -1] = calobj2.freq_array[0, -2] + df / 2
with uvtest.check_warnings(
UserWarning, match="Combined frequencies are not evenly spaced"
):
calobj.__iadd__(calobj2)
assert len(calobj.freq_array[0, :]) == calobj.Nfreqs
# now check having "non-contiguous" frequencies
calobj = go1.copy()
calobj2 = go2.copy()
freqs1 = calobj.freq_array[0, np.arange(0, 5)]
freqs2 = calobj2.freq_array[0, np.arange(5, 10)]
calobj.select(frequencies=freqs1)
calobj2.select(frequencies=freqs2)
# artificially space out frequencies
calobj.freq_array[0, :] *= 10
calobj2.freq_array[0, :] *= 10
with uvtest.check_warnings(
UserWarning, match="Combined frequencies are not contiguous"
):
calobj.__iadd__(calobj2)
freqs1 *= 10
freqs2 *= 10
freqs = np.concatenate([freqs1, freqs2])
assert np.allclose(
calobj.freq_array[0, :],
freqs,
rtol=calobj._freq_array.tols[0],
atol=calobj._freq_array.tols[1],
)
def test_parameter_warnings(gain_data):
"""Test changing a parameter that will raise a warning"""
# change observer and select frequencies
calobj = gain_data
calobj2 = calobj.copy()
calobj2.observer = "mystery_person"
freqs1 = calobj.freq_array[0, np.arange(0, 5)]
freqs2 = calobj2.freq_array[0, np.arange(5, 10)]
calobj.select(frequencies=freqs1)
calobj2.select(frequencies=freqs2)
with uvtest.check_warnings(
UserWarning, match="UVParameter observer does not match"
):
calobj.__iadd__(calobj2)
freqs = np.concatenate([freqs1, freqs2])
assert np.allclose(
calobj.freq_array,
freqs,
rtol=calobj._freq_array.tols[0],
atol=calobj._freq_array.tols[1],
)
@pytest.mark.parametrize("caltype", ["gain", "delay"])
def test_multi_files(caltype, gain_data, delay_data, tmp_path):
"""Test read function when multiple files are included"""
if caltype == "gain":
calobj = gain_data
calobj2 = calobj.copy()
else:
calobj = delay_data
calobj2 = calobj.copy()
calobj_full = calobj.copy()
n_times2 = calobj.Ntimes // 2
# Break up delay object into two objects, divided in time
times1 = calobj.time_array[:n_times2]
times2 = calobj.time_array[n_times2:]
calobj.select(times=times1)
calobj2.select(times=times2)
# Write those objects to files
f1 = str(tmp_path / "read_multi1.calfits")
f2 = str(tmp_path / "read_multi2.calfits")
calobj.write_calfits(f1, clobber=True)
calobj2.write_calfits(f2, clobber=True)
# Read both files together
calobj.read_calfits([f1, f2])
assert uvutils._check_histories(
calobj_full.history + " Downselected to specific times"
" using pyuvdata. Combined data "
"along time axis using pyuvdata.",
calobj.history,
)
calobj.history = calobj_full.history
assert calobj == calobj_full
# check metadata only read
calobj.read_calfits([f1, f2], read_data=False)
calobj_full_metadata_only = calobj_full.copy(metadata_only=True)
calobj.history = calobj_full_metadata_only.history
assert calobj == calobj_full_metadata_only
def test_uvcal_get_methods():
# load data
uvc = UVCal()
uvc.read_calfits(os.path.join(DATA_PATH, "zen.2457698.40355.xx.gain.calfits"))
# test get methods: add in a known value and make sure it is returned
key = (10, "Jee")
uvc.gain_array[1] = 0.0
d, f, q = uvc.get_gains(key), uvc.get_flags(key), uvc.get_quality(key)
# test shapes
assert np.all(np.isclose(d, 0.0))
assert d.shape == (uvc.Nfreqs, uvc.Ntimes)
assert f.shape == (uvc.Nfreqs, uvc.Ntimes)
assert q.shape == (uvc.Nfreqs, uvc.Ntimes)
# test against by-hand indexing
np.testing.assert_array_almost_equal(
d,
uvc.gain_array[
uvc.ant_array.tolist().index(10),
0,
:,
:,
uvc.jones_array.tolist().index(-5),
],
)
# test variable key input
d2 = uvc.get_gains(*key)
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(key[0])
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(key[:1])
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(10, -5)
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(10, "x")
np.testing.assert_array_almost_equal(d, d2)
# check has_key
assert uvc._has_key(antnum=10)
assert uvc._has_key(jpol="Jee")
assert uvc._has_key(antnum=10, jpol="Jee")
assert not uvc._has_key(antnum=10, jpol="Jnn")
assert not uvc._has_key(antnum=101, jpol="Jee")
# test exceptions
pytest.raises(ValueError, uvc.get_gains, 1)
pytest.raises(ValueError, uvc.get_gains, (10, "Jnn"))
uvc.cal_type = "delay"
pytest.raises(ValueError, uvc.get_gains, 10)
def test_write_read_optional_attrs(tmp_path):
# read a test file
cal_in = UVCal()
testfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.gain.calfits")
cal_in.read_calfits(testfile)
# set some optional parameters
cal_in.gain_scale = "Jy"
cal_in.sky_field = "GLEAM"
# write
write_file_calfits = str(tmp_path / "test.calfits")
cal_in.write_calfits(write_file_calfits, clobber=True)
# read and compare
cal_in2 = UVCal()
cal_in2.read_calfits(write_file_calfits)
assert cal_in == cal_in2
@pytest.mark.parametrize("caltype", ["gain", "delay", "unknown", None])
def test_copy(gain_data, delay_data, caltype):
"""Test the copy method"""
if caltype == "gain":
uv_object = gain_data
elif caltype == "delay":
uv_object = delay_data
else:
uv_object = gain_data
uv_object._set_unknown_cal_type()
uv_object.cal_type = caltype
uv_object_copy = uv_object.copy()
assert uv_object_copy == uv_object
uv_object_copy = uv_object.copy(metadata_only=True)
assert uv_object_copy.metadata_only
for name in uv_object._data_params:
setattr(uv_object, name, None)
assert uv_object_copy == uv_object
uv_object_copy = uv_object.copy()
assert uv_object_copy == uv_object
return
@pytest.mark.parametrize("antnamefix", ["all", "partial"])
def test_match_antpos_antname(gain_data, antnamefix, tmp_path):
# fix the antenna names in the uvcal object to match telescope object
new_names = np.array(
[name.replace("ant", "HH") for name in gain_data.antenna_names]
)
if antnamefix == "all":
gain_data.antenna_names = new_names
else:
gain_data.antenna_names[0 : gain_data.Nants_telescope // 2] = new_names[
0 : gain_data.Nants_telescope // 2
]
# remove the antenna_positions to test matching them on read
gain_data2 = gain_data.copy()
gain_data2.antenna_positions = None
write_file_calfits = str(tmp_path / "test.calfits")
with uvtest.check_warnings(
DeprecationWarning,
match="The antenna_positions parameter is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
):
gain_data2.write_calfits(write_file_calfits)
with uvtest.check_warnings(
UserWarning, "antenna_positions is not set. Using known values for HERA."
):
gain_data2.read_calfits(write_file_calfits)
assert gain_data2.antenna_positions is not None
assert gain_data == gain_data2
@pytest.mark.parametrize("modtype", ["rename", "swap"])
def test_set_antpos_from_telescope_errors(gain_data, modtype, tmp_path):
"""Test that setting antenna positions doesn't happen if ants don't match."""
# fix the antenna names in the uvcal object to match telescope object
new_names = np.array(
[name.replace("ant", "HH") for name in gain_data.antenna_names]
)
gain_data.antenna_names = new_names
if modtype == "rename":
# change the name & number of one of the antennas
orig_num = gain_data.antenna_numbers[0]
gain_data.antenna_names[0] = "HH400"
gain_data.antenna_numbers[0] = 400
gain_data.ant_array[np.where(gain_data.ant_array == orig_num)[0]] = 400
else:
# change the name of one antenna and swap the number with a different antenna
orig_num = gain_data.antenna_numbers[0]
gain_data.antenna_names[0] = "HH400"
gain_data.antenna_numbers[0] = gain_data.antenna_numbers[1]
gain_data.antenna_numbers[1] = orig_num
# remove the antenna_positions to test matching them on read
gain_data2 = gain_data.copy()
gain_data2.antenna_positions = None
write_file_calfits = str(tmp_path / "test.calfits")
with uvtest.check_warnings(
DeprecationWarning,
match="The antenna_positions parameter is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
):
gain_data2.write_calfits(write_file_calfits)
with uvtest.check_warnings(
[UserWarning, DeprecationWarning],
match=[
"Not all antennas have positions in the telescope object. "
"Not setting antenna_positions.",
"The antenna_positions parameter is not set. It will be a required "
"parameter starting in pyuvdata version 2.3",
],
):
gain_data2.read_calfits(write_file_calfits)
assert gain_data2.antenna_positions is None