https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 5e2a4940796f4bd063315dc5d614b3e0176c6df9 authored by Paul La Plante on 24 March 2020, 21:36:44 UTC
Update changelog for new version
Update changelog for new version
Tip revision: 5e2a494
test_beamfits.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for BeamFits object.
"""
from __future__ import absolute_import, division, print_function
import pytest
import os
import numpy as np
from astropy.io import fits
from pyuvdata import UVBeam
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
import pyuvdata.utils as uvutils
filenames = ["HERA_NicCST_150MHz.txt", "HERA_NicCST_123MHz.txt"]
cst_folder = "NicCSTbeams"
cst_files = [os.path.join(DATA_PATH, cst_folder, f) for f in filenames]
def test_read_cst_write_read_fits():
beam_in = UVBeam()
beam_out = UVBeam()
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
# add optional parameters for testing purposes
beam_in.extra_keywords = {"KEY1": "test_keyword"}
beam_in.x_orientation = "east"
beam_in.reference_impedance = 340.0
beam_in.receiver_temperature_array = np.random.normal(
50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.mismatch_array = np.random.normal(
0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.s_parameters = np.random.normal(
0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.interpolation_function = "az_za_simple"
beam_in.freq_interp_kind = "linear"
write_file = os.path.join(DATA_PATH, "test/outtest_beam.fits")
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# redo for power beam
del beam_in
beam_in = UVBeam()
# read in efield and convert to power to test cross-pols
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
beam_in.efield_to_power()
# add optional parameters for testing purposes
beam_in.extra_keywords = {"KEY1": "test_keyword"}
beam_in.x_orientation = "east"
beam_in.reference_impedance = 340.0
beam_in.receiver_temperature_array = np.random.normal(
50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.mismatch_array = np.random.normal(
0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.s_parameters = np.random.normal(
0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now replace 'power' with 'intensity' for btype
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
primary_hdr["BTYPE"] = "Intensity"
hdunames = uvutils._fits_indexhdus(fname)
bandpass_hdu = fname[hdunames["BANDPARM"]]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now remove coordsys but leave ctypes 1 & 2
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
primary_hdr.pop("COORDSYS")
hdunames = uvutils._fits_indexhdus(fname)
bandpass_hdu = fname[hdunames["BANDPARM"]]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now change frequency units
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
primary_hdr["CUNIT3"] = "MHz"
primary_hdr["CRVAL3"] = primary_hdr["CRVAL3"] / 1e6
primary_hdr["CDELT3"] = primary_hdr["CRVAL3"] / 1e6
hdunames = uvutils._fits_indexhdus(fname)
bandpass_hdu = fname[hdunames["BANDPARM"]]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
def test_writeread_healpix():
pytest.importorskip("astropy_healpix")
beam_in = UVBeam()
beam_out = UVBeam()
# fill UVBeam object with dummy data for now for testing purposes
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
beam_in.interpolation_function = "az_za_simple"
beam_in.to_healpix()
write_file = os.path.join(DATA_PATH, "test/outtest_beam_hpx.fits")
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# redo for power beam
del beam_in
beam_in = UVBeam()
# read in efield and convert to power to test cross-pols
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
beam_in.efield_to_power()
# add optional parameters for testing purposes
beam_in.extra_keywords = {"KEY1": "test_keyword"}
beam_in.x_orientation = "east"
beam_in.reference_impedance = 340.0
beam_in.receiver_temperature_array = np.random.normal(
50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
beam_in.mismatch_array = np.random.normal(
0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs)
)
beam_in.s_parameters = np.random.normal(
0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs)
)
# check that data_array is complex
assert np.iscomplexobj(np.real_if_close(beam_in.data_array, tol=10))
beam_in.interpolation_function = "az_za_simple"
beam_in.to_healpix()
# check that data_array is complex after interpolation
assert np.iscomplexobj(np.real_if_close(beam_in.data_array, tol=10))
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
# now remove coordsys but leave ctype 1
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
primary_hdr.pop("COORDSYS")
hdunames = uvutils._fits_indexhdus(fname)
hpx_hdu = fname[hdunames["HPX_INDS"]]
bandpass_hdu = fname[hdunames["BANDPARM"]]
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, hpx_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
def test_errors():
beam_in = UVBeam()
beam_out = UVBeam()
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
beam_in.beam_type = "foo"
write_file = os.path.join(DATA_PATH, "test/outtest_beam.fits")
pytest.raises(ValueError, beam_in.write_beamfits, write_file, clobber=True)
pytest.raises(
ValueError, beam_in.write_beamfits, write_file, clobber=True, run_check=False
)
beam_in.beam_type = "efield"
beam_in.antenna_type = "phased_array"
write_file = os.path.join(DATA_PATH, "test/outtest_beam.fits")
pytest.raises(ValueError, beam_in.write_beamfits, write_file, clobber=True)
# now change values for various items in primary hdu to test errors
beam_in.antenna_type = "simple"
header_vals_to_change = [
{"BTYPE": "foo"},
{"COORDSYS": "orthoslant_zenith"},
{"NAXIS": ""},
{"CUNIT1": "foo"},
{"CUNIT2": "foo"},
{"CUNIT3": "foo"},
]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
hdunames = uvutils._fits_indexhdus(fname)
basisvec_hdu = fname[hdunames["BASISVEC"]]
bandpass_hdu = fname[hdunames["BANDPARM"]]
if "NAXIS" in keyword:
ax_num = keyword.split("NAXIS")[1]
if ax_num != "":
ax_num = int(ax_num)
ax_use = len(data.shape) - ax_num
new_arrays = np.split(data, primary_hdr[keyword], axis=ax_use)
data = new_arrays[0]
else:
data = np.squeeze(
np.split(data, primary_hdr["NAXIS1"], axis=len(data.shape) - 1)[0]
)
else:
primary_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
# now change values for various items in basisvec hdu to not match primary hdu
header_vals_to_change = [
{"COORDSYS": "foo"},
{"CTYPE1": "foo"},
{"CTYPE2": "foo"},
{"CDELT1": np.diff(beam_in.axis1_array)[0] * 2},
{"CDELT2": np.diff(beam_in.axis2_array)[0] * 2},
{"NAXIS4": ""},
{"CUNIT1": "foo"},
{"CUNIT2": "foo"},
]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
hdunames = uvutils._fits_indexhdus(fname)
basisvec_hdu = fname[hdunames["BASISVEC"]]
basisvec_hdr = basisvec_hdu.header
basisvec_data = basisvec_hdu.data
bandpass_hdu = fname[hdunames["BANDPARM"]]
if "NAXIS" in keyword:
ax_num = keyword.split("NAXIS")[1]
if ax_num != "":
ax_num = int(ax_num)
ax_use = len(basisvec_data.shape) - ax_num
new_arrays = np.split(basisvec_data, basisvec_hdr[keyword], axis=ax_use)
basisvec_data = new_arrays[0]
else:
basisvec_data = np.split(
basisvec_data,
basisvec_hdr["NAXIS1"],
axis=len(basisvec_data.shape) - 1,
)[0]
else:
basisvec_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
basisvec_hdu = fits.ImageHDU(data=basisvec_data, header=basisvec_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
def test_healpix_errors():
pytest.importorskip("astropy_healpix")
beam_in = UVBeam()
beam_out = UVBeam()
write_file = os.path.join(DATA_PATH, "test/outtest_beam_hpx.fits")
# now change values for various items in primary hdu to test errors
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
beam_in.interpolation_function = "az_za_simple"
beam_in.to_healpix()
header_vals_to_change = [{"CTYPE1": "foo"}, {"NAXIS1": ""}]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
hdunames = uvutils._fits_indexhdus(fname)
basisvec_hdu = fname[hdunames["BASISVEC"]]
hpx_hdu = fname[hdunames["HPX_INDS"]]
bandpass_hdu = fname[hdunames["BANDPARM"]]
if "NAXIS" in keyword:
ax_num = keyword.split("NAXIS")[1]
if ax_num != "":
ax_num = int(ax_num)
ax_use = len(data.shape) - ax_num
new_arrays = np.split(data, primary_hdr[keyword], axis=ax_use)
data = new_arrays[0]
else:
data = np.squeeze(
np.split(data, primary_hdr["NAXIS1"], axis=len(data.shape) - 1)[0]
)
else:
primary_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, hpx_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
# now change values for various items in basisvec hdu to not match primary hdu
beam_in.read_cst_beam(
cst_files[0],
beam_type="efield",
frequency=150e6,
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
beam_in.interpolation_function = "az_za_simple"
beam_in.to_healpix()
header_vals_to_change = [{"CTYPE1": "foo"}, {"NAXIS1": ""}]
for i, hdr_dict in enumerate(header_vals_to_change):
beam_in.write_beamfits(write_file, clobber=True)
keyword = list(hdr_dict.keys())[0]
new_val = hdr_dict[keyword]
fname = fits.open(write_file)
data = fname[0].data
primary_hdr = fname[0].header
hdunames = uvutils._fits_indexhdus(fname)
basisvec_hdu = fname[hdunames["BASISVEC"]]
basisvec_hdr = basisvec_hdu.header
basisvec_data = basisvec_hdu.data
hpx_hdu = fname[hdunames["HPX_INDS"]]
bandpass_hdu = fname[hdunames["BANDPARM"]]
if "NAXIS" in keyword:
ax_num = keyword.split("NAXIS")[1]
if ax_num != "":
ax_num = int(ax_num)
ax_use = len(basisvec_data.shape) - ax_num
new_arrays = np.split(basisvec_data, basisvec_hdr[keyword], axis=ax_use)
basisvec_data = new_arrays[0]
else:
basisvec_data = np.split(
basisvec_data,
basisvec_hdr["NAXIS1"],
axis=len(basisvec_data.shape) - 1,
)[0]
else:
basisvec_hdr[keyword] = new_val
prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
basisvec_hdu = fits.ImageHDU(data=basisvec_data, header=basisvec_hdr)
hdulist = fits.HDUList([prihdu, basisvec_hdu, hpx_hdu, bandpass_hdu])
hdulist.writeto(write_file, overwrite=True)
pytest.raises(ValueError, beam_out.read_beamfits, write_file)
def test_casa_beam():
# test reading in CASA power beam. Some header items are missing...
beam_in = UVBeam()
beam_out = UVBeam()
casa_file = os.path.join(DATA_PATH, "HERABEAM.FITS")
write_file = os.path.join(DATA_PATH, "test/outtest_beam.fits")
beam_in.read_beamfits(casa_file, run_check=False)
# fill in missing parameters
beam_in.data_normalization = "peak"
beam_in.feed_name = "casa_ideal"
beam_in.feed_version = "v0"
beam_in.model_name = "casa_airy"
beam_in.model_version = "v0"
# this file is actually in an orthoslant projection RA/DEC at zenith at a
# particular time.
# For now pretend it's in a zenith orthoslant projection
beam_in.pixel_coordinate_system = "orthoslant_zenith"
expected_extra_keywords = [
"OBSERVER",
"OBSDEC",
"DATAMIN",
"OBJECT",
"INSTRUME",
"DATAMAX",
"OBSRA",
"ORIGIN",
"DATE-MAP",
"DATE",
"EQUINOX",
"DATE-OBS",
"COMMENT",
]
assert expected_extra_keywords.sort() == list(beam_in.extra_keywords.keys()).sort()
beam_in.write_beamfits(write_file, clobber=True)
beam_out.read_beamfits(write_file)
assert beam_in == beam_out
def test_extra_keywords():
beam_in = UVBeam()
beam_out = UVBeam()
casa_file = os.path.join(DATA_PATH, "HERABEAM.FITS")
testfile = os.path.join(DATA_PATH, "test/outtest_beam.fits")
beam_in.read_beamfits(casa_file, run_check=False)
# fill in missing parameters
beam_in.data_normalization = "peak"
beam_in.feed_name = "casa_ideal"
beam_in.feed_version = "v0"
beam_in.model_name = "casa_airy"
beam_in.model_version = "v0"
# this file is actually in an orthoslant projection RA/DEC at zenith at a
# particular time.
# For now pretend it's in a zenith orthoslant projection
beam_in.pixel_coordinate_system = "orthoslant_zenith"
# check for warnings & errors with extra_keywords that are dicts, lists or arrays
beam_in.extra_keywords["testdict"] = {"testkey": 23}
uvtest.checkWarnings(
beam_in.check,
message=["testdict in extra_keywords is a " "list, array or dict"],
)
pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)
beam_in.extra_keywords.pop("testdict")
beam_in.extra_keywords["testlist"] = [12, 14, 90]
uvtest.checkWarnings(
beam_in.check,
message=["testlist in extra_keywords is a " "list, array or dict"],
)
pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)
beam_in.extra_keywords.pop("testlist")
beam_in.extra_keywords["testarr"] = np.array([12, 14, 90])
uvtest.checkWarnings(
beam_in.check, message=["testarr in extra_keywords is a " "list, array or dict"]
)
pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)
beam_in.extra_keywords.pop("testarr")
# check for warnings with extra_keywords keys that are too long
beam_in.extra_keywords["test_long_key"] = True
uvtest.checkWarnings(
beam_in.check,
message=["key test_long_key in extra_keywords " "is longer than 8 characters"],
)
uvtest.checkWarnings(
beam_in.write_beamfits,
[testfile],
{"run_check": False, "clobber": True},
message=["key test_long_key in extra_keywords is longer than 8 characters"],
)
beam_in.extra_keywords.pop("test_long_key")
# check handling of boolean keywords
beam_in.extra_keywords["bool"] = True
beam_in.extra_keywords["bool2"] = False
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
beam_in.extra_keywords.pop("bool")
beam_in.extra_keywords.pop("bool2")
# check handling of int-like keywords
beam_in.extra_keywords["int1"] = np.int(5)
beam_in.extra_keywords["int2"] = 7
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
beam_in.extra_keywords.pop("int1")
beam_in.extra_keywords.pop("int2")
# check handling of float-like keywords
beam_in.extra_keywords["float1"] = np.int64(5.3)
beam_in.extra_keywords["float2"] = 6.9
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
beam_in.extra_keywords.pop("float1")
beam_in.extra_keywords.pop("float2")
# check handling of complex-like keywords
beam_in.extra_keywords["complex1"] = np.complex64(5.3 + 1.2j)
beam_in.extra_keywords["complex2"] = 6.9 + 4.6j
beam_in.write_beamfits(testfile, clobber=True)
beam_out.read_beamfits(testfile, run_check=False)
assert beam_in == beam_out
def test_multi_files():
"""
Reading multiple files at once.
"""
beam_full = UVBeam()
beam_full.read_cst_beam(
cst_files,
beam_type="efield",
frequency=[150e6, 123e6],
telescope_name="TEST",
feed_name="bob",
feed_version="0.1",
feed_pol=["x"],
model_name="E-field pattern - Rigging height 4.9m",
model_version="1.0",
)
# add optional parameters for testing purposes
beam_full.extra_keywords = {"KEY1": "test_keyword"}
beam_full.x_orientation = "east"
beam_full.reference_impedance = 340.0
beam_full.receiver_temperature_array = np.random.normal(
50.0, 5, size=(beam_full.Nspws, beam_full.Nfreqs)
)
beam_full.loss_array = np.random.normal(
50.0, 5, size=(beam_full.Nspws, beam_full.Nfreqs)
)
beam_full.mismatch_array = np.random.normal(
0.0, 1.0, size=(beam_full.Nspws, beam_full.Nfreqs)
)
beam_full.s_parameters = np.random.normal(
0.0, 0.3, size=(4, beam_full.Nspws, beam_full.Nfreqs)
)
testfile1 = os.path.join(DATA_PATH, "test/outtest_beam1.fits")
testfile2 = os.path.join(DATA_PATH, "test/outtest_beam2.fits")
beam1 = beam_full.select(freq_chans=0, inplace=False)
beam2 = beam_full.select(freq_chans=1, inplace=False)
beam1.write_beamfits(testfile1, clobber=True)
beam2.write_beamfits(testfile2, clobber=True)
beam1.read_beamfits([testfile1, testfile2])
# Check history is correct, before replacing and doing a full object check
assert uvutils._check_histories(
beam_full.history + " Downselected "
"to specific frequencies using pyuvdata. "
"Combined data along frequency axis using"
" pyuvdata.",
beam1.history,
)
beam1.history = beam_full.history
assert beam1 == beam_full