https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 65953f8e68a84a821c75ff1ec8fe3bcc512573fc authored by Nicholas Kern on 24 July 2018, 02:16:45 UTC
check phsae_type == drift in miriad_to_uvfits.py before phasing
check phsae_type == drift in miriad_to_uvfits.py before phasing
Tip revision: 65953f8
test_uvdata.py
# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for uvdata object.
"""
from __future__ import absolute_import, division, print_function
import nose.tools as nt
import os
import numpy as np
import copy
import six
from astropy.time import Time
from astropy.coordinates import Angle
from pyuvdata import UVData
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
if six.PY2:
nt.assert_count_equal = nt.assert_items_equal
class TestUVDataInit(object):
def setUp(self):
"""Setup for basic parameter, property and iterator tests."""
self.required_parameters = ['_data_array', '_nsample_array',
'_flag_array', '_Ntimes', '_Nbls',
'_Nblts', '_Nfreqs', '_Npols', '_Nspws',
'_uvw_array', '_time_array', '_ant_1_array',
'_ant_2_array', '_lst_array',
'_baseline_array', '_freq_array',
'_polarization_array', '_spw_array',
'_integration_time', '_channel_width',
'_object_name', '_telescope_name',
'_instrument', '_telescope_location',
'_history', '_vis_units', '_Nants_data',
'_Nants_telescope', '_antenna_names',
'_antenna_numbers', '_phase_type']
self.required_properties = ['data_array', 'nsample_array',
'flag_array', 'Ntimes', 'Nbls',
'Nblts', 'Nfreqs', 'Npols', 'Nspws',
'uvw_array', 'time_array', 'ant_1_array',
'ant_2_array', 'lst_array',
'baseline_array', 'freq_array',
'polarization_array', 'spw_array',
'integration_time', 'channel_width',
'object_name', 'telescope_name',
'instrument', 'telescope_location',
'history', 'vis_units', 'Nants_data',
'Nants_telescope', 'antenna_names',
'antenna_numbers', 'phase_type']
self.extra_parameters = ['_extra_keywords', '_antenna_positions',
'_x_orientation', '_antenna_diameters',
'_gst0', '_rdate', '_earth_omega', '_dut1',
'_timesys', '_uvplane_reference_time',
'_phase_center_ra', '_phase_center_dec',
'_phase_center_epoch', '_phase_center_frame']
self.extra_properties = ['extra_keywords', 'antenna_positions',
'x_orientation', 'antenna_diameters', 'gst0',
'rdate', 'earth_omega', 'dut1', 'timesys',
'uvplane_reference_time',
'phase_center_ra', 'phase_center_dec',
'phase_center_epoch', 'phase_center_frame']
self.other_properties = ['telescope_location_lat_lon_alt',
'telescope_location_lat_lon_alt_degrees',
'phase_center_ra_degrees', 'phase_center_dec_degrees',
'pyuvdata_version_str']
self.uv_object = UVData()
def teardown(self):
"""Test teardown: delete object."""
del(self.uv_object)
def test_order_pols(self):
test_uv1 = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(test_uv1.read_uvfits, [testfile],
message='Telescope EVLA is not')
test_uv1.order_pols(order='AIPS')
# check that we have aips ordering
aips_pols = np.array([-1, -2, -3, -4]).astype(int)
nt.assert_true(np.all(test_uv1.polarization_array == aips_pols))
test_uv2 = copy.deepcopy(test_uv1)
test_uv2.order_pols(order='CASA')
casa_pols = np.array([-1, -3, -4, -2]).astype(int)
nt.assert_true(np.all(test_uv2.polarization_array == casa_pols))
order = np.array([0, 2, 3, 1])
nt.assert_true(np.all(test_uv2.data_array == test_uv1.data_array[:, :, :, order]))
nt.assert_true(np.all(test_uv2.flag_array == test_uv1.flag_array[:, :, :, order]))
# check that we have casa ordering
test_uv2.order_pols(order='AIPS')
# check that we have aips ordering again
nt.assert_equal(test_uv1, test_uv2)
uvtest.checkWarnings(test_uv2.order_pols, ['unknown'], message='Invalid order supplied')
del(test_uv1)
del(test_uv2)
def test_parameter_iter(self):
"Test expected parameters."
all = []
for prop in self.uv_object:
all.append(prop)
for a in self.required_parameters + self.extra_parameters:
nt.assert_true(a in all, msg='expected attribute ' + a
+ ' not returned in object iterator')
def test_required_parameter_iter(self):
"Test expected required parameters."
required = []
for prop in self.uv_object.required():
required.append(prop)
for a in self.required_parameters:
nt.assert_true(a in required, msg='expected attribute ' + a
+ ' not returned in required iterator')
def test_extra_parameter_iter(self):
"Test expected optional parameters."
extra = []
for prop in self.uv_object.extra():
extra.append(prop)
for a in self.extra_parameters:
nt.assert_true(a in extra, msg='expected attribute ' + a
+ ' not returned in extra iterator')
def test_unexpected_parameters(self):
"Test for extra parameters."
expected_parameters = self.required_parameters + self.extra_parameters
attributes = [i for i in self.uv_object.__dict__.keys() if i[0] == '_']
for a in attributes:
nt.assert_true(a in expected_parameters,
msg='unexpected parameter ' + a + ' found in UVData')
def test_unexpected_attributes(self):
"Test for extra attributes."
expected_attributes = self.required_properties + \
self.extra_properties + self.other_properties
attributes = [i for i in self.uv_object.__dict__.keys() if i[0] != '_']
for a in attributes:
nt.assert_true(a in expected_attributes,
msg='unexpected attribute ' + a + ' found in UVData')
def test_properties(self):
"Test that properties can be get and set properly."
prop_dict = dict(list(zip(self.required_properties + self.extra_properties,
self.required_parameters + self.extra_parameters)))
for k, v in prop_dict.items():
rand_num = np.random.rand()
setattr(self.uv_object, k, rand_num)
this_param = getattr(self.uv_object, v)
try:
nt.assert_equal(rand_num, this_param.value)
except(AssertionError):
print('setting {prop_name} to a random number failed'.format(prop_name=k))
raise(AssertionError)
class TestUVDataBasicMethods(object):
def setUp(self):
"""Setup for tests of basic methods."""
self.uv_object = UVData()
self.testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(self.uv_object.read_uvfits, [self.testfile],
message='Telescope EVLA is not')
self.uv_object2 = copy.deepcopy(self.uv_object)
def teardown(self):
"""Test teardown: delete objects."""
del(self.uv_object)
del(self.uv_object2)
def test_equality(self):
"""Basic equality test."""
nt.assert_equal(self.uv_object, self.uv_object)
def test_check(self):
"""Test simple check function."""
nt.assert_true(self.uv_object.check())
# Check variety of special cases
self.uv_object.Nants_data += 1
nt.assert_raises(ValueError, self.uv_object.check)
self.uv_object.Nants_data -= 1
self.uv_object.Nbls += 1
nt.assert_raises(ValueError, self.uv_object.check)
self.uv_object.Nbls -= 1
self.uv_object.Ntimes += 1
nt.assert_raises(ValueError, self.uv_object.check)
self.uv_object.Ntimes -= 1
# Check case where all data is autocorrelations
# Currently only test files that have autos are fhd files
testdir = os.path.join(DATA_PATH, 'fhd_vis_data/')
file_list = [testdir + '1061316296_flags.sav',
testdir + '1061316296_vis_XX.sav',
testdir + '1061316296_params.sav',
testdir + '1061316296_settings.txt']
if not uvtest.scipy_warnings:
self.uv_object.read_fhd(file_list)
else:
# numpy 1.14 introduced a new deprecation warning
n_scipy_warnings, scipy_warn_list, scipy_category_list = uvtest.get_scipy_warnings(n_scipy_warnings=550)
uvtest.checkWarnings(self.uv_object.read_fhd, [file_list],
message=scipy_warn_list, category=scipy_category_list,
nwarnings=n_scipy_warnings)
self.uv_object.select(blt_inds=np.where(self.uv_object.ant_1_array
== self.uv_object.ant_2_array)[0])
nt.assert_true(self.uv_object.check())
# test auto and cross corr uvw_array
uvd = UVData()
uvd.read_miriad(os.path.join(DATA_PATH, "zen.2457698.40355.xx.HH.uvcA"))
autos = np.isclose(uvd.ant_1_array - uvd.ant_2_array, 0.0)
auto_inds = np.where(autos)[0]
cross_inds = np.where(~autos)[0]
# make auto have non-zero uvw coords, assert ValueError
uvd.uvw_array[auto_inds[0], 0] = 0.1
nt.assert_raises(ValueError, uvd.check)
# make cross have |uvw| zero, assert ValueError
uvd.read_miriad(os.path.join(DATA_PATH, "zen.2457698.40355.xx.HH.uvcA"))
uvd.uvw_array[cross_inds[0]][:] = 0.0
nt.assert_raises(ValueError, uvd.check)
def test_nants_data_telescope(self):
self.uv_object.Nants_data = self.uv_object.Nants_telescope - 1
nt.assert_true(self.uv_object.check)
self.uv_object.Nants_data = self.uv_object.Nants_telescope + 1
nt.assert_raises(ValueError, self.uv_object.check)
def test_converttofiletype(self):
fhd_obj = self.uv_object._convert_to_filetype('fhd')
self.uv_object._convert_from_filetype(fhd_obj)
nt.assert_equal(self.uv_object, self.uv_object2)
nt.assert_raises(
ValueError, self.uv_object._convert_to_filetype, 'foo')
class TestBaselineAntnumMethods(object):
"""Setup for tests on antnum, baseline conversion."""
def setup(self):
self.uv_object = UVData()
self.uv_object.Nants_telescope = 128
self.uv_object2 = UVData()
self.uv_object2.Nants_telescope = 2049
def teardown(self):
"""Test teardown: delete objects."""
del(self.uv_object)
del(self.uv_object2)
def test_baseline_to_antnums(self):
"""Test baseline to antnum conversion for 256 & larger conventions."""
nt.assert_equal(self.uv_object.baseline_to_antnums(67585), (0, 0))
nt.assert_raises(
Exception, self.uv_object2.baseline_to_antnums, 67585)
ant_pairs = [(10, 20), (280, 310)]
for pair in ant_pairs:
if np.max(np.array(pair)) < 255:
bl = self.uv_object.antnums_to_baseline(
pair[0], pair[1], attempt256=True)
ant_pair_out = self.uv_object.baseline_to_antnums(bl)
nt.assert_equal(pair, ant_pair_out)
bl = self.uv_object.antnums_to_baseline(
pair[0], pair[1], attempt256=False)
ant_pair_out = self.uv_object.baseline_to_antnums(bl)
nt.assert_equal(pair, ant_pair_out)
def test_antnums_to_baselines(self):
"""Test antums to baseline conversion for 256 & larger conventions."""
nt.assert_equal(self.uv_object.antnums_to_baseline(0, 0), 67585)
nt.assert_equal(self.uv_object.antnums_to_baseline(257, 256), 594177)
nt.assert_equal(self.uv_object.baseline_to_antnums(594177), (257, 256))
# Check attempt256
nt.assert_equal(self.uv_object.antnums_to_baseline(
0, 0, attempt256=True), 257)
nt.assert_equal(self.uv_object.antnums_to_baseline(257, 256), 594177)
uvtest.checkWarnings(self.uv_object.antnums_to_baseline, [257, 256],
{'attempt256': True}, message='found > 256 antennas')
nt.assert_raises(
Exception, self.uv_object2.antnums_to_baseline, 0, 0)
# check a len-1 array returns as an array
ant1 = np.array([1])
ant2 = np.array([2])
nt.assert_true(isinstance(self.uv_object.antnums_to_baseline(ant1, ant2), np.ndarray))
def test_known_telescopes():
"""Test known_telescopes method returns expected results."""
uv_object = UVData()
known_telescopes = ['PAPER', 'HERA', 'MWA']
nt.assert_equal(known_telescopes.sort(),
uv_object.known_telescopes().sort())
def test_HERA_diameters():
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
uv_in = UVData()
uvtest.checkWarnings(uv_in.read_miriad, [miriad_file],
known_warning='miriad')
uv_in.telescope_name = 'HERA'
uvtest.checkWarnings(uv_in.set_telescope_params, message='antenna_diameters '
'is not set. Using known values for HERA.')
nt.assert_equal(uv_in.telescope_name, 'HERA')
nt.assert_true(uv_in.antenna_diameters is not None)
uv_in.check()
def test_generic_read():
uv_in = UVData()
uvfits_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_in.read, [uvfits_file], {'read_data': False},
message='Telescope EVLA is not')
unique_times = np.unique(uv_in.time_array)
nt.assert_raises(ValueError, uv_in.read, uvfits_file, times=unique_times[0:2],
time_range=[unique_times[0], unique_times[1]])
nt.assert_raises(ValueError, uv_in.read, uvfits_file,
antenna_nums=uv_in.antenna_numbers[0],
antenna_names=uv_in.antenna_names[1])
nt.assert_raises(ValueError, uv_in.read, 'foo')
def test_phase_unphaseHERA():
"""
Read in drift data, phase to an RA/DEC, unphase and check for object equality.
"""
testfile = os.path.join(DATA_PATH, 'hera_testfile')
UV_raw = UVData()
# Note the RA/DEC values in the raw file were calculated from the lat/long
# in the file, which don't agree with our known_telescopes.
# So for this test we use the lat/lon in the file.
uvtest.checkWarnings(UV_raw.read_miriad, [testfile], {'correct_lat_lon': False},
message='Altitude is not present in file and latitude and '
'longitude values do not match')
UV_phase = UVData()
uvtest.checkWarnings(UV_phase.read_miriad, [testfile], {'correct_lat_lon': False},
message='Altitude is not present in file and '
'latitude and longitude values do not match')
UV_phase.phase(0., 0., epoch="J2000")
UV_phase.unphase_to_drift()
# check that phase + unphase gets back to raw
nt.assert_equal(UV_raw, UV_phase)
# check that phase + unphase work using gcrs
UV_phase.phase(Angle('5d').rad, Angle('30d').rad, phase_frame='gcrs')
UV_phase.unphase_to_drift()
nt.assert_equal(UV_raw, UV_phase)
# check that phase + unphase work using a different epoch
UV_phase.phase(Angle('180d').rad, Angle('90d'), epoch=Time('2010-01-01T00:00:00', format='isot', scale='utc'))
UV_phase.unphase_to_drift()
nt.assert_equal(UV_raw, UV_phase)
# check that phase + unphase work with one baseline
UV_raw_small = UV_raw.select(blt_inds=[0], inplace=False)
UV_phase_small = copy.deepcopy(UV_raw_small)
UV_phase_small.phase(Angle('23h').rad, Angle('15d').rad)
UV_phase_small.unphase_to_drift()
nt.assert_equal(UV_raw_small, UV_phase_small)
# check that they match if you phase & unphase using antenna locations
# first replace the uvws with the right values
antenna_enu = uvutils.ENU_from_ECEF((UV_raw.antenna_positions + UV_raw.telescope_location),
*UV_raw.telescope_location_lat_lon_alt)
uvw_calc = np.zeros_like(UV_raw.uvw_array)
unique_times, unique_inds = np.unique(UV_raw.time_array, return_index=True)
for ind, jd in enumerate(unique_times):
inds = np.where(UV_raw.time_array == jd)[0]
for bl_ind in inds:
ant1_index = np.where(UV_raw.antenna_numbers == UV_raw.ant_1_array[bl_ind])[0][0]
ant2_index = np.where(UV_raw.antenna_numbers == UV_raw.ant_2_array[bl_ind])[0][0]
uvw_calc[bl_ind, :] = antenna_enu[ant2_index, :] - antenna_enu[ant1_index, :]
UV_raw_new = copy.deepcopy(UV_raw)
UV_raw_new.uvw_array = uvw_calc
UV_phase.phase(0., 0., epoch="J2000", use_ant_pos=True)
UV_phase2 = copy.deepcopy(UV_raw_new)
UV_phase2.phase(0., 0., epoch="J2000")
# The uvw's only agree to ~1mm. should they be better?
nt.assert_true(np.allclose(UV_phase2.uvw_array, UV_phase.uvw_array, atol=1e-3))
# the data array are just multiplied by the w's for phasing, so a difference
# at the 1e-3 level makes the data array different at that level too.
# -> change the tolerance on data_array for this test
UV_phase2._data_array.tols = (0, 1e-3)
nt.assert_equal(UV_phase2, UV_phase)
# check that phase + unphase gets back to raw using antpos
UV_phase.unphase_to_drift(use_ant_pos=True)
nt.assert_equal(UV_raw_new, UV_phase)
# check that phasing to zenith with one timestamp has small changes
# (it won't be identical because of precession/nutation changing the coordinate axes)
# use gcrs rather than icrs to reduce differences (don't include abberation)
UV_raw_small = UV_raw.select(times=UV_raw.time_array[0], inplace=False)
UV_phase_simple_small = copy.deepcopy(UV_raw_small)
UV_phase_simple_small.phase_to_time(time=Time(UV_raw.time_array[0], format='jd'),
phase_frame='gcrs')
# it's unclear to me how close this should be...
nt.assert_true(np.allclose(UV_phase_simple_small.uvw_array, UV_raw_small.uvw_array, atol=1e-2))
# check error if not passing a Time object to phase_to_time
nt.assert_raises(TypeError, UV_raw.phase_to_time, UV_raw.time_array[0])
# check errors when trying to unphase drift or unknown data
nt.assert_raises(ValueError, UV_raw.unphase_to_drift)
UV_raw.set_unknown_phase_type()
nt.assert_raises(ValueError, UV_raw.unphase_to_drift)
UV_raw.set_drift()
# check errors when trying to phase phased or unknown data
UV_phase.phase(0., 0., epoch="J2000")
nt.assert_raises(ValueError, UV_phase.phase, 0., 0., epoch="J2000")
nt.assert_raises(ValueError, UV_phase.phase_to_time,
UV_phase.time_array[0])
UV_phase.set_unknown_phase_type()
nt.assert_raises(ValueError, UV_phase.phase, 0., 0., epoch="J2000")
nt.assert_raises(ValueError, UV_phase.phase_to_time,
UV_phase.time_array[0])
# check errors when trying to phase to an unsupported frame
UV_phase = copy.deepcopy(UV_raw)
nt.assert_raises(ValueError, UV_phase.phase, 0., 0., epoch="J2000", phase_frame='cirs')
del(UV_phase)
del(UV_raw)
def test_phasing():
""" Use MWA files phased to 2 different places to test phasing. """
file1 = os.path.join(DATA_PATH, '1133866760.uvfits')
file2 = os.path.join(DATA_PATH, '1133866760_rephase.uvfits')
uvd1 = UVData()
uvd2 = UVData()
uvd1.read_uvfits(file1)
uvd2.read_uvfits(file2)
uvd1_drift = copy.deepcopy(uvd1)
uvd1_drift.unphase_to_drift(phase_frame='gcrs')
uvd1_drift_antpos = copy.deepcopy(uvd1)
uvd1_drift_antpos.unphase_to_drift(phase_frame='gcrs', use_ant_pos=True)
uvd2_drift = copy.deepcopy(uvd2)
uvd2_drift.unphase_to_drift(phase_frame='gcrs')
uvd2_drift_antpos = copy.deepcopy(uvd2)
uvd2_drift_antpos.unphase_to_drift(phase_frame='gcrs', use_ant_pos=True)
# the tolerances here are empirical -- based on what was seen in the external
# phasing test. See the phasing memo in docs/references for details
nt.assert_true(np.allclose(uvd1_drift.uvw_array, uvd2_drift.uvw_array, atol=2e-2))
nt.assert_true(np.allclose(uvd1_drift_antpos.uvw_array, uvd2_drift_antpos.uvw_array))
uvd2_rephase = copy.deepcopy(uvd2_drift)
uvd2_rephase.phase(uvd1.phase_center_ra,
uvd1.phase_center_dec,
uvd1.phase_center_epoch,
phase_frame='gcrs')
uvd2_rephase_antpos = copy.deepcopy(uvd2_drift_antpos)
uvd2_rephase_antpos.phase(uvd1.phase_center_ra,
uvd1.phase_center_dec,
uvd1.phase_center_epoch,
phase_frame='gcrs',
use_ant_pos=True)
# the tolerances here are empirical -- based on what was seen in the external
# phasing test. See the phasing memo in docs/references for details
nt.assert_true(np.allclose(uvd1.uvw_array, uvd2_rephase.uvw_array, atol=2e-2))
nt.assert_true(np.allclose(uvd1.uvw_array, uvd2_rephase_antpos.uvw_array, atol=5e-3))
# rephase the drift objects to the original pointing and verify that they match
uvd1_drift.phase(uvd1.phase_center_ra, uvd1.phase_center_dec,
uvd1.phase_center_epoch, phase_frame='gcrs')
uvd1_drift_antpos.phase(uvd1.phase_center_ra, uvd1.phase_center_dec,
uvd1.phase_center_epoch, phase_frame='gcrs',
use_ant_pos=True)
# the tolerances here are empirical -- caused by one unphase/phase cycle.
# the antpos-based phasing differences are based on what was seen in the external
# phasing test. See the phasing memo in docs/references for details
nt.assert_true(np.allclose(uvd1.uvw_array, uvd1_drift.uvw_array, atol=1e-4))
nt.assert_true(np.allclose(uvd1.uvw_array, uvd1_drift_antpos.uvw_array, atol=5e-3))
uvd2_drift.phase(uvd2.phase_center_ra, uvd2.phase_center_dec,
uvd2.phase_center_epoch, phase_frame='gcrs')
uvd2_drift_antpos.phase(uvd2.phase_center_ra, uvd2.phase_center_dec,
uvd2.phase_center_epoch, phase_frame='gcrs',
use_ant_pos=True)
# the tolerances here are empirical -- caused by one unphase/phase cycle.
# the antpos-based phasing differences are based on what was seen in the external
# phasing test. See the phasing memo in docs/references for details
nt.assert_true(np.allclose(uvd2.uvw_array, uvd2_drift.uvw_array, atol=1e-4))
nt.assert_true(np.allclose(uvd2.uvw_array, uvd2_drift_antpos.uvw_array, atol=2e-2))
def test_set_phase_unknown():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [
testfile], message='Telescope EVLA is not')
uv_object.set_unknown_phase_type()
nt.assert_equal(uv_object.phase_type, 'unknown')
nt.assert_false(uv_object._phase_center_epoch.required)
nt.assert_false(uv_object._phase_center_ra.required)
nt.assert_false(uv_object._phase_center_dec.required)
nt.assert_true(uv_object.check())
def test_select_blts():
uv_object = UVData()
testfile = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
uvtest.checkWarnings(uv_object.read_miriad, [testfile],
known_warning='miriad')
old_history = uv_object.history
blt_inds = np.array([172, 182, 132, 227, 144, 44, 16, 104, 385, 134, 326, 140, 116,
218, 178, 391, 111, 276, 274, 308, 38, 64, 317, 76, 239, 246,
34, 39, 83, 184, 208, 60, 374, 295, 118, 337, 261, 21, 375,
396, 355, 187, 95, 122, 186, 113, 260, 264, 156, 13, 228, 291,
302, 72, 137, 216, 299, 341, 207, 256, 223, 250, 268, 147, 73,
32, 142, 383, 221, 203, 258, 286, 324, 265, 170, 236, 8, 275,
304, 117, 29, 167, 15, 388, 171, 82, 322, 248, 160, 85, 66,
46, 272, 328, 323, 152, 200, 119, 359, 23, 363, 56, 219, 257,
11, 307, 336, 289, 136, 98, 37, 163, 158, 80, 125, 40, 298,
75, 320, 74, 57, 346, 121, 129, 332, 238, 93, 18, 330, 339,
381, 234, 176, 22, 379, 199, 266, 100, 90, 292, 205, 58, 222,
350, 109, 273, 191, 368, 88, 101, 65, 155, 2, 296, 306, 398,
369, 378, 254, 67, 249, 102, 348, 392, 20, 28, 169, 262, 269,
287, 86, 300, 143, 177, 42, 290, 284, 123, 189, 175, 97, 340,
242, 342, 331, 282, 235, 344, 63, 115, 78, 30, 226, 157, 133,
71, 35, 212, 333])
selected_data = uv_object.data_array[np.sort(blt_inds), :, :, :]
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(blt_inds=blt_inds)
nt.assert_equal(len(blt_inds), uv_object2.Nblts)
# verify that histories are different
nt.assert_false(uvutils._check_histories(old_history, uv_object2.history))
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific baseline-times using pyuvdata.',
uv_object2.history))
nt.assert_true(np.all(selected_data == uv_object2.data_array))
# check that it also works with higher dimension array
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(blt_inds=blt_inds[np.newaxis, :])
nt.assert_equal(len(blt_inds), uv_object2.Nblts)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific baseline-times using pyuvdata.',
uv_object2.history))
nt.assert_true(np.all(selected_data == uv_object2.data_array))
# check for errors associated with out of bounds indices
nt.assert_raises(ValueError, uv_object.select, blt_inds=np.arange(-10, -5))
nt.assert_raises(ValueError, uv_object.select, blt_inds=np.arange(
uv_object.Nblts + 1, uv_object.Nblts + 10))
def test_select_antennas():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
unique_ants = np.unique(
uv_object.ant_1_array.tolist() + uv_object.ant_2_array.tolist())
ants_to_keep = np.array([0, 19, 11, 24, 3, 23, 1, 20, 21])
blts_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(antenna_nums=ants_to_keep)
nt.assert_equal(len(ants_to_keep), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in ants_to_keep:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in ants_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
uv_object2.history))
# check that it also works with higher dimension array
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(antenna_nums=ants_to_keep[np.newaxis, :])
nt.assert_equal(len(ants_to_keep), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in ants_to_keep:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in ants_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
uv_object2.history))
# now test using antenna_names to specify antennas to keep
uv_object3 = copy.deepcopy(uv_object)
ants_to_keep = np.array(sorted(list(ants_to_keep)))
ant_names = []
for a in ants_to_keep:
ind = np.where(uv_object3.antenna_numbers == a)[0][0]
ant_names.append(uv_object3.antenna_names[ind])
uv_object3.select(antenna_names=ant_names)
nt.assert_equal(uv_object2, uv_object3)
# check that it also works with higher dimension array
uv_object3 = copy.deepcopy(uv_object)
ants_to_keep = np.array(sorted(list(ants_to_keep)))
ant_names = []
for a in ants_to_keep:
ind = np.where(uv_object3.antenna_numbers == a)[0][0]
ant_names.append(uv_object3.antenna_names[ind])
uv_object3.select(antenna_names=[ant_names])
nt.assert_equal(uv_object2, uv_object3)
# check for errors associated with antennas not included in data, bad names or providing numbers and names
nt.assert_raises(ValueError, uv_object.select,
antenna_nums=np.max(unique_ants) + np.arange(1, 3))
nt.assert_raises(ValueError, uv_object.select, antenna_names='test1')
nt.assert_raises(ValueError, uv_object.select,
antenna_nums=ants_to_keep, antenna_names=ant_names)
def sort_bl(p):
"""Sort a tuple that starts with a pair of antennas, and may have stuff after."""
if p[1] >= p[0]:
return p
return (p[1], p[0]) + p[2:]
def test_select_bls():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
first_ants = [6, 2, 7, 2, 21, 27, 8]
second_ants = [0, 20, 8, 1, 2, 3, 22]
new_unique_ants = np.unique(first_ants + second_ants)
ant_pairs_to_keep = list(zip(first_ants, second_ants))
sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep]
sorted_pairs_object = [sort_bl(p) for p in zip(
uv_object.ant_1_array, uv_object.ant_2_array)]
blts_select = [sort_bl((a1, a2)) in sorted_pairs_to_keep for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(bls=ant_pairs_to_keep)
sorted_pairs_object2 = [sort_bl(p) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(len(new_unique_ants), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in new_unique_ants:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in new_unique_ants)
for pair in sorted_pairs_to_keep:
nt.assert_true(pair in sorted_pairs_object2)
for pair in sorted_pairs_object2:
nt.assert_true(pair in sorted_pairs_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific baselines using pyuvdata.',
uv_object2.history))
# check select with polarizations
first_ants = [6, 2, 7, 2, 21, 27, 8]
second_ants = [0, 20, 8, 1, 2, 3, 22]
pols = ['RR', 'RR', 'RR', 'RR', 'RR', 'RR', 'RR']
new_unique_ants = np.unique(first_ants + second_ants)
bls_to_keep = list(zip(first_ants, second_ants, pols))
sorted_bls_to_keep = [sort_bl(p) for p in bls_to_keep]
sorted_pairs_object = [sort_bl(p) for p in zip(
uv_object.ant_1_array, uv_object.ant_2_array)]
blts_select = [sort_bl((a1, a2, 'RR')) in sorted_bls_to_keep for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(bls=bls_to_keep)
sorted_pairs_object2 = [sort_bl(p) + ('RR',) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(len(new_unique_ants), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in new_unique_ants:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in new_unique_ants)
for bl in sorted_bls_to_keep:
nt.assert_true(bl in sorted_pairs_object2)
for bl in sorted_pairs_object2:
nt.assert_true(bl in sorted_bls_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific baselines, polarizations using pyuvdata.',
uv_object2.history))
# check that you can use numpy integers with out errors:
first_ants = list(map(np.int32, [6, 2, 7, 2, 21, 27, 8]))
second_ants = list(map(np.int32, [0, 20, 8, 1, 2, 3, 22]))
ant_pairs_to_keep = list(zip(first_ants, second_ants))
uv_object2 = uv_object.select(bls=ant_pairs_to_keep, inplace=False)
sorted_pairs_object2 = [sort_bl(p) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(len(new_unique_ants), uv_object2.Nants_data)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for ant in new_unique_ants:
nt.assert_true(
ant in uv_object2.ant_1_array or ant in uv_object2.ant_2_array)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in new_unique_ants)
for pair in sorted_pairs_to_keep:
nt.assert_true(pair in sorted_pairs_object2)
for pair in sorted_pairs_object2:
nt.assert_true(pair in sorted_pairs_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific baselines using pyuvdata.',
uv_object2.history))
# check that you can specify a single pair without errors
uv_object2.select(bls=(0, 6))
sorted_pairs_object2 = [sort_bl(p) for p in zip(
uv_object2.ant_1_array, uv_object2.ant_2_array)]
nt.assert_equal(list(set(sorted_pairs_object2)), [(0, 6)])
# check for errors associated with antenna pairs not included in data and bad inputs
nt.assert_raises(ValueError, uv_object.select,
bls=list(zip(first_ants, second_ants)) + [0, 6])
nt.assert_raises(ValueError, uv_object.select,
bls=[(uv_object.antenna_names[0], uv_object.antenna_names[1])])
nt.assert_raises(ValueError, uv_object.select, bls=(5, 1))
nt.assert_raises(ValueError, uv_object.select, bls=(0, 5))
nt.assert_raises(ValueError, uv_object.select, bls=(27, 27))
nt.assert_raises(ValueError, uv_object.select, bls=(6, 0, 'RR'), polarizations='RR')
nt.assert_raises(ValueError, uv_object.select, bls=(6, 0, 8))
def test_select_times():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
unique_times = np.unique(uv_object.time_array)
times_to_keep = unique_times[[0, 3, 5, 6, 7, 10, 14]]
Nblts_selected = np.sum([t in times_to_keep for t in uv_object.time_array])
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(times=times_to_keep)
nt.assert_equal(len(times_to_keep), uv_object2.Ntimes)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for t in times_to_keep:
nt.assert_true(t in uv_object2.time_array)
for t in np.unique(uv_object2.time_array):
nt.assert_true(t in times_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
uv_object2.history))
# check that it also works with higher dimension array
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(times=times_to_keep[np.newaxis, :])
nt.assert_equal(len(times_to_keep), uv_object2.Ntimes)
nt.assert_equal(Nblts_selected, uv_object2.Nblts)
for t in times_to_keep:
nt.assert_true(t in uv_object2.time_array)
for t in np.unique(uv_object2.time_array):
nt.assert_true(t in times_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
uv_object2.history))
# check for errors associated with times not included in data
nt.assert_raises(ValueError, uv_object.select, times=[
np.min(unique_times) - uv_object.integration_time])
def test_select_frequencies():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
freqs_to_keep = uv_object.freq_array[0, np.arange(12, 22)]
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep)
nt.assert_equal(len(freqs_to_keep), uv_object2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# check that it also works with higher dimension array
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep[np.newaxis, :])
nt.assert_equal(len(freqs_to_keep), uv_object2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# check that selecting one frequency works
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep[0])
nt.assert_equal(1, uv_object2.Nfreqs)
nt.assert_true(freqs_to_keep[0] in uv_object2.freq_array)
for f in uv_object2.freq_array:
nt.assert_true(f in [freqs_to_keep[0]])
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# check for errors associated with frequencies not included in data
nt.assert_raises(ValueError, uv_object.select, frequencies=[
np.max(uv_object.freq_array) + uv_object.channel_width])
# check for warnings and errors associated with unevenly spaced or non-contiguous frequencies
uv_object2 = copy.deepcopy(uv_object)
uvtest.checkWarnings(uv_object2.select, [], {'frequencies': uv_object2.freq_array[0, [0, 5, 6]]},
message='Selected frequencies are not evenly spaced')
write_file_uvfits = os.path.join(DATA_PATH, 'test/select_test.uvfits')
write_file_miriad = os.path.join(DATA_PATH, 'test/select_test.uv')
nt.assert_raises(ValueError, uv_object2.write_uvfits, write_file_uvfits)
nt.assert_raises(ValueError, uv_object2.write_miriad, write_file_miriad)
uv_object2 = copy.deepcopy(uv_object)
uvtest.checkWarnings(uv_object2.select, [], {'frequencies': uv_object2.freq_array[0, [0, 2, 4]]},
message='Selected frequencies are not contiguous')
nt.assert_raises(ValueError, uv_object2.write_uvfits, write_file_uvfits)
nt.assert_raises(ValueError, uv_object2.write_miriad, write_file_miriad)
def test_select_freq_chans():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
chans_to_keep = np.arange(12, 22)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(freq_chans=chans_to_keep)
nt.assert_equal(len(chans_to_keep), uv_object2.Nfreqs)
for chan in chans_to_keep:
nt.assert_true(uv_object.freq_array[0, chan] in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in uv_object.freq_array[0, chans_to_keep])
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# check that it also works with higher dimension array
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(freq_chans=chans_to_keep[np.newaxis, :])
nt.assert_equal(len(chans_to_keep), uv_object2.Nfreqs)
for chan in chans_to_keep:
nt.assert_true(uv_object.freq_array[0, chan] in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in uv_object.freq_array[0, chans_to_keep])
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uv_object2.history))
# Test selecting both channels and frequencies
freqs_to_keep = uv_object.freq_array[0, np.arange(
20, 30)] # Overlaps with chans
all_chans_to_keep = np.arange(12, 30)
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
nt.assert_equal(len(all_chans_to_keep), uv_object2.Nfreqs)
for chan in all_chans_to_keep:
nt.assert_true(uv_object.freq_array[0, chan] in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in uv_object.freq_array[0, all_chans_to_keep])
def test_select_polarizations():
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
pols_to_keep = [-1, -2]
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(polarizations=pols_to_keep)
nt.assert_equal(len(pols_to_keep), uv_object2.Npols)
for p in pols_to_keep:
nt.assert_true(p in uv_object2.polarization_array)
for p in np.unique(uv_object2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
uv_object2.history))
# check that it also works with higher dimension array
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(polarizations=[pols_to_keep])
nt.assert_equal(len(pols_to_keep), uv_object2.Npols)
for p in pols_to_keep:
nt.assert_true(p in uv_object2.polarization_array)
for p in np.unique(uv_object2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
uv_object2.history))
# check for errors associated with polarizations not included in data
nt.assert_raises(ValueError, uv_object2.select, polarizations=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
uvtest.checkWarnings(uv_object.select, [], {'polarizations': uv_object.polarization_array[[0, 1, 3]]},
message='Selected polarization values are not evenly spaced')
write_file_uvfits = os.path.join(DATA_PATH, 'test/select_test.uvfits')
nt.assert_raises(ValueError, uv_object.write_uvfits, write_file_uvfits)
def test_select():
# now test selecting along all axes at once
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
blt_inds = np.array([1057, 461, 1090, 354, 528, 654, 882, 775, 369, 906, 748,
875, 296, 773, 554, 395, 1003, 476, 762, 976, 1285, 874,
717, 383, 1281, 924, 264, 1163, 297, 857, 1258, 1000, 180,
1303, 1139, 393, 42, 135, 789, 713, 527, 1218, 576, 100,
1311, 4, 653, 724, 591, 889, 36, 1033, 113, 479, 322,
118, 898, 1263, 477, 96, 935, 238, 195, 531, 124, 198,
992, 1131, 305, 154, 961, 6, 1175, 76, 663, 82, 637,
288, 1152, 845, 1290, 379, 1225, 1240, 733, 1172, 937, 1325,
817, 416, 261, 1316, 957, 723, 215, 237, 270, 1309, 208,
17, 1028, 895, 574, 166, 784, 834, 732, 1022, 1068, 1207,
356, 474, 313, 137, 172, 181, 925, 201, 190, 1277, 1044,
1242, 702, 567, 557, 1032, 1352, 504, 545, 422, 179, 780,
280, 890, 774, 884])
unique_ants = np.unique(
uv_object.ant_1_array.tolist() + uv_object.ant_2_array.tolist())
ants_to_keep = np.array([11, 6, 20, 26, 2, 27, 3, 7, 14])
ant_pairs_to_keep = [(2, 11), (20, 26), (6, 7), (3, 27), (14, 6)]
sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep]
freqs_to_keep = uv_object.freq_array[0, np.arange(31, 39)]
unique_times = np.unique(uv_object.time_array)
times_to_keep = unique_times[[0, 2, 6, 8, 10, 13, 14]]
pols_to_keep = [-1, -3]
# Independently count blts that should be selected
blts_blt_select = [i in blt_inds for i in np.arange(uv_object.Nblts)]
blts_ant_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
blts_pair_select = [sort_bl((a1, a2)) in sorted_pairs_to_keep for (a1, a2) in
zip(uv_object.ant_1_array, uv_object.ant_2_array)]
blts_time_select = [t in times_to_keep for t in uv_object.time_array]
Nblts_select = np.sum([bi & (ai | pi) & ti for (bi, ai, pi, ti) in
zip(blts_blt_select, blts_ant_select, blts_pair_select,
blts_time_select)])
uv_object2 = copy.deepcopy(uv_object)
uv_object2.select(blt_inds=blt_inds, antenna_nums=ants_to_keep,
bls=ant_pairs_to_keep, frequencies=freqs_to_keep,
times=times_to_keep, polarizations=pols_to_keep)
nt.assert_equal(Nblts_select, uv_object2.Nblts)
for ant in np.unique(uv_object2.ant_1_array.tolist() + uv_object2.ant_2_array.tolist()):
nt.assert_true(ant in ants_to_keep)
nt.assert_equal(len(freqs_to_keep), uv_object2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in uv_object2.freq_array)
for f in np.unique(uv_object2.freq_array):
nt.assert_true(f in freqs_to_keep)
for t in np.unique(uv_object2.time_array):
nt.assert_true(t in times_to_keep)
nt.assert_equal(len(pols_to_keep), uv_object2.Npols)
for p in pols_to_keep:
nt.assert_true(p in uv_object2.polarization_array)
for p in np.unique(uv_object2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific baseline-times, antennas, '
'baselines, times, frequencies, '
'polarizations using pyuvdata.',
uv_object2.history))
# test that a ValueError is raised if the selection eliminates all blts
nt.assert_raises(ValueError, uv_object.select,
times=unique_times[0], antenna_nums=1)
def test_select_not_inplace():
# Test non-inplace select
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_object.read_uvfits, [testfile],
message='Telescope EVLA is not')
old_history = uv_object.history
uv1 = uv_object.select(freq_chans=np.arange(32), inplace=False)
uv1 += uv_object.select(freq_chans=np.arange(32, 64), inplace=False)
nt.assert_true(uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency axis '
'using pyuvdata.', uv1.history))
uv1.history = old_history
nt.assert_equal(uv1, uv_object)
def test_reorder_pols():
# Test function to fix polarization order
uv1 = UVData()
testfile = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv1.read_uvfits, [testfile], message='Telescope EVLA is not')
uv2 = copy.deepcopy(uv1)
# reorder uv2 manually
order = [1, 3, 2, 0]
uv2.polarization_array = uv2.polarization_array[order]
uv2.data_array = uv2.data_array[:, :, :, order]
uv2.nsample_array = uv2.nsample_array[:, :, :, order]
uv2.flag_array = uv2.flag_array[:, :, :, order]
uv1.reorder_pols(order=order)
nt.assert_equal(uv1, uv2)
# Restore original order
uvtest.checkWarnings(uv1.read_uvfits, [testfile], message='Telescope EVLA is not')
uv2.reorder_pols()
nt.assert_equal(uv1, uv2)
def test_add():
uv_full = UVData()
testfile = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_full.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Add frequencies
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(32, 64))
uv1 += uv2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add frequencies - out of order
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(32, 64))
uv2 += uv1
uv2.history = uv_full.history
nt.assert_equal(uv2, uv_full)
# Add polarizations
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add polarizations - out of order
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv2 += uv1
uv2.history = uv_full.history
nt.assert_equal(uv2, uv_full)
# Add times
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2])
uv2.select(times=times[len(times) // 2:])
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add baselines
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
ant_list = list(range(15)) # Roughly half the antennas in the data
# All blts where ant_1 is in list
ind1 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] in ant_list]
ind2 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] not in ant_list]
uv1.select(blt_inds=ind1)
uv2.select(blt_inds=ind2)
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific baseline-times using pyuvdata. '
'Combined data along baseline-time axis '
'using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add baselines - out of order
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv3 = copy.deepcopy(uv_full)
ants = uv_full.get_ants()
ants1 = ants[0:6]
ants2 = ants[6:12]
ants3 = ants[12:]
# All blts where ant_1 is in list
ind1 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] in ants1]
ind2 = [i for i in range(uv2.Nblts) if uv2.ant_1_array[i] in ants2]
ind3 = [i for i in range(uv3.Nblts) if uv3.ant_1_array[i] in ants3]
uv1.select(blt_inds=ind1)
uv2.select(blt_inds=ind2)
uv3.select(blt_inds=ind3)
uv3.data_array = uv3.data_array[-1::-1, :, :, :]
uv3.nsample_array = uv3.nsample_array[-1::-1, :, :, :]
uv3.flag_array = uv3.flag_array[-1::-1, :, :, :]
uv3.uvw_array = uv3.uvw_array[-1::-1, :]
uv3.time_array = uv3.time_array[-1::-1]
uv3.lst_array = uv3.lst_array[-1::-1]
uv3.ant_1_array = uv3.ant_1_array[-1::-1]
uv3.ant_2_array = uv3.ant_2_array[-1::-1]
uv3.baseline_array = uv3.baseline_array[-1::-1]
uv1 += uv3
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific baseline-times using pyuvdata. '
'Combined data along baseline-time axis '
'using pyuvdata. Combined data along '
'baseline-time axis using pyuvdata.',
uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add multiple axes
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2],
polarizations=uv1.polarization_array[0:2])
uv2.select(times=times[len(times) // 2:],
polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times, polarizations using '
'pyuvdata. Combined data along '
'baseline-time, polarization axis '
'using pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) // 2]])
blt_ind2 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) // 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.nsample_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.flag_array[blt_ind1, :, :, 2:] = True
uv_ref.data_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.nsample_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.flag_array[blt_ind2, :, :, 0:2] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Another combo
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2], freq_chans=np.arange(0, 32))
uv2.select(times=times[len(times) // 2:], freq_chans=np.arange(32, 64))
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times, frequencies using '
'pyuvdata. Combined data along '
'baseline-time, frequency axis using '
'pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) // 2]])
blt_ind2 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) // 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.nsample_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.flag_array[blt_ind1, :, 32:, :] = True
uv_ref.data_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.nsample_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.flag_array[blt_ind2, :, 0:32, :] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Add without inplace
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2])
uv2.select(times=times[len(times) // 2:])
uv1 = uv1 + uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Check warnings
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(33, 64))
uvtest.checkWarnings(uv1.__add__, [uv2],
message='Combined frequencies are not evenly spaced')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=[0])
uv2.select(freq_chans=[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined frequencies are not contiguous')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined polarizations are not evenly spaced')
# Combining histories
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv2.history += ' testing the history. AIPS WTSCAL = 1.0'
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization '
'axis using pyuvdata. testing the history.',
uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# test add of autocorr-only and crosscorr-only objects
uv_full = UVData()
uv_full.read_miriad(os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvcA'))
bls = uv_full.get_antpairs()
autos = [bl for bl in bls if bl[0] == bl[1]]
cross = sorted(set(bls) - set(autos))
uv_auto = uv_full.select(bls=autos, inplace=False)
uv_cross = uv_full.select(bls=cross, inplace=False)
uv1 = uv_auto + uv_cross
nt.assert_equal(uv1.Nbls, uv_auto.Nbls + uv_cross.Nbls)
uv2 = uv_cross + uv_auto
nt.assert_equal(uv2.Nbls, uv_auto.Nbls + uv_cross.Nbls)
def test_add_drift():
uv_full = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_full.read_uvfits, [testfile],
message='Telescope EVLA is not')
uvtest.checkWarnings(uv_full.unphase_to_drift, category=PendingDeprecationWarning,
message='The xyz array in ENU_from_ECEF is being '
'interpreted as (Npts, 3)')
# Add frequencies
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(32, 64))
uv1 += uv2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add polarizations
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add times
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2])
uv2.select(times=times[len(times) // 2:])
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add baselines
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
ant_list = list(range(15)) # Roughly half the antennas in the data
# All blts where ant_1 is in list
ind1 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] in ant_list]
ind2 = [i for i in range(uv1.Nblts) if uv1.ant_1_array[i] not in ant_list]
uv1.select(blt_inds=ind1)
uv2.select(blt_inds=ind2)
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific baseline-times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Add multiple axes
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2],
polarizations=uv1.polarization_array[0:2])
uv2.select(times=times[len(times) // 2:],
polarizations=uv2.polarization_array[2:4])
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times, polarizations using '
'pyuvdata. Combined data along '
'baseline-time, polarization '
'axis using pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) // 2]])
blt_ind2 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) // 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.nsample_array[blt_ind1, :, :, 2:] = 0.0
uv_ref.flag_array[blt_ind1, :, :, 2:] = True
uv_ref.data_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.nsample_array[blt_ind2, :, :, 0:2] = 0.0
uv_ref.flag_array[blt_ind2, :, :, 0:2] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Another combo
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv_ref = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2], freq_chans=np.arange(0, 32))
uv2.select(times=times[len(times) // 2:], freq_chans=np.arange(32, 64))
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times, frequencies using '
'pyuvdata. Combined data along '
'baseline-time, frequency '
'axis using pyuvdata.', uv1.history))
blt_ind1 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[0:len(times) // 2]])
blt_ind2 = np.array([ind for ind in range(uv_full.Nblts) if
uv_full.time_array[ind] in times[len(times) // 2:]])
# Zero out missing data in reference object
uv_ref.data_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.nsample_array[blt_ind1, :, 32:, :] = 0.0
uv_ref.flag_array[blt_ind1, :, 32:, :] = True
uv_ref.data_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.nsample_array[blt_ind2, :, 0:32, :] = 0.0
uv_ref.flag_array[blt_ind2, :, 0:32, :] = True
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_ref)
# Add without inplace
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
times = np.unique(uv_full.time_array)
uv1.select(times=times[0:len(times) // 2])
uv2.select(times=times[len(times) // 2:])
uv1 = uv1 + uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific times using pyuvdata. '
'Combined data along baseline-time '
'axis using pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
# Check warnings
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(33, 64))
uvtest.checkWarnings(uv1.__add__, [uv2],
message='Combined frequencies are not evenly spaced')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=[0])
uv2.select(freq_chans=[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined frequencies are not contiguous')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[3])
uvtest.checkWarnings(uv1.__iadd__, [uv2],
message='Combined polarizations are not evenly spaced')
# Combining histories
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(polarizations=uv1.polarization_array[0:2])
uv2.select(polarizations=uv2.polarization_array[2:4])
uv2.history += ' testing the history. AIPS WTSCAL = 1.0'
uv1 += uv2
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific polarizations using pyuvdata. '
'Combined data along polarization '
'axis using pyuvdata. testing the history.',
uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
def test_break_add():
# Test failure modes of add function
uv_full = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv_full.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Wrong class
uv1 = copy.deepcopy(uv_full)
nt.assert_raises(ValueError, uv1.__iadd__, np.zeros(5))
# One phased, one not
uv2 = copy.deepcopy(uv_full)
uvtest.checkWarnings(uv2.unphase_to_drift, category=PendingDeprecationWarning,
message='The xyz array in ENU_from_ECEF is being '
'interpreted as (Npts, 3)')
nt.assert_raises(ValueError, uv1.__iadd__, uv2)
# Different units
uv2 = copy.deepcopy(uv_full)
uv2.vis_units = "Jy"
nt.assert_raises(ValueError, uv1.__iadd__, uv2)
# Overlapping data
uv2 = copy.deepcopy(uv_full)
nt.assert_raises(ValueError, uv1.__iadd__, uv2)
def test_key2inds():
# Test function to interpret key as antpair, pol
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
ind1, ind2, indp = uv._key2inds((ant1, ant2, pol))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal([0], indp[0]))
# Any of these inputs can also be a tuple of a tuple, so need to be checked twice.
ind1, ind2, indp = uv._key2inds(((ant1, ant2, pol)))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal([0], indp[0]))
# Combo with pol as string
ind1, ind2, indp = uv._key2inds((ant1, ant2, uvutils.polnum2str(pol)))
nt.assert_true(np.array_equal([0], indp[0]))
ind1, ind2, indp = uv._key2inds(((ant1, ant2, uvutils.polnum2str(pol))))
nt.assert_true(np.array_equal([0], indp[0]))
# Check conjugation
ind1, ind2, indp = uv._key2inds((ant2, ant1, pol))
nt.assert_true(np.array_equal(bltind, ind2))
nt.assert_true(np.array_equal(np.array([]), ind1))
nt.assert_true(np.array_equal([0], indp[1]))
# Conjugation with pol as string
ind1, ind2, indp = uv._key2inds((ant2, ant1, uvutils.polnum2str(pol)))
nt.assert_true(np.array_equal(bltind, ind2))
nt.assert_true(np.array_equal(np.array([]), ind1))
nt.assert_true(np.array_equal([0], indp[1]))
nt.assert_true(np.array_equal([], indp[0]))
# Antpair only
ind1, ind2, indp = uv._key2inds((ant1, ant2))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp[0]))
ind1, ind2, indp = uv._key2inds(((ant1, ant2)))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp[0]))
# Baseline number only
ind1, ind2, indp = uv._key2inds(uv.antnums_to_baseline(ant1, ant2))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp[0]))
ind1, ind2, indp = uv._key2inds((uv.antnums_to_baseline(ant1, ant2)))
nt.assert_true(np.array_equal(bltind, ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.arange(uv.Npols), indp[0]))
# Pol number only
ind1, ind2, indp = uv._key2inds(pol)
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([0]), indp[0]))
ind1, ind2, indp = uv._key2inds((pol))
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([0]), indp[0]))
# Pol string only
ind1, ind2, indp = uv._key2inds('LL')
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([1]), indp[0]))
ind1, ind2, indp = uv._key2inds(('LL'))
nt.assert_true(np.array_equal(np.arange(uv.Nblts), ind1))
nt.assert_true(np.array_equal(np.array([]), ind2))
nt.assert_true(np.array_equal(np.array([1]), indp[0]))
# Test invalid keys
nt.assert_raises(KeyError, uv._key2inds, 'I') # pol str not in data
nt.assert_raises(KeyError, uv._key2inds, -8) # pol num not in data
nt.assert_raises(KeyError, uv._key2inds, 6) # bl num not in data
nt.assert_raises(KeyError, uv._key2inds, (1, 1)) # ant pair not in data
nt.assert_raises(KeyError, uv._key2inds, (1, 1, 'rr')) # ant pair not in data
nt.assert_raises(KeyError, uv._key2inds, (0, 1, 'xx')) # pol not in data
# Test autos are handled correctly
uv.ant_2_array[0] = uv.ant_1_array[0]
ind1, ind2, indp = uv._key2inds((ant1, ant1, pol))
nt.assert_true(np.array_equal(ind1, [0]))
nt.assert_true(np.array_equal(ind2, []))
def test_smart_slicing():
# Test function to slice data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# ind1 reg, ind2 empty, pol reg
ind1 = 10 * np.arange(9)
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []))
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a view was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 5.43
nt.assert_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# force copy
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []), force_copy=True)
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_true(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 4.3
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 reg, ind2 empty, pol not reg
ind1 = 10 * np.arange(9)
ind2 = []
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []))
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 1.2
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 not reg, ind2 empty, pol reg
ind1 = [0, 4, 5]
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []))
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 8.2
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 not reg, ind2 empty, pol not reg
ind1 = [0, 4, 5]
ind2 = []
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []))
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
nt.assert_false(d.flags.writeable)
# Ensure a copy was returned
uv.data_array[ind1[1], 0, 0, indp[0]] = 3.4
nt.assert_not_equal(d[1, 0, 0], uv.data_array[ind1[1], 0, 0, indp[0]])
# ind1 empty, ind2 reg, pol reg
# Note conjugation test ensures the result is a copy, not a view.
ind1 = []
ind2 = 10 * np.arange(9)
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, ([], indp))
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1 empty, ind2 reg, pol not reg
ind1 = []
ind2 = 10 * np.arange(9)
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, ([], indp))
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1 empty, ind2 not reg, pol reg
ind1 = []
ind2 = [1, 4, 5, 10]
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, ([], indp))
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1 empty, ind2 not reg, pol not reg
ind1 = []
ind2 = [1, 4, 5, 10]
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, ([], indp))
dcheck = uv.data_array[ind2, :, :, :]
dcheck = np.squeeze(np.conj(dcheck[:, :, :, indp]))
nt.assert_true(np.all(d == dcheck))
# ind1, ind2 not empty, pol reg
ind1 = np.arange(20)
ind2 = np.arange(30, 40)
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, indp))
dcheck = np.append(uv.data_array[ind1, :, :, :],
np.conj(uv.data_array[ind2, :, :, :]), axis=0)
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
# ind1, ind2 not empty, pol not reg
ind1 = np.arange(20)
ind2 = np.arange(30, 40)
indp = [0, 1, 3]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, indp))
dcheck = np.append(uv.data_array[ind1, :, :, :],
np.conj(uv.data_array[ind2, :, :, :]), axis=0)
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
# test single element
ind1 = [45]
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []))
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp], axis=1)
nt.assert_true(np.all(d == dcheck))
# test single element
ind1 = []
ind2 = [45]
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, ([], indp))
nt.assert_true(np.all(d == np.conj(dcheck)))
# Full squeeze
ind1 = [45]
ind2 = []
indp = [0, 1]
d = uv._smart_slicing(uv.data_array, ind1, ind2, (indp, []), squeeze='full')
dcheck = uv.data_array[ind1, :, :, :]
dcheck = np.squeeze(dcheck[:, :, :, indp])
nt.assert_true(np.all(d == dcheck))
# Test invalid squeeze
nt.assert_raises(ValueError, uv._smart_slicing, uv.data_array, ind1, ind2,
(indp, []), squeeze='notasqueeze')
def test_get_data():
# Test get_data function for easy access to data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = np.squeeze(uv.data_array[bltind, :, :, 0])
d = uv.get_data(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_data(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == np.conj(d)))
# Check cross pol conjugation
d = uv.get_data(ant2, ant1, uv.polarization_array[2])
d1 = uv.get_data(ant1, ant2, uv.polarization_array[3])
nt.assert_true(np.all(d == np.conj(d1)))
# Antpair only
dcheck = np.squeeze(uv.data_array[bltind, :, :, :])
d = uv.get_data(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
dcheck = np.squeeze(uv.data_array[:, :, :, 0])
d = uv.get_data(pol)
nt.assert_true(np.all(dcheck == d))
def test_get_flags():
# Test function for easy access to flags
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = np.squeeze(uv.flag_array[bltind, :, :, 0])
d = uv.get_flags(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_flags(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == d))
# Antpair only
dcheck = np.squeeze(uv.flag_array[bltind, :, :, :])
d = uv.get_flags(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
dcheck = np.squeeze(uv.flag_array[:, :, :, 0])
d = uv.get_flags(pol)
nt.assert_true(np.all(dcheck == d))
def test_get_nsamples():
# Test function for easy access to nsample array
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = np.squeeze(uv.nsample_array[bltind, :, :, 0])
d = uv.get_nsamples(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_nsamples(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == d))
# Antpair only
dcheck = np.squeeze(uv.nsample_array[bltind, :, :, :])
d = uv.get_nsamples(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
dcheck = np.squeeze(uv.nsample_array[:, :, :, 0])
d = uv.get_nsamples(pol)
nt.assert_true(np.all(dcheck == d))
def test_get_times():
# Test function for easy access to times, to work in conjunction with get_data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
# Get an antpair/pol combo (pol shouldn't actually effect result)
ant1 = uv.ant_1_array[0]
ant2 = uv.ant_2_array[0]
pol = uv.polarization_array[0]
bltind = np.where((uv.ant_1_array == ant1) & (uv.ant_2_array == ant2))[0]
dcheck = uv.time_array[bltind]
d = uv.get_times(ant1, ant2, pol)
nt.assert_true(np.all(dcheck == d))
# Check conjugation
d = uv.get_times(ant2, ant1, pol)
nt.assert_true(np.all(dcheck == d))
# Antpair only
d = uv.get_times(ant1, ant2)
nt.assert_true(np.all(dcheck == d))
# Pol number only
d = uv.get_times(pol)
nt.assert_true(np.all(d == uv.time_array))
def test_antpairpol_iter():
# Test generator
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
pol_dict = {uvutils.polnum2str(uv.polarization_array[i]): i for i in range(uv.Npols)}
keys = []
pols = set()
bls = set()
for key, d in uv.antpairpol_iter():
keys += key
bl = uv.antnums_to_baseline(key[0], key[1])
blind = np.where(uv.baseline_array == bl)[0]
bls.add(bl)
pols.add(key[2])
dcheck = np.squeeze(uv.data_array[blind, :, :, pol_dict[key[2]]])
nt.assert_true(np.all(dcheck == d))
nt.assert_equal(len(bls), len(uv.get_baseline_nums()))
nt.assert_equal(len(pols), uv.Npols)
def test_get_ants():
# Test function to get unique antennas in data
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
ants = uv.get_ants()
for ant in ants:
nt.assert_true((ant in uv.ant_1_array) or (ant in uv.ant_2_array))
for ant in uv.ant_1_array:
nt.assert_true(ant in ants)
for ant in uv.ant_2_array:
nt.assert_true(ant in ants)
def test_get_ENU_antpos():
uvd = UVData()
uvd.read_miriad(os.path.join(DATA_PATH, "zen.2457698.40355.xx.HH.uvcA"))
# no center, no pick data ants
antpos, ants = uvd.get_ENU_antpos(center=False, pick_data_ants=False)
nt.assert_equal(len(ants), 113)
nt.assert_almost_equal(antpos[0, 0], 19.340211050751535)
nt.assert_equal(ants[0], 0)
# center
antpos, ants = uvd.get_ENU_antpos(center=True, pick_data_ants=False)
nt.assert_almost_equal(antpos[0, 0], 22.472442651767714)
# pick data ants
antpos, ants = uvd.get_ENU_antpos(center=True, pick_data_ants=True)
nt.assert_equal(ants[0], 9)
nt.assert_almost_equal(antpos[0, 0], -0.0026981323386223721)
def test_get_pols():
# Test function to get unique polarizations in string format
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
pols = uv.get_pols()
pols_data = ['RR', 'LL', 'LR', 'RL']
nt.assert_count_equal(pols, pols_data)
def test_get_feedpols():
# Test function to get unique antenna feed polarizations in data. String format.
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile],
message='Telescope EVLA is not')
pols = uv.get_feedpols()
pols_data = ['R', 'L']
nt.assert_count_equal(pols, pols_data)
# Test break when pseudo-Stokes visibilities are present
uv.polarization_array[0] = 1 # pseudo-Stokes I
nt.assert_raises(ValueError, uv.get_feedpols)
def test_parse_ants():
# Test function to get correct antenna pairs and polarizations
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile], message='Telescope EVLA is not')
# All baselines
ant_str = 'all'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
nt.assert_is_instance(ant_pairs_nums, type(None))
nt.assert_is_instance(polarizations, type(None))
# Auto correlations
ant_str = 'auto'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
nt.assert_count_equal(ant_pairs_nums, [])
nt.assert_is_instance(polarizations, type(None))
# Cross correlations
ant_str = 'cross'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
nt.assert_count_equal(uv.get_antpairs(), ant_pairs_nums)
nt.assert_is_instance(polarizations, type(None))
# pseudo-Stokes params
ant_str = 'pI,pq,pU,pv'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
pols_expected = [4, 3, 2, 1]
nt.assert_is_instance(ant_pairs_nums, type(None))
nt.assert_count_equal(polarizations, pols_expected)
# Unparsible string
ant_str = 'none'
nt.assert_raises(ValueError, uv.parse_ants, ant_str)
# Single antenna number
ant_str = '0'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(0, 1), (0, 2), (0, 3), (0, 6), (0, 7), (0, 8),
(0, 11), (0, 14), (0, 18), (0, 19), (0, 20),
(0, 21), (0, 22), (0, 23), (0, 24), (0, 26),
(0, 27)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Single antenna number not in the data
ant_str = '10'
ant_pairs_nums, polarizations = uvtest.checkWarnings(uv.parse_ants,
[ant_str], {},
nwarnings=1,
message='Warning: Antenna')
nt.assert_is_instance(ant_pairs_nums, type(None))
nt.assert_is_instance(polarizations, type(None))
# Multiple antenna numbers as list
ant_str = '22,26'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(0, 22), (0, 26), (1, 22), (1, 26), (2, 22), (2, 26),
(3, 22), (3, 26), (6, 22), (6, 26), (7, 22),
(7, 26), (8, 22), (8, 26), (11, 22), (11, 26),
(14, 22), (14, 26), (18, 22), (18, 26),
(19, 22), (19, 26), (20, 22), (20, 26),
(21, 22), (21, 26), (22, 23), (22, 24),
(22, 26), (22, 27), (23, 26), (24, 26),
(26, 27)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Single baseline
ant_str = '1_3'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 3)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Single baseline with polarization
ant_str = '1l_3r'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 3)]
pols_expected = [-4]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Single baseline with single polarization in first entry
ant_str = '1l_3,2x_3'
ant_pairs_nums, polarizations = uvtest.checkWarnings(uv.parse_ants,
[ant_str], {},
nwarnings=1,
message='Warning: Polarization')
ant_pairs_expected = [(1, 3), (2, 3)]
pols_expected = [-2, -4]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Single baseline with single polarization in last entry
ant_str = '1_3l,2_3x'
ant_pairs_nums, polarizations = uvtest.checkWarnings(uv.parse_ants,
[ant_str], {},
nwarnings=1,
message='Warning: Polarization')
ant_pairs_expected = [(1, 3), (2, 3)]
pols_expected = [-2, -3]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Multiple baselines as list
ant_str = '1_2,1_3,1_11'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 2), (1, 3), (1, 11)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Multiples baselines with polarizations as list
ant_str = '1r_2l,1l_3l,1r_11r'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 2), (1, 3), (1, 11)]
pols_expected = [-1, -2, -3]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Specific baselines with parenthesis
ant_str = '(1,3)_11'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 11), (3, 11)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Specific baselines with parenthesis
ant_str = '1_(3,11)'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 3), (1, 11)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Antenna numbers with polarizations
ant_str = '(1l,2r)_(3l,6r)'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 3), (1, 6), (2, 3), (2, 6)]
pols_expected = [-1, -2, -3, -4]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Antenna numbers with - for avoidance
ant_str = '1_(-3,11)'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 11)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Remove specific antenna number
ant_str = '1,-3'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(0, 1), (1, 2), (1, 6), (1, 7), (1, 8), (1, 11),
(1, 14), (1, 18), (1, 19), (1, 20), (1, 21),
(1, 22), (1, 23), (1, 24), (1, 26), (1, 27)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Remove specific baseline (same expected antenna pairs as above example)
ant_str = '1,-1_3'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Antenna numbers with polarizations and - for avoidance
ant_str = '1l_(-3r,11l)'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 11)]
pols_expected = [-2]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Antenna numbers and pseudo-Stokes parameters
ant_str = '(1l,2r)_(3l,6r),pI,pq'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(1, 3), (1, 6), (2, 3), (2, 6)]
pols_expected = [2, 1, -1, -2, -3, -4]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_count_equal(polarizations, pols_expected)
# Test ant_str='auto' on file with auto correlations
uv = UVData()
testfile = os.path.join(DATA_PATH, 'hera_testfile')
uvtest.checkWarnings(uv.read_miriad, [testfile], nwarnings=1,
message='Altitude is not')
ant_str = 'auto'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(9, 9), (10, 10), (20, 20)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Test cross correlation extraction on data with auto + cross
ant_str = 'cross'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(9, 10), (9, 20), (10, 20)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Remove only polarization of single baseline
ant_str = 'all,-9x_10x'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(9, 9), (9, 20), (10, 10), (10, 20), (20, 20)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
# Test appending all to beginning of strings that start with -
ant_str = '-9'
ant_pairs_nums, polarizations = uv.parse_ants(ant_str)
ant_pairs_expected = [(10, 10), (10, 20), (20, 20)]
nt.assert_count_equal(ant_pairs_nums, ant_pairs_expected)
nt.assert_is_instance(polarizations, type(None))
def test_select_with_ant_str():
# Test select function with ant_str argument
uv = UVData()
testfile = os.path.join(
DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
uvtest.checkWarnings(uv.read_uvfits, [testfile], message='Telescope EVLA is not')
inplace = False
# Check error thrown if ant_str passed with antenna_nums,
# antenna_names, ant_pairs_nums, or polarizations
nt.assert_raises(ValueError, uv.select,
ant_str='',
antenna_nums=[],
inplace=inplace)
nt.assert_raises(ValueError, uv.select,
ant_str='',
antenna_nums=[],
inplace=inplace)
nt.assert_raises(ValueError, uv.select,
ant_str='',
antenna_nums=[],
inplace=inplace)
nt.assert_raises(ValueError, uv.select,
ant_str='',
antenna_nums=[],
inplace=inplace)
# All baselines
ant_str = 'all'
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), uv.get_antpairs())
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Auto correlations
ant_str = 'auto'
nt.assert_raises(ValueError, uv.select, ant_str=ant_str, inplace=inplace)
# No auto correlations in this data
# Cross correlations
ant_str = 'cross'
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), uv.get_antpairs())
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# All baselines in data are cross correlations
# pseudo-Stokes params
ant_str = 'pI,pq,pU,pv'
nt.assert_raises(ValueError, uv.select, ant_str=ant_str, inplace=inplace)
# Unparsible string
ant_str = 'none'
nt.assert_raises(ValueError, uv.select, ant_str=ant_str, inplace=inplace)
# Single antenna number
ant_str = '0'
ant_pairs = [(0, 1), (0, 2), (0, 3), (0, 6), (0, 7), (0, 8), (0, 11),
(0, 14), (0, 18), (0, 19), (0, 20), (0, 21), (0, 22),
(0, 23), (0, 24), (0, 26), (0, 27)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Single antenna number not present in data
ant_str = '10'
uv2 = uvtest.checkWarnings(uv.select, [], {'ant_str': ant_str, 'inplace': inplace},
nwarnings=1, message='Warning: Antenna')
# Multiple antenna numbers as list
ant_str = '22,26'
ant_pairs = [(0, 22), (0, 26), (1, 22), (1, 26), (2, 22), (2, 26),
(3, 22), (3, 26), (6, 22), (6, 26), (7, 22),
(7, 26), (8, 22), (8, 26), (11, 22), (11, 26),
(14, 22), (14, 26), (18, 22), (18, 26), (19, 22),
(19, 26), (20, 22), (20, 26), (21, 22), (21, 26),
(22, 23), (22, 24), (22, 26), (22, 27), (23, 26),
(24, 26), (26, 27)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Single baseline
ant_str = '1_3'
ant_pairs = [(1, 3)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Single baseline with polarization
ant_str = '1l_3r'
ant_pairs = [(1, 3)]
pols = ['LR']
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), pols)
# Single baseline with single polarization in first entry
ant_str = '1l_3,2x_3'
# x,y pols not present in data
uv2 = uvtest.checkWarnings(uv.select, [],
{'ant_str': ant_str, 'inplace': inplace},
nwarnings=1, message='Warning: Polarization')
# with polarizations in data
ant_str = '1l_3,2_3'
ant_pairs = [(1, 3), (2, 3)]
pols = ['LL', 'LR']
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), pols)
# Single baseline with single polarization in last entry
ant_str = '1_3l,2_3x'
# x,y pols not present in data
uv2 = uvtest.checkWarnings(uv.select, [],
{'ant_str': ant_str, 'inplace': inplace},
nwarnings=1, message='Warning: Polarization')
# with polarizations in data
ant_str = '1_3l,2_3'
ant_pairs = [(1, 3), (2, 3)]
pols = ['LL', 'RL']
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), pols)
# Multiple baselines as list
ant_str = '1_2,1_3,1_10'
# Antenna number 10 not in data
uv2 = uvtest.checkWarnings(uv.select, [],
{'ant_str': ant_str, 'inplace': inplace},
nwarnings=1, message='Warning: Antenna')
ant_pairs = [(1, 2), (1, 3)]
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Multiples baselines with polarizations as list
ant_str = '1r_2l,1l_3l,1r_11r'
ant_pairs = [(1, 2), (1, 3), (1, 11)]
pols = ['RR', 'LL', 'RL']
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), pols)
# Specific baselines with parenthesis
ant_str = '(1,3)_11'
ant_pairs = [(1, 11), (3, 11)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Specific baselines with parenthesis
ant_str = '1_(3,11)'
ant_pairs = [(1, 3), (1, 11)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Antenna numbers with polarizations
ant_str = '(1l,2r)_(3l,6r)'
ant_pairs = [(1, 3), (1, 6), (2, 3), (2, 6)]
pols = ['RR', 'LL', 'RL', 'LR']
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), pols)
# Antenna numbers with - for avoidance
ant_str = '1_(-3,11)'
ant_pairs = [(1, 11)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
ant_str = '(-1,3)_11'
ant_pairs = [(3, 11)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Remove specific antenna number
ant_str = '1,-3'
ant_pairs = [(0, 1), (1, 2), (1, 6), (1, 7), (1, 8), (1, 11),
(1, 14), (1, 18), (1, 19), (1, 20), (1, 21),
(1, 22), (1, 23), (1, 24), (1, 26), (1, 27)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Remove specific baseline
ant_str = '1,-1_3'
ant_pairs = [(0, 1), (1, 2), (1, 6), (1, 7), (1, 8), (1, 11),
(1, 14), (1, 18), (1, 19), (1, 20), (1, 21),
(1, 22), (1, 23), (1, 24), (1, 26), (1, 27)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Antenna numbers with polarizations and - for avoidance
ant_str = '1l_(-3r,11l)'
ant_pairs = [(1, 11)]
pols = ['LL']
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), pols)
# Test pseudo-Stokes params with select
ant_str = 'pi,pQ'
pols = ['pQ', 'pI']
uv.polarization_array = np.array([4, 3, 2, 1])
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), uv.get_antpairs())
nt.assert_count_equal(uv2.get_pols(), pols)
# Test ant_str = 'auto' on file with auto correlations
uv = UVData()
testfile = os.path.join(DATA_PATH, 'hera_testfile')
uvtest.checkWarnings(uv.read_miriad, [testfile], nwarnings=1,
message='Altitude is not')
ant_str = 'auto'
ant_pairs = [(9, 9), (10, 10), (20, 20)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Test cross correlation extraction on data with auto + cross
ant_str = 'cross'
ant_pairs = [(9, 10), (9, 20), (10, 20)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Remove only polarization of single baseline
ant_str = 'all,-9x_10x'
ant_pairs = [(9, 9), (9, 20), (10, 10), (10, 20), (20, 20)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
# Test appending all to beginning of strings that start with -
ant_str = '-9'
ant_pairs = [(10, 10), (10, 20), (20, 20)]
uv2 = uv.select(ant_str=ant_str, inplace=inplace)
nt.assert_count_equal(uv2.get_antpairs(), ant_pairs)
nt.assert_count_equal(uv2.get_pols(), uv.get_pols())
def test_set_uvws_from_antenna_pos():
# Test set_uvws_from_antenna_positions function with phased data
uv_object = UVData()
testfile = os.path.join(
DATA_PATH, '1133866760.uvfits')
uv_object.read_uvfits(testfile)
orig_uvw_array = np.copy(uv_object.uvw_array)
nt.assert_raises(ValueError, uv_object.set_uvws_from_antenna_positions)
uvtest.checkWarnings(
nt.assert_raises,
[ValueError, uv_object.set_uvws_from_antenna_positions, True, 'xyz'],
message='Warning: Data will be unphased'
)
uvtest.checkWarnings(
nt.assert_raises,
[ValueError, uv_object.set_uvws_from_antenna_positions, True, 'gcrs', 'xyz'],
message='Warning: Data will be unphased'
)
uvtest.checkWarnings(
uv_object.set_uvws_from_antenna_positions,
[True, 'gcrs', 'gcrs'],
message='Warning: Data will be unphased'
)
max_diff = np.amax(np.absolute(np.subtract(orig_uvw_array,
uv_object.uvw_array)))
nt.assert_almost_equal(max_diff, 0., 2)