https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 37c5370106db1465372a14b225e90fee12e0fe1b authored by pyxieloustar on 31 January 2022, 21:11:03 UTC
fix test messages
fix test messages
Tip revision: 37c5370
mwa_corr_fits.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2019 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Class for reading MWA correlator FITS files."""
import os
import warnings
import itertools
import numpy as np
import h5py
from astropy.io import fits
from astropy.time import Time
from astropy import constants as const
from pyuvdata.data import DATA_PATH
from scipy.special import erf
from scipy.integrate import simps
from numpy.polynomial.chebyshev import chebval3d
from numpy.polynomial.polynomial import polyval2d, polyval
from .. import _corr_fits
from . import UVData
from .. import utils as uvutils
__all__ = ["input_output_mapping", "MWACorrFITS"]
def input_output_mapping():
"""Build a mapping dictionary from pfb input to output numbers."""
# the polyphase filter bank maps inputs to outputs, which the MWA
# correlator then records as the antenna indices.
# the following is taken from mwa_build_lfiles/mwac_utils.c
# inputs are mapped to outputs via pfb_mapper as follows
# (from mwa_build_lfiles/antenna_mapping.h):
# floor(index/4) + index%4 * 16 = input
# for the first 64 outputs, pfb_mapper[output] = input
return _corr_fits.input_output_mapping()
def sighat_vector(x):
"""
Generate quantized sigma using Van Vleck relation.
For an explanation of the Van Vleck relations used and their implementation
in this code, see the memos at
https://github.com/EoRImaging/Memos/blob/master/PDFs/007_Van_Vleck_A.pdf and
https://github.com/EoRImaging/Memos/blob/master/PDFs/008_Van_Vleck_B.pdf
Parameters
----------
x : numpy array
Array of sigma inputs.
Returns
-------
sighat : numpy array
Array of corresponding sigmas of quantized values.
"""
yy = np.arange(7)[:, np.newaxis]
z = (2 * yy + 1) * erf((yy + 0.5) / (x * np.sqrt(2)))
z = z.sum(axis=0)
sighat = np.sqrt(7 ** 2 - z)
return sighat
def sighat_vector_prime(x):
"""
Calculate the derivative of sighat_vector.
Parameters
----------
x : numpy array
Array of sigma inputs.
Returns
-------
sighat : numpy array
Array of corresponding derivatives with respect to sigma inputs.
"""
yy = np.arange(7)[:, np.newaxis]
z = (
(2 * yy + 1)
* (yy + 0.5)
* np.exp(-((yy + 0.5) ** 2) / (2 * (x ** 2)))
/ (np.sqrt(2 * np.pi) * (x ** 2))
)
sighat_prime = z.sum(axis=0)
sighat_prime /= sighat_vector(x)
return sighat_prime
def corrcorrect_simps(rho, sig1, sig2):
"""
Generate quantized kappa using the Van Vleck relation.
For an explanation of the Van Vleck relations used and their implementation
in this code, see the memos at
https://github.com/EoRImaging/Memos/blob/master/PDFs/007_Van_Vleck_A.pdf and
https://github.com/EoRImaging/Memos/blob/master/PDFs/008_Van_Vleck_B.pdf
Parameters
----------
rho : numpy array
Array of rho inputs.
sig1 : numpy array
Array of sigma inputs corresponding to antenna 1.
sig2: numpy array
Array of sigma inputs corresponding to antenna 2.
Returns
-------
integrated_khat : numpy array
Array of cross-correlations of quantized values.
"""
x = np.linspace(0, rho, 11, dtype=np.float64)
khat = np.zeros((11, rho.size), dtype=np.float64)
khat = _corr_fits.get_khat(x, sig1, sig2)
integrated_khat = simps(khat, x, axis=0)
return integrated_khat
def corrcorrect_vect_prime(rho, sig1, sig2):
"""
Calculate the derivative of corrcorrect_simps.
Parameters
----------
rho : numpy array
Array of rho inputs.
sig1 : numpy array
Array of sigma inputs corresponding to antenna 1.
sig2: numpy array
Array of sigma inputs corresponding to antenna 2.
"""
return _corr_fits.get_khat(rho, sig1, sig2)
def van_vleck_autos(sighat):
"""
Use Newton's method to solve the inverse of sighat_vector.
For an explanation of the Van Vleck corrections used and their implementation
in this code, see the memos at
https://github.com/EoRImaging/Memos/blob/master/PDFs/007_Van_Vleck_A.pdf and
https://github.com/EoRImaging/Memos/blob/master/PDFs/008_Van_Vleck_B.pdf
Parameters
----------
sighat_arr : numpy array
Array of quantized sigma to be corrected.
Returns
-------
sighat_arr : numpy array
Array of Van Vleck corrected scaled auto-correlations.
"""
guess = np.copy(sighat)
inds = np.where(np.abs(sighat_vector(guess) - sighat) > 1e-10)[0]
while len(inds) != 0:
guess[inds] -= (
sighat_vector(guess[inds]) - sighat[inds]
) / sighat_vector_prime(guess[inds])
inds = np.where(np.abs(sighat_vector(guess) - sighat) > 1e-10)[0]
return guess
def van_vleck_crosses_int(k_arr, sig1_arr, sig2_arr, cheby_approx):
"""
Use Newton's method to solve the inverse of corrcorrect_simps.
For an explanation of the Van Vleck corrections used and their implementation
in this code, see the memos at
https://github.com/EoRImaging/Memos/blob/master/PDFs/007_Van_Vleck_A.pdf and
https://github.com/EoRImaging/Memos/blob/master/PDFs/008_Van_Vleck_B.pdf
Parameters
----------
k_arr : numpy array
Array of quantized kappa to be corrected.
sig1_arr : numpy array
Array of sigma inputs corresponding to antenna 1.
sig2_arr: numpy array
Array of sigma inputs corresponding to antenna 2.
cheby_approx : bool
Flag to warn if chebyshev approximation is being used.
Returns
-------
k_arr : numpy array
Array of Van Vleck corrected scaled cross-correlations.
"""
real = k_arr.real
imag = k_arr.imag
warned = False
for kap in real, imag:
nonzero_mask = (kap != 0) & (sig1_arr != 0) & (sig2_arr != 0)
if len(np.nonzero(nonzero_mask)[0]) > 0.0:
if cheby_approx and not warned:
warnings.warn(
str(len(np.nonzero(nonzero_mask)[0]))
+ " values are being corrected with the van vleck integral"
)
warned = True
neg_inds = np.where(kap < 0.0)[0]
khat = np.abs(kap[nonzero_mask])
sig1 = sig1_arr[nonzero_mask]
sig2 = sig2_arr[nonzero_mask]
x0 = khat / (sig1 * sig2)
corr = corrcorrect_simps(x0, sig1, sig2) - khat
x0 -= corr / corrcorrect_vect_prime(x0, sig1, sig2)
inds = np.where(np.abs(corr) > 1e-8)[0]
while len(inds) != 0:
corr = corrcorrect_simps(x0[inds], sig1[inds], sig2[inds]) - khat[inds]
x0[inds] -= corr / corrcorrect_vect_prime(
x0[inds], sig1[inds], sig2[inds]
)
inds2 = np.where(np.abs(corr) > 1e-8)[0]
inds = inds[inds2]
kap[nonzero_mask] = x0 * sig1 * sig2
kap[neg_inds] = np.negative(kap[neg_inds])
def van_vleck_crosses_cheby(
khat,
sig1,
sig2,
range_mask,
rho_coeff,
sv_inds_right1,
sv_inds_right2,
ds1,
ds2,
cheby_approx,
):
"""
Compute a chebyshev approximation of corrcorrect_simps.
Uses a bilinear interpolation to find chebyshev coefficients. Assumes distance
between points of interpolation grid is 0.01. If sig1 or sig2 falls outside
the interpolation grid, the corresponding values are corrected using
van_vleck_crosses_int.
For an explanation of the Van Vleck corrections used and their implementation
in this code, see the memos at
https://github.com/EoRImaging/Memos/blob/master/PDFs/007_Van_Vleck_A.pdf and
https://github.com/EoRImaging/Memos/blob/master/PDFs/008_Van_Vleck_B.pdf
Parameters
----------
khat : numpy array
Array of quantized kappa to be corrected.
sig1 : numpy array
Array of sigma inputs corresponding to antenna 1.
sig2: numpy array
Array of sigma inputs corresponding to antenna 2.
range_mask : numpy array
Array indexing sigmas within the chebyshev approximation range.
rho_coeff : numpy array
Array of chebyshev polynomial coefficients.
sv_inds_right1 : numpy array
Array of right indices for sig1 for bilinear interpolation.
sv_inds_right2 : numpy array
Array of right indices for sig2 for bilinear interpolation.
ds1 : numpy array
Distance between sig1 and right-indexed value for bilinear interpolation.
ds2 : numpy array
Distance between sig2 and right-indexed value for bilinear interpolation.
cheby_approx : bool
Flag to warn if chebyshev approximation is being used.
Returns
-------
khat : numpy array
Array of Van Vleck corrected scaled cross-correlations.
"""
kap = np.array([khat[range_mask].real, khat[range_mask].imag])
_corr_fits.van_vleck_cheby(
kap, rho_coeff, sv_inds_right1, sv_inds_right2, ds1, ds2,
)
khat[range_mask] = (kap[0, :] + 1j * kap[1, :]) * (
sig1[range_mask] * sig2[range_mask]
)
khat[~range_mask] = van_vleck_crosses_int(
khat.real[~range_mask], sig1[~range_mask], sig2[~range_mask]
) + 1j * van_vleck_crosses_int(
khat.imag[~range_mask], sig1[~range_mask], sig2[~range_mask]
)
class MWACorrFITS(UVData):
"""
UVData subclass for reading MWA correlator fits files.
This class should not be interacted with directly; instead use the
read_mwa_corr_fits method on the UVData class.
"""
def correct_cable_length(self, cable_lens, ant_1_inds, ant_2_inds):
"""
Apply a cable length correction to the data array.
Parameters
----------
cable_lens : list of strings
A list of strings containing the cable lengths for each antenna.
ant_1_inds : array
An array of indices for antenna 1
ant_2_inds : array
An array of indices for antenna 2
"""
# as of version 0.29.X cython does not handle numpy arrays of strings
# particularly efficiently. Casting to bytes, then into this demonic
# form is a workaround found here: https://stackoverflow.com/a/28777163
cable_lens = np.asarray(cable_lens).astype(np.string_)
cable_lens = cable_lens.view("uint8").reshape(
cable_lens.size, cable_lens.dtype.itemsize
)
# from MWA_Tools/CONV2UVFITS/convutils.h
cable_len_diffs = _corr_fits.get_cable_len_diffs(
ant_1_inds, ant_2_inds, cable_lens,
)
self.data_array *= np.exp(
-1j
* 2
* np.pi
* cable_len_diffs.reshape(self.Nblts, 1)
/ const.c.to("m/s").value
* self.freq_array.reshape(1, self.Nfreqs)
)[:, :, None]
history_add_string = " Applied cable length correction."
self.history += history_add_string
def flag_init(
self,
num_fine_chan,
edge_width=80e3,
start_flag=2.0,
end_flag=0.0,
flag_dc_offset=True,
):
"""
Apply routine flagging to the MWA Correlator FITS file data.
Includes options to flag the coarse channel edges, beginning and end
of obs, as well as the center fine channel of each coarse channel.
Parameters
----------
edge_width: float
The width to flag on the edge of each coarse channel, in hz. Set to
0 for no edge flagging.
start_flag: float
The number of seconds to flag at the beginning of the observation.
Set to 0 for no flagging.
end_flag: floats
The number of seconds to flag at the end of the observation. Set to
0 for no flagging.
flag_dc_offset: bool
Set to True to flag the center fine channel of each coarse channel.
Raises
------
ValueError
If edge_width is not an integer multiple of the channel_width of
the data (0 also acceptable).
If start_flag is not an integer multiple of the integration time
(0 also acceptable).
If end_flag is not an integer multiple of the integration time
(0 also acceptable).
"""
if (edge_width % self.channel_width[0]) > 0:
raise ValueError(
"The edge_width must be an integer multiple of the "
"channel_width of the data or zero."
)
if (start_flag % self.integration_time[0]) > 0:
raise ValueError(
"The start_flag must be an integer multiple of the "
"integration_time of the data or zero."
)
if (end_flag % self.integration_time[0]) > 0:
raise ValueError(
"The end_flag must be an integer multiple of the "
"integration_time of the data or zero."
)
num_ch_flag = int(edge_width / self.channel_width[0])
num_start_flag = int(start_flag / self.integration_time[0])
num_end_flag = int(end_flag / self.integration_time[0])
shape = self.flag_array.shape
reshape = [self.Ntimes, self.Nbls, self.Nfreqs, self.Npols]
self.flag_array = (
self.flag_array
if (shape == reshape)
else np.reshape(self.flag_array, reshape,)
)
bad_chan_inds = []
if num_ch_flag > 0:
for ch_count in range(num_ch_flag):
# count up from the left
left_chans = list(range(ch_count, self.Nfreqs, num_fine_chan))
# count down from the right
right_chans = list(range(self.Nfreqs - 1 - ch_count, 0, -num_fine_chan))
bad_chan_inds += left_chans + right_chans
if flag_dc_offset:
bad_chan_inds += list(range(num_fine_chan // 2, self.Nfreqs, num_fine_chan))
if len(bad_chan_inds) != 0:
self.flag_array[:, :, bad_chan_inds, :] = True
if (num_start_flag > 0) or (num_end_flag > 0):
if num_start_flag > 0:
self.flag_array[:num_start_flag] = True
if num_end_flag > 0:
self.flag_array[-num_end_flag:] = True
self.flag_array = np.reshape(self.flag_array, shape)
self.flag_array = (
self.flag_array
if (shape == reshape)
else np.reshape(self.flag_array, shape,)
)
def _read_fits_file(
self,
filename,
time_array,
file_nums,
num_fine_chans,
int_time,
mwax,
map_inds,
conj,
pol_index_array,
):
"""
Read the fits file and populate into memory.
This is an internal function and should not regularly be called except
by read_mwa_corr_fits function.
It is designed to close the fits files, headers, and all associated pointers.
Without this read in a function, reading files has a large memory footprint.
Parameters
----------
filename : str
The mwa gpubox fits file to read
time_array : array of floats
The time_array object constructed during read_mwa_corr_fits call
file_nums : array
List of included file numbers ordered by coarse channel
num_fine_chans : int
Number of fine channels in each data file
int_time : float
The integration time of each observation.
map_inds : array
Indices for reordering data_array from weird correlator packing.
conj : array
Indices for conjugating data_array from weird correlator packing.
pol_index_array : array
Indices for reordering polarizations to the 'AIPS' convention
"""
# get the file number from the file name
if mwax:
file_num = int(filename.split("_")[-2][-3:])
else:
file_num = int(filename.split("_")[-2][-2:])
# map file number to frequency index
freq_ind = np.where(file_nums == file_num)[0][0] * num_fine_chans
# get a coarse channel index for flag array
coarse_ind = np.where(file_nums == file_num)[0][0]
# create an intermediate array for data
if mwax:
coarse_chan_data = np.zeros(
(self.Ntimes, self.Nbls, num_fine_chans * self.Npols),
dtype=np.complex64,
)
else:
coarse_chan_data = np.zeros(
(self.Ntimes, num_fine_chans, self.Nbls * self.Npols),
dtype=np.complex64,
)
with fits.open(filename, mode="denywrite") as hdu_list:
# if mwax, data is in every other hdu
if mwax:
hdu_list = hdu_list[1::2]
for hdu in hdu_list:
# entry 0 is a header, so we skip it.
if hdu.data is None:
continue
time = (
hdu.header["TIME"]
+ hdu.header["MILLITIM"] / 1000.0
+ int_time / 2.0
)
time_ind = np.where(time_array == time)[0][0]
# dump data into matrix
# and take data from real to complex numbers
coarse_chan_data[time_ind, :, :] = (
hdu.data[:, 0::2] + 1j * hdu.data[:, 1::2]
)
# fill nsample and flag arrays
# think about using the mwax weights array in the future
self.nsample_array[
time_ind, :, freq_ind : freq_ind + num_fine_chans, :
] = 1.0
self.flag_array[time_ind, :, coarse_ind, :] = False
if not mwax:
# do mapping and reshaping here to avoid copying whole data_array
np.take(coarse_chan_data, map_inds, axis=2, out=coarse_chan_data)
# conjugate data
coarse_chan_data[:, :, conj] = np.conj(coarse_chan_data[:, :, conj])
# reshape
if mwax:
coarse_chan_data = coarse_chan_data.reshape(
(self.Ntimes, self.Nbls, num_fine_chans, self.Npols)
)
else:
coarse_chan_data = coarse_chan_data.reshape(
(self.Ntimes, num_fine_chans, self.Nbls, self.Npols)
)
coarse_chan_data = np.swapaxes(coarse_chan_data, 1, 2)
coarse_chan_data = coarse_chan_data.reshape(
self.Nblts, num_fine_chans, self.Npols
)
# reorder pols here to avoid memory spike from self.reorder_pols
np.take(
coarse_chan_data, pol_index_array, axis=-1, out=coarse_chan_data,
)
# make a mask where data actually is so coarse channels that
# are split into two files don't overwrite eachother
data_mask = coarse_chan_data != 0
self.data_array[:, freq_ind : freq_ind + num_fine_chans, :][
data_mask
] = coarse_chan_data[data_mask]
return
def _read_flag_file(self, filename, file_nums, num_fine_chans):
"""
Read aoflagger flag file into flag_array.
Parameters
----------
filename : str
The aoflagger fits file to read.
file_nums : array
List of included file numbers ordered by coarse channel.
num_fine_chans : int
Number of fine channels in each data file.
"""
flag_num = int(filename.split("_")[-1][0:2])
# map file number to frequency index
freq_ind = np.where(file_nums == flag_num)[0][0] * num_fine_chans
with fits.open(filename, mode="denywrite") as aoflags:
flags = aoflags[1].data.field("FLAGS")
# some flag files are longer than data; crop the ends
flags = flags[: self.Nblts, :]
# some flag files are shorter than data; assume same end time
blt_ind = self.Nblts - len(flags)
flags = flags[:, :, np.newaxis]
self.flag_array[
blt_ind:, freq_ind : freq_ind + num_fine_chans, :
] = np.logical_or(
self.flag_array[blt_ind:, freq_ind : freq_ind + num_fine_chans, :], flags,
)
def _flag_small_auto_ants(
self, nsamples, flag_small_auto_ants, ant_1_inds, ant_2_inds, flagged_ant_inds
):
"""
Find and flag autocorrelations below a threshold.
Specifically, look for autocorrelations < 0.5 * channel_width * int_time,
as these have been found by the Van Vleck correction to indicate bad data.
If flag_small_auto_ants is True, then antennas with autos below the
threshold will be flagged completely. Otherwise, antennas will be flagged
at only the times and frequencies at which their autos are below the threshold.
Parameters
----------
nsamples : int
Twice the numkber of electric field samples in an autocorrelation; equal
to 2 * channel_width * int_time. The auto divided by nsamples is equal to
the expectation value of the electric field samples squared.
flag_small_auto_ants : bool
Keyword option to flag antenna entirely or only at specific times and
frequencies.
ant_1_inds : numpy array of type int
Indices of antenna 1 corresponding the the baseline-time axis.
ant_2_inds : numpy array of type int
Indices of antenna 2 corresponding the the baseline-time axis.
flagged_ant_inds : numpy array of type int
List of indices of flagged antennas.
Returns
-------
flagged_ant_inds : numpy array of type int
Updated list of indices of flagged antennas.
"""
# calculate threshold so that average cross multiply = 0.25
threshold = 0.25 * nsamples
# look for small autos and flag
auto_inds = self.ant_1_array == self.ant_2_array
autos = self.data_array.real[auto_inds, :, 0:2]
autos = autos.reshape(self.Ntimes, self.Nants_data, self.Nfreqs, 2)
# find autos below threshold
small_auto_flags = np.logical_and(autos != 0, autos <= threshold,)
if flag_small_auto_ants:
# find antenna indices for small sig ants and add to flagged_ant_inds
ant_inds = np.unique(np.nonzero(small_auto_flags)[1])
ant_inds = ant_inds[~np.in1d(ant_inds, flagged_ant_inds)]
if len(ant_inds) != 0:
self.history += (
" The following antennas were flagged by the Van Vleck \
correction: "
+ str(ant_inds)
+ "."
)
flagged_ant_inds = np.concatenate((flagged_ant_inds, ant_inds))
else:
# get flags for small auto ants and add to flag array
small_auto_flags = np.logical_or(
small_auto_flags[:, :, :, 0], small_auto_flags[:, :, :, 1]
)
# broadcast autos flags to corresponding crosses
small_auto_flags = np.logical_or(
small_auto_flags[:, ant_1_inds[: self.Nbls], :],
small_auto_flags[:, ant_2_inds[: self.Nbls], :],
)
small_auto_flags = small_auto_flags.reshape(self.Nblts, self.Nfreqs)
# update flag array
self.flag_array = np.logical_or(
self.flag_array, small_auto_flags[:, :, np.newaxis]
)
return flagged_ant_inds
def _autos_to_crosses(self, auto_array, sig1_inds, sig2_inds, pol1_inds, pol2_inds):
# check auto_array shape
if auto_array.shape[1] != self.Nants_data:
print("auto_array must have second dimension of length nants_data")
return
cross_array1 = np.take(auto_array, pol1_inds, axis=-1)
cross_array2 = np.take(auto_array, pol2_inds, axis=-1)
cross_array1 = np.take(cross_array1, sig1_inds, axis=1)
cross_array2 = np.take(cross_array2, sig2_inds, axis=1)
return cross_array1, cross_array2
def _vv1_crosses(
self,
sighats,
xy_auto_data,
cross_data,
cheby_approx,
vv1_sig_vec,
vv1_rho_coeff,
num_fine_chans,
sig1_inds,
sig2_inds,
pol1_inds,
pol2_inds,
):
# get shape of sighats with blts axis split to times, nants_data
auto_array_shape = (
self.Ntimes,
self.Nants_data,
num_fine_chans,
2,
)
if cheby_approx:
# so basically have three arrays: sighats, ds, sv_inds_right
# get mask for sigmas within interpolation range
in_mask = np.logical_and(sighats > 0.9, sighats <= 4.5)
# get indices and distances for bilinear interpolation
sv_inds_right = np.zeros(sighats.shape, dtype=np.int64)
ds = np.zeros(sighats.shape)
sv_inds_right[in_mask] = np.searchsorted(vv1_sig_vec, sighats[in_mask])
ds[in_mask] = vv1_sig_vec[sv_inds_right[in_mask]] - sighats[in_mask]
# correct xy autos
range_mask = np.logical_and(in_mask[:, :, 0], in_mask[:, :, 1])
kap = np.array(
[xy_auto_data.real[range_mask], xy_auto_data.imag[range_mask]]
)
sig1 = sighats[:, :, 0][range_mask]
sig2 = sighats[:, :, 1][range_mask]
ds1 = ds[:, :, 0][range_mask]
ds2 = ds[:, :, 1][range_mask]
sv_inds_right1 = sv_inds_right[:, :, 0][range_mask]
sv_inds_right2 = sv_inds_right[:, :, 1][range_mask]
# correct in range data with the cheby approximation
_corr_fits.van_vleck_cheby(
kap, vv1_rho_coeff, sv_inds_right1, sv_inds_right2, ds1, ds2,
)
kap *= sig1
kap *= sig2
xy_auto_data[range_mask] = 1j * kap[1, :]
xy_auto_data[range_mask] += kap[0, :]
# correct out of range data with the integral
if len(np.nonzero(~range_mask)[0]) != 0:
sig1 = sighats[:, :, 0][~range_mask]
sig2 = sighats[:, :, 1][~range_mask]
kap = xy_auto_data[~range_mask]
van_vleck_crosses_int(kap, sig1, sig2, cheby_approx)
xy_auto_data[~range_mask] = kap
# broadcast in_mask
in_mask = in_mask.reshape(auto_array_shape)
in_mask1, in_mask2 = self._autos_to_crosses(
in_mask, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
# get a flag array where both sig1 and sig2 are within cheby range
range_mask = np.logical_and(in_mask1, in_mask2)
# broadcast all the arrays to match autos with corresponding crosses
sv_inds_right = sv_inds_right.reshape(auto_array_shape)
sv_inds_right1, sv_inds_right2 = self._autos_to_crosses(
sv_inds_right, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
ds = ds.reshape(auto_array_shape)
ds1, ds2 = self._autos_to_crosses(
ds, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
sighats_shape = sighats.shape
sighats = sighats.reshape(auto_array_shape)
sig1, sig2 = self._autos_to_crosses(
sighats, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
# split complex numbers into 2d array for cython
kap = np.array([cross_data.real[range_mask], cross_data.imag[range_mask]])
_corr_fits.van_vleck_cheby(
kap,
vv1_rho_coeff,
sv_inds_right1[range_mask],
sv_inds_right2[range_mask],
ds1[range_mask],
ds2[range_mask],
)
kap *= sig1[range_mask]
kap *= sig2[range_mask]
cross_data[range_mask] = 1j * kap[1, :]
cross_data[range_mask] += kap[0, :]
# correct out of range values with integral
if len(np.nonzero(~range_mask)[0]) != 0:
kap = cross_data[~range_mask]
van_vleck_crosses_int(
kap, sig1[~range_mask], sig2[~range_mask], cheby_approx,
)
cross_data[~range_mask] = kap
else:
# do integral correction
# reshape data
sighats_shape = sighats.shape
sighats = sighats.reshape(auto_array_shape)
xy_auto_shape = xy_auto_data.shape
xy_auto_data = xy_auto_data.reshape(
self.Ntimes, self.Nants_data, num_fine_chans
)
# correct xy autos
for k in range(self.Nants_data):
for j in range(num_fine_chans):
# get xy auto data
sig1 = sighats[:, k, j, 0]
sig2 = sighats[:, k, j, 1]
khat = xy_auto_data[:, k, j]
van_vleck_crosses_int(
xy_auto_data[:, k, j], sig1, sig2, cheby_approx
)
# correct crosses
sighats1, sighats2 = self._autos_to_crosses(
sighats, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
for k in range(cross_data.shape[1]):
for j in range(num_fine_chans):
# get data
sig1 = sighats1[:, k, j, :].flatten()
sig2 = sighats2[:, k, j, :].flatten()
khat = cross_data[:, k, j, :].flatten()
van_vleck_crosses_int(khat, sig1, sig2, cheby_approx)
cross_data[:, k, j, :] = khat.reshape(self.Ntimes, self.Npols)
xy_auto_data = xy_auto_data.reshape(xy_auto_shape)
sighats = sighats.reshape(sighats_shape)
def _vv2_dc_chans(
self,
muhats,
xy_auto_data,
cross_data,
dc_ind,
avg_factor,
sig1_inds,
sig2_inds,
pol1_inds,
pol2_inds,
):
# don't subtract dc spike from missing data
nonzero_dc = xy_auto_data.real[:, dc_ind] > 0
# correct xy auto dc chans
xy_auto_data[:, dc_ind][nonzero_dc].real -= (
muhats[:, 0][nonzero_dc] * muhats[:, 1][nonzero_dc] / avg_factor
)
# expand muhats
muhats = muhats.reshape(self.Ntimes, self.Nants_data, 2)
muhats1, muhats2 = self._autos_to_crosses(
muhats, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
# subtract crosses dc chans
nonzero_dc = cross_data[:, :, dc_ind, :].real > 0
cross_data[:, :, dc_ind, :][nonzero_dc].real -= (
muhats1[nonzero_dc] * muhats2[nonzero_dc] / avg_factor
)
def _get_pfb_shape(self, avg_factor):
"""
Get pfb shape from file and apply appropriate averaging.
Parameters
----------
avg_factor : int
Factor by which frequency channels have been averaged.
Returns
-------
cb_array : numpy array of type float
Array corresponding to pfb shape for a coarse band.
"""
with h5py.File(
DATA_PATH + "/mwa_config_data/MWA_rev_cb_10khz_doubles.h5", "r"
) as f:
cb = f["coarse_band"][:]
cb_array = cb.reshape(int(128 / avg_factor), int(avg_factor))
cb_array = np.average(cb_array, axis=1)
return cb_array
def _vv2_crosses(self, data, sig_c_1, sig_c_2, vv2_cross_coeff):
range_mask = np.logical_and(np.abs(data) < 0.35, data != 0)
if len(np.nonzero(~range_mask)[0]) != 0:
print(np.max(data[~range_mask]))
warnings.warn(
str(len(np.nonzero(~range_mask)[0]))
+ " xy real values are above vv2 correction range"
)
# for now, correct data that is outside of range.
# This can be changed later
neg_mask = data < 0
data = chebval3d(np.abs(data), sig_c_1, sig_c_2, vv2_cross_coeff,)
data[neg_mask] = np.negative(data[neg_mask])
def _correct_coarse_band(
self,
cb_num,
ant_1_inds,
ant_2_inds,
cb_array,
dig_gains,
nsamples,
avg_factor,
vv2_mu_coeff,
vv2_auto_coeff,
vv2_cross_coeff,
vv1_sig_vec,
vv1_rho_coeff,
num_fine_chans,
cheby_approx,
correct_van_vleck,
remove_coarse_band,
remove_dig_gains,
):
"""
Apply pfb, digital gain, and Van Vleck corrections to a coarse band.
Parameters
----------
cb_num : int
Index of coarse band.
ant_1_inds : numpy array of type int
Indices of antenna 1 corresponding the the baseline-time axis.
ant_2_inds : numpy array of type int
Indices of antenna 2 corresponding the the baseline-time axis.
cb_array : numpy array of type float
Array corresponding to pfb shape for a coarse band.
dig_gains : numpy array of type float
Array corresponding to digital gains for each antenna and coarse band.
nsamples : int
Twice the numkber of electric field samples in an autocorrelation; equal
to 2 * channel_width * int_time. The auto divided by nsamples is equal to
the expectation value of the electric field sample squared.
num_fine_chans : int
Number of fine channels in each data file.
correct_van_vleck : bool
Option to apply Van Vleck correction to data.
remove_coarse_band : bool
Option to remove pfb coarse band shape from data.
remove_dig_gains : bool
Option to remove digital gains from data.
"""
# get coarse band data as np.complex128
cb_data = self.data_array[
:, cb_num * num_fine_chans : (cb_num + 1) * num_fine_chans, :
].astype(np.complex128)
# correct van vleck
if correct_van_vleck is not False:
# make sure pols are ordered [-5, -6, -7, -8]
# TODO: ask bryna about pols
# xx = np.where(self.polarization_array == -5)[0][0]
# yy = np.where(self.polarization_array == -6)[0][0]
# xy = np.where(self.polarization_array == -7)[0][0]
# yx = np.where(self.polarization_array == -8)[0][0]
# pols = np.array([yy, xx])
# scale data
cb_data /= nsamples
auto_mask = ant_1_inds == ant_2_inds
cross_mask = ant_1_inds != ant_2_inds
# get data for correcting
sighats = np.sqrt(cb_data.real[auto_mask, :, 0:2])
cross_data = cb_data[cross_mask, :, :]
xy_auto_data = cb_data[auto_mask, :, 2]
# 4-bit correction for autos
if correct_van_vleck is True or correct_van_vleck == "four_bit":
# autos that are too small will not converge
range_mask = sighats > 0.5
if len(sighats[range_mask]) != 0:
sighats[range_mask] = van_vleck_autos(sighats[range_mask])
# 8-bit correction for autos
if correct_van_vleck is True or correct_van_vleck == "eight_bit":
# calculate coarse band 'sigmas'
sigma_c = np.sqrt(np.mean(np.square(sighats), axis=1))
# broadcast coarse sigma to full band
sigma_c_full = np.repeat(
sigma_c[:, np.newaxis, :], num_fine_chans, axis=1
)
# calculate nonzero mean introduced by asymettric rounding
# this calculation assumes the fine pfb fft has 128 points
muhats = polyval(sigma_c, vv2_mu_coeff)
muhats = muhats.reshape(sigma_c.shape)
# muhats has shape (Nants * Ntimes, Ncoarse_bands, 2)
# square dc channel, subtract mu^2, take square root again
# get index for dc channel
dc_ind = int(num_fine_chans / 2)
# autos that are too small will go negative when subtracting mu^2
dc_mask = sighats[:, dc_ind, :] > 0.1
sighats[:, dc_ind, :][dc_mask] = np.sqrt(
np.square(sighats[:, dc_ind, :][dc_mask])
- (np.square(muhats[dc_mask]) / avg_factor)
)
# apply second stage correction
# get autos within range
range_mask = np.logical_and(sighats > 0.5, sighats < 4.5)
if len(np.nonzero(sighats > 4.5)[0]) != 0:
print(np.max(sighats))
print(
str(len(np.nonzero(sighats > 4.5)[0]))
+ " autos are above the vv2 range"
)
sighats[range_mask] = polyval2d(
sighats[range_mask], sigma_c_full[range_mask], vv2_auto_coeff,
)
# correct crosses
# get indices for broadcasting pols
pol1_inds = np.array([0, 1, 0, 1])
pol2_inds = np.array([0, 1, 1, 0])
# get indices for sigmas corresponding to crosses
cross_inds = np.where(
ant_1_inds[0 : self.Nbls] != ant_2_inds[0 : self.Nbls]
)[0]
sig1_inds = ant_1_inds[cross_inds]
sig2_inds = ant_2_inds[cross_inds]
# reshape cross data
cross_data_shape = cross_data.shape
cross_data = cross_data.reshape(
self.Ntimes, len(sig1_inds), self.Nfreqs, self.Npols
)
# 4-bit correction for crosses
if correct_van_vleck is True or correct_van_vleck == "four_bit":
self._vv1_crosses(
sighats,
xy_auto_data,
cross_data,
cheby_approx,
vv1_sig_vec,
vv1_rho_coeff,
num_fine_chans,
sig1_inds,
sig2_inds,
pol1_inds,
pol2_inds,
)
# 8-bit correction for crosses
if correct_van_vleck is True or correct_van_vleck == "eight_bit":
# correct dc channels
self._vv2_dc_chans(
muhats,
xy_auto_data,
cross_data,
dc_ind,
avg_factor,
sig1_inds,
sig2_inds,
pol1_inds,
pol2_inds,
)
# correct real xy autos
self._vv2_crosses(
xy_auto_data.real,
sigma_c_full[:, :, 0],
sigma_c_full[:, :, 1],
vv2_cross_coeff,
)
# correct imaginary xy autos
self._vv2_crosses(
xy_auto_data.imag,
sigma_c_full[:, :, 0],
sigma_c_full[:, :, 1],
vv2_cross_coeff,
)
# correct cross data
# extend sigma_c
sigma_c_full = sigma_c_full.reshape(
self.Ntimes, self.Nants_data, num_fine_chans, 2
)
sigma_c1, sigma_c2 = self._autos_to_crosses(
sigma_c_full, sig1_inds, sig2_inds, pol1_inds, pol2_inds
)
# correct real values
self._vv2_crosses(cross_data.real, sigma_c1, sigma_c2, vv2_cross_coeff)
# correct imag values
self._vv2_crosses(cross_data.imag, sigma_c1, sigma_c2, vv2_cross_coeff)
# put corrected data back into coarse band array
cb_data.real[auto_mask, :, 0:2] = np.square(sighats)
cb_data[cross_mask, :, :] = cross_data.reshape(cross_data_shape)
cb_data[auto_mask, :, 2] = xy_auto_data
cb_data[auto_mask, :, 3] = np.conj(xy_auto_data)
# rescale data
cb_data *= nsamples
# remove digital gains
if remove_dig_gains:
dig_gains1 = dig_gains[ant_1_inds, cb_num, np.newaxis, np.newaxis]
dig_gains2 = dig_gains[ant_2_inds, cb_num, np.newaxis, np.newaxis]
cb_data /= dig_gains1
cb_data /= dig_gains2
# remove coarse band
if remove_coarse_band:
cb_data /= cb_array[:num_fine_chans, np.newaxis]
# put corrected data back into data array
self.data_array[
:, cb_num * num_fine_chans : (cb_num + 1) * num_fine_chans, :
] = cb_data
def _apply_corrections(
self,
ant_1_inds,
ant_2_inds,
avg_factor,
dig_gains,
spw_inds,
num_fine_chans,
cheby_approx,
correct_van_vleck,
remove_coarse_band,
remove_dig_gains,
):
"""
Prepare and apply pfb, digital gain, and Van Vleck corrections.
Parameters
----------
ant_1_inds : numpy array of type int
Indices of antenna 1 corresponding the the baseline-time axis.
ant_2_inds : numpy array of type int
Indices of antenna 2 corresponding the the baseline-time axis.
avg_factor : int
Factor by which frequency channels have been averaged.
dig_gains : array
Array of digital gains with shape (Nants, Ncoarse_chans).
spw_inds : array of type int
Array of coarse band numbers.
num_fine_chans : int
Number of fine channels in each data file.
flagged_ant_inds : numpy array of type int
List of indices of flagged antennas.
cheby_approx : bool
Option to use chebyshev approximation for Van Vleck correction.
data_array_dtype : numpy dtype
Datatype to store the output data_array as.
flag_small_auto_ants : bool
Option to completely flag antennas found by _flag_small_auto_ants.
correct_van_vleck : bool
Option to apply Van Vleck correction to data.
remove_coarse_band : bool
Option to remove pfb coarse band shape from data.
remove_dig_gains : bool
Option to remove digital gains from data.
"""
# van vleck correction
if correct_van_vleck is not False:
# calculate number of samples going into real or imaginary part
# factor of two comes from variables being circularly-symmetric
nsamples = self.channel_width[0] * self.integration_time[0] * 2
else:
nsamples = None
if correct_van_vleck is True or correct_van_vleck == "four_bit":
self.history += " Applied four bit Van Vleck correction."
if cheby_approx:
self.history += " Used Van Vleck Chebychev approximation."
# load in interpolation files for four bit cheby correction
with h5py.File(
DATA_PATH + "/mwa_config_data/Chebychev_coeff.h5", "r"
) as f:
vv1_rho_coeff = f["rho_data"][:]
with h5py.File(DATA_PATH + "/mwa_config_data/sigma1.h5", "r") as f:
vv1_sig_vec = f["sig_data"][:]
else:
vv1_rho_coeff = None
vv1_sig_vec = None
else:
vv1_rho_coeff = None
vv1_sig_vec = None
if correct_van_vleck is True or correct_van_vleck == "eight_bit":
self.history += " Applied eight bit Van Vleck correction."
# get eight bit correction coefficients
with h5py.File(DATA_PATH + "/mwa_config_data/vv2_mu_coeffs.h5", "r") as f:
vv2_mu_coeff = f["mu_coeffs"][:]
with h5py.File(DATA_PATH + "/mwa_config_data/vv2_auto_coeffs.h5", "r") as f:
vv2_auto_coeff = f["coeffs"][:]
with h5py.File(
DATA_PATH + "/mwa_config_data/vv2_cross_coeffs.h5", "r"
) as f:
vv2_cross_coeff = f["coeffs"][:]
else:
vv2_mu_coeff = None
vv2_auto_coeff = None
vv2_cross_coeff = None
# get digital gains
if remove_dig_gains:
self.history += " Divided out digital gains."
# get gains for included coarse channels
# During commissioning a shift in the bit selection in the digital
# receiver was implemented which changed the data scaling by
# a factor of 64. To be compatible with the earlier scaling scheme,
# the digital gains are divided by a factor of 64 here.
# For a more detailed explanation, see PR #908.
dig_gains = dig_gains[:, spw_inds] / 64
else:
dig_gains = None
# get pfb response shape
if remove_coarse_band:
self.history += " Divided out pfb coarse channel bandpass."
cb_array = self._get_pfb_shape(avg_factor)
else:
cb_array = None
# apply corrections to each coarse band
for i in range(len(spw_inds)):
self._correct_coarse_band(
i,
ant_1_inds,
ant_2_inds,
cb_array,
dig_gains,
nsamples,
avg_factor,
vv2_mu_coeff,
vv2_auto_coeff,
vv2_cross_coeff,
vv1_sig_vec,
vv1_rho_coeff,
num_fine_chans,
cheby_approx,
correct_van_vleck,
remove_coarse_band,
remove_dig_gains,
)
def read_mwa_corr_fits(
self,
filelist,
use_aoflagger_flags=None,
remove_dig_gains=True,
remove_coarse_band=None,
correct_cable_len=None,
correct_van_vleck=False,
cheby_approx=True,
flag_small_auto_ants=True,
phase_to_pointing_center=False,
propagate_coarse_flags=True,
flag_init=True,
edge_width=80e3,
start_flag="goodtime",
end_flag=0.0,
flag_dc_offset=True,
remove_flagged_ants=True,
background_lsts=True,
read_data=True,
data_array_dtype=np.complex64,
nsample_array_dtype=np.float32,
run_check=True,
check_extra=True,
run_check_acceptability=True,
strict_uvw_antpos_check=False,
check_autos=True,
fix_autos=True,
):
"""
Read in MWA correlator gpu box files.
The default settings remove some of the instrumental effects in the bandpass
by dividing out the coarse band shape (for legacy data only) and the digital
gains, and applying a cable length correction.
If the desired output is raw correlator data, set remove_dig_gains=False,
remove_coarse_band=False, correct_cable_len=False, and
phase_to_pointing_center=False.
Parameters
----------
filelist : list of str
The list of MWA correlator files to read from. Must include at
least one fits file and only one metafits file per data set.
Can also be a list of lists to read multiple data sets.
axis : str
Axis to concatenate files along. This enables fast concatenation
along the specified axis without the normal checking that all other
metadata agrees. This method does not guarantee correct resulting
objects. Please see the docstring for fast_concat for details.
Allowed values are: 'blt', 'freq', 'polarization'. Only used if
multiple files are passed.
use_aoflagger_flags : bool
Option to use aoflagger mwaf flag files. Defaults to true if aoflagger
flag files are submitted.
remove_dig_gains : bool
Option to divide out digital gains.
remove_coarse_band : bool
Option to divide out coarse band shape.
correct_cable_len : bool
Option to apply a cable delay correction.
correct_van_vleck : bool or string
Option to apply a van vleck correction. Allowed options are True,
which applies both the four-bit and eight-bit corrections; False,
which applies neither correction; 'four_bit', which applies only the
four-bit correction; or 'eight-bit', which applies only the eight-bit
correction. The 'four_bit' and 'eight_bit' options are intended for
testing rather than for general use.
cheby_approx : bool
Only used if correct_van_vleck is True or 'four_bit'. Option to implement
the four-bit van vleck correction with a chebyshev polynomial approximation.
flag_small_auto_ants : bool
Only used if correct_van_vleck is True. Option to completely flag any
antenna for which the autocorrelation falls below a threshold found by
the Van Vleck correction to indicate bad data. Specifically, the
threshold used is 0.5 * integration_time * channel_width. If set to False,
only the times and frequencies at which the auto is below the
threshold will be flagged for the antenna.
phase_to_pointing_center : bool
Option to phase to the observation pointing center.
propagate_coarse_flags : bool
Option to propagate flags for missing coarse channel integrations
across frequency.
flag_init: bool
Set to True in order to do routine flagging of coarse channel edges,
start or end integrations, or the center fine channel of each coarse
channel. See associated keywords.
edge_width: float
Only used if flag_init is True. The width to flag on the edge of
each coarse channel, in hz. Errors if not equal to integer multiple
of channel_width. Set to 0 for no edge flagging.
start_flag: float or str
Only used if flag_init is True. The number of seconds to flag at the
beginning of the observation. Set to 0 for no flagging. Default is
'goodtime', which uses information in the metafits file to determine
the length of time that should be flagged. Errors if input is not a
float or 'goodtime'. Errors if float input is not equal to an
integer multiple of the integration time.
end_flag: floats
Only used if flag_init is True. The number of seconds to flag at the
end of the observation. Set to 0 for no flagging. Errors if not
equal to an integer multiple of the integration time.
flag_dc_offset: bool
Only used if flag_init is True. Set to True to flag the center fine
channel of each coarse channel.
remove_flagged_ants : bool
Option to perform a select to remove antennas flagged in the metafits
file. If correct_van_vleck and flag_small_auto_ants are both True then
antennas flagged by the Van Vleck correction are also removed.
background_lsts : bool
When set to True, the lst_array is calculated in a background thread.
read_data : bool
Read in the visibility, nsample and flag data. If set to False, only
the metadata will be read in. Setting read_data to False results in
a metadata only object.
data_array_dtype : numpy dtype
Datatype to store the output data_array as. Must be either
np.complex64 (single-precision real and imaginary) or np.complex128
(double-precision real and imaginary).
nsample_array_dtype : numpy dtype
Datatype to store the output nsample_array as. Must be either
np.float64 (double-precision), np.float32 (single-precision), or
np.float16 (half-precision). Half-precision is only recommended for
cases where no sampling or averaging of baselines will occur,
because round-off errors can be quite large (~1e-3).
run_check : bool
Option to check for the existence and proper shapes of parameters
after after reading in the file (the default is True,
meaning the check will be run).
check_extra : bool
Option to check optional parameters as well as required ones (the
default is True, meaning the optional parameters will be checked).
run_check_acceptability : bool
Option to check acceptable range of the values of parameters after
reading in the file (the default is True, meaning the acceptable
range check will be done).
strict_uvw_antpos_check : bool
Option to raise an error rather than a warning if the check that
uvws match antenna positions does not pass.
check_autos : bool
Check whether any auto-correlations have non-zero imaginary values in
data_array (which should not mathematically exist). Default is True.
fix_autos : bool
If auto-correlations with imaginary values are found, fix those values so
that they are real-only in data_array. Default is True.
Raises
------
ValueError
If required files are missing or multiple files metafits files are
included in filelist.
If files from different observations are included in filelist.
If files in fileslist have different fine channel widths
If file types other than fits, metafits, and mwaf files are included
in filelist.
"""
metafits_file = None
ppds_file = None
obs_id = None
file_dict = {}
start_time = 0.0
end_time = 0.0
included_file_nums = []
included_flag_nums = []
aoflagger_warning = False
num_fine_chans = 0
mwax = None
# do datatype checks
if data_array_dtype not in (np.complex64, np.complex128):
raise ValueError("data_array_dtype must be np.complex64 or np.complex128")
if nsample_array_dtype not in (np.float64, np.float32, np.float16):
raise ValueError(
"nsample_array_dtype must be one of: np.float64, np.float32, np.float16"
)
# do start_flag check
if not isinstance(start_flag, (int, float)):
if start_flag != "goodtime":
raise ValueError("start_flag must be int or float or 'goodtime'")
# do correct_van_vleck check
if not isinstance(correct_van_vleck, bool):
if correct_van_vleck not in ("four_bit", "eight_bit"):
raise ValueError(
"correct_van_vleck must be bool, 'four_bit', or 'eight_bit'"
)
# set future array shapes
self._set_future_array_shapes()
# iterate through files and organize
# create a list of included file numbers
# find the first and last times that have data
for filename in filelist:
# update filename attribute
basename = os.path.basename(filename)
self.filename = uvutils._combine_filenames(self.filename, [basename])
self._filename.form = (len(self.filename),)
if filename.lower().endswith(".metafits"):
# force only one metafits file
if metafits_file is not None:
raise ValueError("multiple metafits files in filelist")
metafits_file = filename
elif filename.lower().endswith(".fits"):
# check if ppds file
try:
fits.getheader(filename, extname="ppds")
ppds_file = filename
except Exception:
# check obsid
head0 = fits.getheader(filename, 0)
if obs_id is None:
obs_id = head0["OBSID"]
else:
if head0["OBSID"] != obs_id:
raise ValueError(
"files from different observations submitted "
"in same list"
)
# check if mwax
if mwax is None:
if "CORR_VER" in head0.keys():
mwax = True
# save mwax version #s into extra_keywords
self.extra_keywords["U2S_VER"] = head0["U2S_VER"]
self.extra_keywords["CBF_VER"] = head0["CBF_VER"]
self.extra_keywords["DB2F_VER"] = head0["DB2F_VER"]
else:
mwax = False
# check headers for first and last times containing data
headstart = fits.getheader(filename, 1)
headfin = fits.getheader(filename, -1)
first_time = headstart["TIME"] + headstart["MILLITIM"] / 1000.0
last_time = headfin["TIME"] + headfin["MILLITIM"] / 1000.0
if start_time == 0.0:
start_time = first_time
# check that files with a timing offset can be aligned
elif np.abs(start_time - first_time) % head0["INTTIME"] != 0.0:
raise ValueError(
"coarse channel start times are misaligned by an amount that is not \
an integer multiple of the integration time"
)
elif start_time > first_time:
start_time = first_time
if end_time < last_time:
end_time = last_time
# get number of fine channels in each coarse channel
if num_fine_chans == 0:
if mwax:
# number of fine channels is multiplied by 4 (pols)
# and by 2 (real and imaginary parts)
num_fine_chans = int(headstart["NAXIS1"] / 8)
else:
num_fine_chans = headstart["NAXIS2"]
else:
if mwax:
if num_fine_chans != int(headstart["NAXIS1"] / 8):
raise ValueError(
"files submitted have different numbers \
of fine channels"
)
else:
if num_fine_chans != headstart["NAXIS2"]:
raise ValueError(
"files submitted have different numbers \
of fine channels"
)
# get the file number from the file name;
# this will later be mapped to a coarse channel
if mwax:
file_num = int(filename.split("_")[-2][-3:])
else:
file_num = int(filename.split("_")[-2][-2:])
if file_num not in included_file_nums:
included_file_nums.append(file_num)
# organize files
if "data" not in file_dict.keys():
file_dict["data"] = [filename]
else:
file_dict["data"].append(filename)
# save bscale keyword
# look for bscale in the first hdu, as some data does not
# record it in the zeroth hdu
if not mwax:
if "SCALEFAC" not in self.extra_keywords.keys():
if "BSCALE" in headstart.keys():
self.extra_keywords["SCALEFAC"] = headstart["BSCALE"]
else:
# correlator did a divide by 4 before october 2014
self.extra_keywords["SCALEFAC"] = 0.25
# look for flag files
elif filename.lower().endswith(".mwaf"):
if use_aoflagger_flags is None:
use_aoflagger_flags = True
flag_num = int(filename.split("_")[-1][0:2])
included_flag_nums.append(flag_num)
if use_aoflagger_flags is False and aoflagger_warning is False:
warnings.warn("mwaf files submitted with use_aoflagger_flags=False")
aoflagger_warning = True
elif "flags" not in file_dict.keys():
file_dict["flags"] = [filename]
else:
file_dict["flags"].append(filename)
else:
raise ValueError("only fits, metafits, and mwaf files supported")
# checks:
if metafits_file is None and ppds_file is None:
raise ValueError("no metafits file submitted")
elif metafits_file is None:
metafits_file = ppds_file
elif ppds_file is not None:
ppds = fits.getheader(ppds_file, 0)
meta = fits.getheader(metafits_file, 0)
for key in ppds.keys():
if key not in meta.keys():
self.extra_keywords[key] = ppds[key]
if "data" not in file_dict.keys():
raise ValueError("no data files submitted")
if "flags" not in file_dict.keys() and use_aoflagger_flags:
raise ValueError(
"no flag files submitted. Rerun with flag files \
or use_aoflagger_flags=False"
)
# reorder file numbers
included_file_nums = sorted(included_file_nums)
included_flag_nums = sorted(included_flag_nums)
# first set parameters that are always true
self.Nspws = 1
self.spw_array = np.array([0])
self.phase_type = "drift"
self.vis_units = "uncalib"
self.Npols = 4
self.xorientation = "east"
# get information from metafits file
with fits.open(metafits_file, memmap=True) as meta:
meta_hdr = meta[0].header
# get a list of coarse channels
coarse_chans = meta_hdr["CHANNELS"].split(",")
coarse_chans = np.array(sorted(int(i) for i in coarse_chans))
# fine channel width
channel_width = float(meta_hdr.pop("FINECHAN") * 1000)
# number of fine channels in observation
obs_num_fine_chans = meta_hdr["NCHANS"]
# calculate number of fine channels per coarse channel
coarse_num_fine_chans = obs_num_fine_chans / len(coarse_chans)
# center frequency of first fine channel of center coarse channel in hertz
# For the legacy correlator, the metafits file includes the observation
# frequency center, which is the center frequency of the first fine
# channel of the center coarse channel. (If there are an even number of
# coarse channels, the center channel is to the right).
# For mwax, the center frequency of the first fine channel of a coarse
# channel is the leftmost edge of the coarse channel if the number of
# fine channels per coarse channel is even. Otherwise it is offset by
# half of the fine channel width.
if mwax:
# calculate coarse channel width in MHz
coarse_chan_width = meta_hdr["BANDWDTH"] / len(coarse_chans)
# coarse channel center freq is channel number * coarse channel width
center_coarse_chan_center = (
meta_hdr["CENTCHAN"] * coarse_chan_width * 1e6
)
# calculate center of first fine channel; this works if the number of
# fine channels is even or odd
obs_freq_center = (
center_coarse_chan_center
- int(coarse_num_fine_chans / 2) * channel_width
)
else:
obs_freq_center = meta_hdr["FREQCENT"] * 1e6
# frequency averaging factor
avg_factor = meta_hdr["NAV_FREQ"]
# integration time in seconds
int_time = meta_hdr["INTTIME"]
# pointing center in degrees
ra_deg = meta_hdr["RA"]
dec_deg = meta_hdr["DEC"]
ra_rad = np.pi * ra_deg / 180
dec_rad = np.pi * dec_deg / 180
# set start_flag with goodtime
if flag_init and start_flag == "goodtime":
# ppds file does not contain this key
try:
if meta_hdr["GOODTIME"] > start_time:
start_flag = meta_hdr["GOODTIME"] - start_time
# round start_flag up to nearest multiple of int_time
if start_flag % int_time > 0:
start_flag = (1 + int(start_flag / int_time)) * int_time
else:
start_flag = 0.0
except KeyError:
raise ValueError(
"To use start_flag='goodtime', a .metafits file must \
be submitted"
)
if "HISTORY" in meta_hdr:
self.history = str(meta_hdr["HISTORY"])
meta_hdr.remove("HISTORY", remove_all=True)
else:
self.history = ""
if not uvutils._check_history_version(
self.history, self.pyuvdata_version_str
):
self.history += self.pyuvdata_version_str
self.instrument = meta_hdr["TELESCOP"]
self.telescope_name = meta_hdr.pop("TELESCOP")
self.object_name = meta_hdr.pop("FILENAME")
self.Nants_telescope = int(meta_hdr.pop("INSTRUME")[0:3])
# get rid of keywords that uvfits.py gets rid of
bad_keys = ["SIMPLE", "EXTEND", "BITPIX", "NAXIS", "DATE-OBS"]
for key in bad_keys:
meta_hdr.remove(key, remove_all=True)
# if not mwax, remove mwax-specific keys
if not mwax:
for key in ["DELAYMOD", "DELDESC", "CABLEDEL", "GEODEL", "CALIBDEL"]:
if key in meta_hdr.keys():
meta_hdr.remove(key, remove_all=True)
# store remaining keys in extra keywords
for key in meta_hdr:
if key == "COMMENT":
self.extra_keywords[key] = str(meta_hdr.get(key))
elif key != "":
self.extra_keywords[key] = meta_hdr.get(key)
# get antenna data from metafits file table
meta_tbl = meta[1].data
# because of polarization, each antenna # is listed twice
antenna_inds = meta_tbl["Antenna"][1::2]
antenna_numbers = meta_tbl["Tile"][1::2]
antenna_names = meta_tbl["TileName"][1::2]
flagged_ant_inds = antenna_inds[meta_tbl["Flag"][1::2] == 1]
cable_lens = np.asarray(meta_tbl["Length"][1::2]).astype(np.str_)
dig_gains = meta_tbl["Gains"][1::2, :].astype(np.float64)
# get antenna postions in enu coordinates
antenna_positions = np.zeros((len(antenna_numbers), 3))
antenna_positions[:, 0] = meta_tbl["East"][1::2]
antenna_positions[:, 1] = meta_tbl["North"][1::2]
antenna_positions[:, 2] = meta_tbl["Height"][1::2]
# reorder antenna parameters from metafits ordering
reordered_inds = antenna_inds.argsort()
self.antenna_numbers = antenna_numbers[reordered_inds]
self.antenna_names = list(antenna_names[reordered_inds])
antenna_positions = antenna_positions[reordered_inds, :]
cable_lens = cable_lens[reordered_inds]
dig_gains = dig_gains[reordered_inds, :]
# set parameters from other parameters
self.Nants_data = len(self.antenna_numbers)
self.Nbls = int(
len(self.antenna_numbers) * (len(self.antenna_numbers) + 1) / 2.0
)
# get telescope parameters
self.set_telescope_params(warn=False)
# build time array of centers
time_array = np.arange(
start_time + int_time / 2.0, end_time + int_time / 2.0 + int_time, int_time
)
# convert to time to jd floats
float_time_array = Time(time_array, format="unix", scale="utc").jd.astype(float)
# build into time array
self.time_array = np.repeat(float_time_array, self.Nbls)
self.Ntimes = len(time_array)
self.Nblts = int(self.Nbls * self.Ntimes)
# convert times to lst
proc = self.set_lsts_from_time_array(background=background_lsts)
self.integration_time = np.full((self.Nblts), int_time)
# convert antenna positions from enu to ecef
# antenna positions are "relative to
# the centre of the array in local topocentric \"east\", \"north\",
# \"height\". Units are meters."
antenna_positions_ecef = uvutils.ECEF_from_ENU(
antenna_positions, *self.telescope_location_lat_lon_alt
)
# make antenna positions relative to telescope location
self.antenna_positions = antenna_positions_ecef - self.telescope_location
# make initial antenna arrays, where ant_1 <= ant_2
# itertools.combinations_with_replacement returns
# all pairs of self.antenna_numbers,
# including pairs with the same number (e.g. (0,0) auto-correlation).
# this is a little faster than having nested for-loops moving over the
# upper triangle of antenna-pair combinations matrix.
ant_1_array, ant_2_array = np.transpose(
list(itertools.combinations_with_replacement(self.antenna_numbers, 2))
)
self.ant_1_array = np.tile(np.array(ant_1_array), self.Ntimes)
self.ant_2_array = np.tile(np.array(ant_2_array), self.Ntimes)
self.baseline_array = self.antnums_to_baseline(
self.ant_1_array, self.ant_2_array
)
# make antenna index arrays
ant_1_inds, ant_2_inds = np.transpose(
list(itertools.combinations_with_replacement(np.arange(self.Nants_data), 2))
)
ant_1_inds = np.tile(np.array(ant_1_inds), self.Ntimes).astype(np.int_)
ant_2_inds = np.tile(np.array(ant_2_inds), self.Ntimes).astype(np.int_)
# create self.uvw_array
self.set_uvws_from_antenna_positions(allow_phasing=False)
if not mwax:
# coarse channel mapping for the legacy correlator:
# channels in group 0-128 are assigned to files in order;
# channels in group 129-155 are assigned in reverse order
# that is, if the lowest channel is 127, it will be assigned to the
# first file
# channel 128 will be assigned to the second file
# then the highest channel will be assigned to the third file
# and the next hightest channel assigned to the fourth file, and so on
mapped_coarse_chans = np.concatenate(
(
coarse_chans[coarse_chans <= 128],
np.flip(coarse_chans[coarse_chans > 128]),
)
)
ordered_file_nums = np.arange(len(coarse_chans))[
np.argsort(mapped_coarse_chans)
]
ordered_file_nums += 1
else:
# for mwax, the file numbers are the coarse channel numbers
ordered_file_nums = coarse_chans
file_mask = np.isin(ordered_file_nums, included_file_nums)
# get included file numbers in coarse band order
file_nums = ordered_file_nums[file_mask]
# check that coarse channels are contiguous.
spw_inds = np.nonzero(file_mask)[0]
if np.any(np.diff(spw_inds) > 1):
warnings.warn("coarse channels are not contiguous for this observation")
# add spectral windows
self._set_flex_spw()
self.Nspws = len(spw_inds)
self.spw_array = coarse_chans[spw_inds]
self.flex_spw_id_array = np.repeat(self.spw_array, num_fine_chans)
# warn user if not all coarse channels are included
if len(included_file_nums) != len(coarse_chans):
warnings.warn("some coarse channel files were not submitted")
# build frequency array
self.Nfreqs = len(included_file_nums) * num_fine_chans
self.freq_array = np.zeros(self.Nfreqs)
self.channel_width = np.full(self.Nfreqs, channel_width)
# Use the center frequency of the first fine channel of the center coarse
# channel to get the frequency range for each included coarse channel.
center_coarse_chan = int(len(coarse_chans) / 2)
for i in range(len(spw_inds)):
first_coarse_freq = (
obs_freq_center
+ (spw_inds[i] - center_coarse_chan)
* coarse_num_fine_chans
* channel_width
)
last_coarse_freq = first_coarse_freq + num_fine_chans * channel_width
self.freq_array[i * num_fine_chans : (i + 1) * num_fine_chans] = np.arange(
first_coarse_freq, last_coarse_freq, channel_width
)
# for mwax, polarizations are ordered xx, xy, yx, yy
if mwax:
self.polarization_array = np.array([-5, -7, -8, -6])
# otherwise, polarizations are ordered yy, yx, xy, xx
else:
self.polarization_array = np.array([-6, -8, -7, -5])
# get index array for AIPS reordering
pol_index_array = np.argsort(np.abs(self.polarization_array))
# reorder polarization_array here to avoid memory spike from self.reorder_pols
self.polarization_array = self.polarization_array[pol_index_array]
if read_data:
if not mwax:
# build mapper from antenna numbers and polarizations to pfb inputs
corr_ants_to_pfb_inputs = {}
for i in range(len(antenna_inds)):
for p in range(2):
corr_ants_to_pfb_inputs[(antenna_inds[i], p)] = 2 * i + p
# for mapping, start with a pair of antennas/polarizations
# this is the pair we want to find the data for
# map the pair to the corresponding coarse pfb input indices
# map the coarse pfb input indices to the fine pfb output indices
# these are the indices for the data corresponding to the initial
# antenna/pol pair
# These two 1D arrays will be both C and F contiguous
# but we are explicitly declaring C to be consistent with the rest
# of the python which interacts with the C/Cython code.
# generate a mapping index array
map_inds = np.zeros(
(self.Nbls * self.Npols), dtype=np.int32, order="C",
)
# generate a conjugation array
conj = np.full(
(self.Nbls * self.Npols), False, dtype=np.bool_, order="C",
)
_corr_fits.generate_map(corr_ants_to_pfb_inputs, map_inds, conj)
else:
map_inds = None
conj = None
# create arrays for data, nsamples, and flags
self.data_array = np.zeros(
(self.Nblts, self.Nfreqs, self.Npols), dtype=data_array_dtype,
)
self.nsample_array = np.zeros(
(self.Ntimes, self.Nbls, self.Nfreqs, self.Npols),
dtype=nsample_array_dtype,
)
self.flag_array = np.full(
(self.Ntimes, self.Nbls, len(spw_inds), self.Npols), True
)
# read data files
for filename in file_dict["data"]:
self._read_fits_file(
filename,
time_array,
file_nums,
num_fine_chans,
int_time,
mwax,
map_inds,
conj,
pol_index_array,
)
# propagate coarse flags so that if coarse band data is missing for
# a time stamp, the the entire time stamp is flagged
if propagate_coarse_flags:
self.flag_array = np.any(self.flag_array, axis=2)
self.flag_array = np.repeat(
self.flag_array[:, :, np.newaxis, :], self.Nfreqs, axis=2
)
else:
self.flag_array = np.repeat(self.flag_array, num_fine_chans, axis=2)
if flag_init:
self.flag_init(
num_fine_chans,
edge_width=edge_width,
start_flag=start_flag,
end_flag=end_flag,
flag_dc_offset=flag_dc_offset,
)
# look for bad ants indicated by small autos:
# TODO: think about if it makes sense to move this out of van_vleck
# TODO: check memory usage of this function
if correct_van_vleck is not False:
# look for bad ants indicated by small autos
nsamples = self.channel_width[0] * self.integration_time[0] * 2
flagged_ant_inds = self._flag_small_auto_ants(
nsamples,
flag_small_auto_ants,
ant_1_inds,
ant_2_inds,
flagged_ant_inds,
)
# flag bad ants
bad_ant_inds = np.logical_or(
np.isin(ant_1_inds[: self.Nbls], flagged_ant_inds),
np.isin(ant_2_inds[: self.Nbls], flagged_ant_inds),
)
self.flag_array[:, bad_ant_inds, :, :] = True
# reshape arrays
self.flag_array = self.flag_array.reshape(
(self.Nblts, self.Nfreqs, self.Npols)
)
self.nsample_array = self.nsample_array.reshape(
(self.Nblts, self.Nfreqs, self.Npols)
)
# When MWA data is cast to float for the correlator, the division
# by 127 introduces small errors that are mitigated when the data
# is cast back into integer.
# this needs to happen before the van vleck correction
if not mwax:
self.data_array /= self.extra_keywords["SCALEFAC"]
np.rint(self.data_array, out=self.data_array)
# TODO: check memory usage of this function
# apply corrections
# we don't yet have a coarse band shape for mwax
if remove_coarse_band is None:
if not mwax:
remove_coarse_band = True
else:
remove_coarse_band = False
if correct_van_vleck in ("four_bit", "eight_bit") or np.any(
[correct_van_vleck, remove_coarse_band, remove_dig_gains]
):
self._apply_corrections(
ant_1_inds,
ant_2_inds,
avg_factor,
dig_gains,
spw_inds,
num_fine_chans,
cheby_approx=cheby_approx,
correct_van_vleck=correct_van_vleck,
remove_coarse_band=remove_coarse_band,
remove_dig_gains=remove_dig_gains,
)
# rescale data
# this needs to happen after the van vleck correction
if not mwax:
self.data_array *= self.extra_keywords["SCALEFAC"]
# cable delay corrections
if correct_cable_len is None:
correct_cable_len = True
warnings.warn(
"cable length correction is now defaulted to True rather than False. \
To read in files without applying the correction set \
correct_cable_len=False. This warning will be removed in v2.4"
)
if correct_cable_len:
self.correct_cable_length(cable_lens, ant_1_inds, ant_2_inds)
# add aoflagger flags to flag_array
if use_aoflagger_flags:
# throw an error if matching files not submitted
if included_file_nums != included_flag_nums:
raise ValueError(
"flag file coarse bands do not match data file coarse bands"
)
warnings.warn(
"coarse channel, start time, and end time flagging will default \
to the more aggressive of flag_init and AOFlagger"
)
for filename in file_dict["flags"]:
self._read_flag_file(filename, file_nums, num_fine_chans)
# to account for discrepancies between file conventions, in order
# to be consistent with the uvw vector direction, all the data must
# be conjugated
np.conj(self.data_array, out=self.data_array)
# wait for LSTs if set in background
if proc is not None:
proc.join()
# remove bad antennas
# select must be called after lst thread is re-joined
# TODO: figure out what to do about self.select memory usage
if remove_flagged_ants:
good_ants = np.delete(np.array(self.antenna_numbers), flagged_ant_inds)
self.select(antenna_nums=good_ants, run_check=False)
# phasing
if phase_to_pointing_center:
self.phase(ra_rad, dec_rad)
# switch to current_array_shape
self.use_current_array_shapes()
# check if object is self-consistent
# uvws are calcuated using pyuvdata, so turn off the check for speed.
if run_check:
self.check(
check_extra=check_extra,
run_check_acceptability=run_check_acceptability,
strict_uvw_antpos_check=strict_uvw_antpos_check,
allow_flip_conj=True,
check_autos=check_autos,
fix_autos=fix_autos,
)