test_uvcal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for uvcal object.
"""
from __future__ import absolute_import, division, print_function
import pytest
import os
import numpy as np
import copy
from pyuvdata import UVCal
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
@pytest.fixture(scope='function')
def uvcal_data():
"""Set up some uvcal iter tests."""
required_parameters = ['_Nfreqs', '_Njones', '_Ntimes', '_Nspws',
'_Nants_data', '_Nants_telescope',
'_antenna_names', '_antenna_numbers',
'_ant_array',
'_telescope_name', '_freq_array',
'_channel_width', '_spw_array',
'_jones_array', '_time_array', '_time_range',
'_integration_time',
'_gain_convention', '_flag_array',
'_quality_array', '_cal_type', '_cal_style',
'_x_orientation', '_history']
required_properties = ['Nfreqs', 'Njones', 'Ntimes', 'Nspws',
'Nants_data', 'Nants_telescope',
'antenna_names', 'antenna_numbers',
'ant_array',
'telescope_name', 'freq_array',
'channel_width', 'spw_array',
'jones_array', 'time_array', 'time_range',
'integration_time',
'gain_convention', 'flag_array',
'quality_array', 'cal_type', 'cal_style',
'x_orientation', 'history']
extra_parameters = ['_gain_array', '_delay_array', '_sky_field',
'_sky_catalog', '_ref_antenna_name', '_Nsources',
'_baseline_range', '_diffuse_model',
'_input_flag_array', '_freq_range',
'_observer', '_git_origin_cal',
'_git_hash_cal', '_total_quality_array',
'_extra_keywords', '_gain_scale']
extra_properties = ['gain_array', 'delay_array', 'sky_field',
'sky_catalog', 'ref_antenna_name', 'Nsources',
'baseline_range', 'diffuse_model',
'input_flag_array', 'freq_range',
'observer', 'git_origin_cal',
'git_hash_cal', 'total_quality_array',
'extra_keywords', 'gain_scale']
other_properties = ['pyuvdata_version_str']
uv_cal_object = UVCal()
# yields the data we need but will continue to the del call after tests
yield uv_cal_object, required_parameters, required_properties, extra_parameters, extra_properties, other_properties
# some post-test object cleanup
del(uv_cal_object)
return
def test_parameter_iter(uvcal_data):
"""Test expected parameters."""
(uv_cal_object, required_parameters, required_properties,
extra_parameters, extra_properties, other_properties) = uvcal_data
all = []
for prop in uv_cal_object:
all.append(prop)
for a in required_parameters + extra_parameters:
assert a in all, 'expected attribute ' + a + ' not returned in object iterator'
def test_required_parameter_iter(uvcal_data):
"""Test expected required parameters."""
(uv_cal_object, required_parameters, required_properties,
extra_parameters, extra_properties, other_properties) = uvcal_data
required = []
for prop in uv_cal_object.required():
required.append(prop)
for a in required_parameters:
assert a in required, 'expected attribute ' + a + ' not returned in required iterator'
def test_unexpected_parameters(uvcal_data):
"""Test for extra parameters."""
(uv_cal_object, required_parameters, required_properties,
extra_parameters, extra_properties, other_properties) = uvcal_data
expected_parameters = required_parameters + extra_parameters
attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] == '_']
for a in attributes:
assert a in expected_parameters, 'unexpected parameter ' + a + ' found in UVCal'
def test_unexpected_attributes(uvcal_data):
"""Test for extra attributes."""
(uv_cal_object, required_parameters, required_properties,
extra_parameters, extra_properties, other_properties) = uvcal_data
expected_attributes = required_properties + \
extra_properties + other_properties
attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] != '_']
for a in attributes:
assert a in expected_attributes, 'unexpected attribute ' + a + ' found in UVCal'
def test_properties(uvcal_data):
"""Test that properties can be get and set properly."""
(uv_cal_object, required_parameters, required_properties,
extra_parameters, extra_properties, other_properties) = uvcal_data
prop_dict = dict(list(zip(required_properties + extra_properties,
required_parameters + extra_parameters)))
for k, v in prop_dict.items():
rand_num = np.random.rand()
setattr(uv_cal_object, k, rand_num)
this_param = getattr(uv_cal_object, v)
try:
assert rand_num == this_param.value
except AssertionError:
print('setting {prop_name} to a random number failed'.format(prop_name=k))
raise
@pytest.fixture(scope='function')
def gain_data():
"""Initialize for some basic uvcal tests."""
gain_object = UVCal()
gainfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.gain.calfits')
gain_object.read_calfits(gainfile)
gain_object2 = copy.deepcopy(gain_object)
delay_object = UVCal()
delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.delay.calfits')
delay_object.read_calfits(delayfile)
class DataHolder(object):
def __init__(self, gain_object, gain_object2, delay_object):
self.gain_object = gain_object
self.gain_object2 = gain_object2
self.delay_object = delay_object
gain_data = DataHolder(gain_object, gain_object2, delay_object)
yield gain_data
del(gain_data)
def test_equality(gain_data):
"""Basic equality test"""
assert gain_data.gain_object == gain_data.gain_object
def test_check(gain_data):
"""Test that parameter checks run properly"""
assert gain_data.gain_object.check()
def test_nants_data_telescope_larger(gain_data):
# make sure it's okay for Nants_telescope to be strictly greater than Nants_data
gain_data.gain_object.Nants_telescope += 1
# add dummy information for "new antenna" to pass object check
gain_data.gain_object.antenna_names = np.concatenate(
(gain_data.gain_object.antenna_names, ["dummy_ant"]))
gain_data.gain_object.antenna_numbers = np.concatenate(
(gain_data.gain_object.antenna_numbers, [20]))
assert gain_data.gain_object.check()
def test_ant_array_not_in_antnums(gain_data):
# make sure an error is raised if antennas with data not in antenna_numbers
# remove antennas from antenna_names & antenna_numbers by hand
gain_data.gain_object.antenna_names = gain_data.gain_object.antenna_names[1:]
gain_data.gain_object.antenna_numbers = gain_data.gain_object.antenna_numbers[1:]
gain_data.gain_object.Nants_telescope = gain_data.gain_object.antenna_numbers.size
with pytest.raises(ValueError) as cm:
gain_data.gain_object.check()
assert str(cm.value).startswith('All antennas in ant_array must be in antenna_numbers')
def test_set_gain(gain_data):
gain_data.delay_object.set_gain()
assert gain_data.delay_object._gain_array.required
assert not gain_data.delay_object._delay_array.required
assert gain_data.delay_object._gain_array.form == gain_data.delay_object._flag_array.form
assert gain_data.delay_object._gain_array.form == gain_data.delay_object._quality_array.form
def test_set_delay(gain_data):
gain_data.gain_object.set_delay()
assert gain_data.gain_object._delay_array.required
assert not gain_data.gain_object._gain_array.required
assert gain_data.gain_object._gain_array.form == gain_data.gain_object._flag_array.form
assert gain_data.gain_object._delay_array.form == gain_data.gain_object._quality_array.form
def test_set_unknown(gain_data):
gain_data.gain_object.set_unknown_cal_type()
assert not gain_data.gain_object._delay_array.required
assert not gain_data.gain_object._gain_array.required
assert gain_data.gain_object._gain_array.form == gain_data.gain_object._flag_array.form
assert gain_data.gain_object._gain_array.form == gain_data.gain_object._quality_array.form
def test_convert_filetype(gain_data):
# error testing
pytest.raises(ValueError, gain_data.gain_object._convert_to_filetype, 'uvfits')
def test_convert_to_gain(gain_data):
conventions = ['minus', 'plus']
for c in conventions:
gain_data.new_object = copy.deepcopy(gain_data.delay_object)
gain_data.new_object.convert_to_gain(delay_convention=c)
assert np.isclose(np.max(np.absolute(gain_data.new_object.gain_array)), 1.,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1])
assert np.isclose(np.min(np.absolute(gain_data.new_object.gain_array)), 1.,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1])
if c == 'minus':
conv = -1
else:
conv = 1
assert np.allclose(np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
(conv * 2 * np.pi * gain_data.delay_object.delay_array[:, :, 0, :, :]
* gain_data.delay_object.freq_array[0, 10]) % (2 * np.pi),
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1])
assert np.allclose(gain_data.delay_object.quality_array,
gain_data.new_object.quality_array[:, :, 10, :, :],
rtol=gain_data.new_object._quality_array.tols[0],
atol=gain_data.new_object._quality_array.tols[1])
assert gain_data.new_object.history == (gain_data.delay_object.history
+ ' Converted from delays to gains using pyuvdata.')
# test a file with a total_quality_array
gain_data.new_object = copy.deepcopy(gain_data.delay_object)
tqa_size = gain_data.new_object.delay_array.shape[1:]
gain_data.new_object.total_quality_array = np.ones(tqa_size)
gain_data.new_object.convert_to_gain(delay_convention='minus')
assert np.isclose(np.max(np.absolute(gain_data.new_object.gain_array)), 1.,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1])
assert np.isclose(np.min(np.absolute(gain_data.new_object.gain_array)), 1.,
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1])
assert np.allclose(np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
(-1 * 2 * np.pi * gain_data.delay_object.delay_array[:, :, 0, :, :]
* gain_data.delay_object.freq_array[0, 10]) % (2 * np.pi),
rtol=gain_data.new_object._gain_array.tols[0],
atol=gain_data.new_object._gain_array.tols[1])
assert np.allclose(gain_data.delay_object.quality_array,
gain_data.new_object.quality_array[:, :, 10, :, :],
rtol=gain_data.new_object._quality_array.tols[0],
atol=gain_data.new_object._quality_array.tols[1])
assert gain_data.new_object.history == (gain_data.delay_object.history
+ ' Converted from delays to gains using pyuvdata.')
# error testing
pytest.raises(ValueError, gain_data.delay_object.convert_to_gain, delay_convention='bogus')
pytest.raises(ValueError, gain_data.gain_object.convert_to_gain)
gain_data.gain_object.set_unknown_cal_type()
pytest.raises(ValueError, gain_data.gain_object.convert_to_gain)
def test_select_antennas(gain_data):
old_history = gain_data.gain_object.history
ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
gain_data.gain_object2.select(antenna_nums=ants_to_keep)
assert len(ants_to_keep) == gain_data.gain_object2.Nants_data
for ant in ants_to_keep:
assert ant in gain_data.gain_object2.ant_array
for ant in gain_data.gain_object2.ant_array:
assert ant in ants_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
gain_data.gain_object2.history)
# now test using antenna_names to specify antennas to keep
ants_to_keep = np.array(sorted(list(ants_to_keep)))
ant_names = []
for a in ants_to_keep:
ind = np.where(gain_data.gain_object.antenna_numbers == a)[0][0]
ant_names.append(gain_data.gain_object.antenna_names[ind])
gain_data.gain_object3 = gain_data.gain_object.select(antenna_names=ant_names, inplace=False)
assert gain_data.gain_object2 == gain_data.gain_object3
# check for errors associated with antennas not included in data, bad names or providing numbers and names
pytest.raises(ValueError, gain_data.gain_object.select,
antenna_nums=np.max(gain_data.gain_object.ant_array) + np.arange(1, 3))
pytest.raises(ValueError, gain_data.gain_object.select, antenna_names='test1')
pytest.raises(ValueError, gain_data.gain_object.select,
antenna_nums=ants_to_keep, antenna_names=ant_names)
# check that write_calfits works with Nants_data < Nants_telescope
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)
# check that reading it back in works too
new_gain_object = UVCal()
new_gain_object.read_calfits(write_file_calfits)
assert gain_data.gain_object2 == new_gain_object
# check that total_quality_array is handled properly when present
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
uvtest.checkWarnings(gain_data.gain_object.select, [], {'antenna_names': ant_names,
'inplace': True},
message='Cannot preserve total_quality_array')
assert gain_data.gain_object.total_quality_array is None
def test_select_times(gain_data):
# self.gain_object2 = copy.deepcopy(self.gain_object)
old_history = gain_data.gain_object.history
times_to_keep = gain_data.gain_object.time_array[2:5]
gain_data.gain_object2.select(times=times_to_keep)
assert len(times_to_keep) == gain_data.gain_object2.Ntimes
for t in times_to_keep:
assert t in gain_data.gain_object2.time_array
for t in np.unique(gain_data.gain_object2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
gain_data.gain_object2.history)
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
# test writing calfits with only one time
gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
times_to_keep = gain_data.gain_object.time_array[[1]]
gain_data.gain_object2.select(times=times_to_keep)
gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)
# check for errors associated with times not included in data
pytest.raises(ValueError, gain_data.gain_object.select,
times=[np.min(gain_data.gain_object.time_array) - gain_data.gain_object.integration_time])
# check for warnings and errors associated with unevenly spaced times
gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
uvtest.checkWarnings(gain_data.gain_object2.select, [], {'times': gain_data.gain_object2.time_array[[0, 2, 3]]},
message='Selected times are not evenly spaced')
pytest.raises(ValueError, gain_data.gain_object2.write_calfits, write_file_calfits)
def test_select_frequencies(gain_data):
old_history = gain_data.gain_object.history
freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(4, 8)]
# add dummy total_quality_array
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
gain_data.gain_object2.total_quality_array = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
gain_data.gain_object2.select(frequencies=freqs_to_keep)
assert len(freqs_to_keep) == gain_data.gain_object2.Nfreqs
for f in freqs_to_keep:
assert f in gain_data.gain_object2.freq_array
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
gain_data.gain_object2.history)
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
# test writing calfits with only one frequency
gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
freqs_to_keep = gain_data.gain_object.freq_array[0, 5]
gain_data.gain_object2.select(frequencies=freqs_to_keep)
gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)
# check for errors associated with frequencies not included in data
pytest.raises(ValueError, gain_data.gain_object.select,
frequencies=[np.max(gain_data.gain_object.freq_array) + gain_data.gain_object.channel_width])
# check for warnings and errors associated with unevenly spaced frequencies
gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
uvtest.checkWarnings(gain_data.gain_object2.select, [], {'frequencies': gain_data.gain_object2.freq_array[0, [0, 5, 6]]},
message='Selected frequencies are not evenly spaced')
pytest.raises(ValueError, gain_data.gain_object2.write_calfits, write_file_calfits)
def test_select_freq_chans(gain_data):
old_history = gain_data.gain_object.history
chans_to_keep = np.arange(4, 8)
# add dummy total_quality_array
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
gain_data.gain_object2.total_quality_array = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
gain_data.gain_object2.select(freq_chans=chans_to_keep)
assert len(chans_to_keep) == gain_data.gain_object2.Nfreqs
for chan in chans_to_keep:
assert gain_data.gain_object.freq_array[0, chan] in gain_data.gain_object2.freq_array
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in gain_data.gain_object.freq_array[0, chans_to_keep]
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
gain_data.gain_object2.history)
# Test selecting both channels and frequencies
freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(7, 10)] # Overlaps with chans
all_chans_to_keep = np.arange(4, 10)
gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
gain_data.gain_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
assert len(all_chans_to_keep) == gain_data.gain_object2.Nfreqs
for chan in all_chans_to_keep:
assert gain_data.gain_object.freq_array[0, chan] in gain_data.gain_object2.freq_array
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in gain_data.gain_object.freq_array[0, all_chans_to_keep]
def test_select_polarizations(gain_data):
# add more jones terms to allow for better testing of selections
while gain_data.gain_object.Njones < 4:
new_jones = np.min(gain_data.gain_object.jones_array) - 1
gain_data.gain_object.jones_array = np.append(gain_data.gain_object.jones_array, new_jones)
gain_data.gain_object.Njones += 1
gain_data.gain_object.flag_array = np.concatenate((gain_data.gain_object.flag_array,
gain_data.gain_object.flag_array[:, :, :, :, [-1]]),
axis=4)
gain_data.gain_object.gain_array = np.concatenate((gain_data.gain_object.gain_array,
gain_data.gain_object.gain_array[:, :, :, :, [-1]]),
axis=4)
gain_data.gain_object.quality_array = np.concatenate((gain_data.gain_object.quality_array,
gain_data.gain_object.quality_array[:, :, :, :, [-1]]),
axis=4)
# add dummy total_quality_array
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
assert gain_data.gain_object.check()
gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
old_history = gain_data.gain_object.history
jones_to_keep = [-5, -6]
gain_data.gain_object2.select(jones=jones_to_keep)
assert len(jones_to_keep) == gain_data.gain_object2.Njones
for j in jones_to_keep:
assert j in gain_data.gain_object2.jones_array
for j in np.unique(gain_data.gain_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific jones polarization terms '
'using pyuvdata.',
gain_data.gain_object2.history)
# check for errors associated with polarizations not included in data
pytest.raises(ValueError, gain_data.gain_object2.select, jones=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
uvtest.checkWarnings(gain_data.gain_object.select, [], {'jones': gain_data.gain_object.jones_array[[0, 1, 3]]},
message='Selected jones polarization terms are not evenly spaced')
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
pytest.raises(ValueError, gain_data.gain_object.write_calfits, write_file_calfits)
def test_select(gain_data):
# now test selecting along all axes at once
old_history = gain_data.gain_object.history
ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(2, 5)]
times_to_keep = gain_data.gain_object.time_array[[1, 2]]
jones_to_keep = [-5]
gain_data.gain_object2.select(antenna_nums=ants_to_keep, frequencies=freqs_to_keep,
times=times_to_keep, jones=jones_to_keep)
assert len(ants_to_keep) == gain_data.gain_object2.Nants_data
for ant in ants_to_keep:
assert ant in gain_data.gain_object2.ant_array
for ant in gain_data.gain_object2.ant_array:
assert ant in ants_to_keep
assert len(times_to_keep) == gain_data.gain_object2.Ntimes
for t in times_to_keep:
assert t in gain_data.gain_object2.time_array
for t in np.unique(gain_data.gain_object2.time_array):
assert t in times_to_keep
assert len(freqs_to_keep) == gain_data.gain_object2.Nfreqs
for f in freqs_to_keep:
assert f in gain_data.gain_object2.freq_array
for f in np.unique(gain_data.gain_object2.freq_array):
assert f in freqs_to_keep
assert len(jones_to_keep) == gain_data.gain_object2.Njones
for j in jones_to_keep:
assert j in gain_data.gain_object2.jones_array
for j in np.unique(gain_data.gain_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific antennas, times, '
'frequencies, jones polarization terms '
'using pyuvdata.',
gain_data.gain_object2.history)
@pytest.fixture(scope='function')
def delay_data():
"""Initialization for some basic uvcal tests."""
delay_object = UVCal()
delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.delay.calfits')
# add an input flag array to the file to test for that.
write_file = os.path.join(DATA_PATH, 'test/outtest_input_flags.fits')
uv_in = UVCal()
uv_in.read_calfits(delayfile)
uv_in.input_flag_array = np.zeros(uv_in._input_flag_array.expected_shape(uv_in), dtype=bool)
uv_in.write_calfits(write_file, clobber=True)
delay_object.read_calfits(write_file)
class DataHolder(object):
def __init__(self, delay_object):
self.delay_object = delay_object
self.delay_object2 = copy.deepcopy(delay_object)
delay_data = DataHolder(delay_object)
# yield the data for testing, then del after tests finish
yield delay_data
del(delay_data)
def test_select_antennas_delay(delay_data):
old_history = delay_data.delay_object.history
ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
delay_data.delay_object2.select(antenna_nums=ants_to_keep)
assert len(ants_to_keep) == delay_data.delay_object2.Nants_data
for ant in ants_to_keep:
assert ant in delay_data.delay_object2.ant_array
for ant in delay_data.delay_object2.ant_array:
assert ant in ants_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
delay_data.delay_object2.history)
# now test using antenna_names to specify antennas to keep
delay_data.delay_object3 = copy.deepcopy(delay_data.delay_object)
ants_to_keep = np.array(sorted(list(ants_to_keep)))
ant_names = []
for a in ants_to_keep:
ind = np.where(delay_data.delay_object3.antenna_numbers == a)[0][0]
ant_names.append(delay_data.delay_object3.antenna_names[ind])
delay_data.delay_object3.select(antenna_names=ant_names)
assert delay_data.delay_object2 == delay_data.delay_object3
# check for errors associated with antennas not included in data, bad names or providing numbers and names
pytest.raises(ValueError, delay_data.delay_object.select,
antenna_nums=np.max(delay_data.delay_object.ant_array) + np.arange(1, 3))
pytest.raises(ValueError, delay_data.delay_object.select, antenna_names='test1')
pytest.raises(ValueError, delay_data.delay_object.select,
antenna_nums=ants_to_keep, antenna_names=ant_names)
# check that total_quality_array is handled properly when present
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
uvtest.checkWarnings(delay_data.delay_object.select, [],
{'antenna_names': ant_names, 'inplace': True},
message='Cannot preserve total_quality_array')
assert delay_data.delay_object.total_quality_array is None
def test_select_times_delay(delay_data):
old_history = delay_data.delay_object.history
times_to_keep = delay_data.delay_object.time_array[2:5]
delay_data.delay_object2.select(times=times_to_keep)
assert len(times_to_keep) == delay_data.delay_object2.Ntimes
for t in times_to_keep:
assert t in delay_data.delay_object2.time_array
for t in np.unique(delay_data.delay_object2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
delay_data.delay_object2.history)
# check for errors associated with times not included in data
pytest.raises(ValueError, delay_data.delay_object.select,
times=[np.min(delay_data.delay_object.time_array) - delay_data.delay_object.integration_time])
# check for warnings and errors associated with unevenly spaced times
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
uvtest.checkWarnings(delay_data.delay_object2.select, [], {'times': delay_data.delay_object2.time_array[[0, 2, 3]]},
message='Selected times are not evenly spaced')
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
pytest.raises(ValueError, delay_data.delay_object2.write_calfits, write_file_calfits)
def test_select_frequencies_delay(delay_data):
old_history = delay_data.delay_object.history
freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(73, 944)]
# add dummy total_quality_array
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
delay_data.delay_object2.total_quality_array = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
delay_data.delay_object2.select(frequencies=freqs_to_keep)
assert len(freqs_to_keep) == delay_data.delay_object2.Nfreqs
for f in freqs_to_keep:
assert f in delay_data.delay_object2.freq_array
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
delay_data.delay_object2.history)
# check for errors associated with frequencies not included in data
pytest.raises(ValueError, delay_data.delay_object.select,
frequencies=[np.max(delay_data.delay_object.freq_array) + delay_data.delay_object.channel_width])
# check for warnings and errors associated with unevenly spaced frequencies
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
uvtest.checkWarnings(delay_data.delay_object2.select, [], {'frequencies': delay_data.delay_object2.freq_array[0, [0, 5, 6]]},
message='Selected frequencies are not evenly spaced')
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
pytest.raises(ValueError, delay_data.delay_object2.write_calfits, write_file_calfits)
def test_select_freq_chans_delay(delay_data):
old_history = delay_data.delay_object.history
chans_to_keep = np.arange(73, 944)
# add dummy total_quality_array
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
delay_data.delay_object2.total_quality_array = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
delay_data.delay_object2.select(freq_chans=chans_to_keep)
assert len(chans_to_keep) == delay_data.delay_object2.Nfreqs
for chan in chans_to_keep:
assert delay_data.delay_object.freq_array[0, chan] in delay_data.delay_object2.freq_array
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in delay_data.delay_object.freq_array[0, chans_to_keep]
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
delay_data.delay_object2.history)
# Test selecting both channels and frequencies
freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(930, 1000)] # Overlaps with chans
all_chans_to_keep = np.arange(73, 1000)
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
delay_data.delay_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
assert len(all_chans_to_keep) == delay_data.delay_object2.Nfreqs
for chan in all_chans_to_keep:
assert delay_data.delay_object.freq_array[0, chan] in delay_data.delay_object2.freq_array
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in delay_data.delay_object.freq_array[0, all_chans_to_keep]
def test_select_polarizations_delay(delay_data):
# add more jones terms to allow for better testing of selections
while delay_data.delay_object.Njones < 4:
new_jones = np.min(delay_data.delay_object.jones_array) - 1
delay_data.delay_object.jones_array = np.append(delay_data.delay_object.jones_array, new_jones)
delay_data.delay_object.Njones += 1
delay_data.delay_object.flag_array = np.concatenate((delay_data.delay_object.flag_array,
delay_data.delay_object.flag_array[:, :, :, :, [-1]]),
axis=4)
delay_data.delay_object.input_flag_array = np.concatenate((delay_data.delay_object.input_flag_array,
delay_data.delay_object.input_flag_array[:, :, :, :, [-1]]),
axis=4)
delay_data.delay_object.delay_array = np.concatenate((delay_data.delay_object.delay_array,
delay_data.delay_object.delay_array[:, :, :, :, [-1]]),
axis=4)
delay_data.delay_object.quality_array = np.concatenate((delay_data.delay_object.quality_array,
delay_data.delay_object.quality_array[:, :, :, :, [-1]]),
axis=4)
# add dummy total_quality_array
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
assert delay_data.delay_object.check()
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
old_history = delay_data.delay_object.history
jones_to_keep = [-5, -6]
delay_data.delay_object2.select(jones=jones_to_keep)
assert len(jones_to_keep) == delay_data.delay_object2.Njones
for j in jones_to_keep:
assert j in delay_data.delay_object2.jones_array
for j in np.unique(delay_data.delay_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific jones polarization terms '
'using pyuvdata.',
delay_data.delay_object2.history)
# check for errors associated with polarizations not included in data
pytest.raises(ValueError, delay_data.delay_object2.select, jones=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
uvtest.checkWarnings(delay_data.delay_object.select, [], {'jones': delay_data.delay_object.jones_array[[0, 1, 3]]},
message='Selected jones polarization terms are not evenly spaced')
write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
pytest.raises(ValueError, delay_data.delay_object.write_calfits, write_file_calfits)
def test_select_delay(delay_data):
# now test selecting along all axes at once
old_history = delay_data.delay_object.history
ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(31, 56)]
times_to_keep = delay_data.delay_object.time_array[[1, 2]]
jones_to_keep = [-5]
delay_data.delay_object2.select(antenna_nums=ants_to_keep, frequencies=freqs_to_keep,
times=times_to_keep, jones=jones_to_keep)
assert len(ants_to_keep) == delay_data.delay_object2.Nants_data
for ant in ants_to_keep:
assert ant in delay_data.delay_object2.ant_array
for ant in delay_data.delay_object2.ant_array:
assert ant in ants_to_keep
assert len(times_to_keep) == delay_data.delay_object2.Ntimes
for t in times_to_keep:
assert t in delay_data.delay_object2.time_array
for t in np.unique(delay_data.delay_object2.time_array):
assert t in times_to_keep
assert len(freqs_to_keep) == delay_data.delay_object2.Nfreqs
for f in freqs_to_keep:
assert f in delay_data.delay_object2.freq_array
for f in np.unique(delay_data.delay_object2.freq_array):
assert f in freqs_to_keep
assert len(jones_to_keep) == delay_data.delay_object2.Njones
for j in jones_to_keep:
assert j in delay_data.delay_object2.jones_array
for j in np.unique(delay_data.delay_object2.jones_array):
assert j in jones_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific antennas, times, '
'frequencies, jones polarization terms '
'using pyuvdata.',
delay_data.delay_object2.history)
def test_add_antennas(gain_data):
"""Test adding antennas between two UVCal objects"""
gain_object_full = copy.deepcopy(gain_data.gain_object)
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
gain_data.gain_object.select(antenna_nums=ants1)
gain_data.gain_object2.select(antenna_nums=ants2)
gain_data.gain_object += gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(gain_object_full.history
+ ' Downselected to specific '
'antennas using pyuvdata. Combined '
'data along antenna axis using pyuvdata.',
gain_data.gain_object.history)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
# test for when total_quality_array is present
gain_data.gain_object.select(antenna_nums=ants1)
gain_data.gain_object.total_quality_array = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
message='Total quality array detected')
assert gain_data.gain_object.total_quality_array is None
def test_add_frequencies(gain_data):
"""Test adding frequencies between two UVCal objects"""
gain_object_full = copy.deepcopy(gain_data.gain_object)
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
gain_data.gain_object += gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(gain_object_full.history
+ ' Downselected to specific '
'frequencies using pyuvdata. Combined '
'data along frequency axis using pyuvdata.',
gain_data.gain_object.history)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
# test for when total_quality_array is present in first file but not second
gain_data.gain_object.select(frequencies=freqs1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# test for when total_quality_array is present in second file but not first
gain_data.gain_object.select(frequencies=freqs1)
tqa = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
gain_data.gain_object.total_quality_array = None
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# test for when total_quality_array is present in both
gain_data.gain_object.select(frequencies=freqs1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=1)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# Out of order - freqs
gain_data.gain_object = copy.deepcopy(gain_object_full)
gain_data.gain_object2 = copy.deepcopy(gain_object_full)
gain_data.gain_object.select(frequencies=freqs2)
gain_data.gain_object2.select(frequencies=freqs1)
gain_data.gain_object += gain_data.gain_object2
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
def test_add_times(gain_data):
"""Test adding times between two UVCal objects"""
gain_object_full = copy.deepcopy(gain_data.gain_object)
Nt2 = gain_data.gain_object.Ntimes // 2
times1 = gain_data.gain_object.time_array[:Nt2]
times2 = gain_data.gain_object.time_array[Nt2:]
gain_data.gain_object.select(times=times1)
gain_data.gain_object2.select(times=times2)
gain_data.gain_object += gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(gain_object_full.history
+ ' Downselected to specific '
'times using pyuvdata. Combined '
'data along time axis using pyuvdata.',
gain_data.gain_object.history)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
# test for when total_quality_array is present in first file but not second
gain_data.gain_object.select(times=times1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# test for when total_quality_array is present in second file but not first
gain_data.gain_object.select(times=times1)
tqa = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
gain_data.gain_object.total_quality_array = None
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# test for when total_quality_array is present in both
gain_data.gain_object.select(times=times1)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
def test_add_jones(gain_data):
"""Test adding Jones axes between two UVCal objects"""
gain_object_original = copy.deepcopy(gain_data.gain_object)
# artificially change the Jones value to permit addition
gain_data.gain_object2.jones_array[0] = -6
gain_data.gain_object += gain_data.gain_object2
# check dimensionality of resulting object
assert gain_data.gain_object.gain_array.shape[-1] == 2
assert sorted(gain_data.gain_object.jones_array) == [-6, -5]
# test for when total_quality_array is present in first file but not second
gain_data.gain_object = copy.deepcopy(gain_object_original)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.zeros(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# test for when total_quality_array is present in second file but not first
gain_data.gain_object = copy.deepcopy(gain_object_original)
tqa = np.zeros(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
# test for when total_quality_array is present in both
gain_data.gain_object = copy.deepcopy(gain_object_original)
tqa = np.ones(
gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
tqa2 = np.ones(
gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
gain_data.gain_object.total_quality_array = tqa
gain_data.gain_object2.total_quality_array = tqa2
gain_data.gain_object += gain_data.gain_object2
assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
rtol=gain_data.gain_object._total_quality_array.tols[0],
atol=gain_data.gain_object._total_quality_array.tols[1])
def test_add(gain_data):
"""Test miscellaneous aspects of add method"""
# test not-in-place addition
gain_object = copy.deepcopy(gain_data.gain_object)
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
gain_data.gain_object.select(antenna_nums=ants1)
gain_data.gain_object2.select(antenna_nums=ants2)
gain_object_add = gain_data.gain_object + gain_data.gain_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(gain_object.history
+ ' Downselected to specific '
'antennas using pyuvdata. Combined '
'data along antenna axis using pyuvdata.',
gain_object_add.history)
gain_object_add.history = gain_object.history
assert gain_object_add == gain_object
# test history concatenation
gain_data.gain_object.history = gain_object.history
gain_data.gain_object2.history = 'Some random history string OMNI_RUN:'
gain_data.gain_object += gain_data.gain_object2
assert uvutils._check_histories(gain_object.history
+ ' Combined data along antenna axis '
'using pyuvdata. Some random '
'history string',
gain_data.gain_object.history)
def test_add_multiple_axes(gain_data):
"""Test addition along multiple axes"""
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
Nt2 = gain_data.gain_object.Ntimes // 2
times1 = gain_data.gain_object.time_array[:Nt2]
times2 = gain_data.gain_object.time_array[Nt2:]
# artificially change the Jones value to permit addition
gain_data.gain_object2.jones_array[0] = -6
# perform select
gain_data.gain_object.select(antenna_nums=ants1, frequencies=freqs1,
times=times1)
gain_data.gain_object2.select(antenna_nums=ants2, frequencies=freqs2,
times=times2)
gain_data.gain_object += gain_data.gain_object2
# check resulting dimensionality
assert len(gain_data.gain_object.ant_array) == 19
assert len(gain_data.gain_object.freq_array[0, :]) == 10
assert len(gain_data.gain_object.time_array) == gain_data.gain_object.Ntimes
assert len(gain_data.gain_object.jones_array) == 2
def test_add_errors(gain_data):
"""Test behavior that will raise errors"""
# test addition of two identical objects
pytest.raises(ValueError, gain_data.gain_object.__add__, gain_data.gain_object2)
# test addition of UVCal and non-UVCal object (empty list)
pytest.raises(ValueError, gain_data.gain_object.__add__, [])
# test compatibility param mismatch
gain_data.gain_object2.telescope_name = "PAPER"
pytest.raises(ValueError, gain_data.gain_object.__add__, gain_data.gain_object2)
def test_jones_warning(gain_data):
"""Test having non-contiguous Jones elements"""
gain_data.gain_object2.jones_array[0] = -6
gain_data.gain_object += gain_data.gain_object2
gain_data.gain_object2.jones_array[0] = -8
uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
message='Combined Jones elements')
assert sorted(gain_data.gain_object.jones_array) == [-8, -6, -5]
def test_frequency_warnings(gain_data):
"""Test having uneven or non-contiguous frequencies"""
# test having unevenly spaced frequency separations
go1 = copy.deepcopy(gain_data.gain_object)
go2 = copy.deepcopy(gain_data.gain_object2)
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
# change the last frequency bin to be smaller than the others
df = gain_data.gain_object2.freq_array[0, -1] - gain_data.gain_object2.freq_array[0, -2]
gain_data.gain_object2.freq_array[0, -1] = gain_data.gain_object2.freq_array[0, -2] + df / 2
uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
message='Combined frequencies are not evenly spaced')
assert len(gain_data.gain_object.freq_array[0, :]) == gain_data.gain_object.Nfreqs
# now check having "non-contiguous" frequencies
gain_data.gain_object = copy.deepcopy(go1)
gain_data.gain_object2 = copy.deepcopy(go2)
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
# artificially space out frequencies
gain_data.gain_object.freq_array[0, :] *= 10
gain_data.gain_object2.freq_array[0, :] *= 10
uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
message='Combined frequencies are not contiguous')
freqs1 *= 10
freqs2 *= 10
freqs = np.concatenate([freqs1, freqs2])
assert np.allclose(gain_data.gain_object.freq_array[0, :], freqs,
rtol=gain_data.gain_object._freq_array.tols[0],
atol=gain_data.gain_object._freq_array.tols[1])
def test_parameter_warnings(gain_data):
"""Test changing a parameter that will raise a warning"""
# change observer and select frequencies
gain_data.gain_object2.observer = 'mystery_person'
freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
gain_data.gain_object.select(frequencies=freqs1)
gain_data.gain_object2.select(frequencies=freqs2)
uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
message='UVParameter observer does not match')
freqs = np.concatenate([freqs1, freqs2])
assert np.allclose(gain_data.gain_object.freq_array, freqs,
rtol=gain_data.gain_object._freq_array.tols[0],
atol=gain_data.gain_object._freq_array.tols[1])
def test_multi_files(gain_data):
"""Test read function when multiple files are included"""
gain_object_full = copy.deepcopy(gain_data.gain_object)
Nt2 = gain_data.gain_object.Ntimes // 2
# Break up delay object into two objects, divided in time
times1 = gain_data.gain_object.time_array[:Nt2]
times2 = gain_data.gain_object.time_array[Nt2:]
gain_data.gain_object.select(times=times1)
gain_data.gain_object2.select(times=times2)
# Write those objects to files
f1 = os.path.join(DATA_PATH, 'test/read_multi1.calfits')
f2 = os.path.join(DATA_PATH, 'test/read_multi2.calfits')
gain_data.gain_object.write_calfits(f1, clobber=True)
gain_data.gain_object2.write_calfits(f2, clobber=True)
# Read both files together
gain_data.gain_object.read_calfits([f1, f2])
assert uvutils._check_histories(gain_object_full.history
+ ' Downselected to specific times'
' using pyuvdata. Combined data '
'along time axis using pyuvdata.',
gain_data.gain_object.history)
gain_data.gain_object.history = gain_object_full.history
assert gain_data.gain_object == gain_object_full
def test_add_antennas_delay(delay_data):
"""Test adding antennas between two UVCal objects"""
delay_object_full = copy.deepcopy(delay_data.delay_object)
ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
delay_data.delay_object.select(antenna_nums=ants1)
delay_data.delay_object2.select(antenna_nums=ants2)
delay_data.delay_object += delay_data.delay_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(delay_object_full.history
+ ' Downselected to specific '
'antennas using pyuvdata. Combined '
'data along antenna axis using pyuvdata.',
delay_data.delay_object.history)
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
# test for when total_quality_array is present
delay_data.delay_object.select(antenna_nums=ants1)
delay_data.delay_object.total_quality_array = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
uvtest.checkWarnings(delay_data.delay_object.__iadd__, [delay_data.delay_object2],
message='Total quality array detected')
assert delay_data.delay_object.total_quality_array is None
# test for when input_flag_array is present in first file but not second
delay_data.delay_object.select(antenna_nums=ants1)
ifa = np.zeros(
delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
ifa2 = np.ones(
delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=0)
delay_data.delay_object.input_flag_array = ifa
delay_data.delay_object2.input_flag_array = None
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
delay_data.delay_object.select(antenna_nums=ants1)
ifa = np.ones(
delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
ifa2 = np.zeros(
delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=0)
delay_data.delay_object.input_flag_array = None
delay_data.delay_object2.input_flag_array = ifa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# Out of order - antennas
delay_data.delay_object = copy.deepcopy(delay_object_full)
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
delay_data.delay_object.select(antenna_nums=ants2)
delay_data.delay_object2.select(antenna_nums=ants1)
delay_data.delay_object += delay_data.delay_object2
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
def test_add_times_delay(delay_data):
"""Test adding times between two UVCal objects"""
delay_object_full = copy.deepcopy(delay_data.delay_object)
Nt2 = delay_data.delay_object.Ntimes // 2
times1 = delay_data.delay_object.time_array[:Nt2]
times2 = delay_data.delay_object.time_array[Nt2:]
delay_data.delay_object.select(times=times1)
delay_data.delay_object2.select(times=times2)
delay_data.delay_object += delay_data.delay_object2
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(delay_object_full.history
+ ' Downselected to specific '
'times using pyuvdata. Combined '
'data along time axis using pyuvdata.',
delay_data.delay_object.history)
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
# test for when total_quality_array is present in first file but not second
delay_data.delay_object.select(times=times1)
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
tqa2 = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1])
# test for when total_quality_array is present in second file but not first
delay_data.delay_object.select(times=times1)
tqa = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
delay_data.delay_object.total_quality_array = None
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1])
# test for when total_quality_array is present in both
delay_data.delay_object.select(times=times1)
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=2)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1])
# test for when input_flag_array is present in first file but not second
delay_data.delay_object.select(times=times1)
ifa = np.zeros(
delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
ifa2 = np.ones(
delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=3)
delay_data.delay_object.input_flag_array = ifa
delay_data.delay_object2.input_flag_array = None
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
delay_data.delay_object.select(times=times1)
ifa = np.ones(
delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
ifa2 = np.zeros(
delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=3)
delay_data.delay_object.input_flag_array = None
delay_data.delay_object2.input_flag_array = ifa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# Out of order - times
delay_data.delay_object = copy.deepcopy(delay_object_full)
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
delay_data.delay_object.select(times=times2)
delay_data.delay_object2.select(times=times1)
delay_data.delay_object += delay_data.delay_object2
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
def test_add_jones_delay(delay_data):
"""Test adding Jones axes between two UVCal objects"""
delay_object_original = copy.deepcopy(delay_data.delay_object)
# artificially change the Jones value to permit addition
delay_data.delay_object2.jones_array[0] = -6
delay_data.delay_object += delay_data.delay_object2
# check dimensionality of resulting object
assert delay_data.delay_object.delay_array.shape[-1] == 2
assert sorted(delay_data.delay_object.jones_array) == [-6, -5]
# test for when total_quality_array is present in first file but not second
delay_data.delay_object = copy.deepcopy(delay_object_original)
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
tqa2 = np.zeros(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1])
# test for when total_quality_array is present in second file but not first
delay_data.delay_object = copy.deepcopy(delay_object_original)
tqa = np.zeros(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1])
# test for when total_quality_array is present in both
delay_data.delay_object = copy.deepcopy(delay_object_original)
tqa = np.ones(
delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
tqa2 = np.ones(
delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
tqa *= 2
tot_tqa = np.concatenate([tqa, tqa2], axis=3)
delay_data.delay_object.total_quality_array = tqa
delay_data.delay_object2.total_quality_array = tqa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
rtol=delay_data.delay_object._total_quality_array.tols[0],
atol=delay_data.delay_object._total_quality_array.tols[1])
# test for when input_flag_array is present in first file but not second
delay_data.delay_object = copy.deepcopy(delay_object_original)
ifa = np.zeros(
delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
ifa2 = np.ones(
delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=4)
delay_data.delay_object.input_flag_array = ifa
delay_data.delay_object2.input_flag_array = None
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# test for when input_flag_array is present in second file but not first
delay_data.delay_object = copy.deepcopy(delay_object_original)
ifa = np.ones(
delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
ifa2 = np.zeros(
delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
tot_ifa = np.concatenate([ifa, ifa2], axis=4)
delay_data.delay_object.input_flag_array = None
delay_data.delay_object2.input_flag_array = ifa2
delay_data.delay_object += delay_data.delay_object2
assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)
# Out of order - jones
delay_data.delay_object = copy.deepcopy(delay_object_original)
delay_data.delay_object2 = copy.deepcopy(delay_object_original)
delay_data.delay_object.jones_array[0] = -6
delay_data.delay_object += delay_data.delay_object2
delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
delay_data.delay_object.select(jones=-5)
delay_data.delay_object.history = delay_object_original.history
assert delay_data.delay_object == delay_object_original
delay_data.delay_object2.select(jones=-6)
delay_data.delay_object2.jones_array[:] = -5
delay_data.delay_object2.history = delay_object_original.history
assert delay_data.delay_object2 == delay_object_original
def test_add_errors_delay(delay_data):
"""Test behavior that will raise errors"""
# test addition of two identical objects
pytest.raises(ValueError, delay_data.delay_object.__add__, delay_data.delay_object2)
def test_multi_files_delay(delay_data):
"""Test read function when multiple files are included"""
delay_object_full = copy.deepcopy(delay_data.delay_object)
Nt2 = delay_data.delay_object.Ntimes // 2
# Break up delay object into two objects, divided in time
times1 = delay_data.delay_object.time_array[:Nt2]
times2 = delay_data.delay_object.time_array[Nt2:]
delay_data.delay_object.select(times=times1)
delay_data.delay_object2.select(times=times2)
# Write those objects to files
f1 = os.path.join(DATA_PATH, 'test/read_multi1.calfits')
f2 = os.path.join(DATA_PATH, 'test/read_multi2.calfits')
delay_data.delay_object.write_calfits(f1, clobber=True)
delay_data.delay_object2.write_calfits(f2, clobber=True)
# Read both files together
delay_data.delay_object.read_calfits([f1, f2])
assert uvutils._check_histories(delay_object_full.history
+ ' Downselected to specific times'
' using pyuvdata. Combined data '
'along time axis using pyuvdata.',
delay_data.delay_object.history)
delay_data.delay_object.history = delay_object_full.history
assert delay_data.delay_object == delay_object_full
def test_uvcal_get_methods():
# load data
uvc = UVCal()
uvc.read_calfits(os.path.join(DATA_PATH, 'zen.2457698.40355.xx.gain.calfits'))
# test get methods: add in a known value and make sure it is returned
key = (10, 'Jee')
uvc.gain_array[1] = 0.0
d, f, q = uvc.get_gains(key), uvc.get_flags(key), uvc.get_quality(key)
# test shapes
assert np.all(np.isclose(d, 0.0))
assert d.shape == (uvc.Nfreqs, uvc.Ntimes)
assert f.shape == (uvc.Nfreqs, uvc.Ntimes)
assert q.shape == (uvc.Nfreqs, uvc.Ntimes)
# test against by-hand indexing
np.testing.assert_array_almost_equal(d, uvc.gain_array[uvc.ant_array.tolist().index(10), 0, :, :, uvc.jones_array.tolist().index(-5)])
# test variable key input
d2 = uvc.get_gains(*key)
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(key[0])
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(key[:1])
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(10, -5)
np.testing.assert_array_almost_equal(d, d2)
d2 = uvc.get_gains(10, 'x')
np.testing.assert_array_almost_equal(d, d2)
# check has_key
assert uvc._has_key(antnum=10)
assert uvc._has_key(jpol='Jee')
assert uvc._has_key(antnum=10, jpol='Jee')
assert not uvc._has_key(antnum=10, jpol='Jnn')
assert not uvc._has_key(antnum=101, jpol='Jee')
# test exceptions
pytest.raises(ValueError, uvc.get_gains, 1)
pytest.raises(ValueError, uvc.get_gains, (10, 'Jnn'))
uvc.cal_type = 'delay'
pytest.raises(ValueError, uvc.get_gains, 10)
def test_write_read_optional_attrs():
# read a test file
cal_in = UVCal()
testfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.gain.calfits')
cal_in.read_calfits(testfile)
# set some optional parameters
cal_in.gain_scale = 'Jy'
cal_in.sky_field = 'GLEAM'
# write
write_file_calfits = os.path.join(DATA_PATH, 'test/test.calfits')
cal_in.write_calfits(write_file_calfits, clobber=True)
# read and compare
cal_in2 = UVCal()
cal_in2.read_calfits(write_file_calfits)
assert cal_in == cal_in2
os.remove(write_file_calfits)