https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: 5e2a4940796f4bd063315dc5d614b3e0176c6df9 authored by Paul La Plante on 24 March 2020, 21:36:44 UTC
Update changelog for new version
Tip revision: 5e2a494
calfits.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading and writing calibration FITS files."""

from __future__ import absolute_import, division, print_function

import numpy as np
import warnings
from astropy.io import fits

from .uvcal import UVCal
from .. import utils as uvutils

__all__ = ["CALFITS"]


class CALFITS(UVCal):
    """
    Defines a calfits-specific class for reading and writing calfits files.

    This class should not be interacted with directly, instead use the read_calfits
    and write_calfits methods on the UVCal class.

    """

    def write_calfits(
        self,
        filename,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
        clobber=False,
    ):
        """
        Write the data to a calfits file.

        Parameters
        ----------
        filename : str
            The calfits file to write to.
        run_check : bool
            Option to check for the existence and proper shapes of
            parameters before writing the file.
        check_extra : bool
            Option to check optional parameters as well as required ones.
        run_check_acceptability : bool
            Option to check acceptable range of the values of
            parameters before writing the file.
        clobber : bool
            Option to overwrite the filename if the file already exists.

        """
        if run_check:
            self.check(
                check_extra=check_extra, run_check_acceptability=run_check_acceptability
            )

        if self.Nfreqs > 1:
            freq_spacing = self.freq_array[0, 1:] - self.freq_array[0, :-1]
            if not np.isclose(
                np.min(freq_spacing),
                np.max(freq_spacing),
                rtol=self._freq_array.tols[0],
                atol=self._freq_array.tols[1],
            ):
                raise ValueError(
                    "The frequencies are not evenly spaced (probably "
                    "because of a select operation). The calfits format "
                    "does not support unevenly spaced frequencies."
                )
            if np.isclose(freq_spacing[0], self.channel_width):
                freq_spacing = self.channel_width
            else:
                rounded_spacing = np.around(
                    freq_spacing, int(np.ceil(np.log10(self._freq_array.tols[1]) * -1))
                )
                freq_spacing = rounded_spacing[0]
        else:
            freq_spacing = self.channel_width

        if self.Ntimes > 1:
            time_spacing = np.diff(self.time_array)
            if not np.isclose(
                np.min(time_spacing),
                np.max(time_spacing),
                rtol=self._time_array.tols[0],
                atol=self._time_array.tols[1],
            ):
                raise ValueError(
                    "The times are not evenly spaced (probably "
                    "because of a select operation). The calfits format "
                    "does not support unevenly spaced times."
                )
            if np.isclose(time_spacing[0], self.integration_time / (24.0 * 60.0 ** 2)):
                time_spacing = self.integration_time / (24.0 * 60.0 ** 2)
            else:
                rounded_spacing = np.around(
                    time_spacing,
                    int(
                        np.ceil(np.log10(self._time_array.tols[1] / self.Ntimes) * -1)
                        + 1
                    ),
                )
                time_spacing = rounded_spacing[0]
        else:
            time_spacing = self.integration_time / (24.0 * 60.0 ** 2)

        if self.Njones > 1:
            jones_spacing = np.diff(self.jones_array)
            if np.min(jones_spacing) < np.max(jones_spacing):
                raise ValueError(
                    "The jones values are not evenly spaced."
                    "The calibration fits file format does not"
                    " support unevenly spaced polarizations."
                )
            jones_spacing = jones_spacing[0]
        else:
            jones_spacing = -1

        prihdr = fits.Header()
        if self.total_quality_array is not None:
            totqualhdr = fits.Header()
            totqualhdr["EXTNAME"] = "TOTQLTY"
        if self.cal_type != "gain":
            sechdr = fits.Header()
            sechdr["EXTNAME"] = "FLAGS"
        # Conforming to fits format
        prihdr["SIMPLE"] = True
        prihdr["BITPIX"] = 32
        prihdr["TELESCOP"] = self.telescope_name
        prihdr["GNCONVEN"] = self.gain_convention
        prihdr["CALTYPE"] = self.cal_type
        prihdr["CALSTYLE"] = self.cal_style
        if self.sky_field is not None:
            prihdr["FIELD"] = self.sky_field
        if self.sky_catalog is not None:
            prihdr["CATALOG"] = self.sky_catalog
        if self.ref_antenna_name is not None:
            prihdr["REFANT"] = self.ref_antenna_name
        if self.Nsources is not None:
            prihdr["NSOURCES"] = self.Nsources
        if self.baseline_range is not None:
            prihdr["BL_RANGE"] = (
                "[" + ", ".join([str(b) for b in self.baseline_range]) + "]"
            )
        if self.diffuse_model is not None:
            prihdr["DIFFUSE"] = self.diffuse_model
        if self.gain_scale is not None:
            prihdr["GNSCALE"] = self.gain_scale
        prihdr["INTTIME"] = self.integration_time
        prihdr["CHWIDTH"] = self.channel_width
        prihdr["XORIENT"] = self.x_orientation
        if self.cal_type == "delay":
            prihdr["FRQRANGE"] = ",".join(map(str, self.freq_range))
        elif self.freq_range is not None:
            prihdr["FRQRANGE"] = ",".join(map(str, self.freq_range))
        prihdr["TMERANGE"] = ",".join(map(str, self.time_range))

        if self.observer:
            prihdr["OBSERVER"] = self.observer
        if self.git_origin_cal:
            prihdr["ORIGCAL"] = self.git_origin_cal
        if self.git_hash_cal:
            prihdr["HASHCAL"] = self.git_hash_cal

        if self.cal_type == "unknown":
            raise ValueError(
                "unknown calibration type. Do not know how to " "store parameters"
            )

        # Define primary header values
        # Arrays have (column-major) dimensions of
        # [Nimages, Njones, Ntimes, Nfreqs, Nspw, Nantennas]
        # For a "delay"-type calibration, Nfreqs is a shallow axis

        # set the axis for number of arrays
        prihdr["CTYPE1"] = ("Narrays", "Number of image arrays.")
        prihdr["CUNIT1"] = "Integer"
        prihdr["CDELT1"] = 1
        prihdr["CRPIX1"] = 1
        prihdr["CRVAL1"] = 1

        # Jones axis
        prihdr["CTYPE2"] = ("JONES", "Jones matrix array")
        prihdr["CUNIT2"] = ("Integer", "representative integer for polarization.")
        prihdr["CRPIX2"] = 1
        prihdr["CRVAL2"] = self.jones_array[0]  # always start with first jones.
        prihdr["CDELT2"] = jones_spacing

        # time axis
        prihdr["CTYPE3"] = ("TIME", "Time axis.")
        prihdr["CUNIT3"] = ("JD", "Time in julian date format")
        prihdr["CRPIX3"] = 1
        prihdr["CRVAL3"] = self.time_array[0]
        prihdr["CDELT3"] = time_spacing

        # freq axis
        prihdr["CTYPE4"] = ("FREQS", "Frequency.")
        prihdr["CUNIT4"] = "Hz"
        prihdr["CRPIX4"] = 1
        prihdr["CRVAL4"] = self.freq_array[0][0]
        prihdr["CDELT4"] = freq_spacing

        # spw axis: number of spectral windows
        prihdr["CTYPE5"] = ("IF", "Spectral window number.")
        prihdr["CUNIT5"] = "Integer"
        prihdr["CRPIX5"] = 1
        prihdr["CRVAL5"] = 1
        prihdr["CDELT5"] = 1

        # antenna axis
        prihdr["CTYPE6"] = ("ANTAXIS", "See ANTARR in ANTENNA extension for values.")
        prihdr["CUNIT6"] = "Integer"
        prihdr["CRPIX6"] = 1
        prihdr["CRVAL6"] = 1
        prihdr["CDELT6"] = -1

        # end standard keywords; begin user-defined keywords
        for key, value in self.extra_keywords.items():
            # header keywords have to be 8 characters or less
            if len(str(key)) > 8:
                warnings.warn(
                    "key {key} in extra_keywords is longer than 8 "
                    "characters. It will be truncated to 8 as required "
                    "by the calfits file format.".format(key=key)
                )
            keyword = key[:8].upper()
            if isinstance(value, (dict, list, np.ndarray)):
                raise TypeError(
                    "Extra keyword {keyword} is of {keytype}. "
                    "Only strings and numbers are "
                    "supported in calfits.".format(keyword=key, keytype=type(value))
                )

            if keyword == "COMMENT":
                for line in value.splitlines():
                    prihdr.add_comment(line)
            else:
                prihdr[keyword] = value

        for line in self.history.splitlines():
            prihdr.add_history(line)

        # define data section based on calibration type
        if self.cal_type == "gain":
            if self.input_flag_array is not None:
                pridata = np.concatenate(
                    [
                        self.gain_array.real[:, :, :, :, :, np.newaxis],
                        self.gain_array.imag[:, :, :, :, :, np.newaxis],
                        self.flag_array[:, :, :, :, :, np.newaxis],
                        self.input_flag_array[:, :, :, :, :, np.newaxis],
                        self.quality_array[:, :, :, :, :, np.newaxis],
                    ],
                    axis=-1,
                )
            else:
                pridata = np.concatenate(
                    [
                        self.gain_array.real[:, :, :, :, :, np.newaxis],
                        self.gain_array.imag[:, :, :, :, :, np.newaxis],
                        self.flag_array[:, :, :, :, :, np.newaxis],
                        self.quality_array[:, :, :, :, :, np.newaxis],
                    ],
                    axis=-1,
                )

        elif self.cal_type == "delay":
            pridata = np.concatenate(
                [
                    self.delay_array[:, :, :, :, :, np.newaxis],
                    self.quality_array[:, :, :, :, :, np.newaxis],
                ],
                axis=-1,
            )

            # Set headers for the second hdu containing the flags. Only in
            # cal_type=delay
            # Can't put in primary header because frequency axis is shallow there,
            # but not here
            # Header values are the same as the primary header
            sechdr["CTYPE1"] = ("Narrays", "Number of image arrays.")
            sechdr["CUNIT1"] = "Integer"
            sechdr["CRPIX1"] = 1
            sechdr["CRVAL1"] = 1
            sechdr["CDELT1"] = 1

            sechdr["CTYPE2"] = ("JONES", "Jones matrix array")
            sechdr["CUNIT2"] = ("Integer", "representative integer for polarization.")
            sechdr["CRPIX2"] = 1
            sechdr["CRVAL2"] = self.jones_array[0]  # always start with first jones.
            sechdr["CDELT2"] = jones_spacing

            sechdr["CTYPE3"] = ("TIME", "Time axis.")
            sechdr["CUNIT3"] = ("JD", "Time in julian date format")
            sechdr["CRPIX3"] = 1
            sechdr["CRVAL3"] = self.time_array[0]
            sechdr["CDELT3"] = time_spacing

            sechdr["CTYPE4"] = ("FREQS", "Valid frequencies to apply delay.")
            sechdr["CUNIT4"] = "Hz"
            sechdr["CRPIX4"] = 1
            sechdr["CRVAL4"] = self.freq_array[0][0]
            sechdr["CDELT4"] = freq_spacing

            sechdr["CTYPE5"] = ("IF", "Spectral window number.")
            sechdr["CUNIT5"] = "Integer"
            sechdr["CRPIX5"] = 1
            sechdr["CRVAL5"] = 1
            sechdr["CDELT5"] = 1

            sechdr["CTYPE6"] = (
                "ANTAXIS",
                "See ANTARR in ANTENNA extension for values.",
            )

            # convert from bool to int64; undone on read
            if self.input_flag_array is not None:
                secdata = np.concatenate(
                    [
                        self.flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis],
                        self.input_flag_array.astype(np.int64)[
                            :, :, :, :, :, np.newaxis
                        ],
                    ],
                    axis=-1,
                )
            else:
                secdata = self.flag_array.astype(np.int64)[:, :, :, :, :, np.newaxis]

        if self.total_quality_array is not None:
            # Set headers for the hdu containing the total_quality_array
            # No antenna axis, so we have [Njones, Ntime, Nfreq, Nspws]
            totqualhdr["CTYPE1"] = ("JONES", "Jones matrix array")
            totqualhdr["CUNIT1"] = (
                "Integer",
                "representative integer for polarization.",
            )
            totqualhdr["CRPIX1"] = 1
            totqualhdr["CRVAL1"] = self.jones_array[0]  # always start with first jones.
            totqualhdr["CDELT1"] = jones_spacing

            totqualhdr["CTYPE2"] = ("TIME", "Time axis.")
            totqualhdr["CUNIT2"] = ("JD", "Time in julian date format")
            totqualhdr["CRPIX2"] = 1
            totqualhdr["CRVAL2"] = self.time_array[0]
            totqualhdr["CDELT2"] = time_spacing

            totqualhdr["CTYPE3"] = ("FREQS", "Valid frequencies to apply delay.")
            totqualhdr["CUNIT3"] = "Hz"
            totqualhdr["CRPIX3"] = 1
            totqualhdr["CRVAL3"] = self.freq_array[0][0]
            totqualhdr["CDELT3"] = freq_spacing

            # spws axis: number of spectral windows
            totqualhdr["CTYPE4"] = ("IF", "Spectral window number.")
            totqualhdr["CUNIT4"] = "Integer"
            totqualhdr["CRPIX4"] = 1
            totqualhdr["CRVAL4"] = 1
            totqualhdr["CDELT4"] = 1
            totqualdata = self.total_quality_array

        # make HDUs
        prihdu = fits.PrimaryHDU(data=pridata, header=prihdr)

        # ant HDU
        col1 = fits.Column(name="ANTNAME", format="8A", array=self.antenna_names)
        col2 = fits.Column(name="ANTINDEX", format="D", array=self.antenna_numbers)
        if self.Nants_data == self.Nants_telescope:
            col3 = fits.Column(name="ANTARR", format="D", array=self.ant_array)
        else:
            # ant_array is shorter than the other columns.
            # Pad the extra rows with -1s. Need to undo on read.
            nants_add = self.Nants_telescope - self.Nants_data
            ant_array_use = np.append(
                self.ant_array, np.zeros(nants_add, dtype=np.int) - 1
            )
            col3 = fits.Column(name="ANTARR", format="D", array=ant_array_use)
        cols = fits.ColDefs([col1, col2, col3])
        ant_hdu = fits.BinTableHDU.from_columns(cols)
        ant_hdu.header["EXTNAME"] = "ANTENNAS"

        hdulist = fits.HDUList([prihdu, ant_hdu])

        if self.cal_type != "gain":
            sechdu = fits.ImageHDU(data=secdata, header=sechdr)
            hdulist.append(sechdu)

        if self.total_quality_array is not None:
            totqualhdu = fits.ImageHDU(data=totqualdata, header=totqualhdr)
            hdulist.append(totqualhdu)

        hdulist.writeto(filename, overwrite=clobber)

    def read_calfits(
        self, filename, run_check=True, check_extra=True, run_check_acceptability=True
    ):
        """
        Read data from a calfits file.

        Parameters
        ----------
        filename : str
            The calfits file to read from.
        run_check : bool
            Option to check for the existence and proper shapes of
            parameters after reading in the file.
        check_extra : bool
            Option to check optional parameters as well as required ones.
        run_check_acceptability : bool
            Option to check acceptable range of the values of
            parameters after reading in the file.

        """
        with fits.open(filename) as fname:
            data = fname[0].data
            hdr = fname[0].header.copy()
            hdunames = uvutils._fits_indexhdus(fname)

            anthdu = fname[hdunames["ANTENNAS"]]
            self.Nants_telescope = anthdu.header["NAXIS2"]
            antdata = anthdu.data
            self.antenna_names = np.array(list(map(str, antdata["ANTNAME"])))
            self.antenna_numbers = np.array(list(map(int, antdata["ANTINDEX"])))
            self.ant_array = np.array(list(map(int, antdata["ANTARR"])))
            if np.min(self.ant_array) < 0:
                # ant_array was shorter than the other columns, so it was
                # padded with -1s.
                # Remove the padded entries.
                self.ant_array = self.ant_array[np.where(self.ant_array >= 0)[0]]

            self.channel_width = hdr.pop("CHWIDTH")
            self.integration_time = hdr.pop("INTTIME")
            self.telescope_name = hdr.pop("TELESCOP")
            self.history = str(hdr.get("HISTORY", ""))

            if not uvutils._check_history_version(
                self.history, self.pyuvdata_version_str
            ):
                if not self.history.endswith("\n"):
                    self.history += "\n"

                self.history += self.pyuvdata_version_str

            while "HISTORY" in hdr.keys():
                hdr.remove("HISTORY")
            self.time_range = list(map(float, hdr.pop("TMERANGE").split(",")))
            self.gain_convention = hdr.pop("GNCONVEN")
            self.gain_scale = hdr.pop("GNSCALE", None)
            self.x_orientation = hdr.pop("XORIENT")
            self.cal_type = hdr.pop("CALTYPE")
            if self.cal_type == "delay":
                self.freq_range = list(map(float, hdr.pop("FRQRANGE").split(",")))
            else:
                if "FRQRANGE" in hdr:
                    self.freq_range = list(map(float, hdr.pop("FRQRANGE").split(",")))

            self.cal_style = hdr.pop("CALSTYLE")
            self.sky_field = hdr.pop("FIELD", None)
            self.sky_catalog = hdr.pop("CATALOG", None)
            self.ref_antenna_name = hdr.pop("REFANT", None)
            self.Nsources = hdr.pop("NSOURCES", None)
            bl_range_string = hdr.pop("BL_RANGE", None)
            if bl_range_string is not None:
                self.baseline_range = [
                    float(b) for b in bl_range_string.strip("[").strip("]").split(",")
                ]
            self.diffuse_model = hdr.pop("DIFFUSE", None)

            self.observer = hdr.pop("OBSERVER", None)
            self.git_origin_cal = hdr.pop("ORIGCAL", None)
            self.git_hash_cal = hdr.pop("HASHCAL", None)

            # generate polarization and time array for either cal_type.
            self.Njones = hdr.pop("NAXIS2")
            self.jones_array = uvutils._fits_gethduaxis(fname[0], 2)
            self.Ntimes = hdr.pop("NAXIS3")
            self.time_array = uvutils._fits_gethduaxis(fname[0], 3)

            self.Nspws = hdr.pop("NAXIS5")
            # subtract 1 to be zero-indexed
            self.spw_array = uvutils._fits_gethduaxis(fname[0], 5) - 1

            # get data.
            if self.cal_type == "gain":
                self.set_gain()
                self.gain_array = data[:, :, :, :, :, 0] + 1j * data[:, :, :, :, :, 1]
                self.flag_array = data[:, :, :, :, :, 2].astype("bool")
                if hdr.pop("NAXIS1") == 5:
                    self.input_flag_array = data[:, :, :, :, :, 3].astype("bool")
                    self.quality_array = data[:, :, :, :, :, 4]
                else:
                    self.quality_array = data[:, :, :, :, :, 3]

                self.Nants_data = hdr.pop("NAXIS6")

                # generate frequency array from primary data unit.
                self.Nfreqs = hdr.pop("NAXIS4")
                self.freq_array = uvutils._fits_gethduaxis(fname[0], 4)
                self.freq_array.shape = (self.Nspws,) + self.freq_array.shape

            if self.cal_type == "delay":
                self.set_delay()
                self.Nants_data = hdr.pop("NAXIS6")

                self.delay_array = data[:, :, :, :, :, 0]
                self.quality_array = data[:, :, :, :, :, 1]

                sechdu = fname[hdunames["FLAGS"]]
                flag_data = sechdu.data
                if sechdu.header["NAXIS1"] == 2:
                    self.flag_array = flag_data[:, :, :, :, :, 0].astype("bool")
                    self.input_flag_array = flag_data[:, :, :, :, :, 1].astype("bool")
                else:
                    self.flag_array = flag_data[:, :, :, :, :, 0].astype("bool")

                # generate frequency array from flag data unit
                # (no freq axis in primary).
                self.Nfreqs = sechdu.header["NAXIS4"]
                self.freq_array = uvutils._fits_gethduaxis(sechdu, 4)
                self.freq_array.shape = (self.Nspws,) + self.freq_array.shape

                spw_array = uvutils._fits_gethduaxis(sechdu, 5) - 1

                if not np.allclose(spw_array, self.spw_array):
                    raise ValueError(
                        "Spectral window values are different in FLAGS HDU than"
                        " in primary HDU"
                    )

                time_array = uvutils._fits_gethduaxis(sechdu, 3)
                if not np.allclose(
                    time_array,
                    self.time_array,
                    rtol=self._time_array.tols[0],
                    atol=self._time_array.tols[0],
                ):
                    raise ValueError(
                        "Time values are different in FLAGS HDU than in primary HDU"
                    )

                jones_array = uvutils._fits_gethduaxis(sechdu, 2)
                if not np.allclose(
                    jones_array,
                    self.jones_array,
                    rtol=self._jones_array.tols[0],
                    atol=self._jones_array.tols[0],
                ):
                    raise ValueError(
                        "Jones values are different in FLAGS HDU than in primary HDU"
                    )

            # remove standard FITS header items that are still around
            std_fits_substrings = [
                "SIMPLE",
                "BITPIX",
                "EXTEND",
                "BLOCKED",
                "GROUPS",
                "PCOUNT",
                "BSCALE",
                "BZERO",
                "NAXIS",
                "PTYPE",
                "PSCAL",
                "PZERO",
                "CTYPE",
                "CRVAL",
                "CRPIX",
                "CDELT",
                "CROTA",
                "CUNIT",
            ]
            for key in list(hdr.keys()):
                for sub in std_fits_substrings:
                    if key.find(sub) > -1:
                        hdr.remove(key)

            # find all the remaining header items and keep them as extra_keywords
            for key in hdr:
                if key == "COMMENT":
                    self.extra_keywords[key] = str(hdr.get(key))
                elif key != "":
                    self.extra_keywords[key] = hdr.get(key)

            # get total quality array if present
            if "TOTQLTY" in hdunames:
                totqualhdu = fname[hdunames["TOTQLTY"]]
                self.total_quality_array = totqualhdu.data
                spw_array = uvutils._fits_gethduaxis(totqualhdu, 4) - 1
                if not np.allclose(spw_array, self.spw_array):
                    raise ValueError(
                        "Spectral window values are different in "
                        "TOTQLTY HDU than in primary HDU. primary HDU "
                        "has {pspw}, TOTQLTY has {tspw}".format(
                            pspw=self.spw_array, tspw=spw_array
                        )
                    )

                if self.cal_type != "delay":
                    # delay-type files won't have a freq_array
                    freq_array = uvutils._fits_gethduaxis(totqualhdu, 3)
                    freq_array.shape = (self.Nspws,) + freq_array.shape
                    if not np.allclose(
                        freq_array,
                        self.freq_array,
                        rtol=self._freq_array.tols[0],
                        atol=self._freq_array.tols[0],
                    ):
                        raise ValueError(
                            "Frequency values are different in TOTQLTY HDU than"
                            " in primary HDU"
                        )

                time_array = uvutils._fits_gethduaxis(totqualhdu, 2)
                if not np.allclose(
                    time_array,
                    self.time_array,
                    rtol=self._time_array.tols[0],
                    atol=self._time_array.tols[0],
                ):
                    raise ValueError(
                        "Time values are different in TOTQLTY HDU than in primary HDU"
                    )

                jones_array = uvutils._fits_gethduaxis(totqualhdu, 1)
                if not np.allclose(
                    jones_array,
                    self.jones_array,
                    rtol=self._jones_array.tols[0],
                    atol=self._jones_array.tols[0],
                ):
                    raise ValueError(
                        "Jones values are different in TOTQLTY HDU than in primary HDU"
                    )

            else:
                self.total_quality_array = None

        if run_check:
            self.check(
                check_extra=check_extra, run_check_acceptability=run_check_acceptability
            )
back to top