https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 0b1b091eeec941c6081327cfd1d65f689feafd8c authored by Paul La Plante on 08 August 2020, 00:16:05 UTC
Merge branch 'Smithsonian-submillimeter_array' into master
Merge branch 'Smithsonian-submillimeter_array' into master
Tip revision: 0b1b091
test_uvcal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for uvcal object.
"""
import pytest
import os
import numpy as np
from pyuvdata import UVCal
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
@pytest.fixture(scope="function")
def uvcal_data():
"""Set up some uvcal iter tests."""
required_parameters = [
"_Nfreqs",
"_Njones",
"_Ntimes",
"_Nspws",
"_Nants_data",
"_Nants_telescope",
"_antenna_names",
"_antenna_numbers",
"_ant_array",
"_telescope_name",
"_freq_array",
"_channel_width",
"_spw_array",
"_jones_array",
"_time_array",
"_integration_time",
"_gain_convention",
"_flag_array",
"_quality_array",
"_cal_type",
"_cal_style",
"_x_orientation",
"_history",
]
required_properties = [
"Nfreqs",
"Njones",
"Ntimes",
"Nspws",
"Nants_data",
"Nants_telescope",
"antenna_names",
"antenna_numbers",
"ant_array",
"telescope_name",
"freq_array",
"channel_width",
"spw_array",
"jones_array",
"time_array",
"integration_time",
"gain_convention",
"flag_array",
"quality_array",
"cal_type",
"cal_style",
"x_orientation",
"history",
]
extra_parameters = [
"_gain_array",
"_delay_array",
"_sky_field",
"_sky_catalog",
"_ref_antenna_name",
"_Nsources",
"_baseline_range",
"_diffuse_model",
"_input_flag_array",
"_time_range",
"_freq_range",
"_observer",
"_git_origin_cal",
"_git_hash_cal",
"_total_quality_array",
"_extra_keywords",
"_gain_scale",
]
extra_properties = [
"gain_array",
"delay_array",
"sky_field",
"sky_catalog",
"ref_antenna_name",
"Nsources",
"baseline_range",
"diffuse_model",
"input_flag_array",
"time_range",
"freq_range",
"observer",
"git_origin_cal",
"git_hash_cal",
"total_quality_array",
"extra_keywords",
"gain_scale",
]
other_properties = ["pyuvdata_version_str"]
uv_cal_object = UVCal()
# yields the data we need but will continue to the del call after tests
yield (
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
)
# some post-test object cleanup
del uv_cal_object
return
def test_parameter_iter(uvcal_data):
"""Test expected parameters."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
all_params = []
for prop in uv_cal_object:
all_params.append(prop)
for a in required_parameters + extra_parameters:
assert a in all_params, (
"expected attribute " + a + " not returned in object iterator"
)
def test_required_parameter_iter(uvcal_data):
"""Test expected required parameters."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
required = []
for prop in uv_cal_object.required():
required.append(prop)
for a in required_parameters:
assert a in required, (
"expected attribute " + a + " not returned in required iterator"
)
def test_unexpected_parameters(uvcal_data):
"""Test for extra parameters."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
expected_parameters = required_parameters + extra_parameters
attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] == "_"]
for a in attributes:
assert a in expected_parameters, "unexpected parameter " + a + " found in UVCal"
def test_unexpected_attributes(uvcal_data):
"""Test for extra attributes."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
expected_attributes = required_properties + extra_properties + other_properties
attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] != "_"]
for a in attributes:
assert a in expected_attributes, "unexpected attribute " + a + " found in UVCal"
def test_properties(uvcal_data):
"""Test that properties can be get and set properly."""
(
uv_cal_object,
required_parameters,
required_properties,
extra_parameters,
extra_properties,
other_properties,
) = uvcal_data
prop_dict = dict(
list(
zip(
required_properties + extra_properties,
required_parameters + extra_parameters,
)
)
)
for k, v in prop_dict.items():
rand_num = np.random.rand()
setattr(uv_cal_object, k, rand_num)
this_param = getattr(uv_cal_object, v)
try:
assert rand_num == this_param.value
except AssertionError:
print("setting {prop_name} to a random number failed".format(prop_name=k))
raise
@pytest.fixture(scope="function")
def gain_data():
"""Initialize for some basic uvcal tests."""
gain_object = UVCal()
gainfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.gain.calfits")
gain_object.read_calfits(gainfile)
gain_object2 = gain_object.copy()
delay_object = UVCal()
delayfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.delay.calfits")
delay_object.read_calfits(delayfile)
class DataHolder(object):
def __init__(self, gain_object, gain_object2, delay_object):
self.gain_object = gain_object
self.gain_object2 = gain_object2
self.delay_object = delay_object
gain_data = DataHolder(gain_object, gain_object2, delay_object)
yield gain_data
del gain_data
def test_equality(gain_data):
"""Basic equality test"""
assert gain_data.gain_object == gain_data.gain_object
def test_check(gain_data):
"""Test that parameter checks run properly"""
assert gain_data.gain_object.check()
def test_nants_data_telescope_larger(gain_data):
# make sure it's okay for Nants_telescope to be strictly greater than Nants_data
gain_data.gain_object.Nants_telescope += 1
# add dummy information for "new antenna" to pass object check
gain_data.gain_object.antenna_names = np.concatenate(
(gain_data.gain_object.antenna_names, ["dummy_ant"])
)
gain_data.gain_object.antenna_numbers = np.concatenate(
(gain_data.gain_object.antenna_numbers, [20])
)
assert gain_data.gain_object.check()
def test_ant_array_not_in_antnums(gain_data):
# make sure an error is raised if antennas with data not in antenna_numbers
# remove antennas from antenna_names & antenna_numbers by hand
gain_data.gain_object.antenna_names = gain_data.gain_object.antenna_names[1:]
gain_data.gain_object.antenna_numbers = gain_data.gain_object.antenna_numbers[1:]
gain_data.gain_object.Nants_telescope = gain_data.gain_object.antenna_numbers.size
with pytest.raises(ValueError) as cm:
gain_data.gain_object.check()
assert str(cm.value).startswith(
"All antennas in ant_array must be in antenna_numbers"
)
def test_set_gain(gain_data):
gain_data.delay_object._set_gain()
assert gain_data.delay_object._gain_array.required
assert not gain_data.delay_object._delay_array.required
assert (
gain_data.delay_object._gain_array.form
== gain_data.delay_object._flag_array.form
)
assert (
gain_data.delay_object._gain_array.form
== gain_data.delay_object._quality_array.form
)
with uvtest.check_warnings(
DeprecationWarning,
match="`set_gain` is deprecated, and will be removed in "
"pyuvdata version 2.2. Use `_set_gain` instead.",
):
gain_data.gain_object.set_gain()
def test_set_delay(gain_data):
gain_data.gain_object._set_delay()
assert gain_data.gain_object._delay_array.required
assert not gain_data.gain_object._gain_array.required
assert (
gain_data.gain_object._gain_array.form == gain_data.gain_object._flag_array.form
)
assert (
gain_data.gain_object._delay_array.form
== gain_data.gain_object._quality_array.form
)
with uvtest.check_warnings(
DeprecationWarning,
match="`set_delay` is deprecated, and will be removed in "
"pyuvdata version 2.2. Use `_set_delay` instead.",
):
gain_data.gain_object.set_delay()
def test_set_unknown(gain_data):
gain_data.gain_object._set_unknown_cal_type()
assert not gain_data.gain_object._delay_array.required
assert not gain_data.gain_object._gain_array.required
assert (
gain_data.gain_object._gain_array.form == gain_data.gain_object._flag_array.form
)
assert (
gain_data.gain_object._gain_array.form
== gain_data.gain_object._quality_array.form
)
with uvtest.check_warnings(
DeprecationWarning,
match="`set_unknown_cal_type` is deprecated, and will be removed in "
"pyuvdata version 2.2. Use `_set_unknown_cal_type` instead.",
):
gain_data.gain_object.set_unknown_cal_type()
def test_set_sky(gain_data):
gain_data.gain_object._set_sky()
assert gain_data.gain_object._sky_field.required
assert gain_data.gain_object._sky_catalog.required
assert gain_data.gain_object._ref_antenna_name.required
with uvtest.check_warnings(
DeprecationWarning,
match="`set_sky` is deprecated, and will be removed in "
"pyuvdata version 2.2. Use `_set_sky` instead.",
):
gain_data.gain_object.set_sky()
def test_set_redundant(gain_data):
gain_data.gain_object._set_redundant()
assert not gain_data.gain_object._sky_field.required
assert not gain_data.gain_object._sky_catalog.required
assert not gain_data.gain_object._ref_antenna_name.required
with uvtest.check_warnings(
DeprecationWarning,
match="`set_redundant` is deprecated, and will be removed in "
"pyuvdata version 2.2. Use `_set_redundant` instead.",
):
gain_data.gain_object.set_redundant()
def test_convert_filetype(gain_data):
# error testing
pytest.raises(ValueError, gain_data.gain_object._convert_to_filetype, "uvfits")
def test_convert_to_gain(gain_data):
conventions = ["minus", "plus"]
for c in conventions:
gain_data.new_object = gain_data.delay_object.copy()
gain_data.new_object.convert_to_gain(delay_convention=c)
assert np.isclose(
np.max(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.isclose(
np.min(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
if c == "minus":
conv = -1
else:
conv = 1
assert np.allclose(
np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
(
conv
* 2
* np.pi
* gain_data.delay_object.delay_array[:, :, 0, :, :]
* gain_data.delay_object.freq_array[0, 10]
)
% (2 * np.pi),
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.allclose(
gain_data.delay_object.quality_array,
gain_data.new_object.quality_array[:, :, 10, :, :],
rtol=gain_data.new_object._quality_array.tols[0],
atol=gain_data.new_object._quality_array.tols[1],
)
assert gain_data.new_object.history == (
gain_data.delay_object.history
+ " Converted from delays to gains using pyuvdata."
)
# test a file with a total_quality_array
gain_data.new_object = gain_data.delay_object.copy()
tqa_size = gain_data.new_object.delay_array.shape[1:]
gain_data.new_object.total_quality_array = np.ones(tqa_size)
gain_data.new_object.convert_to_gain(delay_convention="minus")
assert np.isclose(
np.max(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.isclose(
np.min(np.absolute(gain_data.new_object.gain_array)),
1.0,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.allclose(
np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
(
-1
* 2
* np.pi
* gain_data.delay_object.delay_array[:, :, 0, :, :]
* gain_data.delay_object.freq_array[0, 10]
)
% (2 * np.pi),
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1],
)
assert np.allclose(
gain_data.delay_object.quality_array,
gain_data.new_object.quality_array[:, :, 10, :, :],
rtol=gain_data.new_object._quality_array.tols[0],
atol=gain_data.new_object._quality_array.tols[1],
)
assert gain_data.new_object.history == (
gain_data.delay_object.history
+ " Converted from delays to gains using pyuvdata."
)
# error testing
pytest.raises(
ValueError, gain_data.delay_object.convert_to_gain, delay_convention="bogus"
)
pytest.raises(ValueError, gain_data.gain_object.convert_to_gain)
gain_data.gain_object._set_unknown_cal_type()
pytest.raises(ValueError, gain_data.gain_object.convert_to_gain)
def test_select_antennas(gain_data, tmp_path):
old_history = gain_data.gain_object.history
ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
gain_data.gain_object2.select(antenna_nums=ants_to_keep)
assert len(ants_to_keep) == gain_data.gain_object2.Nants_data
for ant in ants_to_keep:
assert ant in gain_data.gain_object2.ant_array
for ant in gain_data.gain_object2.ant_array:
assert ant in ants_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific antennas using pyuvdata.",
gain_data.gain_object2.history,
)
# now test using antenna_names to specify antennas to keep
ants_to_keep = np.array(sorted(ants_to_keep))
ant_names = []
for a in ants_to_keep:
ind = np.where(gain_data.gain_object.antenna_numbers == a)[0][0]
ant_names.append(gain_data.gain_object.antenna_names[ind])
gain_data.gain_object3 = gain_data.gain_object.select(
antenna_names=ant_names, inplace=False
)
assert gain_data.gain_object2 == gain_data.gain_object3
# check for errors associated with antennas not included in data, bad names
# or providing numbers and names
pytest.raises(
ValueError,
gain_data.gain_object.select,
antenna_nums=np.max(gain_data.gain_object.ant_array) + np.arange(1, 3),
)
pytest.raises(ValueError, gain_data.gain_object.select, antenna_names="test1")
pytest.raises(
ValueError,
gain_data.gain_object.select,
antenna_nums=ants_to_keep,
antenna_names=ant_names,
)
# check that write_calfits works with Nants_data < Nants_telescope
write_file_calfits = str(tmp_path / "select_test.calfits")
gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)
# check that reading it back in works too
new_gain_object = UVCal()
new_gain_object.read_calfits(write_file_calfits)
assert gain_data.gain_object2 == new_gain_object
# check that total_quality_array is handled properly when present
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
with uvtest.check_warnings(UserWarning, "Cannot preserve total_quality_array"):
gain_data.gain_object.select(antenna_names=ant_names, inplace=True)
assert gain_data.gain_object.total_quality_array is None
def test_select_times(gain_data, tmp_path):
old_history = gain_data.gain_object.history
times_to_keep = gain_data.gain_object.time_array[2:5]
gain_data.gain_object2.select(times=times_to_keep)
assert len(times_to_keep) == gain_data.gain_object2.Ntimes
for t in times_to_keep:
assert t in gain_data.gain_object2.time_array
for t in np.unique(gain_data.gain_object2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific times using pyuvdata.",
gain_data.gain_object2.history,
)
write_file_calfits = str(tmp_path / "select_test.calfits")
# test writing calfits with only one time
gain_data.gain_object2 = gain_data.gain_object.copy()
times_to_keep = gain_data.gain_object.time_array[[1]]
gain_data.gain_object2.select(times=times_to_keep)
gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)
# check for errors associated with times not included in data
pytest.raises(
ValueError,
gain_data.gain_object.select,
times=[
np.min(gain_data.gain_object.time_array)
- gain_data.gain_object.integration_time
],
)
# check for warnings and errors associated with unevenly spaced times
gain_data.gain_object2 = gain_data.gain_object.copy()
with uvtest.check_warnings(UserWarning, "Selected times are not evenly spaced"):
gain_data.gain_object2.select(
times=gain_data.gain_object2.time_array[[0, 2, 3]]
)
pytest.raises(ValueError, gain_data.gain_object2.write_calfits, write_file_calfits)
def test_select_frequencies(gain_data, tmp_path):
old_history = gain_data.gain_object.history
freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(4, 8)]
# add dummy total_quality_array
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
gain_data.gain_object2.total_quality_array = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
gain_data.gain_object2.select(frequencies=freqs_to_keep)
assert len(freqs_to_keep) == gain_data.gain_object2.Nfreqs
for f in freqs_to_keep:
assert f in gain_data.gain_object2.freq_array
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific frequencies using pyuvdata.",
gain_data.gain_object2.history,
)
write_file_calfits = str(tmp_path / "select_test.calfits")
# test writing calfits with only one frequency
gain_data.gain_object2 = gain_data.gain_object.copy()
freqs_to_keep = gain_data.gain_object.freq_array[0, 5]
gain_data.gain_object2.select(frequencies=freqs_to_keep)
gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)
# check for errors associated with frequencies not included in data
pytest.raises(
ValueError,
gain_data.gain_object.select,
frequencies=[
np.max(gain_data.gain_object.freq_array)
+ gain_data.gain_object.channel_width
],
)
# check for warnings and errors associated with unevenly spaced frequencies
gain_data.gain_object2 = gain_data.gain_object.copy()
with uvtest.check_warnings(
UserWarning, "Selected frequencies are not evenly spaced"
):
gain_data.gain_object2.select(
frequencies=gain_data.gain_object2.freq_array[0, [0, 5, 6]]
)
pytest.raises(ValueError, gain_data.gain_object2.write_calfits, write_file_calfits)
def test_select_freq_chans(gain_data):
old_history = gain_data.gain_object.history
chans_to_keep = np.arange(4, 8)
# add dummy total_quality_array
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
gain_data.gain_object2.total_quality_array = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
gain_data.gain_object2.select(freq_chans=chans_to_keep)
assert len(chans_to_keep) == gain_data.gain_object2.Nfreqs
for chan in chans_to_keep:
assert (
gain_data.gain_object.freq_array[0, chan]
in gain_data.gain_object2.freq_array
)
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in gain_data.gain_object.freq_array[0, chans_to_keep]
assert uvutils._check_histories(
old_history + " Downselected to " "specific frequencies using pyuvdata.",
gain_data.gain_object2.history,
)
# Test selecting both channels and frequencies
freqs_to_keep = gain_data.gain_object.freq_array[
0, np.arange(7, 10)
] # Overlaps with chans
all_chans_to_keep = np.arange(4, 10)
gain_data.gain_object2 = gain_data.gain_object.copy()
gain_data.gain_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
assert len(all_chans_to_keep) == gain_data.gain_object2.Nfreqs
for chan in all_chans_to_keep:
assert (
gain_data.gain_object.freq_array[0, chan]
in gain_data.gain_object2.freq_array
)
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in gain_data.gain_object.freq_array[0, all_chans_to_keep]
def test_select_polarizations(gain_data):
# add more jones terms to allow for better testing of selections
while gain_data.gain_object.Njones < 4:
new_jones = np.min(gain_data.gain_object.jones_array) - 1
gain_data.gain_object.jones_array = np.append(
gain_data.gain_object.jones_array, new_jones
)
gain_data.gain_object.Njones += 1
gain_data.gain_object.flag_array = np.concatenate(
(
gain_data.gain_object.flag_array,
gain_data.gain_object.flag_array[:, :, :, :, [-1]],
),
axis=4,
)
gain_data.gain_object.gain_array = np.concatenate(
(
gain_data.gain_object.gain_array,
gain_data.gain_object.gain_array[:, :, :, :, [-1]],
),
axis=4,
)
gain_data.gain_object.quality_array = np.concatenate(
(
gain_data.gain_object.quality_array,
gain_data.gain_object.quality_array[:, :, :, :, [-1]],
),
axis=4,
)
# add dummy total_quality_array
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
assert gain_data.gain_object.check()
gain_data.gain_object2 = gain_data.gain_object.copy()
old_history = gain_data.gain_object.history
jones_to_keep = [-5, -6]
gain_data.gain_object2.select(jones=jones_to_keep)
assert len(jones_to_keep) == gain_data.gain_object2.Njones
for j in jones_to_keep:
assert j in gain_data.gain_object2.jones_array
for j in np.unique(gain_data.gain_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(
old_history + " Downselected to "
"specific jones polarization terms "
"using pyuvdata.",
gain_data.gain_object2.history,
)
# check for errors associated with polarizations not included in data
pytest.raises(ValueError, gain_data.gain_object2.select, jones=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
with uvtest.check_warnings(
UserWarning, "Selected jones polarization terms are not evenly spaced"
):
gain_data.gain_object.select(jones=gain_data.gain_object.jones_array[[0, 1, 3]])
write_file_calfits = os.path.join(DATA_PATH, "test/select_test.calfits")
pytest.raises(ValueError, gain_data.gain_object.write_calfits, write_file_calfits)
def test_select(gain_data):
# now test selecting along all axes at once
old_history = gain_data.gain_object.history
ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(2, 5)]
times_to_keep = gain_data.gain_object.time_array[[1, 2]]
jones_to_keep = [-5]
gain_data.gain_object2.select(
antenna_nums=ants_to_keep,
frequencies=freqs_to_keep,
times=times_to_keep,
jones=jones_to_keep,
)
assert len(ants_to_keep) == gain_data.gain_object2.Nants_data
for ant in ants_to_keep:
assert ant in gain_data.gain_object2.ant_array
for ant in gain_data.gain_object2.ant_array:
assert ant in ants_to_keep
assert len(times_to_keep) == gain_data.gain_object2.Ntimes
for t in times_to_keep:
assert t in gain_data.gain_object2.time_array
for t in np.unique(gain_data.gain_object2.time_array):
assert t in times_to_keep
assert len(freqs_to_keep) == gain_data.gain_object2.Nfreqs
for f in freqs_to_keep:
assert f in gain_data.gain_object2.freq_array
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in freqs_to_keep
assert len(jones_to_keep) == gain_data.gain_object2.Njones
for j in jones_to_keep:
assert j in gain_data.gain_object2.jones_array
for j in np.unique(gain_data.gain_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(
old_history + " Downselected to "
"specific antennas, times, "
"frequencies, jones polarization terms "
"using pyuvdata.",
gain_data.gain_object2.history,
)
@pytest.fixture(scope="function")
def delay_data(tmp_path):
"""Initialization for some basic uvcal tests."""
delay_object = UVCal()
delayfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.delay.calfits")
# add an input flag array to the file to test for that.
write_file = str(tmp_path / "outtest_input_flags.fits")
uv_in = UVCal()
uv_in.read_calfits(delayfile)
uv_in.input_flag_array = np.zeros(
uv_in._input_flag_array.expected_shape(uv_in), dtype=bool
)
uv_in.write_calfits(write_file, clobber=True)
delay_object.read_calfits(write_file)
class DataHolder(object):
def __init__(self, delay_object):
self.delay_object = delay_object
self.delay_object2 = delay_object.copy()
delay_data = DataHolder(delay_object)
# yield the data for testing, then del after tests finish
yield delay_data
del delay_data
def test_select_antennas_delay(delay_data):
old_history = delay_data.delay_object.history
ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
delay_data.delay_object2.select(antenna_nums=ants_to_keep)
assert len(ants_to_keep) == delay_data.delay_object2.Nants_data
for ant in ants_to_keep:
assert ant in delay_data.delay_object2.ant_array
for ant in delay_data.delay_object2.ant_array:
assert ant in ants_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific antennas using pyuvdata.",
delay_data.delay_object2.history,
)
# now test using antenna_names to specify antennas to keep
delay_data.delay_object3 = delay_data.delay_object.copy()
ants_to_keep = np.array(sorted(ants_to_keep))
ant_names = []
for a in ants_to_keep:
ind = np.where(delay_data.delay_object3.antenna_numbers == a)[0][0]
ant_names.append(delay_data.delay_object3.antenna_names[ind])
delay_data.delay_object3.select(antenna_names=ant_names)
assert delay_data.delay_object2 == delay_data.delay_object3
# check for errors associated with antennas not included in data, bad names
# or providing numbers and names
pytest.raises(
ValueError,
delay_data.delay_object.select,
antenna_nums=np.max(delay_data.delay_object.ant_array) + np.arange(1, 3),
)
pytest.raises(ValueError, delay_data.delay_object.select, antenna_names="test1")
pytest.raises(
ValueError,
delay_data.delay_object.select,
antenna_nums=ants_to_keep,
antenna_names=ant_names,
)
# check that total_quality_array is handled properly when present
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
with uvtest.check_warnings(UserWarning, "Cannot preserve total_quality_array"):
delay_data.delay_object.select(antenna_names=ant_names, inplace=True)
assert delay_data.delay_object.total_quality_array is None
def test_select_times_delay(delay_data):
old_history = delay_data.delay_object.history
times_to_keep = delay_data.delay_object.time_array[2:5]
delay_data.delay_object2.select(times=times_to_keep)
assert len(times_to_keep) == delay_data.delay_object2.Ntimes
for t in times_to_keep:
assert t in delay_data.delay_object2.time_array
for t in np.unique(delay_data.delay_object2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific times using pyuvdata.",
delay_data.delay_object2.history,
)
# check for errors associated with times not included in data
pytest.raises(
ValueError,
delay_data.delay_object.select,
times=[
np.min(delay_data.delay_object.time_array)
- delay_data.delay_object.integration_time
],
)
# check for warnings and errors associated with unevenly spaced times
delay_data.delay_object2 = delay_data.delay_object.copy()
with uvtest.check_warnings(UserWarning, "Selected times are not evenly spaced"):
delay_data.delay_object2.select(
times=delay_data.delay_object2.time_array[[0, 2, 3]]
)
write_file_calfits = os.path.join(DATA_PATH, "test/select_test.calfits")
pytest.raises(
ValueError, delay_data.delay_object2.write_calfits, write_file_calfits
)
def test_select_frequencies_delay(delay_data, tmp_path):
old_history = delay_data.delay_object.history
freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(73, 944)]
# add dummy total_quality_array
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
delay_data.delay_object2.total_quality_array = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
delay_data.delay_object2.select(frequencies=freqs_to_keep)
assert len(freqs_to_keep) == delay_data.delay_object2.Nfreqs
for f in freqs_to_keep:
assert f in delay_data.delay_object2.freq_array
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(
old_history + " Downselected to " "specific frequencies using pyuvdata.",
delay_data.delay_object2.history,
)
# check for errors associated with frequencies not included in data
pytest.raises(
ValueError,
delay_data.delay_object.select,
frequencies=[
np.max(delay_data.delay_object.freq_array)
+ delay_data.delay_object.channel_width
],
)
# check for warnings and errors associated with unevenly spaced frequencies
delay_data.delay_object2 = delay_data.delay_object.copy()
with uvtest.check_warnings(
UserWarning, "Selected frequencies are not evenly spaced"
):
delay_data.delay_object2.select(
frequencies=delay_data.delay_object2.freq_array[0, [0, 5, 6]]
)
write_file_calfits = str(tmp_path / "select_test.calfits")
pytest.raises(
ValueError, delay_data.delay_object2.write_calfits, write_file_calfits
)
def test_select_freq_chans_delay(delay_data):
old_history = delay_data.delay_object.history
chans_to_keep = np.arange(73, 944)
# add dummy total_quality_array
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
delay_data.delay_object2.total_quality_array = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
delay_data.delay_object2.select(freq_chans=chans_to_keep)
assert len(chans_to_keep) == delay_data.delay_object2.Nfreqs
for chan in chans_to_keep:
assert (
delay_data.delay_object.freq_array[0, chan]
in delay_data.delay_object2.freq_array
)
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in delay_data.delay_object.freq_array[0, chans_to_keep]
assert uvutils._check_histories(
old_history + " Downselected to " "specific frequencies using pyuvdata.",
delay_data.delay_object2.history,
)
# Test selecting both channels and frequencies
freqs_to_keep = delay_data.delay_object.freq_array[
0, np.arange(930, 1000)
] # Overlaps with chans
all_chans_to_keep = np.arange(73, 1000)
delay_data.delay_object2 = delay_data.delay_object.copy()
delay_data.delay_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
assert len(all_chans_to_keep) == delay_data.delay_object2.Nfreqs
for chan in all_chans_to_keep:
assert (
delay_data.delay_object.freq_array[0, chan]
in delay_data.delay_object2.freq_array
)
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in delay_data.delay_object.freq_array[0, all_chans_to_keep]
def test_select_polarizations_delay(delay_data, tmp_path):
# add more jones terms to allow for better testing of selections
while delay_data.delay_object.Njones < 4:
new_jones = np.min(delay_data.delay_object.jones_array) - 1
delay_data.delay_object.jones_array = np.append(
delay_data.delay_object.jones_array, new_jones
)
delay_data.delay_object.Njones += 1
delay_data.delay_object.flag_array = np.concatenate(
(
delay_data.delay_object.flag_array,
delay_data.delay_object.flag_array[:, :, :, :, [-1]],
),
axis=4,
)
delay_data.delay_object.input_flag_array = np.concatenate(
(
delay_data.delay_object.input_flag_array,
delay_data.delay_object.input_flag_array[:, :, :, :, [-1]],
),
axis=4,
)
delay_data.delay_object.delay_array = np.concatenate(
(
delay_data.delay_object.delay_array,
delay_data.delay_object.delay_array[:, :, :, :, [-1]],
),
axis=4,
)
delay_data.delay_object.quality_array = np.concatenate(
(
delay_data.delay_object.quality_array,
delay_data.delay_object.quality_array[:, :, :, :, [-1]],
),
axis=4,
)
# add dummy total_quality_array
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
assert delay_data.delay_object.check()
delay_data.delay_object2 = delay_data.delay_object.copy()
old_history = delay_data.delay_object.history
jones_to_keep = [-5, -6]
delay_data.delay_object2.select(jones=jones_to_keep)
assert len(jones_to_keep) == delay_data.delay_object2.Njones
for j in jones_to_keep:
assert j in delay_data.delay_object2.jones_array
for j in np.unique(delay_data.delay_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(
old_history + " Downselected to "
"specific jones polarization terms "
"using pyuvdata.",
delay_data.delay_object2.history,
)
# check for errors associated with polarizations not included in data
pytest.raises(ValueError, delay_data.delay_object2.select, jones=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
with uvtest.check_warnings(
UserWarning, "Selected jones polarization terms are not evenly spaced"
):
delay_data.delay_object.select(
jones=delay_data.delay_object.jones_array[[0, 1, 3]]
)
write_file_calfits = str(tmp_path / "select_test.calfits")
pytest.raises(ValueError, delay_data.delay_object.write_calfits, write_file_calfits)
def test_select_delay(delay_data):
# now test selecting along all axes at once
old_history = delay_data.delay_object.history
ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(31, 56)]
times_to_keep = delay_data.delay_object.time_array[[1, 2]]
jones_to_keep = [-5]
delay_data.delay_object2.select(
antenna_nums=ants_to_keep,
frequencies=freqs_to_keep,
times=times_to_keep,
jones=jones_to_keep,
)
assert len(ants_to_keep) == delay_data.delay_object2.Nants_data
for ant in ants_to_keep:
assert ant in delay_data.delay_object2.ant_array
for ant in delay_data.delay_object2.ant_array:
assert ant in ants_to_keep
assert len(times_to_keep) == delay_data.delay_object2.Ntimes
for t in times_to_keep:
assert t in delay_data.delay_object2.time_array
for t in np.unique(delay_data.delay_object2.time_array):
assert t in times_to_keep
assert len(freqs_to_keep) == delay_data.delay_object2.Nfreqs
for f in freqs_to_keep:
assert f in delay_data.delay_object2.freq_array
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in freqs_to_keep
assert len(jones_to_keep) == delay_data.delay_object2.Njones
for j in jones_to_keep:
assert j in delay_data.delay_object2.jones_array
for j in np.unique(delay_data.delay_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(
old_history + " Downselected to "
"specific antennas, times, "
"frequencies, jones polarization terms "
"using pyuvdata.",
delay_data.delay_object2.history,
)
def test_add_antennas(gain_data):
"""Test adding antennas between two UVCal objects"""
gain_object_full = gain_data.gain_object.copy()
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
gain_data.gain_object.select(antenna_nums=ants1)
gain_data.gain_object2.select(antenna_nums=ants2)
gain_data.gain_object += gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
gain_object_full.history + " Downselected to specific "
"antennas using pyuvdata. Combined "
"data along antenna axis using pyuvdata.",
gain_data.gain_object.history,
)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
# test for when total_quality_array is present
gain_data.gain_object.select(antenna_nums=ants1)
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
with uvtest.check_warnings(UserWarning, "Total quality array detected"):
gain_data.gain_object.__iadd__(gain_data.gain_object2)
assert gain_data.gain_object.total_quality_array is None
def test_add_frequencies(gain_data):
"""Test adding frequencies between two UVCal objects"""
gain_object_full = gain_data.gain_object.copy()
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
gain_data.gain_object += gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
gain_object_full.history + " Downselected to specific "
"frequencies using pyuvdata. Combined "
"data along frequency axis using pyuvdata.",
gain_data.gain_object.history,
)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
# test for when total_quality_array is present in first file but not second
gain_data.gain_object.select(frequencies=freqs1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
gain_data.gain_object.select(frequencies=freqs1)
tqa = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
gain_data.gain_object.total_quality_array = None
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
gain_data.gain_object.select(frequencies=freqs1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# Out of order - freqs
gain_data.gain_object = gain_object_full.copy()
gain_data.gain_object2 = gain_object_full.copy()
gain_data.gain_object.select(frequencies=freqs2)
gain_data.gain_object2.select(frequencies=freqs1)
gain_data.gain_object += gain_data.gain_object2
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
def test_add_times(gain_data):
"""Test adding times between two UVCal objects"""
gain_object_full = gain_data.gain_object.copy()
n_times2 = gain_data.gain_object.Ntimes // 2
times1 = gain_data.gain_object.time_array[:n_times2]
times2 = gain_data.gain_object.time_array[n_times2:]
gain_data.gain_object.select(times=times1)
gain_data.gain_object2.select(times=times2)
gain_data.gain_object += gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
gain_object_full.history + " Downselected to specific "
"times using pyuvdata. Combined "
"data along time axis using pyuvdata.",
gain_data.gain_object.history,
)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
# test for when total_quality_array is present in first file but not second
gain_data.gain_object.select(times=times1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
gain_data.gain_object.select(times=times1)
tqa = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
gain_data.gain_object.total_quality_array = None
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
gain_data.gain_object.select(times=times1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
def test_add_jones(gain_data):
"""Test adding Jones axes between two UVCal objects"""
gain_object_original = gain_data.gain_object.copy()
# artificially change the Jones value to permit addition
gain_data.gain_object2.jones_array[0] = -6
gain_data.gain_object += gain_data.gain_object2
# check dimensionality of resulting object
assert gain_data.gain_object.gain_array.shape[-1] == 2
assert sorted(gain_data.gain_object.jones_array) == [-6, -5]
# test for when total_quality_array is present in first file but not second
gain_data.gain_object = gain_object_original.copy()
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
gain_data.gain_object = gain_object_original.copy()
tqa = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
gain_data.gain_object = gain_object_original.copy()
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object)
)
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(
gain_data.gain_object2
)
)
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(
gain_data.gain_object.total_quality_array,
tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1],
)
def test_add(gain_data):
"""Test miscellaneous aspects of add method"""
# test not-in-place addition
gain_object = gain_data.gain_object.copy()
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
gain_data.gain_object.select(antenna_nums=ants1)
gain_data.gain_object2.select(antenna_nums=ants2)
gain_object_add = gain_data.gain_object + gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
gain_object.history + " Downselected to specific "
"antennas using pyuvdata. Combined "
"data along antenna axis using pyuvdata.",
gain_object_add.history,
)
gain_object_add.history = gain_object.history
assert gain_object_add == gain_object
# test history concatenation
gain_data.gain_object.history = gain_object.history
gain_data.gain_object2.history = "Some random history string OMNI_RUN:"
gain_data.gain_object += gain_data.gain_object2
assert uvutils._check_histories(
gain_object.history + " Combined data along antenna axis "
"using pyuvdata. Some random "
"history string",
gain_data.gain_object.history,
)
def test_add_multiple_axes(gain_data):
"""Test addition along multiple axes"""
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
n_times2 = gain_data.gain_object.Ntimes // 2
times1 = gain_data.gain_object.time_array[:n_times2]
times2 = gain_data.gain_object.time_array[n_times2:]
# artificially change the Jones value to permit addition
gain_data.gain_object2.jones_array[0] = -6
# perform select
gain_data.gain_object.select(antenna_nums=ants1, frequencies=freqs1, times=times1)
gain_data.gain_object2.select(antenna_nums=ants2, frequencies=freqs2, times=times2)
gain_data.gain_object += gain_data.gain_object2
# check resulting dimensionality
assert len(gain_data.gain_object.ant_array) == 19
assert len(gain_data.gain_object.freq_array[0, :]) == 10
assert len(gain_data.gain_object.time_array) == gain_data.gain_object.Ntimes
assert len(gain_data.gain_object.jones_array) == 2
def test_add_errors(gain_data):
"""Test behavior that will raise errors"""
# test addition of two identical objects
pytest.raises(ValueError, gain_data.gain_object.__add__, gain_data.gain_object2)
# test addition of UVCal and non-UVCal object (empty list)
pytest.raises(ValueError, gain_data.gain_object.__add__, [])
# test compatibility param mismatch
gain_data.gain_object2.telescope_name = "PAPER"
pytest.raises(ValueError, gain_data.gain_object.__add__, gain_data.gain_object2)
def test_jones_warning(gain_data):
"""Test having non-contiguous Jones elements"""
gain_data.gain_object2.jones_array[0] = -6
gain_data.gain_object += gain_data.gain_object2
gain_data.gain_object2.jones_array[0] = -8
with uvtest.check_warnings(UserWarning, "Combined Jones elements"):
gain_data.gain_object.__iadd__(gain_data.gain_object2)
assert sorted(gain_data.gain_object.jones_array) == [-8, -6, -5]
def test_frequency_warnings(gain_data):
"""Test having uneven or non-contiguous frequencies"""
# test having unevenly spaced frequency separations
go1 = gain_data.gain_object.copy()
go2 = gain_data.gain_object2.copy()
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
# change the last frequency bin to be smaller than the others
df = (
gain_data.gain_object2.freq_array[0, -1]
- gain_data.gain_object2.freq_array[0, -2]
)
gain_data.gain_object2.freq_array[0, -1] = (
gain_data.gain_object2.freq_array[0, -2] + df / 2
)
with uvtest.check_warnings(
UserWarning, "Combined frequencies are not evenly spaced"
):
gain_data.gain_object.__iadd__(gain_data.gain_object2)
assert len(gain_data.gain_object.freq_array[0, :]) == gain_data.gain_object.Nfreqs
# now check having "non-contiguous" frequencies
gain_data.gain_object = go1.copy()
gain_data.gain_object2 = go2.copy()
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
# artificially space out frequencies
gain_data.gain_object.freq_array[0, :] *= 10
gain_data.gain_object2.freq_array[0, :] *= 10
with uvtest.check_warnings(UserWarning, "Combined frequencies are not contiguous"):
gain_data.gain_object.__iadd__(gain_data.gain_object2)
freqs1 *= 10
freqs2 *= 10
freqs = np.concatenate([freqs1, freqs2])
assert np.allclose(
gain_data.gain_object.freq_array[0, :],
freqs,
rtol=gain_data.gain_object._freq_array.tols[0],
atol=gain_data.gain_object._freq_array.tols[1],
)
def test_parameter_warnings(gain_data):
"""Test changing a parameter that will raise a warning"""
# change observer and select frequencies
gain_data.gain_object2.observer = "mystery_person"
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
with uvtest.check_warnings(UserWarning, "UVParameter observer does not match"):
gain_data.gain_object.__iadd__(gain_data.gain_object2)
freqs = np.concatenate([freqs1, freqs2])
assert np.allclose(
gain_data.gain_object.freq_array,
freqs,
rtol=gain_data.gain_object._freq_array.tols[0],
atol=gain_data.gain_object._freq_array.tols[1],
)
def test_multi_files(gain_data, tmp_path):
"""Test read function when multiple files are included"""
gain_object_full = gain_data.gain_object.copy()
n_times2 = gain_data.gain_object.Ntimes // 2
# Break up delay object into two objects, divided in time
times1 = gain_data.gain_object.time_array[:n_times2]
times2 = gain_data.gain_object.time_array[n_times2:]
gain_data.gain_object.select(times=times1)
gain_data.gain_object2.select(times=times2)
# Write those objects to files
f1 = str(tmp_path / "read_multi1.calfits")
f2 = str(tmp_path / "read_multi2.calfits")
gain_data.gain_object.write_calfits(f1, clobber=True)
gain_data.gain_object2.write_calfits(f2, clobber=True)
# Read both files together
gain_data.gain_object.read_calfits([f1, f2])
assert uvutils._check_histories(
gain_object_full.history + " Downselected to specific times"
" using pyuvdata. Combined data "
"along time axis using pyuvdata.",
gain_data.gain_object.history,
)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
def test_add_antennas_delay(delay_data):
"""Test adding antennas between two UVCal objects"""
delay_object_full = delay_data.delay_object.copy()
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
delay_data.delay_object.select(antenna_nums=ants1)
delay_data.delay_object2.select(antenna_nums=ants2)
delay_data.delay_object += delay_data.delay_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
delay_object_full.history + " Downselected to specific "
"antennas using pyuvdata. Combined "
"data along antenna axis using pyuvdata.",
delay_data.delay_object.history,
)
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
# test for when total_quality_array is present
delay_data.delay_object.select(antenna_nums=ants1)
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
with uvtest.check_warnings(UserWarning, "Total quality array detected"):
delay_data.delay_object.__iadd__(delay_data.delay_object2)
assert delay_data.delay_object.total_quality_array is None
# test for when input_flag_array is present in first file but not second
delay_data.delay_object.select(antenna_nums=ants1)
ifa = np.zeros(
delay_data.delay_object._input_flag_array.expected_shape(
delay_data.delay_object
)
).astype(np.bool)
ifa2 = np.ones(
delay_data.delay_object2._input_flag_array.expected_shape(
delay_data.delay_object2
)
).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=0)
delay_data.delay_object.input_flag_array = ifa
delay_data.delay_object2.input_flag_array = None
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
delay_data.delay_object.select(antenna_nums=ants1)
ifa = np.ones(
delay_data.delay_object._input_flag_array.expected_shape(
delay_data.delay_object
)
).astype(np.bool)
ifa2 = np.zeros(
delay_data.delay_object2._input_flag_array.expected_shape(
delay_data.delay_object2
)
).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=0)
delay_data.delay_object.input_flag_array = None
delay_data.delay_object2.input_flag_array = ifa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# Out of order - antennas
delay_data.delay_object = delay_object_full.copy()
delay_data.delay_object2 = delay_data.delay_object.copy()
delay_data.delay_object.select(antenna_nums=ants2)
delay_data.delay_object2.select(antenna_nums=ants1)
delay_data.delay_object += delay_data.delay_object2
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
def test_add_times_delay(delay_data):
"""Test adding times between two UVCal objects"""
delay_object_full = delay_data.delay_object.copy()
n_times2 = delay_data.delay_object.Ntimes // 2
times1 = delay_data.delay_object.time_array[:n_times2]
times2 = delay_data.delay_object.time_array[n_times2:]
delay_data.delay_object.select(times=times1)
delay_data.delay_object2.select(times=times2)
delay_data.delay_object += delay_data.delay_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
delay_object_full.history + " Downselected to specific "
"times using pyuvdata. Combined "
"data along time axis using pyuvdata.",
delay_data.delay_object.history,
)
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
# test for when total_quality_array is present in first file but not second
delay_data.delay_object.select(times=times1)
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
tqa2 = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(
delay_data.delay_object.total_quality_array,
tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
delay_data.delay_object.select(times=times1)
tqa = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
delay_data.delay_object.total_quality_array = None
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(
delay_data.delay_object.total_quality_array,
tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
delay_data.delay_object.select(times=times1)
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(
delay_data.delay_object.total_quality_array,
tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1],
)
# test for when input_flag_array is present in first file but not second
delay_data.delay_object.select(times=times1)
ifa = np.zeros(
delay_data.delay_object._input_flag_array.expected_shape(
delay_data.delay_object
)
).astype(np.bool)
ifa2 = np.ones(
delay_data.delay_object2._input_flag_array.expected_shape(
delay_data.delay_object2
)
).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=3)
delay_data.delay_object.input_flag_array = ifa
delay_data.delay_object2.input_flag_array = None
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
delay_data.delay_object.select(times=times1)
ifa = np.ones(
delay_data.delay_object._input_flag_array.expected_shape(
delay_data.delay_object
)
).astype(np.bool)
ifa2 = np.zeros(
delay_data.delay_object2._input_flag_array.expected_shape(
delay_data.delay_object2
)
).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=3)
delay_data.delay_object.input_flag_array = None
delay_data.delay_object2.input_flag_array = ifa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# Out of order - times
delay_data.delay_object = delay_object_full.copy()
delay_data.delay_object2 = delay_data.delay_object.copy()
delay_data.delay_object.select(times=times2)
delay_data.delay_object2.select(times=times1)
delay_data.delay_object += delay_data.delay_object2
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
def test_add_jones_delay(delay_data):
"""Test adding Jones axes between two UVCal objects"""
delay_object_original = delay_data.delay_object.copy()
# artificially change the Jones value to permit addition
delay_data.delay_object2.jones_array[0] = -6
delay_data.delay_object += delay_data.delay_object2
# check dimensionality of resulting object
assert delay_data.delay_object.delay_array.shape[-1] == 2
assert sorted(delay_data.delay_object.jones_array) == [-6, -5]
# test for when total_quality_array is present in first file but not second
delay_data.delay_object = delay_object_original.copy()
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
tqa2 = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(
delay_data.delay_object.total_quality_array,
tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in second file but not first
delay_data.delay_object = delay_object_original.copy()
tqa = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(
delay_data.delay_object.total_quality_array,
tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1],
)
# test for when total_quality_array is present in both
delay_data.delay_object = delay_object_original.copy()
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(
delay_data.delay_object
)
)
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(
delay_data.delay_object2
)
)
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(
delay_data.delay_object.total_quality_array,
tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1],
)
# test for when input_flag_array is present in first file but not second
delay_data.delay_object = delay_object_original.copy()
ifa = np.zeros(
delay_data.delay_object._input_flag_array.expected_shape(
delay_data.delay_object
)
).astype(np.bool)
ifa2 = np.ones(
delay_data.delay_object2._input_flag_array.expected_shape(
delay_data.delay_object2
)
).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=4)
delay_data.delay_object.input_flag_array = ifa
delay_data.delay_object2.input_flag_array = None
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
delay_data.delay_object = delay_object_original.copy()
ifa = np.ones(
delay_data.delay_object._input_flag_array.expected_shape(
delay_data.delay_object
)
).astype(np.bool)
ifa2 = np.zeros(
delay_data.delay_object2._input_flag_array.expected_shape(
delay_data.delay_object2
)
).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=4)
delay_data.delay_object.input_flag_array = None
delay_data.delay_object2.input_flag_array = ifa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# Out of order - jones
delay_data.delay_object = delay_object_original.copy()
delay_data.delay_object2 = delay_object_original.copy()
delay_data.delay_object.jones_array[0] = -6
delay_data.delay_object += delay_data.delay_object2
delay_data.delay_object2 = delay_data.delay_object.copy()
delay_data.delay_object.select(jones=-5)
delay_data.delay_object.history = delay_object_original.history
assert delay_data.delay_object == delay_object_original
delay_data.delay_object2.select(jones=-6)
delay_data.delay_object2.jones_array[:] = -5
delay_data.delay_object2.history = delay_object_original.history
assert delay_data.delay_object2 == delay_object_original
def test_add_errors_delay(delay_data):
"""Test behavior that will raise errors"""
# test addition of two identical objects
pytest.raises(ValueError, delay_data.delay_object.__add__, delay_data.delay_object2)
def test_multi_files_delay(delay_data, tmp_path):
"""Test read function when multiple files are included"""
delay_object_full = delay_data.delay_object.copy()
n_times2 = delay_data.delay_object.Ntimes // 2
# Break up delay object into two objects, divided in time
times1 = delay_data.delay_object.time_array[:n_times2]
times2 = delay_data.delay_object.time_array[n_times2:]
delay_data.delay_object.select(times=times1)
delay_data.delay_object2.select(times=times2)
# Write those objects to files
f1 = str(tmp_path / "read_multi1.calfits")
f2 = str(tmp_path / "read_multi2.calfits")
delay_data.delay_object.write_calfits(f1, clobber=True)
delay_data.delay_object2.write_calfits(f2, clobber=True)
# Read both files together
delay_data.delay_object.read_calfits([f1, f2])
assert uvutils._check_histories(
delay_object_full.history + " Downselected to specific times"
" using pyuvdata. Combined data "
"along time axis using pyuvdata.",
delay_data.delay_object.history,
)
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
def test_uvcal_get_methods():
# load data
uvc = UVCal()
uvc.read_calfits(os.path.join(DATA_PATH, "zen.2457698.40355.xx.gain.calfits"))
# test get methods: add in a known value and make sure it is returned
key = (10, "Jee")
uvc.gain_array[1] = 0.0
d, f, q = uvc.get_gains(key), uvc.get_flags(key), uvc.get_quality(key)
# test shapes
assert np.all(np.isclose(d, 0.0))
assert d.shape == (uvc.Nfreqs, uvc.Ntimes)
assert f.shape == (uvc.Nfreqs, uvc.Ntimes)
assert q.shape == (uvc.Nfreqs, uvc.Ntimes)
# test against by-hand indexing
np.testing.assert_array_almost_equal(
d,
uvc.gain_array[
uvc.ant_array.tolist().index(10),
0,
:,
:,
uvc.jones_array.tolist().index(-5),
],
)
# test variable key input
d2 = uvc.get_gains(*key)
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(key[0])
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(key[:1])
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(10, -5)
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(10, "x")
np.testing.assert_array_almost_equal(d, d2)
# check has_key
assert uvc._has_key(antnum=10)
assert uvc._has_key(jpol="Jee")
assert uvc._has_key(antnum=10, jpol="Jee")
assert not uvc._has_key(antnum=10, jpol="Jnn")
assert not uvc._has_key(antnum=101, jpol="Jee")
# test exceptions
pytest.raises(ValueError, uvc.get_gains, 1)
pytest.raises(ValueError, uvc.get_gains, (10, "Jnn"))
uvc.cal_type = "delay"
pytest.raises(ValueError, uvc.get_gains, 10)
def test_write_read_optional_attrs(tmp_path):
# read a test file
cal_in = UVCal()
testfile = os.path.join(DATA_PATH, "zen.2457698.40355.xx.gain.calfits")
cal_in.read_calfits(testfile)
# set some optional parameters
cal_in.gain_scale = "Jy"
cal_in.sky_field = "GLEAM"
# write
write_file_calfits = str(tmp_path / "test.calfits")
cal_in.write_calfits(write_file_calfits, clobber=True)
# read and compare
cal_in2 = UVCal()
cal_in2.read_calfits(write_file_calfits)
assert cal_in == cal_in2