Raw File
Tip revision: 617c8e77d30037c1e1fb3a2ab460cb0aaa11eca1 authored by Paul La Plante on 29 June 2019, 20:31:12 UTC
Add support for bitshuffle on visdata
Tip revision: 617c8e7
# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Tests for BeamFits object.

from __future__ import absolute_import, division, print_function

import pytest
import os
import numpy as np
from import fits

from pyuvdata import UVBeam
import pyuvdata.tests as uvtest
from import DATA_PATH
import pyuvdata.utils as uvutils

filenames = ['HERA_NicCST_150MHz.txt', 'HERA_NicCST_123MHz.txt']
cst_folder = 'NicCSTbeams'
cst_files = [os.path.join(DATA_PATH, cst_folder, f) for f in filenames]

def test_readCST_writereadFITS():
    beam_in = UVBeam()
    beam_out = UVBeam()
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',

    # add optional parameters for testing purposes
    beam_in.extra_keywords = {'KEY1': 'test_keyword'}
    beam_in.x_orientation = 'east'
    beam_in.reference_impedance = 340.
    beam_in.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs))
    beam_in.interpolation_function = 'az_za_simple'
    beam_in.freq_interp_kind = 'linear'

    write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')

    beam_in.write_beamfits(write_file, clobber=True)

    assert beam_in == beam_out

    # redo for power beam
    beam_in = UVBeam()
    # read in efield and convert to power to test cross-pols
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',

    # add optional parameters for testing purposes
    beam_in.extra_keywords = {'KEY1': 'test_keyword'}
    beam_in.x_orientation = 'east'
    beam_in.reference_impedance = 340.
    beam_in.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs))

    beam_in.write_beamfits(write_file, clobber=True)
    assert beam_in == beam_out

    # now replace 'power' with 'intensity' for btype
    F =
    data = F[0].data
    primary_hdr = F[0].header
    primary_hdr['BTYPE'] = 'Intensity'
    hdunames = uvutils._fits_indexhdus(F)
    bandpass_hdu = F[hdunames['BANDPARM']]

    prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
    hdulist = fits.HDUList([prihdu, bandpass_hdu])

    hdulist.writeto(write_file, overwrite=True)

    assert beam_in == beam_out

    # now remove coordsys but leave ctypes 1 & 2
    F =
    data = F[0].data
    primary_hdr = F[0].header
    hdunames = uvutils._fits_indexhdus(F)
    bandpass_hdu = F[hdunames['BANDPARM']]

    prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
    hdulist = fits.HDUList([prihdu, bandpass_hdu])

    hdulist.writeto(write_file, overwrite=True)

    assert beam_in == beam_out

    # now change frequency units
    F =
    data = F[0].data
    primary_hdr = F[0].header
    primary_hdr['CUNIT3'] = 'MHz'
    primary_hdr['CRVAL3'] = primary_hdr['CRVAL3'] / 1e6
    primary_hdr['CDELT3'] = primary_hdr['CRVAL3'] / 1e6
    hdunames = uvutils._fits_indexhdus(F)
    bandpass_hdu = F[hdunames['BANDPARM']]

    prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
    hdulist = fits.HDUList([prihdu, bandpass_hdu])

    hdulist.writeto(write_file, overwrite=True)

    assert beam_in == beam_out

