https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 983ae27b57a28550b6ee1a58e13cc9161bb89327 authored by Bryna Hazelton on 15 January 2020, 19:15:00 UTC
update release date
update release date
Tip revision: 983ae27
test_beamfits.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for BeamFits object.
"""
from __future__ import absolute_import, division, print_function
import pytest
import os
import numpy as np
from astropy.io import fits
from pyuvdata import UVBeam
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
import pyuvdata.utils as uvutils
filenames = ['HERA_NicCST_150MHz.txt', 'HERA_NicCST_123MHz.txt']
cst_folder = 'NicCSTbeams'
cst_files = [os.path.join(DATA_PATH, cst_folder, f) for f in filenames]
def test_readCST_writereadFITS():
beam_in = UVBeam()
beam_out = UVBeam()
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# add optional parameters for testing purposes
beam_in.extra_keywords = {'KEY1': 'test_keyword'}
beam_in.x_orientation = 'east'
beam_in.reference_impedance = 340.
beam_in.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs))
beam_in.interpolation_function = 'az_za_simple'
beam_in.freq_interp_kind = 'linear'
write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# redo for power beam
del(beam_in)
beam_in = UVBeam()
# read in efield and convert to power to test cross-pols
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
beam_in.efield_to_power()
# add optional parameters for testing purposes
beam_in.extra_keywords = {'KEY1': 'test_keyword'}
beam_in.x_orientation = 'east'
beam_in.reference_impedance = 340.
beam_in.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs))
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now replace 'power' with 'intensity' for btype
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
primary_hdr['BTYPE'] = 'Intensity'
hdunames = uvutils._fits_indexhdus(F)
bandpass_hdu = F[hdunames['BANDPARM']]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now remove coordsys but leave ctypes 1 & 2
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
primary_hdr.pop('COORDSYS')
hdunames = uvutils._fits_indexhdus(F)
bandpass_hdu = F[hdunames['BANDPARM']]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now change frequency units
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
primary_hdr['CUNIT3'] = 'MHz'
primary_hdr['CRVAL3'] = primary_hdr['CRVAL3'] / 1e6
primary_hdr['CDELT3'] = primary_hdr['CRVAL3'] / 1e6
hdunames = uvutils._fits_indexhdus(F)
bandpass_hdu = F[hdunames['BANDPARM']]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
@uvtest.skipIf_no_healpix
def test_writeread_healpix():
beam_in = UVBeam()
beam_out = UVBeam()
# fill UVBeam object with dummy data for now for testing purposes
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
beam_in.interpolation_function = 'az_za_simple'
beam_in.to_healpix()
write_file = os.path.join(DATA_PATH, 'test/outtest_beam_hpx.fits')
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# redo for power beam
del(beam_in)
beam_in = UVBeam()
# read in efield and convert to power to test cross-pols
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
beam_in.efield_to_power()
# add optional parameters for testing purposes
beam_in.extra_keywords = {'KEY1': 'test_keyword'}
beam_in.x_orientation = 'east'
beam_in.reference_impedance = 340.
beam_in.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs))
# check that data_array is complex
assert np.iscomplexobj(np.real_if_close(beam_in.data_array, tol=10))
beam_in.interpolation_function = 'az_za_simple'
beam_in.to_healpix()
# check that data_array is complex after interpolation
assert np.iscomplexobj(np.real_if_close(beam_in.data_array, tol=10))
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now remove coordsys but leave ctype 1
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
primary_hdr.pop('COORDSYS')
hdunames = uvutils._fits_indexhdus(F)
hpx_hdu = F[hdunames['HPX_INDS']]
bandpass_hdu = F[hdunames['BANDPARM']]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, hpx_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
def test_errors():
beam_in = UVBeam()
beam_out = UVBeam()
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
beam_in.beam_type = 'foo'
write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
pytest.raises(ValueError, beam_in.write_beamfits, write_file, clobber=True)
pytest.raises(ValueError, beam_in.write_beamfits, write_file,
clobber=True, run_check=False)
beam_in.beam_type = 'efield'
beam_in.antenna_type = 'phased_array'
write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
pytest.raises(ValueError, beam_in.write_beamfits, write_file, clobber=True)
# now change values for various items in primary hdu to test errors
beam_in.antenna_type = 'simple'
header_vals_to_change = [{'BTYPE': 'foo'}, {'COORDSYS': 'orthoslant_zenith'},
{'NAXIS': ''}, {'CUNIT1': 'foo'}, {'CUNIT2': 'foo'},
{'CUNIT3': 'foo'}]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
hdunames = uvutils._fits_indexhdus(F)
basisvec_hdu = F[hdunames['BASISVEC']]
bandpass_hdu = F[hdunames['BANDPARM']]
if 'NAXIS' in keyword:
ax_num = keyword.split('NAXIS')[1]
if ax_num != '':
ax_num = int(ax_num)
ax_use = len(data.shape) - ax_num
new_arrays = np.split(data, primary_hdr[keyword], axis=ax_use)
data = new_arrays[0]
else:
data = np.squeeze(np.split(data, primary_hdr['NAXIS1'],
axis=len(data.shape) - 1)[0])
else:
primary_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
# now change values for various items in basisvec hdu to not match primary hdu
header_vals_to_change = [{'COORDSYS': 'foo'}, {'CTYPE1': 'foo'},
{'CTYPE2': 'foo'},
{'CDELT1': np.diff(beam_in.axis1_array)[0] * 2},
{'CDELT2': np.diff(beam_in.axis2_array)[0] * 2},
{'NAXIS4': ''}, {'CUNIT1': 'foo'}, {'CUNIT2': 'foo'}]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
hdunames = uvutils._fits_indexhdus(F)
basisvec_hdu = F[hdunames['BASISVEC']]
basisvec_hdr = basisvec_hdu.header
basisvec_data = basisvec_hdu.data
bandpass_hdu = F[hdunames['BANDPARM']]
if 'NAXIS' in keyword:
ax_num = keyword.split('NAXIS')[1]
if ax_num != '':
ax_num = int(ax_num)
ax_use = len(basisvec_data.shape) - ax_num
new_arrays = np.split(basisvec_data, basisvec_hdr[keyword], axis=ax_use)
basisvec_data = new_arrays[0]
else:
basisvec_data = np.split(basisvec_data, basisvec_hdr['NAXIS1'],
axis=len(basisvec_data.shape) - 1)[0]
else:
basisvec_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
basisvec_hdu = fits.ImageHDU(data=basisvec_data, header=basisvec_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
@uvtest.skipIf_no_healpix
def test_healpix_errors():
beam_in = UVBeam()
beam_out = UVBeam()
write_file = os.path.join(DATA_PATH, 'test/outtest_beam_hpx.fits')
# now change values for various items in primary hdu to test errors
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
beam_in.interpolation_function = 'az_za_simple'
beam_in.to_healpix()
header_vals_to_change = [{'CTYPE1': 'foo'}, {'NAXIS1': ''}]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
hdunames = uvutils._fits_indexhdus(F)
basisvec_hdu = F[hdunames['BASISVEC']]
hpx_hdu = F[hdunames['HPX_INDS']]
bandpass_hdu = F[hdunames['BANDPARM']]
if 'NAXIS' in keyword:
ax_num = keyword.split('NAXIS')[1]
if ax_num != '':
ax_num = int(ax_num)
ax_use = len(data.shape) - ax_num
new_arrays = np.split(data, primary_hdr[keyword], axis=ax_use)
data = new_arrays[0]
else:
data = np.squeeze(np.split(data, primary_hdr['NAXIS1'],
axis=len(data.shape) - 1)[0])
else:
primary_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, hpx_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
# now change values for various items in basisvec hdu to not match primary hdu
beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
beam_in.interpolation_function = 'az_za_simple'
beam_in.to_healpix()
header_vals_to_change = [{'CTYPE1': 'foo'}, {'NAXIS1': ''}]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
F = fits.open(write_file)
data = F[0].data
primary_hdr = F[0].header
hdunames = uvutils._fits_indexhdus(F)
basisvec_hdu = F[hdunames['BASISVEC']]
basisvec_hdr = basisvec_hdu.header
basisvec_data = basisvec_hdu.data
hpx_hdu = F[hdunames['HPX_INDS']]
bandpass_hdu = F[hdunames['BANDPARM']]
if 'NAXIS' in keyword:
ax_num = keyword.split('NAXIS')[1]
if ax_num != '':
ax_num = int(ax_num)
ax_use = len(basisvec_data.shape) - ax_num
new_arrays = np.split(basisvec_data, basisvec_hdr[keyword], axis=ax_use)
basisvec_data = new_arrays[0]
else:
basisvec_data = np.split(basisvec_data, basisvec_hdr['NAXIS1'],
axis=len(basisvec_data.shape) - 1)[0]
else:
basisvec_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
basisvec_hdu = fits.ImageHDU(data=basisvec_data, header=basisvec_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, hpx_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
def test_casa_beam():
# test reading in CASA power beam. Some header items are missing...
beam_in = UVBeam()
beam_out = UVBeam()
casa_file = os.path.join(DATA_PATH, 'HERABEAM.FITS')
write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
beam_in.read_beamfits(casa_file, run_check=False)
# fill in missing parameters
beam_in.data_normalization = 'peak'
beam_in.feed_name = 'casa_ideal'
beam_in.feed_version = 'v0'
beam_in.model_name = 'casa_airy'
beam_in.model_version = 'v0'
# this file is actually in an orthoslant projection RA/DEC at zenith at a particular time.
# For now pretend it's in a zenith orthoslant projection
beam_in.pixel_coordinate_system = 'orthoslant_zenith'
expected_extra_keywords = ['OBSERVER', 'OBSDEC', 'DATAMIN', 'OBJECT',
'INSTRUME', 'DATAMAX', 'OBSRA', 'ORIGIN',
'DATE-MAP', 'DATE', 'EQUINOX', 'DATE-OBS',
'COMMENT']
assert expected_extra_keywords.sort() == list(beam_in.extra_keywords.keys()).sort()
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
def test_extra_keywords():
beam_in = UVBeam()
beam_out = UVBeam()
casa_file = os.path.join(DATA_PATH, 'HERABEAM.FITS')
testfile = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
beam_in.read_beamfits(casa_file, run_check=False)
# fill in missing parameters
beam_in.data_normalization = 'peak'
beam_in.feed_name = 'casa_ideal'
beam_in.feed_version = 'v0'
beam_in.model_name = 'casa_airy'
beam_in.model_version = 'v0'
# this file is actually in an orthoslant projection RA/DEC at zenith at a particular time.
# For now pretend it's in a zenith orthoslant projection
beam_in.pixel_coordinate_system = 'orthoslant_zenith'
# check for warnings & errors with extra_keywords that are dicts, lists or arrays
beam_in.extra_keywords['testdict'] = {'testkey': 23}
uvtest.checkWarnings(beam_in.check, message=['testdict in extra_keywords is a '
'list, array or dict'])
pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)
beam_in.extra_keywords.pop('testdict')
beam_in.extra_keywords['testlist'] = [12, 14, 90]
uvtest.checkWarnings(beam_in.check, message=['testlist in extra_keywords is a '
'list, array or dict'])
pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)
beam_in.extra_keywords.pop('testlist')
beam_in.extra_keywords['testarr'] = np.array([12, 14, 90])
uvtest.checkWarnings(beam_in.check, message=['testarr in extra_keywords is a '
'list, array or dict'])
pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)
beam_in.extra_keywords.pop('testarr')
# check for warnings with extra_keywords keys that are too long
beam_in.extra_keywords['test_long_key'] = True
uvtest.checkWarnings(beam_in.check, message=['key test_long_key in extra_keywords '
'is longer than 8 characters'])
uvtest.checkWarnings(beam_in.write_beamfits, [testfile], {'run_check': False,
'clobber': True},
message=['key test_long_key in extra_keywords is longer than 8 characters'])
beam_in.extra_keywords.pop('test_long_key')
# check handling of boolean keywords
beam_in.extra_keywords['bool'] = True
beam_in.extra_keywords['bool2'] = False
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
beam_in.extra_keywords.pop('bool')
beam_in.extra_keywords.pop('bool2')
# check handling of int-like keywords
beam_in.extra_keywords['int1'] = np.int(5)
beam_in.extra_keywords['int2'] = 7
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
beam_in.extra_keywords.pop('int1')
beam_in.extra_keywords.pop('int2')
# check handling of float-like keywords
beam_in.extra_keywords['float1'] = np.int64(5.3)
beam_in.extra_keywords['float2'] = 6.9
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
beam_in.extra_keywords.pop('float1')
beam_in.extra_keywords.pop('float2')
# check handling of complex-like keywords
beam_in.extra_keywords['complex1'] = np.complex64(5.3 + 1.2j)
beam_in.extra_keywords['complex2'] = 6.9 + 4.6j
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
def test_multi_files():
"""
Reading multiple files at once.
"""
beam_full = UVBeam()
beam_full.read_cst_beam(cst_files, beam_type='efield', frequency=[150e6, 123e6],
telescope_name='TEST', feed_name='bob',
feed_version='0.1', feed_pol=['x'],
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0')
# add optional parameters for testing purposes
beam_full.extra_keywords = {'KEY1': 'test_keyword'}
beam_full.x_orientation = 'east'
beam_full.reference_impedance = 340.
beam_full.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_full.Nspws, beam_full.Nfreqs))
beam_full.loss_array = np.random.normal(50.0, 5, size=(beam_full.Nspws, beam_full.Nfreqs))
beam_full.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_full.Nspws, beam_full.Nfreqs))
beam_full.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_full.Nspws, beam_full.Nfreqs))
testfile1 = os.path.join(DATA_PATH, 'test/outtest_beam1.fits')
testfile2 = os.path.join(DATA_PATH, 'test/outtest_beam2.fits')
beam1 = beam_full.select(freq_chans=0, inplace=False)
beam2 = beam_full.select(freq_chans=1, inplace=False)
beam1.write_beamfits(testfile1, clobber=True)
beam2.write_beamfits(testfile2, clobber=True)
beam1.read_beamfits([testfile1, testfile2])
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(beam_full.history + ' Downselected '
'to specific frequencies using pyuvdata. '
'Combined data along frequency axis using'
' pyuvdata.', beam1.history)
beam1.history = beam_full.history
assert beam1 == beam_full