https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 5e2a4940796f4bd063315dc5d614b3e0176c6df9 authored by Paul La Plante on 24 March 2020, 21:36:44 UTC
Update changelog for new version
Update changelog for new version
Tip revision: 5e2a494
calfits.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading and writing calibration FITS files."""
from __future__ import absolute_import, division, print_function
import numpy as np
import warnings
from astropy.io import fits
from .uvcal import UVCal
from .. import utils as uvutils
__all__ = ["CALFITS"]
class CALFITS(UVCal):
"""
Defines a calfits-specific class for reading and writing calfits files.
This class should not be interacted with directly, instead use the read_calfits
and write_calfits methods on the UVCal class.
"""
def write_calfits(
self,
filename,
run_check=True,
check_extra=True,
run_check_acceptability=True,
clobber=False,
):
"""
Write the data to a calfits file.
Parameters
----------
filename : str
The calfits file to write to.
run_check : bool
Option to check for the existence and proper shapes of
parameters before writing the file.
check_extra : bool
Option to check optional parameters as well as required ones.
run_check_acceptability : bool
Option to check acceptable range of the values of
parameters before writing the file.
clobber : bool
Option to overwrite the filename if the file already exists.
"""
if run_check:
self.check(
check_extra=check_extra, run_check_acceptability=run_check_acceptability
)
if self.Nfreqs > 1:
freq_spacing = self.freq_array[0, 1:] - self.freq_array[0, :-1]
if not np.isclose(
np.min(freq_spacing),
np.max(freq_spacing),
rtol=self._freq_array.tols[0],
atol=self._freq_array.tols[1],
):
raise ValueError(
"The frequencies are not evenly spaced (probably "
"because of a select operation). The calfits format "
"does not support unevenly spaced frequencies."
)
if np.isclose(freq_spacing[0], self.channel_width):
freq_spacing = self.channel_width
else:
rounded_spacing = np.around(
freq_spacing, int(np.ceil(np.log10(self._freq_array.tols[1]) * -1))
)
freq_spacing = rounded_spacing[0]
else:
freq_spacing = self.channel_width
if self.Ntimes > 1:
time_spacing = np.diff(self.time_array)
if not np.isclose(
np.min(time_spacing),
np.max(time_spacing),
rtol=self._time_array.tols[0],
atol=self._time_array.tols[1],
):
raise ValueError(
"The times are not evenly spaced (probably "
"because of a select operation). The calfits format "
"does not support unevenly spaced times."
)
if np.isclose(time_spacing[0], self.integration_time / (24.0 * 60.0 ** 2)):
time_spacing = self.integration_time / (24.0 * 60.0 ** 2)
else:
rounded_spacing = np.around(
time_spacing,
int(
np.ceil(np.log10(self._time_array.tols[1] / self.Ntimes) * -1)
+ 1
),
)
time_spacing = rounded_spacing[0]
else:
time_spacing = self.integration_time / (24.0 * 60.0 ** 2)
if self.Njones > 1:
jones_spacing = np.diff(self.jones_array)
if np.min(jones_spacing) < np.max(jones_spacing):
raise ValueError(
"The jones values are not evenly spaced."
"The calibration fits file format does not"
" support unevenly spaced polarizations."
)
jones_spacing = jones_spacing[0]
else:
jones_spacing = -1
prihdr = fits.Header()
if self.total_quality_array is not None:
totqualhdr = fits.Header()
totqualhdr["EXTNAME"] = "TOTQLTY"
if self.cal_type != "gain":
sechdr = fits.Header()
sechdr["EXTNAME"] = "FLAGS"
# Conforming to fits format
prihdr["SIMPLE"] = True
prihdr["BITPIX"] = 32
prihdr["TELESCOP"] = self.telescope_name
prihdr["GNCONVEN"] = self.gain_convention
prihdr["CALTYPE"] = self.cal_type
prihdr["CALSTYLE"] = self.cal_style
if self.sky_field is not None:
prihdr["FIELD"] = self.sky_field
if self.sky_catalog is not None:
prihdr["CATALOG"] = self.sky_catalog
if self.ref_antenna_name is not None:
prihdr["REFANT"] = self.ref_antenna_name
if self.Nsources is not None:
prihdr["NSOURCES"] = self.Nsources
if self.baseline_range is not None:
prihdr["BL_RANGE"] = (
"[" + ", ".join([str(b) for b in self.baseline_range]) + "]"
)
if self.diffuse_model is not None:
prihdr["DIFFUSE"] = self.diffuse_model
if self.gain_scale is not None:
prihdr["GNSCALE"] = self.gain_scale
prihdr["INTTIME"] = self.integration_time
prihdr["CHWIDTH"] = self.channel_width
prihdr["XORIENT"] = self.x_orientation
if self.cal_type == "delay":
prihdr["FRQRANGE"] = ",".join(map(str, self.freq_range))
elif self.freq_range is not None:
prihdr["FRQRANGE"] = ",".join(map(str, self.freq_range))
prihdr["TMERANGE"] = ",".join(map(str, self.time_range))
if self.observer:
prihdr["OBSERVER"] = self.observer
if self.git_origin_cal:
prihdr["ORIGCAL"] = self.git_origin_cal
if self.git_hash_cal:
prihdr["HASHCAL"] = self.git_hash_cal
if self.cal_type == "unknown":
raise ValueError(
"unknown calibration type. Do not know how to " "store parameters"
)
# Define primary header values
# Arrays have (column-major) dimensions of
# [Nimages, Njones, Ntimes, Nfreqs, Nspw, Nantennas]
# For a "delay"-type calibration, Nfreqs is a shallow axis
# set the axis for number of arrays
prihdr["CTYPE1"] = ("Narrays", "Number of image arrays.")
prihdr["CUNIT1"] = "Integer"
prihdr["CDELT1"] = 1
prihdr["CRPIX1"] = 1
prihdr["CRVAL1"] = 1
# Jones axis
prihdr["CTYPE2"] = ("JONES", "Jones matrix array")
prihdr["CUNIT2"] = ("Integer", "representative integer for polarization.")
prihdr["CRPIX2"] = 1
prihdr["CRVAL2"] = self.jones_array[0] # always start with first jones.
prihdr["CDELT2"] = jones_spacing
# time axis
prihdr["CTYPE3"] = ("TIME", "Time axis.")
prihdr["CUNIT3"] = ("JD", "Time in julian date format")
prihdr["CRPIX3"] = 1
prihdr["CRVAL3"] = self.time_array[0]
prihdr["CDELT3"] = time_spacing
# freq axis
prihdr["CTYPE4"] = ("FREQS", "Frequency.")
prihdr["CUNIT4"] = "Hz"
prihdr["CRPIX4"] = 1
prihdr["CRVAL4"] = self.freq_array[0][0]
prihdr["CDELT4"] = freq_spacing
# spw axis: number of spectral windows
prihdr["CTYPE5"] = ("IF", "Spectral window number.")
prihdr["CUNIT5"] = "Integer"
prihdr["CRPIX5"] = 1
prihdr["CRVAL5"] = 1
prihdr["CDELT5"] = 1
# antenna axis
prihdr["CTYPE6"] = ("ANTAXIS", "See ANTARR in ANTENNA extension for values.")
prihdr["CUNIT6"] = "Integer"
prihdr["CRPIX6"] = 1
prihdr["CRVAL6"] = 1
prihdr["CDELT6"] = -1
# end standard keywords; begin user-defined keywords
for key, value in self.extra_keywords.items():
# header keywords have to be 8 characters or less
if len(str(key)) > 8:
warnings.warn(
"key {key} in extra_keywords is longer than 8 "
"characters. It will be truncated to 8 as required "
"by the calfits file format.".format(key=key)
)
keyword = key[:8].upper()
if isinstance(value, (dict, list, np.ndarray)):
raise TypeError(
"Extra keyword {keyword} is of {keytype}. "
"Only strings and numbers are "
"supported in calfits.".format(keyword=key, keytype=type(value))
)
if keyword == "COMMENT":
for line in value.splitlines():
prihdr.add_comment(line)
else:
prihdr[keyword] = value
for line in self.history.splitlines():
prihdr.add_history(line)
# define data section based on calibration type
if self.cal_type == "gain":
if self.input_flag_array is not None:
pridata = np.concatenate(
[
self.gain_array.real[:, :, :, :, :, np.newaxis],
self.gain_array.imag[:, :, :, :, :, np.newaxis],
self.flag_array[:, :, :, :, :, np.newaxis],
self.input_flag_array[:, :, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, :, np.newaxis],
],
axis=-1,
)
else:
pridata = np.concatenate(
[
self.gain_array.real[:, :, :, :, :, np.newaxis],
self.gain_array.imag[:, :, :, :, :, np.newaxis],
self.flag_array[:, :, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, :, np.newaxis],
],
axis=-1,
)
elif self.cal_type == "delay":
pridata = np.concatenate(
[
self.delay_array[:, :, :, :, :, np.newaxis],
self.quality_array[:, :, :, :, :, np.newaxis],
],
axis=-1,
)
# Set headers for the second hdu containing the flags. Only in
# cal_type=delay
# Can't put in primary header because frequency axis is shallow there,
# but not here
# Header values are the same as the primary header
sechdr["CTYPE1"] = ("Narrays", "Number of image arrays.")
sechdr["CUNIT1"] = "Integer"
sechdr["CRPIX1"] = 1
sechdr["CRVAL1"] = 1
sechdr["CDELT1"] = 1
sechdr["CTYPE2"] = ("JONES", "Jones matrix array")
sechdr["CUNIT2"] = ("Integer", "representative integer for polarization.")
sechdr["CRPIX2"] = 1
sechdr["CRVAL2"] = self.jones_array[0] # always start with first jones.
sechdr["CDELT2"] = jones_spacing
sechdr["CTYPE3"] = ("TIME", "Time axis.")
sechdr["CUNIT3"] = ("JD", "Time in julian date format")
sechdr["CRPIX3"] = 1
sechdr["CRVAL3"] = self.time_array[0]
sechdr["CDELT3"] = time_spacing
sechdr["CTYPE4"] = ("FREQS", "Valid frequencies to apply delay.")
sechdr["CUNIT4"] = "Hz"
sechdr["CRPIX4"] = 1
sechdr["CRVAL4"] = self.freq_array[0][0]
sechdr["CDELT4"] = freq_spacing
sechdr["CTYPE5"] = ("IF", "Spectral window number.")
sechdr["CUNIT5"] = "Integer"
sechdr["CRPIX5"] = 1
sechdr["CRVAL5"] = 1
sechdr["CDELT5"] = 1
sechdr["CTYPE6"] = (
"ANTAXIS",
"See ANTARR in ANTENNA extension for values.",
)
# convert from bool to int64; undone on read
if self.input_flag_array is not None:
secdata = np.concatenate(
[
self.flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis],
self.input_flag_array.astype(np.int64)[
:, :, :, :, :, np.newaxis
],
],
axis=-1,
)
else:
secdata = self.flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis]
if self.total_quality_array is not None:
# Set headers for the hdu containing the total_quality_array
# No antenna axis, so we have [Njones, Ntime, Nfreq, Nspws]
totqualhdr["CTYPE1"] = ("JONES", "Jones matrix array")
totqualhdr["CUNIT1"] = (
"Integer",
"representative integer for polarization.",
)
totqualhdr["CRPIX1"] = 1
totqualhdr["CRVAL1"] = self.jones_array[0] # always start with first jones.
totqualhdr["CDELT1"] = jones_spacing
totqualhdr["CTYPE2"] = ("TIME", "Time axis.")
totqualhdr["CUNIT2"] = ("JD", "Time in julian date format")
totqualhdr["CRPIX2"] = 1
totqualhdr["CRVAL2"] = self.time_array[0]
totqualhdr["CDELT2"] = time_spacing
totqualhdr["CTYPE3"] = ("FREQS", "Valid frequencies to apply delay.")
totqualhdr["CUNIT3"] = "Hz"
totqualhdr["CRPIX3"] = 1
totqualhdr["CRVAL3"] = self.freq_array[0][0]
totqualhdr["CDELT3"] = freq_spacing
# spws axis: number of spectral windows
totqualhdr["CTYPE4"] = ("IF", "Spectral window number.")
totqualhdr["CUNIT4"] = "Integer"
totqualhdr["CRPIX4"] = 1
totqualhdr["CRVAL4"] = 1
totqualhdr["CDELT4"] = 1
totqualdata = self.total_quality_array
# make HDUs
prihdu = fits.PrimaryHDU(data=pridata, header=prihdr)
# ant HDU
col1 = fits.Column(name="ANTNAME", format="8A", array=self.antenna_names)
col2 = fits.Column(name="ANTINDEX", format="D", array=self.antenna_numbers)
if self.Nants_data == self.Nants_telescope:
col3 = fits.Column(name="ANTARR", format="D", array=self.ant_array)
else:
# ant_array is shorter than the other columns.
# Pad the extra rows with -1s. Need to undo on read.
nants_add = self.Nants_telescope - self.Nants_data
ant_array_use = np.append(
self.ant_array, np.zeros(nants_add, dtype=np.int) - 1
)
col3 = fits.Column(name="ANTARR", format="D", array=ant_array_use)
cols = fits.ColDefs([col1, col2, col3])
ant_hdu = fits.BinTableHDU.from_columns(cols)
ant_hdu.header["EXTNAME"] = "ANTENNAS"
hdulist = fits.HDUList([prihdu, ant_hdu])
if self.cal_type != "gain":
sechdu = fits.ImageHDU(data=secdata, header=sechdr)
hdulist.append(sechdu)
if self.total_quality_array is not None:
totqualhdu = fits.ImageHDU(data=totqualdata, header=totqualhdr)
hdulist.append(totqualhdu)
hdulist.writeto(filename, overwrite=clobber)
def read_calfits(
self, filename, run_check=True, check_extra=True, run_check_acceptability=True
):
"""
Read data from a calfits file.
Parameters
----------
filename : str
The calfits file to read from.
run_check : bool
Option to check for the existence and proper shapes of
parameters after reading in the file.
check_extra : bool
Option to check optional parameters as well as required ones.
run_check_acceptability : bool
Option to check acceptable range of the values of
parameters after reading in the file.
"""
with fits.open(filename) as fname:
data = fname[0].data
hdr = fname[0].header.copy()
hdunames = uvutils._fits_indexhdus(fname)
anthdu = fname[hdunames["ANTENNAS"]]
self.Nants_telescope = anthdu.header["NAXIS2"]
antdata = anthdu.data
self.antenna_names = np.array(list(map(str, antdata["ANTNAME"])))
self.antenna_numbers = np.array(list(map(int, antdata["ANTINDEX"])))
self.ant_array = np.array(list(map(int, antdata["ANTARR"])))
if np.min(self.ant_array) < 0:
# ant_array was shorter than the other columns, so it was
# padded with -1s.
# Remove the padded entries.
self.ant_array = self.ant_array[np.where(self.ant_array >= 0)[0]]
self.channel_width = hdr.pop("CHWIDTH")
self.integration_time = hdr.pop("INTTIME")
self.telescope_name = hdr.pop("TELESCOP")
self.history = str(hdr.get("HISTORY", ""))
if not uvutils._check_history_version(
self.history, self.pyuvdata_version_str
):
if not self.history.endswith("\n"):
self.history += "\n"
self.history += self.pyuvdata_version_str
while "HISTORY" in hdr.keys():
hdr.remove("HISTORY")
self.time_range = list(map(float, hdr.pop("TMERANGE").split(",")))
self.gain_convention = hdr.pop("GNCONVEN")
self.gain_scale = hdr.pop("GNSCALE", None)
self.x_orientation = hdr.pop("XORIENT")
self.cal_type = hdr.pop("CALTYPE")
if self.cal_type == "delay":
self.freq_range = list(map(float, hdr.pop("FRQRANGE").split(",")))
else:
if "FRQRANGE" in hdr:
self.freq_range = list(map(float, hdr.pop("FRQRANGE").split(",")))
self.cal_style = hdr.pop("CALSTYLE")
self.sky_field = hdr.pop("FIELD", None)
self.sky_catalog = hdr.pop("CATALOG", None)
self.ref_antenna_name = hdr.pop("REFANT", None)
self.Nsources = hdr.pop("NSOURCES", None)
bl_range_string = hdr.pop("BL_RANGE", None)
if bl_range_string is not None:
self.baseline_range = [
float(b) for b in bl_range_string.strip("[").strip("]").split(",")
]
self.diffuse_model = hdr.pop("DIFFUSE", None)
self.observer = hdr.pop("OBSERVER", None)
self.git_origin_cal = hdr.pop("ORIGCAL", None)
self.git_hash_cal = hdr.pop("HASHCAL", None)
# generate polarization and time array for either cal_type.
self.Njones = hdr.pop("NAXIS2")
self.jones_array = uvutils._fits_gethduaxis(fname[0], 2)
self.Ntimes = hdr.pop("NAXIS3")
self.time_array = uvutils._fits_gethduaxis(fname[0], 3)
self.Nspws = hdr.pop("NAXIS5")
# subtract 1 to be zero-indexed
self.spw_array = uvutils._fits_gethduaxis(fname[0], 5) - 1
# get data.
if self.cal_type == "gain":
self.set_gain()
self.gain_array = data[:, :, :, :, :, 0] + 1j * data[:, :, :, :, :, 1]
self.flag_array = data[:, :, :, :, :, 2].astype("bool")
if hdr.pop("NAXIS1") == 5:
self.input_flag_array = data[:, :, :, :, :, 3].astype("bool")
self.quality_array = data[:, :, :, :, :, 4]
else:
self.quality_array = data[:, :, :, :, :, 3]
self.Nants_data = hdr.pop("NAXIS6")
# generate frequency array from primary data unit.
self.Nfreqs = hdr.pop("NAXIS4")
self.freq_array = uvutils._fits_gethduaxis(fname[0], 4)
self.freq_array.shape = (self.Nspws,) + self.freq_array.shape
if self.cal_type == "delay":
self.set_delay()
self.Nants_data = hdr.pop("NAXIS6")
self.delay_array = data[:, :, :, :, :, 0]
self.quality_array = data[:, :, :, :, :, 1]
sechdu = fname[hdunames["FLAGS"]]
flag_data = sechdu.data
if sechdu.header["NAXIS1"] == 2:
self.flag_array = flag_data[:, :, :, :, :, 0].astype("bool")
self.input_flag_array = flag_data[:, :, :, :, :, 1].astype("bool")
else:
self.flag_array = flag_data[:, :, :, :, :, 0].astype("bool")
# generate frequency array from flag data unit
# (no freq axis in primary).
self.Nfreqs = sechdu.header["NAXIS4"]
self.freq_array = uvutils._fits_gethduaxis(sechdu, 4)
self.freq_array.shape = (self.Nspws,) + self.freq_array.shape
spw_array = uvutils._fits_gethduaxis(sechdu, 5) - 1
if not np.allclose(spw_array, self.spw_array):
raise ValueError(
"Spectral window values are different in FLAGS HDU than"
" in primary HDU"
)
time_array = uvutils._fits_gethduaxis(sechdu, 3)
if not np.allclose(
time_array,
self.time_array,
rtol=self._time_array.tols[0],
atol=self._time_array.tols[0],
):
raise ValueError(
"Time values are different in FLAGS HDU than in primary HDU"
)
jones_array = uvutils._fits_gethduaxis(sechdu, 2)
if not np.allclose(
jones_array,
self.jones_array,
rtol=self._jones_array.tols[0],
atol=self._jones_array.tols[0],
):
raise ValueError(
"Jones values are different in FLAGS HDU than in primary HDU"
)
# remove standard FITS header items that are still around
std_fits_substrings = [
"SIMPLE",
"BITPIX",
"EXTEND",
"BLOCKED",
"GROUPS",
"PCOUNT",
"BSCALE",
"BZERO",
"NAXIS",
"PTYPE",
"PSCAL",
"PZERO",
"CTYPE",
"CRVAL",
"CRPIX",
"CDELT",
"CROTA",
"CUNIT",
]
for key in list(hdr.keys()):
for sub in std_fits_substrings:
if key.find(sub) > -1:
hdr.remove(key)
# find all the remaining header items and keep them as extra_keywords
for key in hdr:
if key == "COMMENT":
self.extra_keywords[key] = str(hdr.get(key))
elif key != "":
self.extra_keywords[key] = hdr.get(key)
# get total quality array if present
if "TOTQLTY" in hdunames:
totqualhdu = fname[hdunames["TOTQLTY"]]
self.total_quality_array = totqualhdu.data
spw_array = uvutils._fits_gethduaxis(totqualhdu, 4) - 1
if not np.allclose(spw_array, self.spw_array):
raise ValueError(
"Spectral window values are different in "
"TOTQLTY HDU than in primary HDU. primary HDU "
"has {pspw}, TOTQLTY has {tspw}".format(
pspw=self.spw_array, tspw=spw_array
)
)
if self.cal_type != "delay":
# delay-type files won't have a freq_array
freq_array = uvutils._fits_gethduaxis(totqualhdu, 3)
freq_array.shape = (self.Nspws,) + freq_array.shape
if not np.allclose(
freq_array,
self.freq_array,
rtol=self._freq_array.tols[0],
atol=self._freq_array.tols[0],
):
raise ValueError(
"Frequency values are different in TOTQLTY HDU than"
" in primary HDU"
)
time_array = uvutils._fits_gethduaxis(totqualhdu, 2)
if not np.allclose(
time_array,
self.time_array,
rtol=self._time_array.tols[0],
atol=self._time_array.tols[0],
):
raise ValueError(
"Time values are different in TOTQLTY HDU than in primary HDU"
)
jones_array = uvutils._fits_gethduaxis(totqualhdu, 1)
if not np.allclose(
jones_array,
self.jones_array,
rtol=self._jones_array.tols[0],
atol=self._jones_array.tols[0],
):
raise ValueError(
"Jones values are different in TOTQLTY HDU than in primary HDU"
)
else:
self.total_quality_array = None
if run_check:
self.check(
check_extra=check_extra, run_check_acceptability=run_check_acceptability
)