def test_writeread_healpix():
    beam_in = UVBeam()
    beam_out = UVBeam()
    # fill UVBeam object with dummy data for now for testing purposes
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',
    beam_in.interpolation_function = 'az_za_simple'

    write_file = os.path.join(DATA_PATH, 'test/outtest_beam_hpx.fits')

    beam_in.write_beamfits(write_file, clobber=True)

    assert beam_in == beam_out

    # redo for power beam
    beam_in = UVBeam()
    # read in efield and convert to power to test cross-pols
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',

    # add optional parameters for testing purposes
    beam_in.extra_keywords = {'KEY1': 'test_keyword'}
    beam_in.x_orientation = 'east'
    beam_in.reference_impedance = 340.
    beam_in.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.loss_array = np.random.normal(50.0, 5, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_in.Nspws, beam_in.Nfreqs))
    beam_in.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_in.Nspws, beam_in.Nfreqs))

    # check that data_array is complex
    assert np.iscomplexobj(np.real_if_close(beam_in.data_array, tol=10))

    beam_in.interpolation_function = 'az_za_simple'
    # check that data_array is complex after interpolation
    assert np.iscomplexobj(np.real_if_close(beam_in.data_array, tol=10))

    beam_in.write_beamfits(write_file, clobber=True)

    assert beam_in == beam_out

    # now remove coordsys but leave ctype 1
    F =
    data = F[0].data
    primary_hdr = F[0].header
    hdunames = uvutils._fits_indexhdus(F)
    hpx_hdu = F[hdunames['HPX_INDS']]
    bandpass_hdu = F[hdunames['BANDPARM']]

    prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
    hdulist = fits.HDUList([prihdu, hpx_hdu, bandpass_hdu])

    hdulist.writeto(write_file, overwrite=True)

    assert beam_in == beam_out

def test_errors():
    beam_in = UVBeam()
    beam_out = UVBeam()
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',
    beam_in.beam_type = 'foo'

    write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
    pytest.raises(ValueError, beam_in.write_beamfits, write_file, clobber=True)
    pytest.raises(ValueError, beam_in.write_beamfits, write_file,
                  clobber=True, run_check=False)

    beam_in.beam_type = 'efield'
    beam_in.antenna_type = 'phased_array'
    write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
    pytest.raises(ValueError, beam_in.write_beamfits, write_file, clobber=True)

    # now change values for various items in primary hdu to test errors
    beam_in.antenna_type = 'simple'

    header_vals_to_change = [{'BTYPE': 'foo'}, {'COORDSYS': 'orthoslant_zenith'},
                             {'NAXIS': ''}, {'CUNIT1': 'foo'}, {'CUNIT2': 'foo'},
                             {'CUNIT3': 'foo'}]

    for i, hdr_dict in enumerate(header_vals_to_change):
        beam_in.write_beamfits(write_file, clobber=True)

        keyword = list(hdr_dict.keys())[0]
        new_val = hdr_dict[keyword]
        F =
        data = F[0].data
        primary_hdr = F[0].header
        hdunames = uvutils._fits_indexhdus(F)
        basisvec_hdu = F[hdunames['BASISVEC']]
        bandpass_hdu = F[hdunames['BANDPARM']]

        if 'NAXIS' in keyword:
            ax_num = keyword.split('NAXIS')[1]
            if ax_num != '':
                ax_num = int(ax_num)
                ax_use = len(data.shape) - ax_num
                new_arrays = np.split(data, primary_hdr[keyword], axis=ax_use)
                data = new_arrays[0]
                data = np.squeeze(np.split(data, primary_hdr['NAXIS1'],
                                  axis=len(data.shape) - 1)[0])
            primary_hdr[keyword] = new_val

        prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
        hdulist = fits.HDUList([prihdu, basisvec_hdu, bandpass_hdu])

        hdulist.writeto(write_file, overwrite=True)

        pytest.raises(ValueError, beam_out.read_beamfits, write_file)

    # now change values for various items in basisvec hdu to not match primary hdu
    header_vals_to_change = [{'COORDSYS': 'foo'}, {'CTYPE1': 'foo'},
                             {'CTYPE2': 'foo'},
                             {'CDELT1': np.diff(beam_in.axis1_array)[0] * 2},
                             {'CDELT2': np.diff(beam_in.axis2_array)[0] * 2},
                             {'NAXIS4': ''}, {'CUNIT1': 'foo'}, {'CUNIT2': 'foo'}]

    for i, hdr_dict in enumerate(header_vals_to_change):
        beam_in.write_beamfits(write_file, clobber=True)

        keyword = list(hdr_dict.keys())[0]
        new_val = hdr_dict[keyword]
        F =
        data = F[0].data
        primary_hdr = F[0].header
        hdunames = uvutils._fits_indexhdus(F)
        basisvec_hdu = F[hdunames['BASISVEC']]
        basisvec_hdr = basisvec_hdu.header
        basisvec_data =
        bandpass_hdu = F[hdunames['BANDPARM']]

        if 'NAXIS' in keyword:
            ax_num = keyword.split('NAXIS')[1]
            if ax_num != '':
                ax_num = int(ax_num)
                ax_use = len(basisvec_data.shape) - ax_num
                new_arrays = np.split(basisvec_data, basisvec_hdr[keyword], axis=ax_use)
                basisvec_data = new_arrays[0]
                basisvec_data = np.split(basisvec_data, basisvec_hdr['NAXIS1'],
                                         axis=len(basisvec_data.shape) - 1)[0]
            basisvec_hdr[keyword] = new_val

        prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
        basisvec_hdu = fits.ImageHDU(data=basisvec_data, header=basisvec_hdr)
        hdulist = fits.HDUList([prihdu, basisvec_hdu, bandpass_hdu])

        hdulist.writeto(write_file, overwrite=True)

        pytest.raises(ValueError, beam_out.read_beamfits, write_file)

