https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 983ae27b57a28550b6ee1a58e13cc9161bb89327 authored by Bryna Hazelton on 15 January 2020, 19:15:00 UTC
update release date
update release date
Tip revision: 983ae27
calfits.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
from __future__ import absolute_import, division, print_function
import numpy as np
import warnings
from astropy.io import fits
from . import UVCal
from . import utils as uvutils
class CALFITS(UVCal):
"""
Defines a calfits-specific class for reading and writing calfits files.
"""
def write_calfits(self, filename, run_check=True, check_extra=True,
run_check_acceptability=True, clobber=False):
"""
Write the data to a calfits file.
Args:
filename: The calfits file to write to.
run_check: Option to check for the existence and proper shapes of
parameters before writing the file. Default is True.
check_extra: Option to check optional parameters as well as required
ones. Default is True.
run_check_acceptability: Option to check acceptable range of the values of
parameters before writing the file. Default is True.
clobber: Option to overwrite the filename if the file already exists.
Default is False.
"""
if run_check:
self.check(check_extra=check_extra,
run_check_acceptability=run_check_acceptability)
if self.Nfreqs > 1:
freq_spacing = self.freq_array[0, 1:] - self.freq_array[0, :-1]
if not np.isclose(np.min(freq_spacing), np.max(freq_spacing),
rtol=self._freq_array.tols[0], atol=self._freq_array.tols[1]):
raise ValueError('The frequencies are not evenly spaced (probably '
'because of a select operation). The calfits format '
'does not support unevenly spaced frequencies.')
if np.isclose(freq_spacing[0], self.channel_width):
freq_spacing = self.channel_width
else:
rounded_spacing = np.around(freq_spacing, int(np.ceil(np.log10(self._freq_array.tols[1]) * -1)))
freq_spacing = rounded_spacing[0]
else:
freq_spacing = self.channel_width
if self.Ntimes > 1:
time_spacing = np.diff(self.time_array)
if not np.isclose(np.min(time_spacing), np.max(time_spacing),
rtol=self._time_array.tols[0], atol=self._time_array.tols[1]):
raise ValueError('The times are not evenly spaced (probably '
'because of a select operation). The calfits format '
'does not support unevenly spaced times.')
if np.isclose(time_spacing[0], self.integration_time / (24. * 60.**2)):
time_spacing = self.integration_time / (24. * 60.**2)
else:
rounded_spacing = np.around(time_spacing,
int(np.ceil(np.log10(self._time_array.tols[1]
/ self.Ntimes) * -1) + 1))
time_spacing = rounded_spacing[0]
else:
time_spacing = self.integration_time / (24. * 60.**2)
if self.Njones > 1:
jones_spacing = np.diff(self.jones_array)
if np.min(jones_spacing) < np.max(jones_spacing):
raise ValueError('The jones values are not evenly spaced.'
'The calibration fits file format does not'
' support unevenly spaced polarizations.')
jones_spacing = jones_spacing[0]
else:
jones_spacing = -1
prihdr = fits.Header()
if self.total_quality_array is not None:
totqualhdr = fits.Header()
totqualhdr['EXTNAME'] = 'TOTQLTY'
if self.cal_type != 'gain':
sechdr = fits.Header()
sechdr['EXTNAME'] = 'FLAGS'
# Conforming to fits format
prihdr['SIMPLE'] = True
prihdr['BITPIX'] = 32
prihdr['TELESCOP'] = self.telescope_name
prihdr['GNCONVEN'] = self.gain_convention
prihdr['CALTYPE'] = self.cal_type
prihdr['CALSTYLE'] = self.cal_style
if self.sky_field is not None:
prihdr['FIELD'] = self.sky_field
if self.sky_catalog is not None:
prihdr['CATALOG'] = self.sky_catalog
if self.ref_antenna_name is not None:
prihdr['REFANT'] = self.ref_antenna_name
if self.Nsources is not None:
prihdr['NSOURCES'] = self.Nsources
if self.baseline_range is not None:
prihdr['BL_RANGE'] = '[' + ', '.join([str(b) for b in self.baseline_range]) + ']'
if self.diffuse_model is not None:
prihdr['DIFFUSE'] = self.diffuse_model
if self.gain_scale is not None:
prihdr['GNSCALE'] = self.gain_scale
prihdr['INTTIME'] = self.integration_time
prihdr['CHWIDTH'] = self.channel_width
prihdr['XORIENT'] = self.x_orientation
if self.cal_type == 'delay':
prihdr['FRQRANGE'] = ','.join(map(str, self.freq_range))
elif self.freq_range is not None:
prihdr['FRQRANGE'] = ','.join(map(str, self.freq_range))
prihdr['TMERANGE'] = ','.join(map(str, self.time_range))
if self.observer:
prihdr['OBSERVER'] = self.observer
if self.git_origin_cal:
prihdr['ORIGCAL'] = self.git_origin_cal
if self.git_hash_cal:
prihdr['HASHCAL'] = self.git_hash_cal
if self.cal_type == 'unknown':
raise ValueError("unknown calibration type. Do not know how to "
"store parameters")
# Define primary header values
# Arrays have (column-major) dimensions of [Nimages, Njones, Ntimes, Nfreqs, Nspw, Nantennas]
# For a "delay"-type calibration, Nfreqs is a shallow axis
# set the axis for number of arrays
prihdr['CTYPE1'] = ('Narrays', 'Number of image arrays.')
prihdr['CUNIT1'] = 'Integer'
prihdr['CDELT1'] = 1
prihdr['CRPIX1'] = 1
prihdr['CRVAL1'] = 1
# Jones axis
prihdr['CTYPE2'] = ('JONES', 'Jones matrix array')
prihdr['CUNIT2'] = ('Integer', 'representative integer for polarization.')
prihdr['CRPIX2'] = 1
prihdr['CRVAL2'] = self.jones_array[0] # always start with first jones.
prihdr['CDELT2'] = jones_spacing
# time axis
prihdr['CTYPE3'] = ('TIME', 'Time axis.')
prihdr['CUNIT3'] = ('JD', 'Time in julian date format')
prihdr['CRPIX3'] = 1
prihdr['CRVAL3'] = self.time_array[0]
prihdr['CDELT3'] = time_spacing
# freq axis
prihdr['CTYPE4'] = ('FREQS', 'Frequency.')
prihdr['CUNIT4'] = 'Hz'
prihdr['CRPIX4'] = 1
prihdr['CRVAL4'] = self.freq_array[0][0]
prihdr['CDELT4'] = freq_spacing
# spw axis: number of spectral windows
prihdr['CTYPE5'] = ('IF', 'Spectral window number.')
prihdr['CUNIT5'] = 'Integer'
prihdr['CRPIX5'] = 1
prihdr['CRVAL5'] = 1
prihdr['CDELT5'] = 1
# antenna axis
prihdr['CTYPE6'] = ('ANTAXIS', 'See ANTARR in ANTENNA extension for values.')
prihdr['CUNIT6'] = 'Integer'
prihdr['CRPIX6'] = 1
prihdr['CRVAL6'] = 1
prihdr['CDELT6'] = -1
# end standard keywords; begin user-defined keywords
for key, value in self.extra_keywords.items():
# header keywords have to be 8 characters or less
if len(str(key)) > 8:
warnings.warn('key {key} in extra_keywords is longer than 8 '
'characters. It will be truncated to 8 as required '
'by the calfits file format.'.format(key=key))
keyword = key[:8].upper()
if isinstance(value, (dict, list, np.ndarray)):
raise TypeError('Extra keyword {keyword} is of {keytype}. '
'Only strings and numbers are '
'supported in calfits.'.format(keyword=key,
keytype=type(value)))
if keyword == 'COMMENT':
for line in value.splitlines():
prihdr.add_comment(line)
else:
prihdr[keyword] = value
for line in self.history.splitlines():
prihdr.add_history(line)
# define data section based on calibration type
if self.cal_type == 'gain':
if self.input_flag_array is not None:
pridata = np.concatenate([self.gain_array.real[:, :, :, :, :, np.newaxis],
self.gain_array.imag[:, :, :, :, :, np.newaxis],
self.flag_array[:, :, :, :, :, np.newaxis],
self.input_flag_array[:, :, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, :, np.newaxis]],
axis=-1)
else:
pridata = np.concatenate([self.gain_array.real[:, :, :, :, :, np.newaxis],
self.gain_array.imag[:, :, :, :, :, np.newaxis],
self.flag_array[:, :, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, :, np.newaxis]],
axis=-1)
elif self.cal_type == 'delay':
pridata = np.concatenate([self.delay_array[:, :, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, :, np.newaxis]],
axis=-1)
# Set headers for the second hdu containing the flags. Only in cal_type=delay
# Can't put in pridata because frequency axis is shallow there, but not here
# Header values are the same as the primary header
sechdr['CTYPE1'] = ('Narrays', 'Number of image arrays.')
sechdr['CUNIT1'] = 'Integer'
sechdr['CRPIX1'] = 1
sechdr['CRVAL1'] = 1
sechdr['CDELT1'] = 1
sechdr['CTYPE2'] = ('JONES', 'Jones matrix array')
sechdr['CUNIT2'] = ('Integer', 'representative integer for polarization.')
sechdr['CRPIX2'] = 1
sechdr['CRVAL2'] = self.jones_array[0] # always start with first jones.
sechdr['CDELT2'] = jones_spacing
sechdr['CTYPE3'] = ('TIME', 'Time axis.')
sechdr['CUNIT3'] = ('JD', 'Time in julian date format')
sechdr['CRPIX3'] = 1
sechdr['CRVAL3'] = self.time_array[0]
sechdr['CDELT3'] = time_spacing
sechdr['CTYPE4'] = ('FREQS', 'Valid frequencies to apply delay.')
sechdr['CUNIT4'] = 'Hz'
sechdr['CRPIX4'] = 1
sechdr['CRVAL4'] = self.freq_array[0][0]
sechdr['CDELT4'] = freq_spacing
sechdr['CTYPE5'] = ('IF', 'Spectral window number.')
sechdr['CUNIT5'] = 'Integer'
sechdr['CRPIX5'] = 1
sechdr['CRVAL5'] = 1
sechdr['CDELT5'] = 1
sechdr['CTYPE6'] = ('ANTAXIS', 'See ANTARR in ANTENNA extension for values.')
# convert from bool to int64; undone on read
if self.input_flag_array is not None:
secdata = np.concatenate([self.flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis],
self.input_flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis]],
axis=-1)
else:
secdata = self.flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis]
if self.total_quality_array is not None:
# Set headers for the hdu containing the total_quality_array
# No antenna axis, so we have [Njones, Ntime, Nfreq, Nspws]
totqualhdr['CTYPE1'] = ('JONES', 'Jones matrix array')
totqualhdr['CUNIT1'] = ('Integer', 'representative integer for polarization.')
totqualhdr['CRPIX1'] = 1
totqualhdr['CRVAL1'] = self.jones_array[0] # always start with first jones.
totqualhdr['CDELT1'] = jones_spacing
totqualhdr['CTYPE2'] = ('TIME', 'Time axis.')
totqualhdr['CUNIT2'] = ('JD', 'Time in julian date format')
totqualhdr['CRPIX2'] = 1
totqualhdr['CRVAL2'] = self.time_array[0]
totqualhdr['CDELT2'] = time_spacing
totqualhdr['CTYPE3'] = ('FREQS', 'Valid frequencies to apply delay.')
totqualhdr['CUNIT3'] = 'Hz'
totqualhdr['CRPIX3'] = 1
totqualhdr['CRVAL3'] = self.freq_array[0][0]
totqualhdr['CDELT3'] = freq_spacing
# spws axis: number of spectral windows
totqualhdr['CTYPE4'] = ('IF', 'Spectral window number.')
totqualhdr['CUNIT4'] = 'Integer'
totqualhdr['CRPIX4'] = 1
totqualhdr['CRVAL4'] = 1
totqualhdr['CDELT4'] = 1
totqualdata = self.total_quality_array
# make HDUs
prihdu = fits.PrimaryHDU(data=pridata, header=prihdr)
# ant HDU
col1 = fits.Column(name='ANTNAME', format='8A',
array=self.antenna_names)
col2 = fits.Column(name='ANTINDEX', format='D',
array=self.antenna_numbers)
if self.Nants_data == self.Nants_telescope:
col3 = fits.Column(name='ANTARR', format='D',
array=self.ant_array)
else:
# ant_array is shorter than the other columns.
# Pad the extra rows with -1s. Need to undo on read.
nants_add = self.Nants_telescope - self.Nants_data
ant_array_use = np.append(self.ant_array,
np.zeros(nants_add, dtype=np.int) - 1)
col3 = fits.Column(name='ANTARR', format='D',
array=ant_array_use)
cols = fits.ColDefs([col1, col2, col3])
ant_hdu = fits.BinTableHDU.from_columns(cols)
ant_hdu.header['EXTNAME'] = 'ANTENNAS'
hdulist = fits.HDUList([prihdu, ant_hdu])
if self.cal_type != 'gain':
sechdu = fits.ImageHDU(data=secdata, header=sechdr)
hdulist.append(sechdu)
if self.total_quality_array is not None:
totqualhdu = fits.ImageHDU(data=totqualdata, header=totqualhdr)
hdulist.append(totqualhdu)
hdulist.writeto(filename, overwrite=clobber)
def read_calfits(self, filename, run_check=True, check_extra=True,
run_check_acceptability=True):
"""
Read data from a calfits file.
Args:
filename: The calfits file to read to.
run_check: Option to check for the existence and proper shapes of
parameters after reading in the file. Default is True.
check_extra: Option to check optional parameters as well as required
ones. Default is True.
run_check_acceptability: Option to check acceptable range of the values of
parameters after reading in the file. Default is True.
"""
with fits.open(filename) as F:
data = F[0].data
hdr = F[0].header.copy()
hdunames = uvutils._fits_indexhdus(F)
anthdu = F[hdunames['ANTENNAS']]
self.Nants_telescope = anthdu.header['NAXIS2']
antdata = anthdu.data
self.antenna_names = np.array(list(map(str, antdata['ANTNAME'])))
self.antenna_numbers = np.array(list(map(int, antdata['ANTINDEX'])))
self.ant_array = np.array(list(map(int, antdata['ANTARR'])))
if np.min(self.ant_array) < 0:
# ant_array was shorter than the other columns, so it was padded with -1s.
# Remove the padded entries.
self.ant_array = self.ant_array[np.where(self.ant_array >= 0)[0]]
self.channel_width = hdr.pop('CHWIDTH')
self.integration_time = hdr.pop('INTTIME')
self.telescope_name = hdr.pop('TELESCOP')
self.history = str(hdr.get('HISTORY', ''))
if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
if not self.history.endswith('\n'):
self.history += '\n'
self.history += self.pyuvdata_version_str
while 'HISTORY' in hdr.keys():
hdr.remove('HISTORY')
self.time_range = list(map(float, hdr.pop('TMERANGE').split(',')))
self.gain_convention = hdr.pop('GNCONVEN')
self.gain_scale = hdr.pop("GNSCALE", None)
self.x_orientation = hdr.pop('XORIENT')
self.cal_type = hdr.pop('CALTYPE')
if self.cal_type == 'delay':
self.freq_range = list(map(float, hdr.pop('FRQRANGE').split(',')))
else:
if 'FRQRANGE' in hdr:
self.freq_range = list(map(float, hdr.pop('FRQRANGE').split(',')))
self.cal_style = hdr.pop('CALSTYLE')
self.sky_field = hdr.pop('FIELD', None)
self.sky_catalog = hdr.pop('CATALOG', None)
self.ref_antenna_name = hdr.pop('REFANT', None)
self.Nsources = hdr.pop('NSOURCES', None)
bl_range_string = hdr.pop('BL_RANGE', None)
if bl_range_string is not None:
self.baseline_range = [float(b) for b in bl_range_string.strip('[').strip(']').split(',')]
self.diffuse_model = hdr.pop('DIFFUSE', None)
self.observer = hdr.pop('OBSERVER', None)
self.git_origin_cal = hdr.pop('ORIGCAL', None)
self.git_hash_cal = hdr.pop('HASHCAL', None)
# generate polarization and time array for either cal_type.
self.Njones = hdr.pop('NAXIS2')
self.jones_array = uvutils._fits_gethduaxis(F[0], 2)
self.Ntimes = hdr.pop('NAXIS3')
self.time_array = uvutils._fits_gethduaxis(F[0], 3)
self.Nspws = hdr.pop('NAXIS5')
# subtract 1 to be zero-indexed
self.spw_array = uvutils._fits_gethduaxis(F[0], 5) - 1
# get data.
if self.cal_type == 'gain':
self.set_gain()
self.gain_array = data[:, :, :, :, :, 0] + 1j * data[:, :, :, :, :, 1]
self.flag_array = data[:, :, :, :, :, 2].astype('bool')
if hdr.pop('NAXIS1') == 5:
self.input_flag_array = data[:, :, :, :, :, 3].astype('bool')
self.quality_array = data[:, :, :, :, :, 4]
else:
self.quality_array = data[:, :, :, :, :, 3]
self.Nants_data = hdr.pop('NAXIS6')
# generate frequency array from primary data unit.
self.Nfreqs = hdr.pop('NAXIS4')
self.freq_array = uvutils._fits_gethduaxis(F[0], 4)
self.freq_array.shape = (self.Nspws,) + self.freq_array.shape
if self.cal_type == 'delay':
self.set_delay()
self.Nants_data = hdr.pop('NAXIS6')
self.delay_array = data[:, :, :, :, :, 0]
self.quality_array = data[:, :, :, :, :, 1]
sechdu = F[hdunames['FLAGS']]
flag_data = sechdu.data
if sechdu.header['NAXIS1'] == 2:
self.flag_array = flag_data[:, :, :, :, :, 0].astype('bool')
self.input_flag_array = flag_data[:, :, :, :, :, 1].astype('bool')
else:
self.flag_array = flag_data[:, :, :, :, :, 0].astype('bool')
# generate frequency array from flag data unit (no freq axis in primary).
self.Nfreqs = sechdu.header['NAXIS4']
self.freq_array = uvutils._fits_gethduaxis(sechdu, 4)
self.freq_array.shape = (self.Nspws,) + self.freq_array.shape
spw_array = uvutils._fits_gethduaxis(sechdu, 5) - 1
if not np.allclose(spw_array, self.spw_array):
raise ValueError('Spectral window values are different in FLAGS HDU than in primary HDU')
time_array = uvutils._fits_gethduaxis(sechdu, 3)
if not np.allclose(time_array, self.time_array,
rtol=self._time_array.tols[0],
atol=self._time_array.tols[0]):
raise ValueError('Time values are different in FLAGS HDU than in primary HDU')
jones_array = uvutils._fits_gethduaxis(sechdu, 2)
if not np.allclose(jones_array, self.jones_array,
rtol=self._jones_array.tols[0],
atol=self._jones_array.tols[0]):
raise ValueError('Jones values are different in FLAGS HDU than in primary HDU')
# remove standard FITS header items that are still around
std_fits_substrings = ['SIMPLE', 'BITPIX', 'EXTEND', 'BLOCKED',
'GROUPS', 'PCOUNT', 'BSCALE', 'BZERO', 'NAXIS',
'PTYPE', 'PSCAL', 'PZERO', 'CTYPE', 'CRVAL',
'CRPIX', 'CDELT', 'CROTA', 'CUNIT']
for key in list(hdr.keys()):
for sub in std_fits_substrings:
if key.find(sub) > -1:
hdr.remove(key)
# find all the remaining header items and keep them as extra_keywords
for key in hdr:
if key == 'COMMENT':
self.extra_keywords[key] = str(hdr.get(key))
elif key != '':
self.extra_keywords[key] = hdr.get(key)
# get total quality array if present
if 'TOTQLTY' in hdunames:
totqualhdu = F[hdunames['TOTQLTY']]
self.total_quality_array = totqualhdu.data
spw_array = uvutils._fits_gethduaxis(totqualhdu, 4) - 1
if not np.allclose(spw_array, self.spw_array):
raise ValueError('Spectral window values are different in '
'TOTQLTY HDU than in primary HDU. primary HDU '
'has {pspw}, TOTQLTY has {tspw}'
.format(pspw=self.spw_array, tspw=spw_array))
if self.cal_type != 'delay':
# delay-type files won't have a freq_array
freq_array = uvutils._fits_gethduaxis(totqualhdu, 3)
freq_array.shape = (self.Nspws,) + freq_array.shape
if not np.allclose(freq_array, self.freq_array,
rtol=self._freq_array.tols[0],
atol=self._freq_array.tols[0]):
raise ValueError('Frequency values are different in TOTQLTY HDU than in primary HDU')
time_array = uvutils._fits_gethduaxis(totqualhdu, 2)
if not np.allclose(time_array, self.time_array,
rtol=self._time_array.tols[0],
atol=self._time_array.tols[0]):
raise ValueError('Time values are different in TOTQLTY HDU than in primary HDU')
jones_array = uvutils._fits_gethduaxis(totqualhdu, 1)
if not np.allclose(jones_array, self.jones_array,
rtol=self._jones_array.tols[0],
atol=self._jones_array.tols[0]):
raise ValueError('Jones values are different in TOTQLTY HDU than in primary HDU')
else:
self.total_quality_array = None
if run_check:
self.check(check_extra=check_extra,
run_check_acceptability=run_check_acceptability)