test_uvbeam.py
"""Tests for uvbeam object."""
import nose.tools as nt
import os
import numpy as np
import copy
import healpy as hp
from pyuvdata import UVBeam
import pyuvdata.tests as uvtest
import pyuvdata.utils as uvutils
import pyuvdata.version as uvversion
from pyuvdata.data import DATA_PATH
filenames = ['HERA_NicCST_150MHz.txt', 'HERA_NicCST_123MHz.txt']
cst_files = [os.path.join(DATA_PATH, f) for f in filenames]
class TestUVBeamInit(object):
def setUp(self):
"""Setup for basic parameter, property and iterator tests."""
self.required_parameters = ['_beam_type', '_Nfreqs', '_Naxes_vec', '_Nspws',
'_pixel_coordinate_system',
'_freq_array', '_spw_array',
'_data_normalization',
'_data_array', '_bandpass_array',
'_telescope_name', '_feed_name',
'_feed_version', '_model_name',
'_model_version', '_history',
'_antenna_type']
self.required_properties = ['beam_type', 'Nfreqs', 'Naxes_vec', 'Nspws',
'pixel_coordinate_system',
'freq_array', 'spw_array',
'data_normalization',
'data_array', 'bandpass_array',
'telescope_name', 'feed_name',
'feed_version', 'model_name',
'model_version', 'history',
'antenna_type']
self.extra_parameters = ['_Naxes1', '_Naxes2', '_Npixels', '_Nfeeds', '_Npols',
'_Ncomponents_vec',
'_axis1_array', '_axis2_array', '_nside', '_ordering',
'_pixel_array', '_feed_array', '_polarization_array',
'_basis_vector_array',
'_extra_keywords', '_Nelements',
'_element_coordinate_system',
'_element_location_array', '_delay_array',
'_gain_array', '_coupling_matrix',
'_reference_input_impedance', '_reference_output_impedance',
'_receiver_temperature_array',
'_loss_array', '_mismatch_array',
'_s_parameters']
self.extra_properties = ['Naxes1', 'Naxes2', 'Npixels', 'Nfeeds', 'Npols',
'Ncomponents_vec',
'axis1_array', 'axis2_array', 'nside', 'ordering',
'pixel_array', 'feed_array', 'polarization_array',
'basis_vector_array', 'extra_keywords', 'Nelements',
'element_coordinate_system',
'element_location_array', 'delay_array',
'gain_array', 'coupling_matrix',
'reference_input_impedance', 'reference_output_impedance',
'receiver_temperature_array',
'loss_array', 'mismatch_array',
's_parameters']
self.other_properties = ['pyuvdata_version_str']
self.beam_obj = UVBeam()
def teardown(self):
"""Test teardown: delete object."""
del(self.beam_obj)
def test_parameter_iter(self):
"Test expected parameters."
all = []
for prop in self.beam_obj:
all.append(prop)
for a in self.required_parameters + self.extra_parameters:
nt.assert_true(a in all, msg='expected attribute ' + a
+ ' not returned in object iterator')
def test_required_parameter_iter(self):
"Test expected required parameters."
required = []
for prop in self.beam_obj.required():
required.append(prop)
for a in self.required_parameters:
nt.assert_true(a in required, msg='expected attribute ' + a
+ ' not returned in required iterator')
def test_extra_parameter_iter(self):
"Test expected optional parameters."
extra = []
for prop in self.beam_obj.extra():
extra.append(prop)
for a in self.extra_parameters:
nt.assert_true(a in extra, msg='expected attribute ' + a
+ ' not returned in extra iterator')
def test_unexpected_parameters(self):
"Test for extra parameters."
expected_parameters = self.required_parameters + self.extra_parameters
attributes = [i for i in self.beam_obj.__dict__.keys() if i[0] == '_']
for a in attributes:
nt.assert_true(a in expected_parameters,
msg='unexpected parameter ' + a + ' found in UVData')
def test_unexpected_attributes(self):
"Test for extra attributes."
expected_attributes = self.required_properties + \
self.extra_properties + self.other_properties
attributes = [i for i in self.beam_obj.__dict__.keys() if i[0] != '_']
for a in attributes:
nt.assert_true(a in expected_attributes,
msg='unexpected attribute ' + a + ' found in UVData')
def test_properties(self):
"Test that properties can be get and set properly."
prop_dict = dict(zip(self.required_properties + self.extra_properties,
self.required_parameters + self.extra_parameters))
for k, v in prop_dict.iteritems():
rand_num = np.random.rand()
setattr(self.beam_obj, k, rand_num)
this_param = getattr(self.beam_obj, v)
try:
nt.assert_equal(rand_num, this_param.value)
except(AssertionError):
print('setting {prop_name} to a random number failed'.format(prop_name=k))
raise(AssertionError)
def test_errors():
beam_obj = UVBeam()
nt.assert_raises(ValueError, beam_obj._convert_to_filetype, 'foo')
def test_peak_normalize():
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files, beam_type='efield', frequency=[150e6, 123e6],
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
orig_bandpass_array = copy.deepcopy(efield_beam.bandpass_array)
maxima = np.zeros(efield_beam.Nfreqs)
for freq_i in range(efield_beam.Nfreqs):
maxima[freq_i] = np.amax(abs(efield_beam.data_array[:, :, :, freq_i]))
efield_beam.peak_normalize()
nt.assert_equal(np.amax(abs(efield_beam.data_array)), 1)
nt.assert_equal(np.sum(abs(efield_beam.bandpass_array - orig_bandpass_array * maxima)), 0)
nt.assert_equal(efield_beam.data_normalization, 'peak')
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files, beam_type='power', frequency=[150e6, 123e6],
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
orig_bandpass_array = copy.deepcopy(power_beam.bandpass_array)
maxima = np.zeros(efield_beam.Nfreqs)
for freq_i in range(efield_beam.Nfreqs):
maxima[freq_i] = np.amax(power_beam.data_array[:, :, :, freq_i])
power_beam.peak_normalize()
nt.assert_equal(np.amax(abs(power_beam.data_array)), 1)
nt.assert_equal(np.sum(abs(power_beam.bandpass_array - orig_bandpass_array * maxima)), 0)
nt.assert_equal(power_beam.data_normalization, 'peak')
power_beam.data_normalization = 'solid_angle'
nt.assert_raises(NotImplementedError, power_beam.peak_normalize)
def test_efield_to_power():
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files, beam_type='efield', frequency=[150e6, 123e6],
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files, beam_type='power', frequency=[150e6, 123e6],
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
new_power_beam = efield_beam.efield_to_power(calc_cross_pols=False, inplace=False)
# The values in the beam file only have 4 sig figs, so they don't match precisely
diff = np.abs(new_power_beam.data_array - power_beam.data_array)
nt.assert_true(np.max(diff) < 2)
reldiff = diff / power_beam.data_array
nt.assert_true(np.max(reldiff) < 0.002)
# set data_array tolerances higher to test the rest of the object
# tols are (relative, absolute)
tols = [0.002, 0]
power_beam._data_array.tols = tols
# modify the history to match
power_beam.history += ' Converted from efield to power using pyuvdata.'
nt.assert_equal(power_beam, new_power_beam)
# test with non-orthogonal basis vectors
# first construct a beam with non-orthogonal basis vectors
new_basis_vecs = np.zeros_like(efield_beam.basis_vector_array)
new_basis_vecs[0, 0, :, :] = np.sqrt(0.5)
new_basis_vecs[0, 1, :, :] = np.sqrt(0.5)
new_basis_vecs[1, :, :, :] = efield_beam.basis_vector_array[1, :, :, :]
new_data = np.zeros_like(efield_beam.data_array)
new_data[0, :, :, :, :, :] = np.sqrt(2) * efield_beam.data_array[0, :, :, :, :, :]
new_data[1, :, :, :, :, :] = (efield_beam.data_array[1, :, :, :, :, :]
- efield_beam.data_array[0, :, :, :, :, :])
efield_beam2 = copy.deepcopy(efield_beam)
efield_beam2.basis_vector_array = new_basis_vecs
efield_beam2.data_array = new_data
efield_beam2.check()
# now convert to power. Should get the same result
new_power_beam2 = copy.deepcopy(efield_beam2)
new_power_beam2.efield_to_power(calc_cross_pols=False)
nt.assert_equal(new_power_beam, new_power_beam2)
# check that this raises an error if trying to convert to HEALPix:
nt.assert_raises(NotImplementedError, efield_beam2.az_za_to_healpix,
inplace=False)
# now try a different rotation to non-orthogonal basis vectors
new_basis_vecs = np.zeros_like(efield_beam.basis_vector_array)
new_basis_vecs[0, :, :, :] = efield_beam.basis_vector_array[0, :, :, :]
new_basis_vecs[1, 0, :, :] = np.sqrt(0.5)
new_basis_vecs[1, 1, :, :] = np.sqrt(0.5)
new_data = np.zeros_like(efield_beam.data_array)
new_data[0, :, :, :, :, :] = (efield_beam.data_array[0, :, :, :, :, :]
- efield_beam.data_array[1, :, :, :, :, :])
new_data[1, :, :, :, :, :] = np.sqrt(2) * efield_beam.data_array[1, :, :, :, :, :]
efield_beam2 = copy.deepcopy(efield_beam)
efield_beam2.basis_vector_array = new_basis_vecs
efield_beam2.data_array = new_data
efield_beam2.check()
# now convert to power. Should get the same result
new_power_beam2 = copy.deepcopy(efield_beam2)
new_power_beam2.efield_to_power(calc_cross_pols=False)
nt.assert_equal(new_power_beam, new_power_beam2)
# now construct a beam with orthogonal but rotated basis vectors
new_basis_vecs = np.zeros_like(efield_beam.basis_vector_array)
new_basis_vecs[0, 0, :, :] = np.sqrt(0.5)
new_basis_vecs[0, 1, :, :] = np.sqrt(0.5)
new_basis_vecs[1, 0, :, :] = -1 * np.sqrt(0.5)
new_basis_vecs[1, 1, :, :] = np.sqrt(0.5)
new_data = np.zeros_like(efield_beam.data_array)
new_data[0, :, :, :, :, :] = np.sqrt(0.5) * (efield_beam.data_array[0, :, :, :, :, :]
+ efield_beam.data_array[1, :, :, :, :, :])
new_data[1, :, :, :, :, :] = np.sqrt(0.5) * (-1 * efield_beam.data_array[0, :, :, :, :, :]
+ efield_beam.data_array[1, :, :, :, :, :])
efield_beam2 = copy.deepcopy(efield_beam)
efield_beam2.basis_vector_array = new_basis_vecs
efield_beam2.data_array = new_data
efield_beam2.check()
# now convert to power. Should get the same result
new_power_beam2 = copy.deepcopy(efield_beam2)
new_power_beam2.efield_to_power(calc_cross_pols=False)
nt.assert_equal(new_power_beam, new_power_beam2)
# test calculating cross pols
new_power_beam = efield_beam.efield_to_power(calc_cross_pols=True, inplace=False)
nt.assert_true(np.all(np.abs(new_power_beam.data_array[:, :, 0, :, :,
np.where(new_power_beam.axis1_array == 0)[0]])
> np.abs(new_power_beam.data_array[:, :, 2, :, :,
np.where(new_power_beam.axis1_array == 0)[0]])))
nt.assert_true(np.all(np.abs(new_power_beam.data_array[:, :, 0, :, :,
np.where(new_power_beam.axis1_array == np.pi / 2.)[0]])
> np.abs(new_power_beam.data_array[:, :, 2, :, :,
np.where(new_power_beam.axis1_array == np.pi / 2.)[0]])))
# test writing out & reading back in power files (with cross pols which are complex)
write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
new_power_beam.write_beamfits(write_file, clobber=True)
new_power_beam2 = UVBeam()
new_power_beam2.read_beamfits(write_file)
nt.assert_equal(new_power_beam, new_power_beam2)
# test keeping basis vectors
new_power_beam = efield_beam.efield_to_power(calc_cross_pols=False,
keep_basis_vector=True,
inplace=False)
nt.assert_true(np.allclose(new_power_beam.data_array, np.abs(efield_beam.data_array)**2))
# test raises error if beam is already a power beam
nt.assert_raises(ValueError, power_beam.efield_to_power)
# test raises error if input efield beam has Naxes_vec=3
efield_beam.Naxes_vec = 3
nt.assert_raises(ValueError, efield_beam.efield_to_power)
# TODO: add testing in healpix once we can convert efield beams to healpix
def test_az_za_to_healpix():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
power_beam.select(axis2_inds=np.where(power_beam.axis2_array <= np.pi / 2.)[0])
power_beam_healpix = power_beam.az_za_to_healpix(inplace=False)
# check that history is updated appropriately
nt.assert_equal(power_beam_healpix.history, power_beam.history
+ ' Converted from regularly gridded azimuth/zenith_angle '
'to HEALPix using pyuvdata.')
npix = hp.nside2npix(power_beam_healpix.nside)
nt.assert_true(power_beam_healpix.Npixels <= npix * 0.55)
# Test error if not az_za
power_beam.pixel_coordinate_system = 'sin_zenith'
nt.assert_raises(ValueError, power_beam.az_za_to_healpix)
# Now check Efield interpolation
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
interp_then_sq = efield_beam.az_za_to_healpix(inplace=False)
interp_then_sq.efield_to_power(calc_cross_pols=False)
# convert to power and then interpolate to compare.
# Don't use power read from file because it has rounding errors that will dominate this comparison
sq_then_interp = efield_beam.efield_to_power(calc_cross_pols=False, inplace=False)
sq_then_interp.az_za_to_healpix()
# square then interpolate is different from interpolate then square at a
# higher level than normally allowed in the equality.
# We can live with it for now, may need to improve it later
diff = np.abs(interp_then_sq.data_array - sq_then_interp.data_array)
nt.assert_true(np.max(diff) < 0.5)
reldiff = diff / sq_then_interp.data_array
nt.assert_true(np.max(reldiff) < 0.05)
# set data_array tolerances higher to test the rest of the object
# tols are (relative, absolute)
tols = [0.05, 0]
sq_then_interp._data_array.tols = tols
# check history changes
interp_history_add = ' Converted from regularly gridded azimuth/zenith_angle to HEALPix using pyuvdata.'
sq_history_add = ' Converted from efield to power using pyuvdata.'
nt.assert_equal(sq_then_interp.history,
efield_beam.history + sq_history_add + interp_history_add)
nt.assert_equal(interp_then_sq.history,
efield_beam.history + interp_history_add + sq_history_add)
# now change history on one so we can compare the rest of the object
sq_then_interp.history = efield_beam.history + interp_history_add + sq_history_add
nt.assert_equal(sq_then_interp, interp_then_sq)
def test_select_axis():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# add optional parameters for testing purposes
power_beam.extra_keywords = {'KEY1': 'test_keyword'}
power_beam.reference_input_impedance = 340.
power_beam.reference_output_impedance = 50.
power_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.loss_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, power_beam.Nspws, power_beam.Nfreqs))
old_history = power_beam.history
# Test selecting on axis1
inds1_to_keep = np.arange(14, 63)
power_beam2 = power_beam.select(axis1_inds=inds1_to_keep, inplace=False)
nt.assert_equal(len(inds1_to_keep), power_beam2.Naxes1)
for i in inds1_to_keep:
nt.assert_true(power_beam.axis1_array[i] in power_beam2.axis1_array)
for i in np.unique(power_beam2.axis1_array):
nt.assert_true(i in power_beam.axis1_array)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of first image axis '
'using pyuvdata.', power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one element in axis1
inds_to_keep = [len(inds1_to_keep) + 1]
power_beam2 = power_beam.select(axis1_inds=inds_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with indices not included in data
nt.assert_raises(ValueError, power_beam2.select, axis1_inds=[power_beam.Naxes1 - 1])
# check for warnings and errors associated with unevenly spaced image pixels
power_beam2 = copy.deepcopy(power_beam)
uvtest.checkWarnings(power_beam2.select, [], {'axis1_inds': [0, 5, 6]},
message='Selected values along first image axis are not evenly spaced')
nt.assert_raises(ValueError, power_beam2.write_beamfits, write_file_beamfits)
# Test selecting on axis2
inds2_to_keep = np.arange(5, 14)
power_beam2 = power_beam.select(axis2_inds=inds2_to_keep, inplace=False)
nt.assert_equal(len(inds2_to_keep), power_beam2.Naxes2)
for i in inds2_to_keep:
nt.assert_true(power_beam.axis2_array[i] in power_beam2.axis2_array)
for i in np.unique(power_beam2.axis2_array):
nt.assert_true(i in power_beam.axis2_array)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of second image axis '
'using pyuvdata.', power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one element in axis2
inds_to_keep = [len(inds2_to_keep) + 1]
power_beam2 = power_beam.select(axis2_inds=inds_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with indices not included in data
nt.assert_raises(ValueError, power_beam2.select, axis2_inds=[power_beam.Naxes2 - 1])
# check for warnings and errors associated with unevenly spaced image pixels
power_beam2 = copy.deepcopy(power_beam)
uvtest.checkWarnings(power_beam2.select, [], {'axis2_inds': [0, 5, 6]},
message='Selected values along second image axis are not evenly spaced')
nt.assert_raises(ValueError, power_beam2.write_beamfits, write_file_beamfits)
def test_select_frequencies():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding several times
while power_beam.Nfreqs < 8:
new_beam = copy.deepcopy(power_beam)
new_beam.freq_array = power_beam.freq_array + power_beam.Nfreqs * 1e6
power_beam += new_beam
# add optional parameters for testing purposes
power_beam.extra_keywords = {'KEY1': 'test_keyword'}
power_beam.reference_input_impedance = 340.
power_beam.reference_output_impedance = 50.
power_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.loss_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, power_beam.Nspws, power_beam.Nfreqs))
old_history = power_beam.history
freqs_to_keep = power_beam.freq_array[0, np.arange(2, 7)]
power_beam2 = power_beam.select(frequencies=freqs_to_keep, inplace=False)
nt.assert_equal(len(freqs_to_keep), power_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
power_beam2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one frequency
freqs_to_keep = power_beam.freq_array[0, 5]
power_beam2 = power_beam.select(frequencies=freqs_to_keep, inplace=False)
power_beam2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with frequencies not included in data
nt.assert_raises(ValueError, power_beam.select, frequencies=[np.max(power_beam.freq_array) + 10])
# check for warnings and errors associated with unevenly spaced frequencies
power_beam2 = copy.deepcopy(power_beam)
uvtest.checkWarnings(power_beam2.select, [],
{'frequencies': power_beam2.freq_array[0, [0, 5, 6]]},
message='Selected frequencies are not evenly spaced')
nt.assert_raises(ValueError, power_beam2.write_beamfits, write_file_beamfits)
# Test selecting on freq_chans
chans_to_keep = np.arange(2, 7)
power_beam2 = power_beam.select(freq_chans=chans_to_keep, inplace=False)
nt.assert_equal(len(chans_to_keep), power_beam2.Nfreqs)
for chan in chans_to_keep:
nt.assert_true(power_beam.freq_array[0, chan] in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in power_beam.freq_array[0, chans_to_keep])
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
power_beam2.history))
# Test selecting both channels and frequencies
freqs_to_keep = power_beam.freq_array[0, np.arange(6, 8)] # Overlaps with chans
all_chans_to_keep = np.arange(2, 8)
power_beam2 = power_beam.select(frequencies=freqs_to_keep,
freq_chans=chans_to_keep,
inplace=False)
nt.assert_equal(len(all_chans_to_keep), power_beam2.Nfreqs)
for chan in all_chans_to_keep:
nt.assert_true(power_beam.freq_array[0, chan] in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in power_beam.freq_array[0, all_chans_to_keep])
def test_select_feeds():
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# add optional parameters for testing purposes
efield_beam.extra_keywords = {'KEY1': 'test_keyword'}
efield_beam.reference_input_impedance = 340.
efield_beam.reference_output_impedance = 50.
efield_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.loss_array = np.random.normal(50.0, 5, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, efield_beam.Nspws, efield_beam.Nfreqs))
old_history = efield_beam.history
feeds_to_keep = ['x']
efield_beam2 = efield_beam.select(feeds=feeds_to_keep, inplace=False)
nt.assert_equal(len(feeds_to_keep), efield_beam2.Nfeeds)
for f in feeds_to_keep:
nt.assert_true(f in efield_beam2.feed_array)
for f in np.unique(efield_beam2.feed_array):
nt.assert_true(f in feeds_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific feeds using pyuvdata.',
efield_beam2.history))
# check for errors associated with feeds not included in data
nt.assert_raises(ValueError, efield_beam.select, feeds=['N'])
# check for error with selecting polarizations on efield beams
nt.assert_raises(ValueError, efield_beam.select, polarizations=[-5, -6])
# Test check basis vectors
efield_beam.basis_vector_array[0, 1, :, :] = 1.0
nt.assert_raises(ValueError, efield_beam.check)
efield_beam.basis_vector_array[0, 0, :, :] = np.sqrt(0.5)
efield_beam.basis_vector_array[0, 1, :, :] = np.sqrt(0.5)
nt.assert_true(efield_beam.check())
efield_beam.basis_vector_array = None
nt.assert_raises(ValueError, efield_beam.check)
def test_select_polarizations():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol='xx',
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more polarizations for testing by copying and adding several times
while power_beam.Npols < 4:
new_beam = copy.deepcopy(power_beam)
new_beam.polarization_array = power_beam.polarization_array - power_beam.Npols
power_beam += new_beam
# add optional parameters for testing purposes
power_beam.extra_keywords = {'KEY1': 'test_keyword'}
power_beam.reference_input_impedance = 340.
power_beam.reference_output_impedance = 50.
power_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.loss_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, power_beam.Nspws, power_beam.Nfreqs))
old_history = power_beam.history
pols_to_keep = [-5, -6]
power_beam2 = power_beam.select(polarizations=pols_to_keep,
inplace=False)
nt.assert_equal(len(pols_to_keep), power_beam2.Npols)
for p in pols_to_keep:
nt.assert_true(p in power_beam2.polarization_array)
for p in np.unique(power_beam2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
power_beam2.history))
# check for errors associated with polarizations not included in data
nt.assert_raises(ValueError, power_beam.select, polarizations=[-3, -4])
# check for warnings and errors associated with unevenly spaced polarizations
uvtest.checkWarnings(power_beam.select, [], {'polarizations': power_beam.polarization_array[[0, 1, 3]]},
message='Selected polarizations are not evenly spaced')
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
nt.assert_raises(ValueError, power_beam.write_beamfits, write_file_beamfits)
# check for error with selecting on feeds on power beams
nt.assert_raises(ValueError, power_beam.select, feeds=['x'])
def test_select():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding
new_beam = copy.deepcopy(power_beam)
new_beam.freq_array = power_beam.freq_array + power_beam.Nfreqs * 1e6
power_beam += new_beam
# add optional parameters for testing purposes
power_beam.extra_keywords = {'KEY1': 'test_keyword'}
power_beam.reference_input_impedance = 340.
power_beam.reference_output_impedance = 50.
power_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.loss_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, power_beam.Nspws, power_beam.Nfreqs))
# now test selecting along all axes at once
old_history = power_beam.history
inds1_to_keep = np.arange(14, 63)
inds2_to_keep = np.arange(5, 14)
freqs_to_keep = [power_beam.freq_array[0, 0]]
pols_to_keep = [-5]
power_beam2 = power_beam.select(axis1_inds=inds1_to_keep,
axis2_inds=inds2_to_keep,
frequencies=freqs_to_keep,
polarizations=pols_to_keep,
inplace=False)
nt.assert_equal(len(inds1_to_keep), power_beam2.Naxes1)
for i in inds1_to_keep:
nt.assert_true(power_beam.axis1_array[i] in power_beam2.axis1_array)
for i in np.unique(power_beam2.axis1_array):
nt.assert_true(i in power_beam.axis1_array)
nt.assert_equal(len(inds2_to_keep), power_beam2.Naxes2)
for i in inds2_to_keep:
nt.assert_true(power_beam.axis2_array[i] in power_beam2.axis2_array)
for i in np.unique(power_beam2.axis2_array):
nt.assert_true(i in power_beam.axis2_array)
nt.assert_equal(len(freqs_to_keep), power_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in power_beam2.freq_array)
for f in np.unique(power_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(pols_to_keep), power_beam2.Npols)
for p in pols_to_keep:
nt.assert_true(p in power_beam2.polarization_array)
for p in np.unique(power_beam2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of first image axis, '
'parts of second image axis, '
'frequencies, polarizations using pyuvdata.',
power_beam2.history))
# repeat for efield beam
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding
new_beam = copy.deepcopy(efield_beam)
new_beam.freq_array = efield_beam.freq_array + efield_beam.Nfreqs * 1e6
efield_beam += new_beam
# add optional parameters for testing purposes
efield_beam.extra_keywords = {'KEY1': 'test_keyword'}
efield_beam.reference_input_impedance = 340.
efield_beam.reference_output_impedance = 50.
efield_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.loss_array = np.random.normal(50.0, 5, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, efield_beam.Nspws, efield_beam.Nfreqs))
feeds_to_keep = ['x']
efield_beam2 = efield_beam.select(axis1_inds=inds1_to_keep,
axis2_inds=inds2_to_keep,
frequencies=freqs_to_keep,
feeds=feeds_to_keep,
inplace=False)
nt.assert_equal(len(inds1_to_keep), efield_beam2.Naxes1)
for i in inds1_to_keep:
nt.assert_true(efield_beam.axis1_array[i] in efield_beam2.axis1_array)
for i in np.unique(efield_beam2.axis1_array):
nt.assert_true(i in efield_beam.axis1_array)
nt.assert_equal(len(inds2_to_keep), efield_beam2.Naxes2)
for i in inds2_to_keep:
nt.assert_true(efield_beam.axis2_array[i] in efield_beam2.axis2_array)
for i in np.unique(efield_beam2.axis2_array):
nt.assert_true(i in efield_beam.axis2_array)
nt.assert_equal(len(freqs_to_keep), efield_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in efield_beam2.freq_array)
for f in np.unique(efield_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(feeds_to_keep), efield_beam2.Nfeeds)
for f in feeds_to_keep:
nt.assert_true(f in efield_beam2.feed_array)
for f in np.unique(efield_beam2.feed_array):
nt.assert_true(f in feeds_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific parts of first image axis, '
'parts of second image axis, '
'frequencies, feeds using pyuvdata.',
efield_beam2.history))
def test_add():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding
new_beam = copy.deepcopy(power_beam)
new_beam.freq_array = power_beam.freq_array + power_beam.Nfreqs * 1e6
power_beam += new_beam
# add optional parameters for testing purposes
power_beam.extra_keywords = {'KEY1': 'test_keyword'}
power_beam.reference_input_impedance = 340.
power_beam.reference_output_impedance = 50.
power_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.loss_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, power_beam.Nspws, power_beam.Nfreqs))
# Add along first image axis
beam1 = power_beam.select(axis1_inds=np.arange(0, 180), inplace=False)
beam2 = power_beam.select(axis1_inds=np.arange(180, 360), inplace=False)
beam1 += beam2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(power_beam.history
+ ' Downselected to specific parts of '
'first image axis using pyuvdata. '
'Combined data along first image axis '
'using pyuvdata.', beam1.history))
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Out of order - axis1
beam1 = power_beam.select(axis1_inds=np.arange(180, 360), inplace=False)
beam2 = power_beam.select(axis1_inds=np.arange(0, 180), inplace=False)
beam1 += beam2
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Add along second image axis
beam1 = power_beam.select(axis2_inds=np.arange(0, 90), inplace=False)
beam2 = power_beam.select(axis2_inds=np.arange(90, 181), inplace=False)
beam1 += beam2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(power_beam.history
+ ' Downselected to specific parts of '
'second image axis using pyuvdata. '
'Combined data along second image axis '
'using pyuvdata.', beam1.history))
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Out of order - axis2
beam1 = power_beam.select(axis2_inds=np.arange(90, 181), inplace=False)
beam2 = power_beam.select(axis2_inds=np.arange(0, 90), inplace=False)
beam1 += beam2
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Add frequencies
beam1 = power_beam.select(freq_chans=0, inplace=False)
beam2 = power_beam.select(freq_chans=1, inplace=False)
beam1 += beam2
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils.check_histories(power_beam.history
+ ' Downselected to specific frequencies '
'using pyuvdata. Combined data along '
'frequency axis using pyuvdata.',
beam1.history))
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Out of order - freqs
beam1 = power_beam.select(freq_chans=1, inplace=False)
beam2 = power_beam.select(freq_chans=0, inplace=False)
beam1 += beam2
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Add polarizations
beam1 = power_beam.select(polarizations=-5, inplace=False)
beam2 = power_beam.select(polarizations=-6, inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam.history
+ ' Downselected to specific polarizations '
'using pyuvdata. Combined data along '
'polarization axis using pyuvdata.',
beam1.history))
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Out of order - pols
beam1 = power_beam.select(polarizations=-6, inplace=False)
beam2 = power_beam.select(polarizations=-5, inplace=False)
beam1 += beam2
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# Add feeds
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding
new_beam = copy.deepcopy(efield_beam)
new_beam.freq_array = efield_beam.freq_array + efield_beam.Nfreqs * 1e6
efield_beam += new_beam
# add optional parameters for testing purposes
efield_beam.extra_keywords = {'KEY1': 'test_keyword'}
efield_beam.reference_input_impedance = 340.
efield_beam.reference_output_impedance = 50.
efield_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.loss_array = np.random.normal(50.0, 5, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(efield_beam.Nspws, efield_beam.Nfreqs))
efield_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, efield_beam.Nspws, efield_beam.Nfreqs))
beam1 = efield_beam.select(feeds=efield_beam.feed_array[0], inplace=False)
beam2 = efield_beam.select(feeds=efield_beam.feed_array[1], inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(efield_beam.history
+ ' Downselected to specific feeds '
'using pyuvdata. Combined data along '
'feed axis using pyuvdata.',
beam1.history))
beam1.history = efield_beam.history
nt.assert_equal(beam1, efield_beam)
# Out of order - feeds
beam1 = efield_beam.select(feeds=efield_beam.feed_array[1], inplace=False)
beam2 = efield_beam.select(feeds=efield_beam.feed_array[0], inplace=False)
beam1 += beam2
beam1.history = efield_beam.history
nt.assert_equal(beam1, efield_beam)
# Add multiple axes
beam_ref = copy.deepcopy(power_beam)
beam1 = power_beam.select(axis1_inds=np.arange(0, power_beam.Naxes1 / 2),
polarizations=power_beam.polarization_array[0],
inplace=False)
beam2 = power_beam.select(axis1_inds=np.arange(power_beam.Naxes1 / 2,
power_beam.Naxes1),
polarizations=power_beam.polarization_array[1],
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam.history
+ ' Downselected to specific parts of '
'first image axis, polarizations using '
'pyuvdata. Combined data along first '
'image, polarization axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, 0, :, :, power_beam.Naxes1 / 2:] = 0.0
beam_ref.data_array[:, :, 1, :, :, :power_beam.Naxes1 / 2] = 0.0
beam1.history = power_beam.history
nt.assert_equal(beam1, beam_ref)
# Another combo with efield
beam_ref = copy.deepcopy(efield_beam)
beam1 = efield_beam.select(axis1_inds=np.arange(0, efield_beam.Naxes1 / 2),
axis2_inds=np.arange(0, efield_beam.Naxes2 / 2),
inplace=False)
beam2 = efield_beam.select(axis1_inds=np.arange(efield_beam.Naxes1 / 2,
efield_beam.Naxes1),
axis2_inds=np.arange(efield_beam.Naxes2 / 2,
efield_beam.Naxes2),
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(efield_beam.history
+ ' Downselected to specific parts of '
'first image axis, parts of second '
'image axis using pyuvdata. Combined '
'data along first image, second image '
'axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, :, :, :efield_beam.Naxes2 / 2,
efield_beam.Naxes1 / 2:] = 0.0
beam_ref.data_array[:, :, :, :, efield_beam.Naxes2 / 2:,
:efield_beam.Naxes1 / 2] = 0.0
beam_ref.basis_vector_array[:, :, :efield_beam.Naxes2 / 2,
efield_beam.Naxes1 / 2:] = 0.0
beam_ref.basis_vector_array[:, :, efield_beam.Naxes2 / 2:,
:efield_beam.Naxes1 / 2] = 0.0
beam1.history = efield_beam.history
nt.assert_equal(beam1, beam_ref)
# Check warnings
# generate more frequencies for testing by copying and adding several times
while power_beam.Nfreqs < 8:
new_beam = copy.deepcopy(power_beam)
new_beam.freq_array = power_beam.freq_array + power_beam.Nfreqs * 1e6
power_beam += new_beam
beam1 = power_beam.select(freq_chans=np.arange(0, 4), inplace=False)
beam2 = power_beam.select(freq_chans=np.arange(5, 8), inplace=False)
uvtest.checkWarnings(beam1.__add__, [beam2],
message='Combined frequencies are not evenly spaced')
# generate more polarizations for testing by copying and adding several times
while power_beam.Npols < 4:
new_beam = copy.deepcopy(power_beam)
new_beam.polarization_array = power_beam.polarization_array - power_beam.Npols
power_beam += new_beam
power_beam.receiver_temperature_array = np.ones((1, 8))
beam1 = power_beam.select(polarizations=power_beam.polarization_array[0:2],
inplace=False)
beam2 = power_beam.select(polarizations=power_beam.polarization_array[3],
inplace=False)
uvtest.checkWarnings(beam1.__iadd__, [beam2],
message='Combined polarizations are not evenly spaced')
beam1 = power_beam.select(polarizations=power_beam.polarization_array[0:2],
inplace=False)
beam2 = power_beam.select(polarizations=power_beam.polarization_array[2:3],
inplace=False)
beam2.receiver_temperature_array = None
nt.assert_false(beam1.receiver_temperature_array is None)
uvtest.checkWarnings(beam1.__iadd__, [beam2],
message=['Only one of the UVBeam objects being combined '
'has optional parameter'])
nt.assert_true(beam1.receiver_temperature_array is None)
# Combining histories
beam1 = power_beam.select(polarizations=power_beam.polarization_array[0:2], inplace=False)
beam2 = power_beam.select(polarizations=power_beam.polarization_array[2:4], inplace=False)
beam2.history += ' testing the history. Read/written with pyuvdata'
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam.history
+ ' Downselected to specific polarizations '
'using pyuvdata. Combined data along '
'polarization axis using pyuvdata. '
'testing the history.',
beam1.history))
beam1.history = power_beam.history
nt.assert_equal(beam1, power_beam)
# ------------------------
# Test failure modes of add function
# Wrong class
beam1 = copy.deepcopy(power_beam)
nt.assert_raises(ValueError, beam1.__iadd__, np.zeros(5))
params_to_change = {'beam_type': 'efield', 'data_normalization': 'solid_angle',
'telescope_name': 'foo', 'feed_name': 'foo',
'feed_version': 'v12', 'model_name': 'foo',
'model_version': 'v12', 'pixel_coordinate_system': 'sin_zenith',
'Naxes_vec': 3, 'nside': 16, 'ordering': 'nested'}
beam1 = power_beam.select(freq_chans=0, inplace=False)
for param, value in params_to_change.iteritems():
beam2 = power_beam.select(freq_chans=1, inplace=False)
setattr(beam2, param, value)
nt.assert_raises(ValueError, beam1.__iadd__, beam2)
# Overlapping data
beam2 = copy.deepcopy(power_beam)
nt.assert_raises(ValueError, beam1.__iadd__, beam2)
def test_healpix():
# put all the testing on healpix in this one function to minimize slow calls
# to uvbeam.az_za_to_healpix()
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding
new_beam = copy.deepcopy(power_beam)
new_beam.freq_array = power_beam.freq_array + power_beam.Nfreqs * 1e6
power_beam += new_beam
# add optional parameters for testing purposes
power_beam.extra_keywords = {'KEY1': 'test_keyword'}
power_beam.reference_input_impedance = 340.
power_beam.reference_output_impedance = 50.
power_beam.receiver_temperature_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.loss_array = np.random.normal(50.0, 5, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.mismatch_array = np.random.normal(0.0, 1.0, size=(power_beam.Nspws, power_beam.Nfreqs))
power_beam.s_parameters = np.random.normal(0.0, 0.3, size=(4, power_beam.Nspws, power_beam.Nfreqs))
power_beam_healpix = power_beam.az_za_to_healpix(inplace=False)
# test that Npixels make sense
n_max_pix = power_beam.Naxes1 * power_beam.Naxes2
nt.assert_true(power_beam_healpix.Npixels <= n_max_pix)
# -----------------------
# test selecting on pixels
old_history = power_beam_healpix.history
pixels_to_keep = np.arange(31, 184)
power_beam_healpix2 = power_beam_healpix.select(pixels=pixels_to_keep, inplace=False)
nt.assert_equal(len(pixels_to_keep), power_beam_healpix2.Npixels)
for pi in pixels_to_keep:
nt.assert_true(pi in power_beam_healpix2.pixel_array)
for pi in np.unique(power_beam_healpix2.pixel_array):
nt.assert_true(pi in pixels_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific healpix pixels using pyuvdata.',
power_beam_healpix2.history))
write_file_beamfits = os.path.join(DATA_PATH, 'test/select_beam.fits')
# test writing beamfits with only one pixel
pixels_to_keep = [43]
power_beam_healpix2 = power_beam_healpix.select(pixels=pixels_to_keep, inplace=False)
power_beam_healpix2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors associated with pixels not included in data
nt.assert_raises(ValueError, power_beam_healpix.select,
pixels=[12 * power_beam_healpix.nside**2 + 10])
# test writing beamfits with non-contiguous pixels
pixels_to_keep = np.arange(2, 150, 4)
power_beam_healpix2 = power_beam_healpix.select(pixels=pixels_to_keep, inplace=False)
power_beam_healpix2.write_beamfits(write_file_beamfits, clobber=True)
# check for errors selecting pixels on non-healpix beams
nt.assert_raises(ValueError, power_beam.select, pixels=pixels_to_keep)
# -----------------
# check for errors selecting axis1_inds on healpix beams
inds1_to_keep = np.arange(14, 63)
nt.assert_raises(ValueError, power_beam_healpix.select, axis1_inds=inds1_to_keep)
# check for errors selecting axis2_inds on healpix beams
inds2_to_keep = np.arange(5, 14)
nt.assert_raises(ValueError, power_beam_healpix.select, axis2_inds=inds2_to_keep)
# ------------------------
# test selecting along all axes at once for healpix beams
freqs_to_keep = [power_beam_healpix.freq_array[0, 0]]
pols_to_keep = [-5]
power_beam_healpix2 = power_beam_healpix.select(pixels=pixels_to_keep,
frequencies=freqs_to_keep,
polarizations=pols_to_keep,
inplace=False)
nt.assert_equal(len(pixels_to_keep), power_beam_healpix2.Npixels)
for pi in pixels_to_keep:
nt.assert_true(pi in power_beam_healpix2.pixel_array)
for pi in np.unique(power_beam_healpix2.pixel_array):
nt.assert_true(pi in pixels_to_keep)
nt.assert_equal(len(freqs_to_keep), power_beam_healpix2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in power_beam_healpix2.freq_array)
for f in np.unique(power_beam_healpix2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(pols_to_keep), power_beam_healpix2.Npols)
for p in pols_to_keep:
nt.assert_true(p in power_beam_healpix2.polarization_array)
for p in np.unique(power_beam_healpix2.polarization_array):
nt.assert_true(p in pols_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific healpix pixels, frequencies, '
'polarizations using pyuvdata.',
power_beam_healpix2.history))
# repeat for efield beam
efield_beam = UVBeam()
efield_beam.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# generate more frequencies for testing by copying and adding
new_beam = copy.deepcopy(efield_beam)
new_beam.freq_array = efield_beam.freq_array + efield_beam.Nfreqs * 1e6
efield_beam += new_beam
efield_beam.az_za_to_healpix()
old_history = efield_beam.history
freqs_to_keep = np.array([efield_beam.freq_array[0, 0]])
feeds_to_keep = ['x']
efield_beam2 = efield_beam.select(pixels=pixels_to_keep,
frequencies=freqs_to_keep,
feeds=feeds_to_keep,
inplace=False)
nt.assert_equal(len(pixels_to_keep), efield_beam2.Npixels)
for pi in pixels_to_keep:
nt.assert_true(pi in efield_beam2.pixel_array)
for pi in np.unique(efield_beam2.pixel_array):
nt.assert_true(pi in pixels_to_keep)
nt.assert_equal(freqs_to_keep.size, efield_beam2.Nfreqs)
for f in freqs_to_keep:
nt.assert_true(f in efield_beam2.freq_array)
for f in np.unique(efield_beam2.freq_array):
nt.assert_true(f in freqs_to_keep)
nt.assert_equal(len(feeds_to_keep), efield_beam2.Nfeeds)
for f in feeds_to_keep:
nt.assert_true(f in efield_beam2.feed_array)
for f in np.unique(efield_beam2.feed_array):
nt.assert_true(f in feeds_to_keep)
nt.assert_true(uvutils.check_histories(old_history + ' Downselected to '
'specific healpix pixels, frequencies, '
'feeds using pyuvdata.',
efield_beam2.history))
# -------------------
# Test adding a different combo with healpix
beam_ref = copy.deepcopy(power_beam_healpix)
beam1 = power_beam_healpix.select(
pixels=power_beam_healpix.pixel_array[0:power_beam_healpix.Npixels / 2],
freq_chans=0, inplace=False)
beam2 = power_beam_healpix.select(
pixels=power_beam_healpix.pixel_array[power_beam_healpix.Npixels / 2:],
freq_chans=1, inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(power_beam_healpix.history
+ ' Downselected to specific healpix '
'pixels, frequencies using pyuvdata. '
'Combined data along healpix pixel, '
'frequency axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, :, 0, power_beam_healpix.Npixels / 2:] = 0.0
beam_ref.data_array[:, :, :, 1, :power_beam_healpix.Npixels / 2] = 0.0
beam1.history = power_beam_healpix.history
nt.assert_equal(beam1, beam_ref)
# Test adding another combo with efield
beam_ref = copy.deepcopy(efield_beam)
beam1 = efield_beam.select(freq_chans=0, feeds=efield_beam.feed_array[0],
inplace=False)
beam2 = efield_beam.select(freq_chans=1, feeds=efield_beam.feed_array[1],
inplace=False)
beam1 += beam2
nt.assert_true(uvutils.check_histories(efield_beam.history
+ ' Downselected to specific frequencies, '
'feeds using pyuvdata. Combined data '
'along frequency, feed axis using pyuvdata.',
beam1.history))
# Zero out missing data in reference object
beam_ref.data_array[:, :, 1, 0, :] = 0.0
beam_ref.data_array[:, :, 0, 1, :] = 0.0
beam1.history = efield_beam.history
nt.assert_equal(beam1, beam_ref)
# Add without inplace
beam1 = efield_beam.select(pixels=efield_beam.pixel_array[0:efield_beam.Npixels / 2],
inplace=False)
beam2 = efield_beam.select(pixels=efield_beam.pixel_array[efield_beam.Npixels / 2:],
inplace=False)
beam1 = beam1 + beam2
nt.assert_true(uvutils.check_histories(efield_beam.history
+ ' Downselected to specific healpix pixels '
'using pyuvdata. Combined data '
'along healpix pixel axis using pyuvdata.',
beam1.history))
beam1.history = efield_beam.history
nt.assert_equal(beam1, efield_beam)
# ---------------
# Test error: adding overlapping data with healpix
beam1 = copy.deepcopy(power_beam_healpix)
beam2 = copy.deepcopy(power_beam_healpix)
nt.assert_raises(ValueError, beam1.__iadd__, beam2)
# ---------------
# Test beam area methods
# Check that non-peak normalizations error
nt.assert_raises(ValueError, power_beam_healpix.get_beam_area)
nt.assert_raises(ValueError, power_beam_healpix.get_beam_sq_area)
healpix_norm = copy.deepcopy(power_beam_healpix)
healpix_norm.data_normalization = 'solid_angle'
nt.assert_raises(ValueError, healpix_norm.get_beam_area)
nt.assert_raises(ValueError, healpix_norm.get_beam_sq_area)
# change it back to 'physical'
healpix_norm.data_normalization = 'physical'
# change it to peak for rest of checks
healpix_norm.peak_normalize()
# Check sizes of output
numfreqs = healpix_norm.freq_array.shape[-1]
beam_int = healpix_norm.get_beam_area()
beam_sq_int = healpix_norm.get_beam_sq_area()
nt.assert_equal(beam_int.shape[0], numfreqs)
nt.assert_equal(beam_sq_int.shape[0], numfreqs)
# Check for the case of a uniform beam over the whole sky
dOmega = hp.nside2pixarea(healpix_norm.nside)
npix = healpix_norm.Npixels
healpix_norm.data_array = np.ones_like(healpix_norm.data_array)
nt.assert_almost_equal(np.sum(healpix_norm.get_beam_area()), numfreqs * npix * dOmega)
healpix_norm.data_array = 2. * np.ones_like(healpix_norm.data_array)
nt.assert_almost_equal(np.sum(healpix_norm.get_beam_sq_area()), numfreqs * 4. * npix * dOmega)
# check XX and YY beam areas work and match to within 5 sigfigs
XX_area = healpix_norm.get_beam_area('XX')
xx_area = healpix_norm.get_beam_area('xx')
nt.assert_true(np.allclose(xx_area, XX_area))
YY_area = healpix_norm.get_beam_area('YY')
nt.assert_true(np.allclose(YY_area / XX_area, np.ones(numfreqs)))
# nt.assert_almost_equal(YY_area / XX_area, 1.0, places=5)
XX_area = healpix_norm.get_beam_sq_area("XX")
YY_area = healpix_norm.get_beam_sq_area("YY")
nt.assert_true(np.allclose(YY_area / XX_area, np.ones(numfreqs)))
# nt.assert_almost_equal(YY_area / XX_area, 1.0, places=5)
# check backwards compatability with pstokes nomenclature and int polnum
I_area = healpix_norm.get_beam_area('I')
pI_area = healpix_norm.get_beam_area('pI')
area1 = healpix_norm.get_beam_area(1)
nt.assert_true(np.allclose(I_area, pI_area))
nt.assert_true(np.allclose(I_area, area1))
# Check that if pseudo-Stokes I (pI) is in the beam polarization_array, it just uses it
healpix_norm.polarization_array = [1, 2]
nt.assert_almost_equal(np.sum(healpix_norm.get_beam_area()), 2. * numfreqs * npix * dOmega)
nt.assert_almost_equal(np.sum(healpix_norm.get_beam_sq_area()), 4. * numfreqs * npix * dOmega)
# Check error if desired pol is allowed but isn't in the polarization_array
nt.assert_raises(ValueError, healpix_norm.get_beam_area, pol='xx')
nt.assert_raises(ValueError, healpix_norm.get_beam_sq_area, pol='xx')
# Check to make sure only pseudo-Stokes I is accepted
nt.assert_raises(NotImplementedError, healpix_norm.get_beam_area, pol='Q')
nt.assert_raises(NotImplementedError, healpix_norm.get_beam_sq_area, pol='Q')
# Check polarization error
healpix_norm.polarization_array = [9, 18, 27, -5]
nt.assert_raises(ValueError, healpix_norm.get_beam_area)
nt.assert_raises(ValueError, healpix_norm.get_beam_sq_area)
healpix_norm_fullpol = efield_beam.efield_to_power(inplace=False)
healpix_norm_fullpol.peak_normalize()
XX_area = healpix_norm_fullpol.get_beam_sq_area("XX")
YY_area = healpix_norm_fullpol.get_beam_sq_area("YY")
XY_area = healpix_norm_fullpol.get_beam_sq_area("XY")
YX_area = healpix_norm_fullpol.get_beam_sq_area("YX")
# check if XY beam area is equal to beam YX beam area
nt.assert_true(np.allclose(XY_area, YX_area))
# check if XY/YX beam area is less than XX/YY beam area
nt.assert_true(np.all(np.less(XY_area, XX_area)))
nt.assert_true(np.all(np.less(XY_area, YY_area)))
nt.assert_true(np.all(np.less(YX_area, XX_area)))
nt.assert_true(np.all(np.less(YX_area, YY_area)))
# Check if power is scalar
healpix_vec_norm = efield_beam.efield_to_power(keep_basis_vector=True,
calc_cross_pols=False,
inplace=False)
healpix_vec_norm.peak_normalize()
nt.assert_raises(ValueError, healpix_vec_norm.get_beam_area)
nt.assert_raises(ValueError, healpix_vec_norm.get_beam_sq_area)
# Check only power beams accepted
nt.assert_raises(ValueError, efield_beam.get_beam_area)
nt.assert_raises(ValueError, efield_beam.get_beam_sq_area)
def test_get_beam_functions():
power_beam = UVBeam()
power_beam.read_cst_beam(cst_files[0], beam_type='power', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1',
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
nt.assert_raises(AssertionError, power_beam._get_beam, 'I')
# Check only healpix accepted (HEALPix checks are in test_healpix)
# change data_normalization to peak for rest of checks
power_beam.peak_normalize()
nt.assert_raises(ValueError, power_beam.get_beam_area)
nt.assert_raises(ValueError, power_beam.get_beam_sq_area)