def test_healpix_errors():
    beam_in = UVBeam()
    beam_out = UVBeam()
    write_file = os.path.join(DATA_PATH, 'test/outtest_beam_hpx.fits')

    # now change values for various items in primary hdu to test errors
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',
    beam_in.interpolation_function = 'az_za_simple'

    header_vals_to_change = [{'CTYPE1': 'foo'}, {'NAXIS1': ''}]

    for i, hdr_dict in enumerate(header_vals_to_change):
        beam_in.write_beamfits(write_file, clobber=True)

        keyword = list(hdr_dict.keys())[0]
        new_val = hdr_dict[keyword]
        F =
        data = F[0].data
        primary_hdr = F[0].header
        hdunames = uvutils._fits_indexhdus(F)
        basisvec_hdu = F[hdunames['BASISVEC']]
        hpx_hdu = F[hdunames['HPX_INDS']]
        bandpass_hdu = F[hdunames['BANDPARM']]

        if 'NAXIS' in keyword:
            ax_num = keyword.split('NAXIS')[1]
            if ax_num != '':
                ax_num = int(ax_num)
                ax_use = len(data.shape) - ax_num
                new_arrays = np.split(data, primary_hdr[keyword], axis=ax_use)
                data = new_arrays[0]
                data = np.squeeze(np.split(data, primary_hdr['NAXIS1'],
                                  axis=len(data.shape) - 1)[0])
            primary_hdr[keyword] = new_val

        prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
        hdulist = fits.HDUList([prihdu, basisvec_hdu, hpx_hdu, bandpass_hdu])

        hdulist.writeto(write_file, overwrite=True)

        pytest.raises(ValueError, beam_out.read_beamfits, write_file)

    # now change values for various items in basisvec hdu to not match primary hdu
    beam_in.read_cst_beam(cst_files[0], beam_type='efield', frequency=150e6,
                          telescope_name='TEST', feed_name='bob',
                          feed_version='0.1', feed_pol=['x'],
                          model_name='E-field pattern - Rigging height 4.9m',
    beam_in.interpolation_function = 'az_za_simple'

    header_vals_to_change = [{'CTYPE1': 'foo'}, {'NAXIS1': ''}]

    for i, hdr_dict in enumerate(header_vals_to_change):
        beam_in.write_beamfits(write_file, clobber=True)

        keyword = list(hdr_dict.keys())[0]
        new_val = hdr_dict[keyword]
        F =
        data = F[0].data
        primary_hdr = F[0].header
        hdunames = uvutils._fits_indexhdus(F)
        basisvec_hdu = F[hdunames['BASISVEC']]
        basisvec_hdr = basisvec_hdu.header
        basisvec_data =
        hpx_hdu = F[hdunames['HPX_INDS']]
        bandpass_hdu = F[hdunames['BANDPARM']]

        if 'NAXIS' in keyword:
            ax_num = keyword.split('NAXIS')[1]
            if ax_num != '':
                ax_num = int(ax_num)
                ax_use = len(basisvec_data.shape) - ax_num
                new_arrays = np.split(basisvec_data, basisvec_hdr[keyword], axis=ax_use)
                basisvec_data = new_arrays[0]
                basisvec_data = np.split(basisvec_data, basisvec_hdr['NAXIS1'],
                                         axis=len(basisvec_data.shape) - 1)[0]
            basisvec_hdr[keyword] = new_val

        prihdu = fits.PrimaryHDU(data=data, header=primary_hdr)
        basisvec_hdu = fits.ImageHDU(data=basisvec_data, header=basisvec_hdr)
        hdulist = fits.HDUList([prihdu, basisvec_hdu, hpx_hdu, bandpass_hdu])

        hdulist.writeto(write_file, overwrite=True)

        pytest.raises(ValueError, beam_out.read_beamfits, write_file)

def test_casa_beam():
    # test reading in CASA power beam. Some header items are missing...
    beam_in = UVBeam()
    beam_out = UVBeam()
    casa_file = os.path.join(DATA_PATH, 'HERABEAM.FITS')
    write_file = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
    beam_in.read_beamfits(casa_file, run_check=False)

    # fill in missing parameters
    beam_in.data_normalization = 'peak'
    beam_in.feed_name = 'casa_ideal'
    beam_in.feed_version = 'v0'
    beam_in.model_name = 'casa_airy'
    beam_in.model_version = 'v0'

    # this file is actually in an orthoslant projection RA/DEC at zenith at a particular time.
    # For now pretend it's in a zenith orthoslant projection
    beam_in.pixel_coordinate_system = 'orthoslant_zenith'

    expected_extra_keywords = ['OBSERVER', 'OBSDEC', 'DATAMIN', 'OBJECT',
                               'INSTRUME', 'DATAMAX', 'OBSRA', 'ORIGIN',
                               'DATE-MAP', 'DATE', 'EQUINOX', 'DATE-OBS',
    assert expected_extra_keywords.sort() == list(beam_in.extra_keywords.keys()).sort()

    beam_in.write_beamfits(write_file, clobber=True)

    assert beam_in == beam_out

def test_extra_keywords():
    beam_in = UVBeam()
    beam_out = UVBeam()
    casa_file = os.path.join(DATA_PATH, 'HERABEAM.FITS')
    testfile = os.path.join(DATA_PATH, 'test/outtest_beam.fits')
    beam_in.read_beamfits(casa_file, run_check=False)

    # fill in missing parameters
    beam_in.data_normalization = 'peak'
    beam_in.feed_name = 'casa_ideal'
    beam_in.feed_version = 'v0'
    beam_in.model_name = 'casa_airy'
    beam_in.model_version = 'v0'

    # this file is actually in an orthoslant projection RA/DEC at zenith at a particular time.
    # For now pretend it's in a zenith orthoslant projection
    beam_in.pixel_coordinate_system = 'orthoslant_zenith'

    # check for warnings & errors with extra_keywords that are dicts, lists or arrays
    beam_in.extra_keywords['testdict'] = {'testkey': 23}
    uvtest.checkWarnings(beam_in.check, message=['testdict in extra_keywords is a '
                                                 'list, array or dict'])
    pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)

    beam_in.extra_keywords['testlist'] = [12, 14, 90]
    uvtest.checkWarnings(beam_in.check, message=['testlist in extra_keywords is a '
                                                 'list, array or dict'])
    pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)

    beam_in.extra_keywords['testarr'] = np.array([12, 14, 90])
    uvtest.checkWarnings(beam_in.check, message=['testarr in extra_keywords is a '
                                                 'list, array or dict'])
    pytest.raises(TypeError, beam_in.write_beamfits, testfile, run_check=False)

    # check for warnings with extra_keywords keys that are too long
    beam_in.extra_keywords['test_long_key'] = True
    uvtest.checkWarnings(beam_in.check, message=['key test_long_key in extra_keywords '
                                                 'is longer than 8 characters'])
    uvtest.checkWarnings(beam_in.write_beamfits, [testfile], {'run_check': False,
                                                              'clobber': True},
                         message=['key test_long_key in extra_keywords is longer than 8 characters'])

    # check handling of boolean keywords
    beam_in.extra_keywords['bool'] = True
    beam_in.extra_keywords['bool2'] = False
    beam_in.write_beamfits(testfile, clobber=True)
    beam_out.read_beamfits(testfile, run_check=False)

    assert beam_in == beam_out

    # check handling of int-like keywords
    beam_in.extra_keywords['int1'] =
    beam_in.extra_keywords['int2'] = 7
    beam_in.write_beamfits(testfile, clobber=True)
    beam_out.read_beamfits(testfile, run_check=False)

    assert beam_in == beam_out

    # check handling of float-like keywords
    beam_in.extra_keywords['float1'] = np.int64(5.3)
    beam_in.extra_keywords['float2'] = 6.9
    beam_in.write_beamfits(testfile, clobber=True)
    beam_out.read_beamfits(testfile, run_check=False)

    assert beam_in == beam_out

    # check handling of complex-like keywords
    beam_in.extra_keywords['complex1'] = np.complex64(5.3 + 1.2j)
    beam_in.extra_keywords['complex2'] = 6.9 + 4.6j
    beam_in.write_beamfits(testfile, clobber=True)
    beam_out.read_beamfits(testfile, run_check=False)

    assert beam_in == beam_out

