https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 0d99c94d2bc0558e519d1d2302673c213f18273b authored by Bryna Hazelton on 14 April 2017, 20:15:04 UTC
increase version number after addition of uvcal module
increase version number after addition of uvcal module
Tip revision: 0d99c94
calfits.py
import astropy
from astropy.io import fits
import numpy as np
from uvcal import UVCal
import datetime
class CALFITS(UVCal):
"""
Defines a calfits-specific class for reading and writing uvfits files.
"""
def _indexhdus(self, hdulist):
# input a list of hdus
# return a dictionary of table names
tablenames = {}
for i in range(len(hdulist)):
try:
tablenames[hdulist[i].header['EXTNAME']] = i
except(KeyError):
continue
return tablenames
def write_calfits(self, filename, spoof_nonessential=False,
run_check=True, run_check_acceptability=True, clobber=False):
"""
Write the data to a uvfits file.
Args:
filename: The uvfits file to write to.
spoof_nonessential: Option to spoof the values of optional
UVParameters that are not set but are required for uvfits files.
Default is False.
run_check: Option to check for the existence and proper shapes of
required parameters before writing the file. Default is True.
run_check_acceptability: Option to check acceptability of the values of
required parameters before writing the file. Default is True.
"""
if run_check:
self.check(run_check_acceptability=run_check_acceptability)
today = datetime.date.today().strftime("Date: %d, %b %Y")
prihdr = fits.Header()
if self.cal_type != 'gain':
sechdr = fits.Header()
sechdr['EXTNAME'] = 'FLAGS'
# Conforming to fits format
prihdr['SIMPLE'] = True
prihdr['BITPIX'] = 32
prihdr['NAXIS'] = 5
prihdr['TELESCOP'] = self.telescope_name
prihdr['GNCONVEN'] = self.gain_convention
prihdr['NTIMES'] = self.Ntimes
prihdr['NFREQS'] = self.Nfreqs
prihdr['NANTSDAT'] = self.Nants_data
prihdr['NJONES'] = self.Njones
prihdr['CALTYPE'] = self.cal_type
prihdr['INTTIME'] = self.integration_time
prihdr['CHWIDTH'] = self.channel_width
prihdr['NANTSTEL'] = self.Nants_telescope
prihdr['NSPWS'] = self.Nspws
prihdr['XORIENT'] = self.x_orientation
prihdr['FRQRANGE'] = ','.join(map(str, self.freq_range))
prihdr['TMERANGE'] = ','.join(map(str, self.time_range))
for line in self.history.splitlines():
prihdr.add_history(line)
for p in self.extra():
ep = getattr(self, p)
if ep.form is 'str':
prihdr['{0}'.format(p.upper().replace('_', '')[:8])] = ep.value
else:
continue
if self.observer:
prihdr['OBSERVER'] = self.observer
if self.git_origin_cal:
prihdr['ORIGCAL'] = self.git_origin_cal
if self.git_hash_cal:
prihdr['HASHCAL'] = self.git_hash_cal
if self.cal_type == 'unknown':
raise ValueError("unknown calibration type. Do not know how to"
"store parameters")
if self.cal_type == 'gain':
# Set header variable for gain.
prihdr['CTYPE4'] = ('FREQS', 'Frequency.')
prihdr['CUNIT4'] = ('Hz', 'Units of frequecy.')
prihdr['CRVAL4'] = self.freq_array[0][0]
prihdr['CDELT4'] = self.channel_width
# set the last axis for number of arrays.
prihdr['CTYPE1'] = ('Narrays', 'Number of image arrays.')
prihdr['CUNIT1'] = ('Integer', 'Number of image arrays. Increment.')
prihdr['CDELT1'] = 1
if self.input_flag_array is not None:
prihdr['CRVAL1'] = (5, 'Number of image arrays.')
pridata = np.concatenate([self.gain_array.real[:, :, :, :, np.newaxis],
self.gain_array.imag[:, :, :, :, np.newaxis],
self.flag_array[:, :, :, :, np.newaxis],
self.input_flag_array[:, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, np.newaxis]],
axis=-1)
else:
prihdr['CRVAL1'] = (4, 'Number of image arrays.')
pridata = np.concatenate([self.gain_array.real[:, :, :, :, np.newaxis],
self.gain_array.imag[:, :, :, :, np.newaxis],
self.flag_array[:, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, np.newaxis]],
axis=-1)
if self.cal_type == 'delay':
# Set header variable for gain.
# set the last axis for number of arrays.
prihdr['CTYPE1'] = ('Narrays', 'Number of image arrays.')
prihdr['CUNIT1'] = ('Integer', 'Number of image arrays. Value.')
prihdr['CRVAL1'] = (2, 'Number of image arrays.')
prihdr['CDELT1'] = 1
prihdr['CTYPE4'] = ('FREQS', 'Valid frequencies to apply delay.')
prihdr['CUNIT4'] = ('Hz', 'Units of frequecy.')
prihdr['CRVAL4'] = self.freq_array[0][0]
prihdr['CDELT4'] = self.channel_width
pridata = np.concatenate([self.delay_array[:, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, np.newaxis]],
axis=-1)
# Set headers for the second hdu containing the flags. Only in cal_type=delay.
if self.Njones > 1:
jones_spacing = np.diff(self.jones_array)
if np.min(jones_spacing) < np.max(jones_spacing):
raise ValueError('The jones values are not evenly spaced.'
'The calibration fits file format does not'
' support unevenly spaced polarizations.')
sechdr['CTYPE2'] = ('JONES', 'Jones matrix array')
sechdr['CUNIT2'] = ('Integer', 'representative integer for polarization.')
sechdr['CRVAL2'] = self.jones_array[0] # always start with first jones.
if self.Njones > 1:
sechdr['CDELT2'] = jones_spacing[0]
else:
sechdr['CDELT2'] = -1
sechdr['CTYPE3'] = ('TIME', 'Time axis.')
sechdr['CUNIT3'] = ('JD', 'Time in julian date format')
sechdr['CRVAL3'] = self.time_array[0]
sechdr['CDELT3'] = self.integration_time
sechdr['CTYPE4'] = ('FREQS', 'Valid frequencies to apply delay.')
sechdr['CUNIT4'] = ('Hz', 'Units of frequecy.')
sechdr['CRVAL4'] = self.freq_array[0][0]
sechdr['CDELT4'] = self.channel_width
sechdr['CTYPE5'] = ('ANTAXIS', 'See antenna_numbers/names for values.')
if self.input_flag_array is not None:
secdata = np.concatenate([self.flag_array.astype(np.int64)[:, :, :, :, np.newaxis],
self.input_flag_array.astype(np.int64)[:, :, :, :, np.newaxis]],
axis=-1)
sechdr['CTYPE1'] = ('Narrays', 'Number of image arrays.')
sechdr['CUNIT1'] = ('Integer', 'Number of image arrays. Value.')
sechdr['CRVAL1'] = (2, 'Number of image arrays.')
sechdr['CDELT1'] = 1
else:
secdata = self.flag_array.astype(np.int64)[:, :, :, :, np.newaxis] # Can't be bool
sechdr['CTYPE1'] = ('Narrays', 'Number of image arrays.')
sechdr['CUNIT1'] = ('Integer', 'Number of image arrays. Value.')
sechdr['CRVAL1'] = (1, 'Number of image arrays.')
sechdr['CDELT1'] = 1
# primary header ctypes for NAXIS [ for both gain and delay cal_type.]
# Check polarizations.
if self.Njones > 1:
jones_spacing = np.diff(self.jones_array)
if np.min(jones_spacing) < np.max(jones_spacing):
raise ValueError('The jones values are not evenly spaced.'
'The calibration fits file format does not'
' support unevenly spaced polarizations.')
prihdr['CTYPE2'] = ('JONES', 'Jones matrix array')
prihdr['CUNIT2'] = ('Integer', 'representative integer for polarization.')
prihdr['CRVAL2'] = self.jones_array[0] # always start with first jones.
if self.Njones > 1:
prihdr['CDELT2'] = jones_spacing[0]
else:
prihdr['CDELT2'] = -1
prihdr['CTYPE3'] = ('TIME', 'Time axis.')
prihdr['CUNIT3'] = ('JD', 'Time in julian date format')
prihdr['CRVAL3'] = self.time_array[0]
prihdr['CDELT3'] = self.integration_time
prihdr['CTYPE5'] = ('ANTAXIS', 'See antenna_numbers/names for values.')
prihdr['CUNIT5'] = 'Integer'
prihdr['CRVAL5'] = 0
prihdr['CDELT5'] = -1
prihdu = fits.PrimaryHDU(data=pridata, header=prihdr)
col1 = fits.Column(name='ANTNAME', format='8A',
array=self.antenna_names)
col2 = fits.Column(name='ANTINDEX', format='D',
array=self.antenna_numbers)
cols = fits.ColDefs([col1, col2])
ant_hdu = fits.BinTableHDU.from_columns(cols)
ant_hdu.header['EXTNAME'] = 'ANTENNAS'
if self.cal_type != 'gain':
prihdu = fits.PrimaryHDU(data=pridata, header=prihdr)
sechdu = fits.ImageHDU(data=secdata, header=sechdr)
hdulist = fits.HDUList([prihdu, ant_hdu, sechdu])
else:
prihdu = fits.PrimaryHDU(data=pridata, header=prihdr)
hdulist = fits.HDUList([prihdu, ant_hdu])
if float(astropy.__version__[0:3]) < 1.3:
hdulist.writeto(filename, clobber=clobber)
else:
hdulist.writeto(filename, overwrite=clobber)
def read_calfits(self, filename, run_check=True, run_check_acceptability=True):
F = fits.open(filename)
data = F[0].data
hdr = F[0].header.copy()
hdunames = self._indexhdus(F)
anthdu = F[hdunames['ANTENNAS']]
antdata = anthdu.data
self.antenna_names = map(str, antdata['ANTNAME'])
self.antenna_numbers = map(int, antdata['ANTINDEX'])
self.Nfreqs = hdr['NFREQS']
self.Njones = hdr['NJONES']
self.Ntimes = hdr['NTIMES']
self.channel_width = hdr['CHWIDTH']
self.integration_time = hdr['INTTIME']
self.telescope_name = hdr['TELESCOP']
self.history = str(hdr.get('HISTORY', ''))
if self.pyuvdata_version_str not in self.history.replace('\n', ''):
if self.history.endswith('\n'):
self.history += self.pyuvdata_version_str
else:
self.history += '\n' + self.pyuvdata_version_str
while 'HISTORY' in hdr.keys():
hdr.remove('HISTORY')
self.freq_range = map(float, hdr['FRQRANGE'].split(','))
self.time_range = map(float, hdr['TMERANGE'].split(','))
self.Nspws = hdr['NSPWS']
self.Nants_data = hdr['NANTSDAT']
self.Nants_telescope = hdr['NANTSTEL']
self.gain_convention = hdr['GNCONVEN']
self.x_orientation = hdr['XORIENT']
self.cal_type = hdr['CALTYPE']
try:
self.observer = hdr['OBSERVER']
except:
pass
try:
self.git_origin_cal = hdr['ORIGCAL']
except:
pass
try:
self.git_hash_cal = hdr['HASHCAL']
except:
pass
# get data.
if self.cal_type == 'gain':
self.set_gain()
self.gain_array = data[:, :, :, :, 0] + 1j * data[:, :, :, :, 1]
self.flag_array = data[:, :, :, :, 2].astype('bool')
if hdr['CTYPE1'] == 5:
self.input_flag_array = data[:, :, :, :, 3].astype('bool')
self.quality_array = data[:, :, :, :, 4]
else:
self.quality_array = data[:, :, :, :, 3]
if self.cal_type == 'delay':
self.set_delay()
self.delay_array = data[:, :, :, :, 0]
self.quality_array = data[:, :, :, :, 1]
sechdu = F[hdunames['FLAGS']]
flag_data = sechdu.data
if sechdu.header['CTYPE1'] == 2:
self.flag_array = sechdu.data[:, :, :, :, 0].astype('bool')
self.input_flag_array = sechdu.data[:, :, :, :, 1]
else:
self.flag_array = sechdu.data[:, :, :, :, 0].astype('bool')
# generate frequency, polarization, and time array.
self.freq_array = np.arange(self.Nfreqs).reshape(1, -1) * hdr['CDELT4'] + hdr['CRVAL4']
self.jones_array = np.arange(self.Njones) * hdr['CDELT2'] + hdr['CRVAL2']
self.time_array = np.arange(self.Ntimes) * hdr['CDELT3'] + hdr['CRVAL3']
if run_check:
self.check(run_check_acceptability=run_check_acceptability)