https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 0d9d742e366731f8d46229dd81ae4032938a3f09 authored by Bryna Hazelton on 08 November 2017, 18:24:28 UTC
increase version number
increase version number
Tip revision: 0d9d742
test_uvbeam.py
"""Tests for uvbeam object."""
import nose.tools as nt
import os
import numpy as np
import copy
import healpy as hp
from pyuvdata import UVBeam
import pyuvdata.tests as uvtest
import pyuvdata.utils as uvutils
import pyuvdata.version as uvversion
from pyuvdata.data import DATA_PATH
filenames = ['HERA_NicCST_150MHz.txt', 'HERA_NicCST_123MHz.txt']
cst_files = [os.path.join(DATA_PATH, f) for f in filenames]
def fill_dummy_beam(beam_obj, beam_type, pixel_coordinate_system):
beam_obj.set_simple()
beam_obj.telescope_name = 'testscope'
beam_obj.feed_name = 'testfeed'
beam_obj.feed_version = '0.1'
beam_obj.model_name = 'testmodel'
beam_obj.model_version = '1.0'
beam_obj.history = 'random data for test'
pyuvdata_version_str = (' Read/written with pyuvdata version: ' +
uvversion.version + '.')
if uvversion.git_hash is not '':
pyuvdata_version_str += (' Git origin: ' + uvversion.git_origin +
'. Git hash: ' + uvversion.git_hash +
'. Git branch: ' + uvversion.git_branch +
'. Git description: ' + uvversion.git_description + '.')
beam_obj.history += pyuvdata_version_str
beam_obj.pixel_coordinate_system = pixel_coordinate_system
if pixel_coordinate_system == 'healpix':
beam_obj.nside = 512
beam_obj.ordering = 'ring'
beam_obj.pixel_array = np.arange(0, 6 * (beam_obj.nside) ^ 2)
beam_obj.Npixels = len(beam_obj.pixel_array)
elif pixel_coordinate_system == 'az_za':
beam_obj.axis1_array = np.radians(np.arange(-180.0, 180.0, 5.0))
beam_obj.Naxes1 = len(beam_obj.axis1_array)
beam_obj.axis2_array = np.radians(np.arange(0, 90.0, 5.0))
beam_obj.Naxes2 = len(beam_obj.axis2_array)
beam_obj.freq_array = np.arange(150e6, 160e6, 1e5)
beam_obj.freq_array = beam_obj.freq_array[np.newaxis, :]
beam_obj.Nfreqs = beam_obj.freq_array.shape[1]
beam_obj.spw_array = np.array([0])
beam_obj.Nspws = len(beam_obj.spw_array)
beam_obj.data_normalization = 'peak'
beam_obj.bandpass_array = np.zeros((beam_obj.Nspws, beam_obj.Nfreqs)) + 1.
if beam_type == 'power':
beam_obj.set_power()
beam_obj.polarization_array = np.array([-5, -6, -7, -8])
beam_obj.Npols = len(beam_obj.polarization_array)
beam_obj.Naxes_vec = 1
if pixel_coordinate_system == 'healpix':
data_size_tuple = (beam_obj.Naxes_vec, beam_obj.Nspws, beam_obj.Npols,
beam_obj.Nfreqs, beam_obj.Npixels)
else:
data_size_tuple = (beam_obj.Naxes_vec, beam_obj.Nspws, beam_obj.Npols,
beam_obj.Nfreqs, beam_obj.Naxes2, beam_obj.Naxes1)
beam_obj.data_array = np.square(np.random.normal(0.0, 0.2, size=data_size_tuple))
else:
beam_obj.set_efield()
beam_obj.feed_array = np.array(['x', 'y'])
beam_obj.Nfeeds = len(beam_obj.feed_array)
beam_obj.Naxes_vec = 2
if pixel_coordinate_system == 'healpix':
beam_obj.basis_vector_array = np.zeros((beam_obj.Naxes_vec, 2,
beam_obj.Npixels))
beam_obj.basis_vector_array[0, 0, :] = 1.0
beam_obj.basis_vector_array[1, 1, :] = 1.0
data_size_tuple = (beam_obj.Naxes_vec, beam_obj.Nspws, beam_obj.Nfeeds,
beam_obj.Nfreqs, beam_obj.Npixels)
else:
beam_obj.basis_vector_array = np.zeros((beam_obj.Naxes_vec, 2,
beam_obj.Naxes2, beam_obj.Naxes1))
beam_obj.basis_vector_array[0, 0, :, :] = 1.0
beam_obj.basis_vector_array[1, 1, :, :] = 1.0
data_size_tuple = (beam_obj.Naxes_vec, beam_obj.Nspws, beam_obj.Nfeeds,
beam_obj.Nfreqs, beam_obj.Naxes2, beam_obj.Naxes1)
beam_obj.data_array = (np.random.normal(0.0, 0.2, size=data_size_tuple) +
1j * np.random.normal(0.0, 0.2, size=data_size_tuple))
# add optional parameters for testing purposes
beam_obj.extra_keywords = {'KEY1': 'test_keyword'}
beam_obj.reference_input_impedance = 340.
beam_obj.reference_output_impedance = 50.
beam_obj.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_obj.Nspws, beam_obj.Nfreqs))
beam_obj.loss_array = np.random.normal(50.0, 5, size=(beam_obj.Nspws, beam_obj.Nfreqs))
beam_obj.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_obj.Nspws, beam_obj.Nfreqs))
beam_obj.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_obj.Nspws, beam_obj.Nfreqs))
return beam_obj
class TestUVBeamInit(object):
def setUp(self):
"""Setup for basic parameter, property and iterator tests."""
self.required_parameters = ['_beam_type', '_Nfreqs', '_Naxes_vec', '_Nspws',
'_pixel_coordinate_system',
'_freq_array', '_spw_array',
'_data_normalization',
'_data_array', '_bandpass_array',
'_telescope_name', '_feed_name',
'_feed_version', '_model_name',
'_model_version', '_history',
'_antenna_type']
self.required_properties = ['beam_type', 'Nfreqs', 'Naxes_vec', 'Nspws',
'pixel_coordinate_system',
'freq_array', 'spw_array',
'data_normalization',
'data_array', 'bandpass_array',
'telescope_name', 'feed_name',
'feed_version', 'model_name',
'model_version', 'history',
'antenna_type']
self.extra_parameters = ['_Naxes1', '_Naxes2', '_Npixels', '_Nfeeds', '_Npols',
'_axis1_array', '_axis2_array', '_nside', '_ordering',
'_pixel_array', '_feed_array', '_polarization_array',
'_basis_vector_array',
'_extra_keywords', '_Nelements',
'_element_coordinate_system',
'_element_location_array', '_delay_array',
'_gain_array', '_coupling_matrix',
'_reference_input_impedance', '_reference_output_impedance',
'_receiver_temperature_array',
'_loss_array', '_mismatch_array',
'_s_parameters']
self.extra_properties = ['Naxes1', 'Naxes2', 'Npixels', 'Nfeeds', 'Npols',
'axis1_array', 'axis2_array', 'nside', 'ordering',
'pixel_array', 'feed_array', 'polarization_array',
'basis_vector_array', 'extra_keywords', 'Nelements',
'element_coordinate_system',
'element_location_array', 'delay_array',
'gain_array', 'coupling_matrix',
'reference_input_impedance', 'reference_output_impedance',
'receiver_temperature_array',
'loss_array', 'mismatch_array',
's_parameters']
self.other_properties = ['pyuvdata_version_str']
self.beam_obj = UVBeam()
def teardown(self):
"""Test teardown: delete object."""
del(self.beam_obj)
def test_parameter_iter(self):
"Test expected parameters."
all = []
for prop in self.beam_obj:
all.append(prop)
for a in self.required_parameters + self.extra_parameters:
nt.assert_true(a in all, msg='expected attribute ' + a +
' not returned in object iterator')
def test_required_parameter_iter(self):
"Test expected required parameters."
required = []
for prop in self.beam_obj.required():
required.append(prop)
for a in self.required_parameters:
nt.assert_true(a in required, msg='expected attribute ' + a +
' not returned in required iterator')
def test_extra_parameter_iter(self):
"Test expected optional parameters."
extra = []
for prop in self.beam_obj.extra():
extra.append(prop)
for a in self.extra_parameters:
nt.assert_true(a in extra, msg='expected attribute ' + a +
' not returned in extra iterator')
def test_unexpected_parameters(self):
"Test for extra parameters."
expected_parameters = self.required_parameters + self.extra_parameters
attributes = [i for i in self.beam_obj.__dict__.keys() if i[0] == '_']
for a in attributes:
nt.assert_true(a in expected_parameters,
msg='unexpected parameter ' + a + ' found in UVData')
def test_unexpected_attributes(self):
"Test for extra attributes."
expected_attributes = self.required_properties + \
self.extra_properties + self.other_properties
attributes = [i for i in self.beam_obj.__dict__.keys() if i[0] != '_']
for a in attributes:
nt.assert_true(a in expected_attributes,
msg='unexpected attribute ' + a + ' found in UVData')
def test_properties(self):
"Test that properties can be get and set properly."
prop_dict = dict(zip(self.required_properties + self.extra_properties,
self.required_parameters + self.extra_parameters))
for k, v in prop_dict.iteritems():
rand_num = np.random.rand()
setattr(self.beam_obj, k, rand_num)
this_param = getattr(self.beam_obj, v)
try:
nt.assert_equal(rand_num, this_param.value)
except(AssertionError):
print('setting {prop_name} to a random number failed'.format(prop_name=k))
raise(AssertionError)
def test_errors():
beam_obj = UVBeam()
nt.assert_raises(ValueError, beam_obj._convert_to_filetype, 'foo')
def test_az_za_to_healpix():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files, beam_type='power', frequency=[150e6, 123e6],
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
n_max_pix = power_beam.Naxes1 * power_beam.Naxes2
power_beam.az_za_to_healpix()
nt.assert_true(power_beam.Npixels <= n_max_pix)
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=[150e6],
telescope_name='TEST', rotate_pol=False,
feed_name='bob', feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
power_beam.select(axis2_inds=np.where(power_beam.axis2_array <= np.pi / 2.)[0])
power_beam.az_za_to_healpix()
npix = hp.nside2npix(power_beam.nside)
nt.assert_true(power_beam.Npixels <= npix * 0.55)
def test_select_pixels():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'healpix')
old_history = power_beam.history
pixels_to_keep = np.arange(31, 184)
power_beam2 = power_beam.select(pixels=pixels_to_keep, inplace=False)
nt.assert_equal(len(pixels_to_keep), power_beam2.Npixels)
for pi in pixels_to_keep:
nt.assert_true(pi in power_beam2.pixel_array)
for pi in np.unique(power_beam2.pixel_array):
nt.assert_true(pi in pixels_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific healpix pixels using pyuvdata.',
power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one pixel
pixels_to_keep = [43]
power_beam2 = power_beam.select(pixels=pixels_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with pixels not included in data
nt.assert_raises(ValueError, power_beam.select,
pixels=[6 * power_beam.nside ^ 2 + 10])
# test writing beamfits with non-contiguous pixels
pixels_to_keep = np.arange(2, 150, 4)
power_beam2 = power_beam.select(pixels=pixels_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors selecting pixels on non-healpix beams
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
nt.assert_raises(ValueError, power_beam.select, pixels=pixels_to_keep)
def test_select_axis1():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
old_history = power_beam.history
inds1_to_keep = np.arange(14, 63)
power_beam2 = power_beam.select(axis1_inds=inds1_to_keep, inplace=False)
nt.assert_equal(len(inds1_to_keep), power_beam2.Naxes1)
for i in inds1_to_keep:
nt.assert_true(power_beam.axis1_array[i] in power_beam2.axis1_array)
for i in np.unique(power_beam2.axis1_array):
nt.assert_true(i in power_beam.axis1_array)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of first image axis '
'using pyuvdata.', power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one element in axis1
inds_to_keep = [len(inds1_to_keep) + 1]
power_beam2 = power_beam.select(axis1_inds=inds_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with indices not included in data
nt.assert_raises(ValueError, power_beam2.select, axis1_inds=[power_beam.Naxes1 - 1])
# check for warnings and errors associated with unevenly spaced image pixels
power_beam2 = copy.deepcopy(power_beam)
uvtest.checkWarnings(power_beam2.select, [], {'axis1_inds': [0, 5, 6]},
message='Selected values along first image axis are not evenly spaced')
nt.assert_raises(ValueError, power_beam2.write_beamfits, write_file_beamfits)
# check for errors selecting axis1_inds on healpix beams
power_beam = fill_dummy_beam(power_beam, 'power', 'healpix')
nt.assert_raises(ValueError, power_beam.select, axis1_inds=inds1_to_keep)
def test_select_axis2():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
old_history = power_beam.history
inds2_to_keep = np.arange(5, 14)
power_beam2 = power_beam.select(axis2_inds=inds2_to_keep, inplace=False)
nt.assert_equal(len(inds2_to_keep), power_beam2.Naxes2)
for i in inds2_to_keep:
nt.assert_true(power_beam.axis2_array[i] in power_beam2.axis2_array)
for i in np.unique(power_beam2.axis2_array):
nt.assert_true(i in power_beam.axis2_array)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of second image axis '
'using pyuvdata.', power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one element in axis2
inds_to_keep = [len(inds2_to_keep) + 1]
power_beam2 = power_beam.select(axis2_inds=inds_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with indices not included in data
nt.assert_raises(ValueError, power_beam2.select, axis2_inds=[power_beam.Naxes2 - 1])
# check for warnings and errors associated with unevenly spaced image pixels
power_beam2 = copy.deepcopy(power_beam)
uvtest.checkWarnings(power_beam2.select, [], {'axis2_inds': [0, 5, 6]},
message='Selected values along second image axis are not evenly spaced')
nt.assert_raises(ValueError, power_beam2.write_beamfits, write_file_beamfits)
# check for errors selecting axis2_inds on healpix beams
power_beam = fill_dummy_beam(power_beam, 'power', 'healpix')
nt.assert_raises(ValueError, power_beam.select, axis2_inds=inds2_to_keep)
def test_select_frequencies():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
old_history = power_beam.history
freqs_to_keep = power_beam.freq_array[0, np.arange(7, 94)]
power_beam2 = power_beam.select(frequencies=freqs_to_keep, inplace=False)
nt.assert_equal(len(freqs_to_keep), power_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one frequency
freqs_to_keep = power_beam.freq_array[0, 51]
power_beam2 = power_beam.select(frequencies=freqs_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with frequencies not included in data
nt.assert_raises(ValueError, power_beam.select, frequencies=[np.max(power_beam.freq_array) + 10])
# check for warnings and errors associated with unevenly spaced frequencies
power_beam2 = copy.deepcopy(power_beam)
uvtest.checkWarnings(power_beam2.select, [],
{'frequencies': power_beam2.freq_array[0, [0, 5, 6]]},
message='Selected frequencies are not evenly spaced')
nt.assert_raises(ValueError, power_beam2.write_beamfits, write_file_beamfits)
def test_select_freq_chans():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
old_history = power_beam.history
chans_to_keep = np.arange(7, 94)
power_beam2 = power_beam.select(freq_chans=chans_to_keep, inplace=False)
nt.assert_equal(len(chans_to_keep), power_beam2.Nfreqs)
for chan in chans_to_keep:
nt.assert_true(power_beam.freq_array[0, chan] in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in power_beam.freq_array[0, chans_to_keep])
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
power_beam2.history))
# Test selecting both channels and frequencies
freqs_to_keep = power_beam.freq_array[0, np.arange(93, 100)] # Overlaps with chans
all_chans_to_keep = np.arange(7, 100)
power_beam2 = power_beam.select(frequencies=freqs_to_keep,
freq_chans=chans_to_keep,
inplace=False)
nt.assert_equal(len(all_chans_to_keep), power_beam2.Nfreqs)
for chan in all_chans_to_keep:
nt.assert_true(power_beam.freq_array[0, chan] in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in power_beam.freq_array[0, all_chans_to_keep])
def test_select_feeds():
efield_beam = UVBeam()
efield_beam = fill_dummy_beam(efield_beam, 'efield', 'az_za')
old_history = efield_beam.history
feeds_to_keep = ['x']
efield_beam2 = efield_beam.select(feeds=feeds_to_keep, inplace=False)
nt.assert_equal(len(feeds_to_keep), efield_beam2.Nfeeds)
for f in feeds_to_keep:
nt.assert_true(f in efield_beam2.feed_array)
for f in np.unique(efield_beam2.feed_array):
nt.assert_true(f in feeds_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific feeds using pyuvdata.',
efield_beam2.history))
# check for errors associated with feeds not included in data
nt.assert_raises(ValueError, efield_beam.select, feeds=['N'])
# check for error with feeds on power beams
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
nt.assert_raises(ValueError, power_beam.select, feeds=['x'])
def test_select_polarizations():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
old_history = power_beam.history
pols_to_keep = [-5, -6]
power_beam2 = power_beam.select(polarizations=pols_to_keep,
inplace=False)
nt.assert_equal(len(pols_to_keep), power_beam2.Npols)
for p in pols_to_keep:
nt.assert_true(p in power_beam2.polarization_array)
for p in np.unique(power_beam2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
power_beam2.history))
# check for errors associated with polarizations not included in data
nt.assert_raises(ValueError, power_beam.select, polarizations=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
uvtest.checkWarnings(power_beam.select, [], {'polarizations': power_beam.polarization_array[[0, 1, 3]]},
message='Selected polarizations are not evenly spaced')
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
nt.assert_raises(ValueError, power_beam.write_beamfits, write_file_beamfits)
# check for error with polarizations on efield beams
efield_beam = UVBeam()
efield_beam = fill_dummy_beam(efield_beam, 'efield', 'az_za')
nt.assert_raises(ValueError, efield_beam.select, polarizations=[-5, -6])
def test_select():
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'az_za')
# now test selecting along all axes at once
old_history = power_beam.history
inds1_to_keep = np.arange(14, 63)
inds2_to_keep = np.arange(5, 14)
freqs_to_keep = power_beam.freq_array[0, np.arange(31, 56)]
pols_to_keep = [-5]
power_beam2 = power_beam.select(axis1_inds=inds1_to_keep,
axis2_inds=inds2_to_keep,
frequencies=freqs_to_keep,
polarizations=pols_to_keep,
inplace=False)
nt.assert_equal(len(inds1_to_keep), power_beam2.Naxes1)
for i in inds1_to_keep:
nt.assert_true(power_beam.axis1_array[i] in power_beam2.axis1_array)
for i in np.unique(power_beam2.axis1_array):
nt.assert_true(i in power_beam.axis1_array)
nt.assert_equal(len(inds2_to_keep), power_beam2.Naxes2)
for i in inds2_to_keep:
nt.assert_true(power_beam.axis2_array[i] in power_beam2.axis2_array)
for i in np.unique(power_beam2.axis2_array):
nt.assert_true(i in power_beam.axis2_array)
nt.assert_equal(len(freqs_to_keep), power_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(pols_to_keep), power_beam2.Npols)
for p in pols_to_keep:
nt.assert_true(p in power_beam2.polarization_array)
for p in np.unique(power_beam2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of first image axis, '
'parts of second image axis, '
'frequencies, polarizations using pyuvdata.',
power_beam2.history))
# repeat for efield beam
efield_beam = UVBeam()
efield_beam = fill_dummy_beam(efield_beam, 'efield', 'az_za')
feeds_to_keep = ['x']
efield_beam2 = efield_beam.select(axis1_inds=inds1_to_keep,
axis2_inds=inds2_to_keep,
frequencies=freqs_to_keep,
feeds=feeds_to_keep,
inplace=False)
nt.assert_equal(len(inds1_to_keep), efield_beam2.Naxes1)
for i in inds1_to_keep:
nt.assert_true(efield_beam.axis1_array[i] in efield_beam2.axis1_array)
for i in np.unique(efield_beam2.axis1_array):
nt.assert_true(i in efield_beam.axis1_array)
nt.assert_equal(len(inds2_to_keep), efield_beam2.Naxes2)
for i in inds2_to_keep:
nt.assert_true(efield_beam.axis2_array[i] in efield_beam2.axis2_array)
for i in np.unique(efield_beam2.axis2_array):
nt.assert_true(i in efield_beam.axis2_array)
nt.assert_equal(len(freqs_to_keep), efield_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in efield_beam2.freq_array)
for f in np.unique(efield_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(feeds_to_keep), efield_beam2.Nfeeds)
for f in feeds_to_keep:
nt.assert_true(f in efield_beam2.feed_array)
for f in np.unique(efield_beam2.feed_array):
nt.assert_true(f in feeds_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of first image axis, '
'parts of second image axis, '
'frequencies, feeds using pyuvdata.',
efield_beam2.history))
def test_select_healpix():
# test selecting along all axes at once for healpix beams
power_beam = UVBeam()
power_beam = fill_dummy_beam(power_beam, 'power', 'healpix')
old_history = power_beam.history
pixels_to_keep = np.arange(31, 184)
freqs_to_keep = power_beam.freq_array[0, np.arange(31, 56)]
pols_to_keep = [-5]
power_beam2 = power_beam.select(pixels=pixels_to_keep,
frequencies=freqs_to_keep,
polarizations=pols_to_keep,
inplace=False)
nt.assert_equal(len(pixels_to_keep), power_beam2.Npixels)
for pi in pixels_to_keep:
nt.assert_true(pi in power_beam2.pixel_array)
for pi in np.unique(power_beam2.pixel_array):
nt.assert_true(pi in pixels_to_keep)
nt.assert_equal(len(freqs_to_keep), power_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(pols_to_keep), power_beam2.Npols)
for p in pols_to_keep:
nt.assert_true(p in power_beam2.polarization_array)
for p in np.unique(power_beam2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific healpix pixels, frequencies, '
'polarizations using pyuvdata.',
power_beam2.history))
# repeat for efield beam
efield_beam = UVBeam()
efield_beam = fill_dummy_beam(efield_beam, 'efield', 'healpix')
feeds_to_keep = ['x']
efield_beam2 = efield_beam.select(pixels=pixels_to_keep,
frequencies=freqs_to_keep,
feeds=feeds_to_keep,
inplace=False)
nt.assert_equal(len(pixels_to_keep), power_beam2.Npixels)
for pi in pixels_to_keep:
nt.assert_true(pi in power_beam2.pixel_array)
for pi in np.unique(power_beam2.pixel_array):
nt.assert_true(pi in pixels_to_keep)
nt.assert_equal(len(freqs_to_keep), efield_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in efield_beam2.freq_array)
for f in np.unique(efield_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(feeds_to_keep), efield_beam2.Nfeeds)
for f in feeds_to_keep:
nt.assert_true(f in efield_beam2.feed_array)
for f in np.unique(efield_beam2.feed_array):
nt.assert_true(f in feeds_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific healpix pixels, frequencies, '
'feeds using pyuvdata.',
efield_beam2.history))
def test_add():
power_beam_full = UVBeam()
power_beam_full = fill_dummy_beam(power_beam_full, 'power', 'az_za')
# Add along first image axis
beam1 = power_beam_full.select(axis1_inds=np.arange(0, 36), inplace=False)
beam2 = power_beam_full.select(axis1_inds=np.arange(36, 72), inplace=False)
beam1 += beam2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific parts of '
'first image axis using pyuvdata. '
'Combined data along first image axis '
'using pyuvdata.', beam1.history))
beam1.history = power_beam_full.history
nt.assert_equal(beam1, power_beam_full)
# Add along second image axis
beam1 = power_beam_full.select(axis2_inds=np.arange(0, 9), inplace=False)
beam2 = power_beam_full.select(axis2_inds=np.arange(9, 18), inplace=False)
beam1 += beam2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific parts of '
'second image axis using pyuvdata. '
'Combined data along second image axis '
'using pyuvdata.', beam1.history))
beam1.history = power_beam_full.history
nt.assert_equal(beam1, power_beam_full)
# Add frequencies
beam1 = power_beam_full.select(freq_chans=np.arange(0, 50), inplace=False)
beam2 = power_beam_full.select(freq_chans=np.arange(50, 100), inplace=False)
beam1 += beam2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific frequencies '
'using pyuvdata. Combined data along '
'frequency axis using pyuvdata.',
beam1.history))
beam1.history = power_beam_full.history
nt.assert_equal(beam1, power_beam_full)
# Add polarizations
beam1 = power_beam_full.select(polarizations=power_beam_full.polarization_array[0:2], inplace=False)
beam2 = power_beam_full.select(polarizations=power_beam_full.polarization_array[2:4], inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific polarizations '
'using pyuvdata. Combined data along '
'polarization axis using pyuvdata.',
beam1.history))
beam1.history = power_beam_full.history
nt.assert_equal(beam1, power_beam_full)
# Add feeds
efield_beam_full = UVBeam()
efield_beam_full = fill_dummy_beam(efield_beam_full, 'efield', 'az_za')
beam1 = efield_beam_full.select(feeds=efield_beam_full.feed_array[0], inplace=False)
beam2 = efield_beam_full.select(feeds=efield_beam_full.feed_array[1], inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(efield_beam_full.history +
' Downselected to specific feeds '
'using pyuvdata. Combined data along '
'feed axis using pyuvdata.',
beam1.history))
beam1.history = efield_beam_full.history
nt.assert_equal(beam1, efield_beam_full)
# Add multiple axes
beam_ref = copy.deepcopy(power_beam_full)
beam1 = power_beam_full.select(axis1_inds=np.arange(0, power_beam_full.Naxes1 / 2),
polarizations=power_beam_full.polarization_array[0:2],
inplace=False)
beam2 = power_beam_full.select(axis1_inds=np.arange(power_beam_full.Naxes1 / 2,
power_beam_full.Naxes1),
polarizations=power_beam_full.polarization_array[2:4],
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific parts of '
'first image axis, polarizations using '
'pyuvdata. Combined data along first '
'image, polarization axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, :2, :, :, power_beam_full.Naxes1 / 2:] = 0.0
beam_ref.data_array[:, :, 2:, :, :, :power_beam_full.Naxes1 / 2] = 0.0
beam1.history = power_beam_full.history
nt.assert_equal(beam1, beam_ref)
# Another combo with healpix
power_beam_full = fill_dummy_beam(power_beam_full, 'power', 'healpix')
beam_ref = copy.deepcopy(power_beam_full)
beam1 = power_beam_full.select(pixels=power_beam_full.pixel_array[0:power_beam_full.Npixels / 2],
freq_chans=np.arange(0, 50),
inplace=False)
beam2 = power_beam_full.select(pixels=power_beam_full.pixel_array[power_beam_full.Npixels / 2:],
freq_chans=np.arange(50, 100),
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific healpix '
'pixels, frequencies using pyuvdata. '
'Combined data along healpix pixel, '
'frequency axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, :, :50, power_beam_full.Npixels / 2:] = 0.0
beam_ref.data_array[:, :, :, 50:, :power_beam_full.Npixels / 2] = 0.0
beam1.history = power_beam_full.history
nt.assert_equal(beam1, beam_ref)
# Another combo with efield
efield_beam_full = fill_dummy_beam(efield_beam_full, 'efield', 'az_za')
beam_ref = copy.deepcopy(efield_beam_full)
beam1 = efield_beam_full.select(axis1_inds=np.arange(0, efield_beam_full.Naxes1 / 2),
axis2_inds=np.arange(0, efield_beam_full.Naxes2 / 2),
inplace=False)
beam2 = efield_beam_full.select(axis1_inds=np.arange(efield_beam_full.Naxes1 / 2,
efield_beam_full.Naxes1),
axis2_inds=np.arange(efield_beam_full.Naxes2 / 2,
efield_beam_full.Naxes2),
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(efield_beam_full.history +
' Downselected to specific parts of '
'first image axis, parts of second '
'image axis using pyuvdata. Combined '
'data along first image, second image '
'axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, :, :, :efield_beam_full.Naxes2 / 2,
efield_beam_full.Naxes1 / 2:] = 0.0
beam_ref.data_array[:, :, :, :, efield_beam_full.Naxes2 / 2:,
:efield_beam_full.Naxes1 / 2] = 0.0
beam_ref.basis_vector_array[:, :, :efield_beam_full.Naxes2 / 2,
efield_beam_full.Naxes1 / 2:] = 0.0
beam_ref.basis_vector_array[:, :, efield_beam_full.Naxes2 / 2:,
:efield_beam_full.Naxes1 / 2] = 0.0
beam1.history = efield_beam_full.history
nt.assert_equal(beam1, beam_ref)
# Another combo with healpix efield
efield_beam_full = fill_dummy_beam(efield_beam_full, 'efield', 'healpix')
beam_ref = copy.deepcopy(efield_beam_full)
beam1 = efield_beam_full.select(freq_chans=np.arange(0, 50),
feeds=efield_beam_full.feed_array[0],
inplace=False)
beam2 = efield_beam_full.select(freq_chans=np.arange(50, 100),
feeds=efield_beam_full.feed_array[1],
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(efield_beam_full.history +
' Downselected to specific frequencies, '
'feeds using pyuvdata. Combined data '
'along frequency, feed axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, 1, :50, :] = 0.0
beam_ref.data_array[:, :, 0, 50:, :] = 0.0
beam1.history = efield_beam_full.history
nt.assert_equal(beam1, beam_ref)
# Add without inplace
efield_beam_full = fill_dummy_beam(efield_beam_full, 'efield', 'healpix')
beam1 = efield_beam_full.select(pixels=efield_beam_full.pixel_array[0:efield_beam_full.Npixels / 2],
inplace=False)
beam2 = efield_beam_full.select(pixels=efield_beam_full.pixel_array[efield_beam_full.Npixels / 2:],
inplace=False)
beam1 = beam1 + beam2
nt.assert_true(uvutils.check_histories(efield_beam_full.history +
' Downselected to specific healpix pixels '
'using pyuvdata. Combined data '
'along healpix pixel axis using pyuvdata.',
beam1.history))
beam1.history = efield_beam_full.history
nt.assert_equal(beam1, efield_beam_full)
# Check warnings
power_beam_full = fill_dummy_beam(power_beam_full, 'power', 'healpix')
beam1 = power_beam_full.select(freq_chans=np.arange(0, 32), inplace=False)
beam2 = power_beam_full.select(freq_chans=np.arange(33, 64), inplace=False)
uvtest.checkWarnings(beam1.__add__, [beam2],
message='Combined frequencies are not evenly spaced')
beam1 = power_beam_full.select(polarizations=power_beam_full.polarization_array[0:2],
inplace=False)
beam2 = power_beam_full.select(polarizations=power_beam_full.polarization_array[3],
inplace=False)
uvtest.checkWarnings(beam1.__iadd__, [beam2],
message='Combined polarizations are not evenly spaced')
beam1 = power_beam_full.select(polarizations=power_beam_full.polarization_array[0:2],
inplace=False)
beam2 = power_beam_full.select(polarizations=power_beam_full.polarization_array[2:3],
inplace=False)
beam2.receiver_temperature_array = None
nt.assert_false(beam1.receiver_temperature_array is None)
uvtest.checkWarnings(beam1.__iadd__, [beam2],
message=['Only one of the UVBeam objects being combined '
'has optional parameter'])
nt.assert_true(beam1.receiver_temperature_array is None)
# Combining histories
beam1 = power_beam_full.select(polarizations=power_beam_full.polarization_array[0:2], inplace=False)
beam2 = power_beam_full.select(polarizations=power_beam_full.polarization_array[2:4], inplace=False)
beam2.history += ' testing the history. random data for test'
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam_full.history +
' Downselected to specific polarizations '
'using pyuvdata. Combined data along '
'polarization axis using pyuvdata. '
'testing the history.',
beam1.history))
beam1.history = power_beam_full.history
nt.assert_equal(beam1, power_beam_full)
def test_break_add():
# Test failure modes of add function
power_beam_full = UVBeam()
power_beam_full = fill_dummy_beam(power_beam_full, 'power', 'healpix')
# Wrong class
beam1 = copy.deepcopy(power_beam_full)
nt.assert_raises(ValueError, beam1.__iadd__, np.zeros(5))
params_to_change = {'beam_type': 'efield', 'data_normalization': 'solid_angle',
'telescope_name': 'foo', 'feed_name': 'foo',
'feed_version': 'v12', 'model_name': 'foo',
'model_version': 'v12', 'pixel_coordinate_system': 'sin_zenith',
'Naxes_vec': 3, 'nside': 16, 'ordering': 'nested'}
beam1 = power_beam_full.select(freq_chans=np.arange(0, 50), inplace=False)
for param, value in params_to_change.iteritems():
beam2 = power_beam_full.select(freq_chans=np.arange(50, 100), inplace=False)
setattr(beam2, param, value)
nt.assert_raises(ValueError, beam1.__iadd__, beam2)
# Overlapping data
beam2 = copy.deepcopy(power_beam_full)
nt.assert_raises(ValueError, beam1.__iadd__, beam2)
# Overlapping data with non-healpix
power_beam_full = fill_dummy_beam(power_beam_full, 'power', 'az_za')
beam1 = copy.deepcopy(power_beam_full)
beam2 = copy.deepcopy(power_beam_full)
nt.assert_raises(ValueError, beam1.__iadd__, beam2)