def test_multi_files():
    Reading multiple files at once.
    beam_full = UVBeam()
    beam_full.read_cst_beam(cst_files, beam_type='efield', frequency=[150e6, 123e6],
                            telescope_name='TEST', feed_name='bob',
                            feed_version='0.1', feed_pol=['x'],
                            model_name='E-field pattern - Rigging height 4.9m',

    # add optional parameters for testing purposes
    beam_full.extra_keywords = {'KEY1': 'test_keyword'}
    beam_full.x_orientation = 'east'
    beam_full.reference_impedance = 340.
    beam_full.receiver_temperature_array = np.random.normal(50.0, 5, size=(beam_full.Nspws, beam_full.Nfreqs))
    beam_full.loss_array = np.random.normal(50.0, 5, size=(beam_full.Nspws, beam_full.Nfreqs))
    beam_full.mismatch_array = np.random.normal(0.0, 1.0, size=(beam_full.Nspws, beam_full.Nfreqs))
    beam_full.s_parameters = np.random.normal(0.0, 0.3, size=(4, beam_full.Nspws, beam_full.Nfreqs))

    testfile1 = os.path.join(DATA_PATH, 'test/outtest_beam1.fits')
    testfile2 = os.path.join(DATA_PATH, 'test/outtest_beam2.fits')

    beam1 =, inplace=False)
    beam2 =, inplace=False)
    beam1.write_beamfits(testfile1, clobber=True)
    beam2.write_beamfits(testfile2, clobber=True)
    beam1.read_beamfits([testfile1, testfile2])
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(beam_full.history + '  Downselected '
                                    'to specific frequencies using pyuvdata. '
                                    'Combined data along frequency axis using'
                                    ' pyuvdata.', beam1.history)

    beam1.history = beam_full.history
    assert beam1 == beam_full
back to top