"""Tests for uvdata object."""
import nose.tools as nt
import os
import numpy as np
import copy
import ephem
from pyuvdata import UVData
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
class TestUVDataInit(object):
def setUp(self):
"""Setup for basic parameter, property and iterator tests."""
self.required_parameters = ['_data_array', '_nsample_array',
'_flag_array', '_Ntimes', '_Nbls',
'_Nblts', '_Nfreqs', '_Npols', '_Nspws',
'_uvw_array', '_time_array', '_ant_1_array',
'_ant_2_array', '_lst_array',
'_baseline_array', '_freq_array',
'_polarization_array', '_spw_array',
'_integration_time', '_channel_width',
'_object_name', '_telescope_name',
'_instrument', '_telescope_location',
'_history', '_vis_units', '_Nants_data',
'_Nants_telescope', '_antenna_names',
'_antenna_numbers', '_phase_type']
self.required_properties = ['data_array', 'nsample_array',
'flag_array', 'Ntimes', 'Nbls',
'Nblts', 'Nfreqs', 'Npols', 'Nspws',
'uvw_array', 'time_array', 'ant_1_array',
'ant_2_array', 'lst_array',
'baseline_array', 'freq_array',
'polarization_array', 'spw_array',
'integration_time', 'channel_width',
'object_name', 'telescope_name',
'instrument', 'telescope_location',
'history', 'vis_units', 'Nants_data',
'Nants_telescope', 'antenna_names',
'antenna_numbers', 'phase_type']
self.extra_parameters = ['_extra_keywords', '_antenna_positions',
'_x_orientation', '_antenna_diameters',
'_gst0', '_rdate', '_earth_omega', '_dut1',
'_timesys', '_uvplane_reference_time',
'_phase_center_ra', '_phase_center_dec',
'_phase_center_epoch',
'_zenith_ra', '_zenith_dec']
self.extra_properties = ['extra_keywords', 'antenna_positions',
'x_orientation', 'antenna_diameters', 'gst0',
'rdate', 'earth_omega', 'dut1', 'timesys',
'uvplane_reference_time',
'phase_center_ra', 'phase_center_dec',
'phase_center_epoch',
'zenith_ra', 'zenith_dec']
self.other_properties = ['telescope_location_lat_lon_alt',
'telescope_location_lat_lon_alt_degrees',
'phase_center_ra_degrees', 'phase_center_dec_degrees',
'zenith_ra_degrees', 'zenith_dec_degrees',
'pyuvdata_version_str']
self.uv_object = UVData()
def teardown(self):
"""Test teardown: delete object."""
del(self.uv_object)
def test_order_pols(self):
test_uv1 = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(test_uv1.read_uvfits, [testfile],
message='Telescope EVLA is not')
test_uv1.order_pols(order='AIPS')
# check that we have aips ordering
aips_pols = np.array([-1, -2, -3, -4]).astype(int)
nt.assert_true(np.all(test_uv1.polarization_array == aips_pols))
test_uv2 = copy.deepcopy(test_uv1)
test_uv2.order_pols(order='CASA')
casa_pols = np.array([-1, -3, -4, -2]).astype(int)
nt.assert_true(np.all(test_uv2.polarization_array == casa_pols))
order = np.array([0, 2, 3, 1])
nt.assert_true(np.all(test_uv2.data_array == test_uv1.data_array[:, :, :, order]))
nt.assert_true(np.all(test_uv2.flag_array == test_uv1.flag_array[:, :, :, order]))
# check that we have casa ordering
test_uv2.order_pols(order='AIPS')
# check that we have aips ordering again
nt.assert_equal(test_uv1, test_uv2)
uvtest.checkWarnings(test_uv2.order_pols, ['unknown'], message='Invalid order supplied')
del(test_uv1)
del(test_uv2)
def test_parameter_iter(self):
"Test expected parameters."
all = []
for prop in self.uv_object:
all.append(prop)
for a in self.required_parameters + self.extra_parameters:
nt.assert_true(a in all, msg='expected attribute ' + a +
' not returned in object iterator')
def test_required_parameter_iter(self):
"Test expected required parameters."
required = []
for prop in self.uv_object.required():
required.append(prop)
for a in self.required_parameters:
nt.assert_true(a in required, msg='expected attribute ' + a +
' not returned in required iterator')
def test_extra_parameter_iter(self):
"Test expected optional parameters."
extra = []
for prop in self.uv_object.extra():
extra.append(prop)
for a in self.extra_parameters:
nt.assert_true(a in extra, msg='expected attribute ' + a +
' not returned in extra iterator')
def test_unexpected_parameters(self):
"Test for extra parameters."
expected_parameters = self.required_parameters + self.extra_parameters
attributes = [i for i in self.uv_object.__dict__.keys() if i[0] == '_']
for a in attributes:
nt.assert_true(a in expected_parameters,
msg='unexpected parameter ' + a + ' found in UVData')
def test_unexpected_attributes(self):
"Test for extra attributes."
expected_attributes = self.required_properties + \
self.extra_properties + self.other_properties
attributes = [i for i in self.uv_object.__dict__.keys() if i[0] != '_']
for a in attributes:
nt.assert_true(a in expected_attributes,
msg='unexpected attribute ' + a + ' found in UVData')
def test_properties(self):
"Test that properties can be get and set properly."
prop_dict = dict(zip(self.required_properties + self.extra_properties,
self.required_parameters + self.extra_parameters))
for k, v in prop_dict.iteritems():
rand_num = np.random.rand()
setattr(self.uv_object, k, rand_num)
this_param = getattr(self.uv_object, v)
try:
nt.assert_equal(rand_num, this_param.value)
except(AssertionError):
print('setting {prop_name} to a random number failed'.format(prop_name=k))
raise(AssertionError)
class TestUVDataBasicMethods(object):
def setUp(self):
"""Setup for tests of basic methods."""
self.uv_object = UVData()
self.testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(self.uv_object.read_uvfits, [self.testfile],
message='Telescope EVLA is not')
self.uv_object2 = copy.deepcopy(self.uv_object)
def teardown(self):
"""Test teardown: delete objects."""
del(self.uv_object)
del(self.uv_object2)
def test_equality(self):
"""Basic equality test."""
nt.assert_equal(self.uv_object, self.uv_object)
def test_check(self):
"""Test simple check function."""
nt.assert_true(self.uv_object.check())
# Check variety of special cases
self.uv_object.Nants_data += 1
nt.assert_raises(ValueError, self.uv_object.check)
self.uv_object.Nants_data -= 1
self.uv_object.Nbls += 1
nt.assert_raises(ValueError, self.uv_object.check)
self.uv_object.Nbls -= 1
self.uv_object.Ntimes += 1
nt.assert_raises(ValueError, self.uv_object.check)
self.uv_object.Ntimes -= 1
# Check case where all data is autocorrelations
# Currently only test files that have autos are fhd files
testdir = os.path.join(DATA_PATH, 'fhd_vis_data/')
self.uv_object.read_fhd([testdir + '1061316296_flags.sav',
testdir + '1061316296_vis_XX.sav',
testdir + '1061316296_params.sav',
testdir + '1061316296_settings.txt'])
self.uv_object.select(blt_inds=np.where(self.uv_object.ant_1_array ==
self.uv_object.ant_2_array)[0])
nt.assert_true(self.uv_object.check())
def test_nants_data_telescope(self):
self.uv_object.Nants_data = self.uv_object.Nants_telescope - 1
nt.assert_true(self.uv_object.check)
self.uv_object.Nants_data = self.uv_object.Nants_telescope + 1
nt.assert_raises(ValueError, self.uv_object.check)
def test_converttofiletype(self):
fhd_obj = self.uv_object._convert_to_filetype('fhd')
self.uv_object._convert_from_filetype(fhd_obj)
nt.assert_equal(self.uv_object, self.uv_object2)
nt.assert_raises(
ValueError, self.uv_object._convert_to_filetype, 'foo')
class TestBaselineAntnumMethods(object):
"""Setup for tests on antnum, baseline conversion."""
def setup(self):
self.uv_object = UVData()
self.uv_object.Nants_telescope = 128
self.uv_object2 = UVData()
self.uv_object2.Nants_telescope = 2049
def teardown(self):
"""Test teardown: delete objects."""
del(self.uv_object)
del(self.uv_object2)
def test_baseline_to_antnums(self):
"""Test baseline to antnum conversion for 256 & larger conventions."""
nt.assert_equal(self.uv_object.baseline_to_antnums(67585), (0, 0))
nt.assert_raises(
StandardError, self.uv_object2.baseline_to_antnums, 67585)
ant_pairs = [(10, 20), (280, 310)]
for pair in ant_pairs:
if np.max(np.array(pair)) < 255:
bl = self.uv_object.antnums_to_baseline(
pair[0], pair[1], attempt256=True)
ant_pair_out = self.uv_object.baseline_to_antnums(bl)
nt.assert_equal(pair, ant_pair_out)
bl = self.uv_object.antnums_to_baseline(
pair[0], pair[1], attempt256=False)
ant_pair_out = self.uv_object.baseline_to_antnums(bl)
nt.assert_equal(pair, ant_pair_out)
def test_antnums_to_baselines(self):
"""Test antums to baseline conversion for 256 & larger conventions."""
nt.assert_equal(self.uv_object.antnums_to_baseline(0, 0), 67585)
nt.assert_equal(self.uv_object.antnums_to_baseline(257, 256), 594177)
nt.assert_equal(self.uv_object.baseline_to_antnums(594177), (257, 256))
# Check attempt256
nt.assert_equal(self.uv_object.antnums_to_baseline(
0, 0, attempt256=True), 257)
nt.assert_equal(self.uv_object.antnums_to_baseline(257, 256), 594177)
uvtest.checkWarnings(self.uv_object.antnums_to_baseline, [257, 256],
{'attempt256': True}, message='found > 256 antennas')
nt.assert_raises(
StandardError, self.uv_object2.antnums_to_baseline, 0, 0)
def test_known_telescopes():
"""Test known_telescopes method returns expected results."""
uv_object = UVData()
known_telescopes = ['PAPER', 'HERA', 'MWA']
nt.assert_equal(known_telescopes.sort(),
uv_object.known_telescopes().sort())
def test_HERA_diameters():
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
uv_in = UVData()
uvtest.checkWarnings(uv_in.read_miriad, [miriad_file],
known_warning='miriad')
uv_in.telescope_name = 'HERA'
uvtest.checkWarnings(uv_in.set_telescope_params, message='antenna_diameters '
'is not set. Using known values for HERA.')
nt.assert_equal(uv_in.telescope_name, 'HERA')
nt.assert_true(uv_in.antenna_diameters is not None)
uv_in.check()
def test_phase_unphaseHERA():
"""
Read in drift data, phase to an RA/DEC, unphase and check for object equality.
"""
testfile = os.path.join(DATA_PATH, 'hera_testfile')
UV_raw = UVData()
# Note the RA/DEC values in the raw file were calculated from the lat/long
# in the file, which don't agree with our known_telescopes.
# So for this test we use the lat/lon in the file.
uvtest.checkWarnings(UV_raw.read_miriad, [testfile], {'correct_lat_lon': False},
message='Altitude is not present in file and latitude and '
'longitude values do not match')
UV_phase = UVData()
uvtest.checkWarnings(UV_phase.read_miriad, [testfile], {'correct_lat_lon': False},
message='Altitude is not present in file and '
'latitude and longitude values do not match')
UV_phase.phase(0., 0., ephem.J2000)
UV_phase.unphase_to_drift()
nt.assert_equal(UV_raw, UV_phase)
# check errors when trying to unphase drift or unknown data
nt.assert_raises(ValueError, UV_raw.unphase_to_drift)
UV_raw.set_unknown_phase_type()
nt.assert_raises(ValueError, UV_raw.unphase_to_drift)
# check errors when trying to phase phased or unknown data
UV_phase.phase(0., 0., ephem.J2000)
nt.assert_raises(ValueError, UV_phase.phase, 0., 0., ephem.J2000)
nt.assert_raises(ValueError, UV_phase.phase_to_time,
UV_phase.time_array[0])
UV_phase.set_unknown_phase_type()
nt.assert_raises(ValueError, UV_phase.phase, 0., 0., ephem.J2000)
nt.assert_raises(ValueError, UV_phase.phase_to_time,
UV_phase.time_array[0])
del(UV_phase)
del(UV_raw)
def test_set_phase_unknown():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [
testfile], message='Telescope EVLA is not')
uv_object.set_unknown_phase_type()
nt.assert_equal(uv_object.phase_type, 'unknown')
nt.assert_false(uv_object._zenith_ra.required)
nt.assert_false(uv_object._zenith_dec.required)
nt.assert_false(uv_object._phase_center_epoch.required)
nt.assert_false(uv_object._phase_center_ra.required)
nt.assert_false(uv_object._phase_center_dec.required)
nt.assert_true(uv_object.check())
def test_select_blts():
uv_object = UVData()
testfile = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
uvtest.checkWarnings(uv_object.read_miriad, [testfile],
known_warning='miriad')
old_history = uv_object.history
blt_inds = np.array([172, 182, 132, 227, 144, 44, 16, 104, 385, 134, 326, 140, 116,
218, 178, 391, 111, 276, 274, 308, 38, 64, 317, 76, 239, 246,
34, 39, 83, 184, 208, 60, 374, 295, 118, 337, 261, 21, 375,
396, 355, 187, 95, 122, 186, 113, 260, 264, 156, 13, 228, 291,
302, 72, 137, 216, 299, 341, 207, 256, 223, 250, 268, 147, 73,
32, 142, 383, 221, 203, 258, 286, 324, 265, 170, 236, 8, 275,
304, 117, 29, 167, 15, 388, 171, 82, 322, 248, 160, 85, 66,
46, 272, 328, 323, 152, 200, 119, 359, 23, 363, 56, 219, 257,
11, 307, 336, 289, 136, 98, 37, 163, 158, 80, 125, 40, 298,
75, 320, 74, 57, 346, 121, 129, 332, 238, 93, 18, 330, 339,
381, 234, 176, 22, 379, 199, 266, 100, 90, 292, 205, 58, 222,
350, 109, 273, 191, 368, 88, 101, 65, 155, 2, 296, 306, 398,
369, 378, 254, 67, 249, 102, 348, 392, 20, 28, 169, 262, 269,
287, 86, 300, 143, 177, 42, 290, 284, 123, 189, 175, 97, 340,
242, 342, 331, 282, 235, 344, 63, 115, 78, 30, 226, 157, 133,
71, 35, 212, 333])
selected_data = uv_object.data_array[np.sort(blt_inds), :, :, :]
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(blt_inds=blt_inds)
nt.assert_equal(len(blt_inds), uv_object2.Nblts)
# verify that histories are different
nt.assert_false(uvutils.check_histories(old_history, uv_object2.history))
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific baseline-times using pyuvdata.',
uv_object2.history))
nt.assert_true(np.all(selected_data == uv_object2.data_array))
# check for errors associated with out of bounds indices
nt.assert_raises(ValueError, uv_object.select, blt_inds=np.arange(-10, -5))
nt.assert_raises(ValueError, uv_object.select, blt_inds=np.arange(
uv_object.Nblts + 1, uv_object.Nblts + 10))
def test_select_antennas():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
unique_ants = np.unique(
uv_object.ant_1_array.tolist() + uv_object.ant_2_array.tolist())
ants_to_keep = np.array([0, 19, 11, 24, 3, 23, 1, 20, 21])
blts_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(antenna_nums=ants_to_keep)
nt.assert_equal(len(ants_to_keep), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in ants_to_keep:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in ants_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
uv_object2.history))
# now test using antenna_names to specify antennas to keep
uv_object3 = copy.deepcopy(uv_object)
ants_to_keep = np.array(sorted(list(ants_to_keep)))
ant_names = []
for a in ants_to_keep:
ind = np.where(uv_object3.antenna_numbers == a)[0][0]
ant_names.append(uv_object3.antenna_names[ind])
uv_object3.select(antenna_names=ant_names)
nt.assert_equal(uv_object2, uv_object3)
# check for errors associated with antennas not included in data, bad names or providing numbers and names
nt.assert_raises(ValueError, uv_object.select,
antenna_nums=np.max(unique_ants) + np.arange(1, 3))
nt.assert_raises(ValueError, uv_object.select, antenna_names='test1')
nt.assert_raises(ValueError, uv_object.select,
antenna_nums=ants_to_keep, antenna_names=ant_names)
def test_select_ant_pairs():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
first_ants = [6, 2, 7, 2, 21, 27, 8]
second_ants = [0, 20, 8, 1, 2, 3, 22]
new_unique_ants = np.unique(first_ants + second_ants)
ant_pairs_to_keep = zip(first_ants, second_ants)
sorted_pairs_to_keep = [tuple(sorted(p)) for p in ant_pairs_to_keep]
sorted_pairs_object = [tuple(sorted(p)) for p in zip(
uv_object.ant_1_array, uv_object.ant_2_array)]
blts_select = [tuple(sorted((a1, a2))) in sorted_pairs_to_keep for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(ant_pairs_nums=ant_pairs_to_keep)
sorted_pairs_object2 = [tuple(sorted(p)) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(len(new_unique_ants), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in new_unique_ants:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in new_unique_ants)
for pair in sorted_pairs_to_keep:
nt.assert_true(pair in sorted_pairs_object2)
for pair in sorted_pairs_object2:
nt.assert_true(pair in sorted_pairs_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific antenna pairs using pyuvdata.',
uv_object2.history))
# check that you can use numpy integers with out errors:
first_ants = map(np.int32, [6, 2, 7, 2, 21, 27, 8])
second_ants = map(np.int32, [0, 20, 8, 1, 2, 3, 22])
ant_pairs_to_keep = zip(first_ants, second_ants)
uv_object2 = uv_object.select(ant_pairs_nums=ant_pairs_to_keep, inplace=False)
sorted_pairs_object2 = [tuple(sorted(p)) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(len(new_unique_ants), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in new_unique_ants:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in new_unique_ants)
for pair in sorted_pairs_to_keep:
nt.assert_true(pair in sorted_pairs_object2)
for pair in sorted_pairs_object2:
nt.assert_true(pair in sorted_pairs_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific antenna pairs using pyuvdata.',
uv_object2.history))
# check that you can specify a single pair without errors
uv_object2.select(ant_pairs_nums=(0, 6))
sorted_pairs_object2 = [tuple(sorted(p)) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(list(set(sorted_pairs_object2)), [(0, 6)])
# check for errors associated with antenna pairs not included in data and bad inputs
nt.assert_raises(ValueError, uv_object.select,
ant_pairs_nums=zip(first_ants, second_ants) + [0, 6])
nt.assert_raises(ValueError, uv_object.select,
ant_pairs_nums=[(uv_object.antenna_names[0], uv_object.antenna_names[1])])
nt.assert_raises(ValueError, uv_object.select, ant_pairs_nums=(5, 1))
nt.assert_raises(ValueError, uv_object.select, ant_pairs_nums=(0, 5))
nt.assert_raises(ValueError, uv_object.select, ant_pairs_nums=(27, 27))
def test_select_times():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
unique_times = np.unique(uv_object.time_array)
times_to_keep = unique_times[[0, 3, 5, 6, 7, 10, 14]]
Nblts_selected = np.sum([t in times_to_keep for t in uv_object.time_array])
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(times=times_to_keep)
nt.assert_equal(len(times_to_keep), uv_object2.Ntimes)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for t in times_to_keep:
nt.assert_true(t in uv_object2.time_array)
for t in np.unique(uv_object2.time_array):
nt.assert_true(t in times_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
uv_object2.history))
# check for errors associated with times not included in data
nt.assert_raises(ValueError, uv_object.select, times=[
np.min(unique_times) - uv_object.integration_time])
def test_select_frequencies():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
freqs_to_keep = uv_object.freq_array[0, np.arange(12, 22)]
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep)
nt.assert_equal(len(freqs_to_keep), uv_object2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# check that selecting one frequency works
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep[0])
nt.assert_equal(1, uv_object2.Nfreqs)
nt.assert_true(freqs_to_keep[0] in uv_object2.freq_array)
for f in uv_object2.freq_array:
nt.assert_true(f in [freqs_to_keep[0]])
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# check for errors associated with frequencies not included in data
nt.assert_raises(ValueError, uv_object.select, frequencies=[
np.max(uv_object.freq_array) + uv_object.channel_width])
# check for warnings and errors associated with unevenly spaced or non-contiguous frequencies
uv_object2 = copy.deepcopy(uv_object)
uvtest.checkWarnings(uv_object2.select, [], {'frequencies': uv_object2.freq_array[0, [0, 5, 6]]},
message='Selected frequencies are not evenly spaced')
write_file_uvfits = os.path.join(DATA_PATH, 'test/select_test.uvfits')
write_file_miriad = os.path.join(DATA_PATH, 'test/select_test.uv')
nt.assert_raises(ValueError, uv_object2.write_uvfits, write_file_uvfits)
nt.assert_raises(ValueError, uv_object2.write_miriad, write_file_miriad)
uv_object2 = copy.deepcopy(uv_object)
uvtest.checkWarnings(uv_object2.select, [], {'frequencies': uv_object2.freq_array[0, [0, 2, 4]]},
message='Selected frequencies are not contiguous')
nt.assert_raises(ValueError, uv_object2.write_uvfits, write_file_uvfits)
nt.assert_raises(ValueError, uv_object2.write_miriad, write_file_miriad)
def test_select_freq_chans():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
chans_to_keep = np.arange(12, 22)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(freq_chans=chans_to_keep)
nt.assert_equal(len(chans_to_keep), uv_object2.Nfreqs)
for chan in chans_to_keep:
nt.assert_true(uv_object.freq_array[0, chan] in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in uv_object.freq_array[0, chans_to_keep])
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# Test selecting both channels and frequencies
freqs_to_keep = uv_object.freq_array[0, np.arange(
20, 30)] # Overlaps with chans
all_chans_to_keep = np.arange(12, 30)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
nt.assert_equal(len(all_chans_to_keep), uv_object2.Nfreqs)
for chan in all_chans_to_keep:
nt.assert_true(uv_object.freq_array[0, chan] in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in uv_object.freq_array[0, all_chans_to_keep])
def test_select_polarizations():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
pols_to_keep = [-1, -2]
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(polarizations=pols_to_keep)
nt.assert_equal(len(pols_to_keep), uv_object2.Npols)
for p in pols_to_keep:
nt.assert_true(p in uv_object2.polarization_array)
for p in np.unique(uv_object2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
uv_object2.history))
# check for errors associated with polarizations not included in data
nt.assert_raises(ValueError, uv_object2.select, polarizations=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
uvtest.checkWarnings(uv_object.select, [], {'polarizations': uv_object.polarization_array[[0, 1, 3]]},
message='Selected polarization values are not evenly spaced')
write_file_uvfits = os.path.join(DATA_PATH, 'test/select_test.uvfits')
nt.assert_raises(ValueError, uv_object.write_uvfits, write_file_uvfits)
def test_select():
# now test selecting along all axes at once
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
blt_inds = np.array([1057, 461, 1090, 354, 528, 654, 882, 775, 369, 906, 748,
875, 296, 773, 554, 395, 1003, 476, 762, 976, 1285, 874,
717, 383, 1281, 924, 264, 1163, 297, 857, 1258, 1000, 180,
1303, 1139, 393, 42, 135, 789, 713, 527, 1218, 576, 100,
1311, 4, 653, 724, 591, 889, 36, 1033, 113, 479, 322,
118, 898, 1263, 477, 96, 935, 238, 195, 531, 124, 198,
992, 1131, 305, 154, 961, 6, 1175, 76, 663, 82, 637,
288, 1152, 845, 1290, 379, 1225, 1240, 733, 1172, 937, 1325,
817, 416, 261, 1316, 957, 723, 215, 237, 270, 1309, 208,
17, 1028, 895, 574, 166, 784, 834, 732, 1022, 1068, 1207,
356, 474, 313, 137, 172, 181, 925, 201, 190, 1277, 1044,
1242, 702, 567, 557, 1032, 1352, 504, 545, 422, 179, 780,
280, 890, 774, 884])
unique_ants = np.unique(
uv_object.ant_1_array.tolist() + uv_object.ant_2_array.tolist())
ants_to_keep = np.array([11, 6, 20, 26, 2, 27, 3, 7, 14])
ant_pairs_to_keep = [(2, 11), (20, 26), (6, 7), (3, 27), (14, 6)]
sorted_pairs_to_keep = [tuple(sorted(p)) for p in ant_pairs_to_keep]
freqs_to_keep = uv_object.freq_array[0, np.arange(31, 39)]
unique_times = np.unique(uv_object.time_array)
times_to_keep = unique_times[[0, 2, 6, 8, 10, 13, 14]]
pols_to_keep = [-1, -3]
# Independently count blts that should be selected
blts_blt_select = [i in blt_inds for i in np.arange(uv_object.Nblts)]
blts_ant_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
blts_pair_select = [tuple(sorted((a1, a2))) in sorted_pairs_to_keep for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
blts_time_select = [t in times_to_keep for t in uv_object.time_array]
Nblts_select = np.sum([bi & ai & pi & ti for (bi, ai, pi, ti) in
zip(blts_blt_select, blts_ant_select, blts_pair_select,
blts_time_select)])
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(blt_inds=blt_inds, antenna_nums=ants_to_keep,
ant_pairs_nums=ant_pairs_to_keep, frequencies=freqs_to_keep,
times=times_to_keep, polarizations=pols_to_keep)
nt.assert_equal(Nblts_select, uv_object2.Nblts)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in ants_to_keep)
sorted_pairs_object2 = [tuple(sorted(p)) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
for pair in sorted_pairs_object2:
nt.assert_true(pair in sorted_pairs_to_keep)
nt.assert_equal(len(freqs_to_keep), uv_object2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in freqs_to_keep)
for t in np.unique(uv_object2.time_array):
nt.assert_true(t in times_to_keep)
nt.assert_equal(len(pols_to_keep), uv_object2.Npols)
for p in pols_to_keep:
nt.assert_true(p in uv_object2.polarization_array)
for p in np.unique(uv_object2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific baseline-times, antennas, '
'antenna pairs, times, frequencies, '
'polarizations using pyuvdata.',
uv_object2.history))
# test that a ValueError is raised if the selection eliminates all blts
nt.assert_raises(ValueError, uv_object.select,
times=unique_times[0], antenna_nums=1)
def test_select_not_inplace():
# Test non-inplace select
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
uv1 = uv_object.select(freq_chans=np.arange(32), inplace=False)
uv1 += uv_object.select(freq_chans=np.arange(32, 64), inplace=False)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency axis '
'using pyuvdata.', uv1.history))
uv1.history = old_history
nt.assert_equal(uv1, uv_object)
def test_reorder_pols():
# Test function to fix polarization order
uv1 = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(
uv1.read_uvfits, [testfile], message='Telescope EVLA is not')
uv2 = copy.deepcopy(uv1)
# reorder uv2 manually
order = [1, 3, 2, 0]
uv2.polarization_array = uv2.polarization_array[order]
uv2.data_array = uv2.data_array[:, :, :, order]
uv2.nsample_array = uv2.nsample_array[:, :, :, order]
uv2.flag_array = uv2.flag_array[:, :, :, order]
uv1.reorder_pols(order=order)
nt.assert_equal(uv1, uv2)
# Restore original order
uvtest.checkWarnings(
uv1.read_uvfits, [testfile], message='Telescope EVLA is not')
uv2.reorder_pols()
nt.assert_equal(uv1, uv2)
def test_add():
uv_full = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_full.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Add frequencies
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(32, 64))
uv1 += uv2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add polarizations
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add times
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2])
uv2.select(times=times[len(times) / 2:])
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add baselines
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
ant_list = range(15) # Roughly half the antennas in the data
# All blts where ant_1 is in list
ind1 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] in ant_list]
ind2 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] not in ant_list]
uv1.select(blt_inds=ind1)
uv2.select(blt_inds=ind2)
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific baseline-times using pyuvdata. '
'Combined data along baseline-time axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add baselines - out of order
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv3 = copy.deepcopy(uv_full)
ants = uv_full.get_ants()
ants1 = ants[0:6]
ants2 = ants[6:12]
ants3 = ants[12:]
# All blts where ant_1 is in list
ind1 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] in ants1]
ind2 = [i for i in range(uv2.Nblts) if uv2.ant_1_array[i] in ants2]
ind3 = [i for i in range(uv3.Nblts) if uv3.ant_1_array[i] in ants3]
uv1.select(blt_inds=ind1)
uv2.select(blt_inds=ind2)
uv3.select(blt_inds=ind3)
uv3.data_array = uv3.data_array[-1::-1, :, :, :]
uv3.nsample_array = uv3.nsample_array[-1::-1, :, :, :]
uv3.flag_array = uv3.flag_array[-1::-1, :, :, :]
uv3.uvw_array = uv3.uvw_array[-1::-1, :]
uv3.time_array = uv3.time_array[-1::-1]
uv3.lst_array = uv3.lst_array[-1::-1]
uv3.ant_1_array = uv3.ant_1_array[-1::-1]
uv3.ant_2_array = uv3.ant_2_array[-1::-1]
uv3.baseline_array = uv3.baseline_array[-1::-1]
uv1 += uv3
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific baseline-times using pyuvdata. '
'Combined data along baseline-time axis '
'using pyuvdata. Combined data along '
'baseline-time axis using pyuvdata.',
uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add multiple axes
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2],
polarizations=uv1.polarization_array[0:2])
uv2.select(times=times[len(times) / 2:],
polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times, polarizations using '
'pyuvdata. Combined data along '
'baseline-time, polarization axis '
'using pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) / 2]])
blt_ind2 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) / 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.nsample_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.flag_array[blt_ind1, :, :, 2:] = True
uv_ref.data_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.nsample_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.flag_array[blt_ind2, :, :, 0:2] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Another combo
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2], freq_chans=np.arange(0, 32))
uv2.select(times=times[len(times) / 2:], freq_chans=np.arange(32, 64))
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times, frequencies using '
'pyuvdata. Combined data along '
'baseline-time, frequency axis using '
'pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) / 2]])
blt_ind2 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) / 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.nsample_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.flag_array[blt_ind1, :, 32:, :] = True
uv_ref.data_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.nsample_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.flag_array[blt_ind2, :, 0:32, :] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Add without inplace
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2])
uv2.select(times=times[len(times) / 2:])
uv1 = uv1 + uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Check warnings
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(33, 64))
uvtest.checkWarnings(uv1.__add__, [uv2],
message='Combined frequencies are not evenly spaced')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=[0])
uv2.select(freq_chans=[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined frequencies are not contiguous')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined polarizations are not evenly spaced')
# Combining histories
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv2.history += ' testing the history. AIPS WTSCAL = 1.0'
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization '
'axis using pyuvdata. testing the history.',
uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
def test_add_drift():
uv_full = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_full.read_uvfits, [testfile],
message='Telescope EVLA is not')
uv_full.unphase_to_drift()
# Add frequencies
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(32, 64))
uv1 += uv2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add polarizations
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add times
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2])
uv2.select(times=times[len(times) / 2:])
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add baselines
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
ant_list = range(15) # Roughly half the antennas in the data
# All blts where ant_1 is in list
ind1 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] in ant_list]
ind2 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] not in ant_list]
uv1.select(blt_inds=ind1)
uv2.select(blt_inds=ind2)
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific baseline-times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add multiple axes
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2],
polarizations=uv1.polarization_array[0:2])
uv2.select(times=times[len(times) / 2:],
polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times, polarizations using '
'pyuvdata. Combined data along '
'baseline-time, polarization '
'axis using pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) / 2]])
blt_ind2 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) / 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.nsample_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.flag_array[blt_ind1, :, :, 2:] = True
uv_ref.data_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.nsample_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.flag_array[blt_ind2, :, :, 0:2] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Another combo
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2], freq_chans=np.arange(0, 32))
uv2.select(times=times[len(times) / 2:], freq_chans=np.arange(32, 64))
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times, frequencies using '
'pyuvdata. Combined data along '
'baseline-time, frequency '
'axis using pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) / 2]])
blt_ind2 = np.array([ind for ind in xrange(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) / 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.nsample_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.flag_array[blt_ind1, :, 32:, :] = True
uv_ref.data_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.nsample_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.flag_array[blt_ind2, :, 0:32, :] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Add without inplace
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) / 2])
uv2.select(times=times[len(times) / 2:])
uv1 = uv1 + uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Check warnings
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(33, 64))
uvtest.checkWarnings(uv1.__add__, [uv2],
message='Combined frequencies are not evenly spaced')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=[0])
uv2.select(freq_chans=[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined frequencies are not contiguous')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined polarizations are not evenly spaced')
# Combining histories
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv2.history += ' testing the history. AIPS WTSCAL = 1.0'
uv1 += uv2
nt.assert_true(uvutils.check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization '
'axis using pyuvdata. testing the history.',
uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
def test_break_add():
# Test failure modes of add function
uv_full = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_full.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Wrong class
uv1 = copy.deepcopy(uv_full)
nt.assert_raises(ValueError, uv1.__iadd__, np.zeros(5))
# One phased, one not
uv2 = copy.deepcopy(uv_full)
uv2.unphase_to_drift()
nt.assert_raises(ValueError, uv1.__iadd__, uv2)
# Different units
uv2 = copy.deepcopy(uv_full)
uv2.vis_units = "Jy"
nt.assert_raises(ValueError, uv1.__iadd__, uv2)
# Overlapping data
uv2 = copy.deepcopy(uv_full)
nt.assert_raises(ValueError, uv1.__iadd__, uv2)
def test_key2inds():
# Test function to interpret key as antpair, pol
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
ind1, ind2, indp = uv._key2inds((ant1, ant2, pol))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal([0], indp))
# Any of these inputs can also be a tuple of a tuple, so need to be checked twice.
ind1, ind2, indp = uv._key2inds(((ant1, ant2, pol)))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal([0], indp))
# Combo with pol as string
ind1, ind2, indp = uv._key2inds((ant1, ant2, uvutils.polnum2str(pol)))
nt.assert_true(np.array_equal([0], indp))
ind1, ind2, indp = uv._key2inds(((ant1, ant2, uvutils.polnum2str(pol))))
nt.assert_true(np.array_equal([0], indp))
# Check conjugation
ind1, ind2, indp = uv._key2inds((ant2, ant1, pol))
nt.assert_true(np.array_equal(bltind, ind2))
nt.assert_true(np.array_equal(np.array([]), ind1))
nt.assert_true(np.array_equal([0], indp))
# Antpair only
ind1, ind2, indp = uv._key2inds((ant1, ant2))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp))
ind1, ind2, indp = uv._key2inds(((ant1, ant2)))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp))
# Baseline number only
ind1, ind2, indp = uv._key2inds(uv.antnums_to_baseline(ant1, ant2))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp))
ind1, ind2, indp = uv._key2inds((uv.antnums_to_baseline(ant1, ant2)))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp))
# Pol number only
ind1, ind2, indp = uv._key2inds(pol)
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([0]), indp))
ind1, ind2, indp = uv._key2inds((pol))
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([0]), indp))
# Pol string only
ind1, ind2, indp = uv._key2inds('LL')
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([1]), indp))
ind1, ind2, indp = uv._key2inds(('LL'))
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([1]), indp))
# Test invalid keys
nt.assert_raises(KeyError, uv._key2inds, 'I') # pol str not in data
nt.assert_raises(KeyError, uv._key2inds, -8) # pol num not in data
nt.assert_raises(KeyError, uv._key2inds, 6) # bl num not in data
nt.assert_raises(KeyError, uv._key2inds, (1, 1)) # ant pair not in data
nt.assert_raises(KeyError, uv._key2inds, (1, 1, 'rr')) # ant pair not in data
nt.assert_raises(KeyError, uv._key2inds, (0, 1, 'xx')) # pol not in data
# Test autos are handled correctly
uv.ant_2_array[0] = uv.ant_1_array[0]
ind1, ind2, indp = uv._key2inds((ant1, ant1, pol))
nt.assert_true(np.array_equal(ind1, [0]))
nt.assert_true(np.array_equal(ind2, []))
def test_smart_slicing():
# Test function to slice data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# ind1 reg, ind2 empty, pol reg
ind1 = 10 * np.arange(9)
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a view was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 5.43
nt.assert_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# force copy
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp, force_copy=True)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_true(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 4.3
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 reg, ind2 empty, pol not reg
ind1 = 10 * np.arange(9)
ind2 = []
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 1.2
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 not reg, ind2 empty, pol reg
ind1 = [0, 4, 5]
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 8.2
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 not reg, ind2 empty, pol not reg
ind1 = [0, 4, 5]
ind2 = []
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 3.4
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 empty, ind2 reg, pol reg
# Note conjugation test ensures the result is a copy, not a view.
ind1 = []
ind2 = 10 * np.arange(9)
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1 empty, ind2 reg, pol not reg
ind1 = []
ind2 = 10 * np.arange(9)
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1 empty, ind2 not reg, pol reg
ind1 = []
ind2 = [1, 4, 5, 10]
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1 empty, ind2 not reg, pol not reg
ind1 = []
ind2 = [1, 4, 5, 10]
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1, ind2 not empty, pol reg
ind1 = np.arange(20)
ind2 = np.arange(30, 40)
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = np.append(uv.data_array[ind1, :, :, :],
np.conj(uv.data_array[ind2, :, :, :]), axis=0)
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
# ind1, ind2 not empty, pol not reg
ind1 = np.arange(20)
ind2 = np.arange(30, 40)
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = np.append(uv.data_array[ind1, :, :, :],
np.conj(uv.data_array[ind2, :, :, :]), axis=0)
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
# test single element
ind1 = [45]
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp], axis=1)
nt.assert_true(np.all(d == dcheck))
# test single element
ind1 = []
ind2 = [45]
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp)
nt.assert_true(np.all(d == np.conj(dcheck)))
# Full squeeze
ind1 = [45]
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, indp, squeeze='full')
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
def test_get_data():
# Test get_data function for easy access to data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = np.squeeze(uv.data_array[bltind, :, :, 0])
d = uv.get_data(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_data(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == np.conj(d)))
# Antpair only
dcheck = np.squeeze(uv.data_array[bltind, :, :, :])
d = uv.get_data(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
dcheck = np.squeeze(uv.data_array[:, :, :, 0])
d = uv.get_data(pol)
nt.assert_true(np.all(dcheck == d))
def test_get_flags():
# Test function for easy access to flags
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = np.squeeze(uv.flag_array[bltind, :, :, 0])
d = uv.get_flags(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_flags(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == d))
# Antpair only
dcheck = np.squeeze(uv.flag_array[bltind, :, :, :])
d = uv.get_flags(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
dcheck = np.squeeze(uv.flag_array[:, :, :, 0])
d = uv.get_flags(pol)
nt.assert_true(np.all(dcheck == d))
def test_get_nsamples():
# Test function for easy access to nsample array
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = np.squeeze(uv.nsample_array[bltind, :, :, 0])
d = uv.get_nsamples(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_nsamples(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == d))
# Antpair only
dcheck = np.squeeze(uv.nsample_array[bltind, :, :, :])
d = uv.get_nsamples(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
dcheck = np.squeeze(uv.nsample_array[:, :, :, 0])
d = uv.get_nsamples(pol)
nt.assert_true(np.all(dcheck == d))
def test_get_times():
# Test function for easy access to times, to work in conjunction with get_data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo (pol shouldn't actually effect result)
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = uv.time_array[bltind]
d = uv.get_times(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_times(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == d))
# Antpair only
d = uv.get_times(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
d = uv.get_times(pol)
nt.assert_true(np.all(d == uv.time_array))
def test_antpairpol_iter():
# Test generator
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
pol_dict = {uvutils.polnum2str(uv.polarization_array[i]): i for i in range(uv.Npols)}
keys = []
pols = set()
bls = set()
for key, d in uv.antpairpol_iter():
keys += key
bl = uv.antnums_to_baseline(key[0], key[1])
blind = np.where(uv.baseline_array == bl)[0]
bls.add(bl)
pols.add(key[2])
dcheck = np.squeeze(uv.data_array[blind, :, :, pol_dict[key[2]]])
nt.assert_true(np.all(dcheck == d))
nt.assert_equal(len(bls), len(uv.get_baseline_nums()))
nt.assert_equal(len(pols), uv.Npols)
def test_get_ants():
# Test function to get unique antennas in data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
ants = uv.get_ants()
for ant in ants:
nt.assert_true((ant in uv.ant_1_array) or (ant in uv.ant_2_array))
for ant in uv.ant_1_array:
nt.assert_true(ant in ants)
for ant in uv.ant_2_array:
nt.assert_true(ant in ants)
def test_get_pols():
# Test function to get unique polarizations in string format
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
pols = uv.get_pols()
pols_data = ['RR', 'LL', 'LR', 'RL']
nt.assert_items_equal(pols, pols_data)
def test_get_feedpols():
# Test function to get unique antenna feed polarizations in data. String format.
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
pols = uv.get_feedpols()
pols_data = ['R', 'L']
nt.assert_items_equal(pols, pols_data)
# Test break when stokes visibilities are present
uv.polarization_array[0] = 1 # Stokes I
nt.assert_raises(ValueError, uv.get_feedpols)