Revision 8b22682704c00bd278c44dae1686f726d261b718 authored by Steven Murray on 09 January 2023, 21:13:32 UTC, committed by Steven Murray on 09 January 2023, 21:13:32 UTC
1 parent e377413
corr_fits.pyx
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2020 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
# distutils: language = c
# cython: linetrace=True
# cython imports
cimport cython
cimport numpy
from libc.stdlib cimport strtod
from libc.string cimport memcpy, strncmp, strtok
from cython.parallel import parallel, prange
from libc.math cimport exp, pi, sqrt
# This initializes the numpy 1.7 c-api.
# cython 3.0 will do this by default.
# We may be able to just remove this then.
numpy.import_array()
ctypedef fused int_like:
numpy.int_t
int
cdef inline int_like pfb_mapper(int_like index):
# the polyphase filter bank maps inputs to outputs, which the MWA
# correlator then records as the antenna indices.
# the following is taken from mwa_build_lfiles/mwac_utils.c
# inputs are mapped to outputs via pfb_mapper as follows
# (from mwa_build_lfiles/antenna_mapping.h):
# floor(index/4) + index%4 * 16 = input
# for the first 64 outputs, pfb_mapper[output] = input
return index // 4 + index % 4 * 16
cpdef dict input_output_mapping():
"""Build a mapping dictionary from pfb input to output numbers."""
cdef int p, i
cdef dict pfb_inputs_to_outputs = {}
# build a mapper for all 256 inputs
for p in range(4):
for i in range(64):
pfb_inputs_to_outputs[pfb_mapper(i) + p * 64] = p * 64 + i
return pfb_inputs_to_outputs
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef void generate_map(
dict ants_to_pf,
numpy.int32_t[::1] map_inds,
numpy.npy_bool[::1] conj,
):
"""Compute the map between pfb inputs and antenna numbersself.
This function operates on input `map_inds` and `conj` arrays inplace.
Parameters
----------
map_inds : 1D numpy array of type int32
The array into which mapping indices will be populated.
conj : 1D numpy array of type np.bool_
The array into which indices of baselines to conjugate will be populated.
"""
cdef int ant1, ant2, p1, p2, pol_ind, bls_ind, out_ant1, out_ant2
cdef int out_p1, out_p2, ind1_1, ind1_2, ind2_1, ind2_2, data_index
cdef dict in_to_out = input_output_mapping()
for ant1 in range(128):
for ant2 in range(ant1, 128):
for p1 in range(2):
for p2 in range(2):
# generate the indices in self.data_array for this combination
# baselines are ordered (0,0),(0,1),...,(0,127),(1,1),.....
# polarizion of 0 (1) corresponds to y (x)
pol_ind = int(2 * p1 + p2)
bls_ind = int(128 * ant1 - ant1 * (ant1 + 1) / 2 + ant2)
# find the pfb input indices for this combination
ind1_1 = ants_to_pf[(ant1, p1)]
ind1_2 = ants_to_pf[(ant2, p2)]
# find the pfb output indices
ind2_1 = in_to_out[ind1_1]
ind2_2 = in_to_out[ind1_2]
out_ant1 = int(ind2_1 / 2)
out_ant2 = int(ind2_2 / 2)
out_p1 = ind2_1 % 2
out_p2 = ind2_2 % 2
# the correlator has ind2_2 <= ind2_1 except for
# redundant data. The redundant data is not perfectly
# redundant; sometimes the values of redundant data
# are off by one in the imaginary part.
# For consistency, we are ignoring the redundant values
# that have ind2_2 > ind2_1
if ind2_2 > ind2_1:
# get the index for the data
data_index = int(
2 * out_ant2 * (out_ant2 + 1)
+ 4 * out_ant1
+ 2 * out_p2
+ out_p1
)
# need to take the complex conjugate of the data
map_inds[bls_ind * 4 + pol_ind] = data_index
conj[bls_ind * 4 + pol_ind] = True
else:
data_index = int(
2 * out_ant1 * (out_ant1 + 1)
+ 4 * out_ant2
+ 2 * out_p1
+ out_p2
)
map_inds[bls_ind * 4 + pol_ind] = data_index
return
@cython.boundscheck(False)
@cython.wraparound(False)
cdef inline void _make_length_array(
const int max_length,
char[:, ::1] cable_lens,
numpy.float64_t[::1] cable_array
):
cdef char * token
cdef char clen[30]
# "the velocity factor of electic fields in RG-6 like coax"
# from MWA_Tools/CONV2UVFITS/convutils.h
cdef float v_factor = 1.204
cdef int n_cables = cable_lens.shape[0]
# check if the cable length already has the velocity factor applied
for i in range(n_cables):
# copy the location in memory to our character array
memcpy(clen, &cable_lens[i, 0], max_length)
# attempt to split on the character "_"
token = strtok(clen, b"_")
if strncmp(token, b"EL", 2) == 0:
# has already had the velocity factor applied
# grab the next bit of the string after EL
token = strtok(NULL, b"_")
cable_array[i] = strtod(token, NULL)
else:
cable_array[i] = strtod(token, NULL) * v_factor
return
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef numpy.ndarray[ndim=1, dtype=numpy.float64_t] get_cable_len_diffs(
numpy.int_t[::1] ant1_array,
numpy.int_t[::1] ant2_array,
char[:, ::1] cable_lens,
):
"""Computer the difference in cable lengths for each baseline.
The inputs are one dimensional and will be both C and F contiguous but
we want to declare the memory layout in a consistent way with the rest of the code.
Parameters
----------
ant1_array : numpy array of type int_t
Array of antenna 1 numbers for each baseline.
ant2_array : numpy array of type int_t
Array of antenna 2 numbers for each baseline.
cable_lens : numpy array
Array of strings of the length of the cable for each antenna.
However it is cast to uint8 and reshaped as (cable_lens.size, cable_lens.dytype.itemsize)
see more about this approach here: https://stackoverflow.com/a/28777163
Returns
-------
cable_diffs : numpy array of type float64
Array of length Nblts with the difference of cable lengths for each baseline.
"""
cdef Py_ssize_t i
cdef int Nblts = ant1_array.shape[0]
cdef int n_cables = cable_lens.shape[0]
cdef int max_length = cable_lens.shape[1]
cdef numpy.npy_intp * dims_cables = [n_cables]
cdef numpy.npy_intp * dims_diffs = [Nblts]
cdef numpy.float64_t[::1] cable_array = numpy.PyArray_ZEROS(1, dims_cables, numpy.NPY_FLOAT64, 0)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] cable_diffs = numpy.PyArray_ZEROS(1, dims_diffs, numpy.NPY_FLOAT64, 0)
cdef numpy.float64_t[::1] _cable_diffs = cable_diffs
# fill out array of cable lengths
_make_length_array(max_length, cable_lens, cable_array)
for i in range(Nblts):
_cable_diffs[i] = cable_array[ant2_array[i]] - cable_array[ant1_array[i]]
return cable_diffs
@cython.wraparound(False)
@cython.boundscheck(False)
@cython.cdivision(True)
cdef numpy.ndarray[ndim=2, dtype=numpy.float64_t] _compute_khat(
numpy.float64_t[:, ::1] x,
numpy.float64_t[::1] sig1,
numpy.float64_t[::1] sig2,
):
cdef int ndim = 2
cdef numpy.npy_intp * dims = [x.shape[0], x.shape[1]]
cdef numpy.ndarray[ndim=2, dtype=numpy.float64_t] khat = numpy.PyArray_ZEROS(ndim, dims, numpy.NPY_FLOAT64, 0)
cdef numpy.float64_t[:, ::1] _khat = khat
ind1 = numpy.PyArray_Reshape(
numpy.PyArray_Arange(0.5, 7.5, 1, numpy.NPY_FLOAT64),
(7, 1),
)
cdef numpy.float64_t[:, ::1] j_ind = ind1 / sig1
cdef numpy.float64_t[:, ::1] k_ind = ind1 / sig2
cdef Py_ssize_t i, j, k, l
if x.size > 800: # pragma: nocover
with nogil, parallel():
for j in prange(x.shape[1]):
for i in range(x.shape[0]):
for l in range(k_ind.shape[0]):
for k in range(j_ind.shape[0]):
_khat[i, j] += (
1./ (pi * sqrt( 1 - x[i, j] ** 2)) * (
exp(-1. / (2 * (1 - x[i,j] ** 2)) * (j_ind[k, j] ** 2 + k_ind[l, j] ** 2 - 2 * x[i, j] * j_ind[k, j] * k_ind[l,j]))
+ exp(-1. / (2 * (1 - x[i,j] ** 2)) * (j_ind[k, j] ** 2 + k_ind[l, j] ** 2 + 2 * x[i, j] * j_ind[k, j] * k_ind[l,j]))
)
)
else:
for i in range(x.shape[0]):
for j in range(x.shape[1]):
for l in range(k_ind.shape[0]):
for k in range(j_ind.shape[0]):
_khat[i, j] += (
1./ (pi * sqrt( 1 - x[i, j] ** 2)) * (
exp(-1. / (2 * (1 - x[i,j] ** 2)) * (j_ind[k, j] ** 2 + k_ind[l, j] ** 2 - 2 * x[i, j] * j_ind[k, j] * k_ind[l,j]))
+ exp(-1. / (2 * (1 - x[i,j] ** 2)) * (j_ind[k, j] ** 2 + k_ind[l, j] ** 2 + 2 * x[i, j] * j_ind[k, j] * k_ind[l,j]))
)
)
return khat
cpdef numpy.ndarray[dtype=numpy.float64_t] get_khat(rho, sig1, sig2):
"""Compute generalized k-hat matrix for van vleck correction.
Generalized for 1 or two dimenional rho inputs.
Parameters
----------
rho : numpy array
Array of rho inputs.
sig1 : array_like
Array of sigma inputs corresponding to antenna 1.
sig2: array_like
Array of sigma inputs corresponding to antenna 2.
"""
# NPY_ARRAY_OUT_ARRAY is C-contiguous, writeable, aligned versions of the inputs
rho = numpy.PyArray_FROMANY(rho, numpy.NPY_FLOAT64, 0, 2, numpy.NPY_ARRAY_OUT_ARRAY)
sig1 = numpy.PyArray_FROMANY(sig1, numpy.NPY_FLOAT64, 1, 1, numpy.NPY_ARRAY_OUT_ARRAY)
sig2 = numpy.PyArray_FROMANY(sig2, numpy.NPY_FLOAT64, 1, 1, numpy.NPY_ARRAY_OUT_ARRAY)
cdef int ndim = numpy.PyArray_NDIM(rho)
cdef bint squeeze = False
if ndim == 1:
squeeze = True
rho = numpy.PyArray_Reshape(rho, (1, -1))
cdef numpy.ndarray[ndim=2, dtype=numpy.float64_t] khat = _compute_khat(rho, sig1, sig2)
if squeeze:
return numpy.PyArray_Squeeze(khat)
return khat
@cython.wraparound(False)
@cython.boundscheck(False)
# this function could be reworked to return a C array
cdef numpy.ndarray[ndim=2, dtype=numpy.float64_t[:, ::1]] _get_cheby_coeff(
numpy.float64_t[:, :, ::1] rho_coeff,
numpy.int64_t[::1] sv_inds_right1,
numpy.int64_t[::1] sv_inds_right2,
numpy.float64_t[::1] ds1,
numpy.float64_t[::1] ds2
):
"""
Perform a bilinear interpolation to get Chebyshev coefficients.
Explicitly assumes the grid spacing is 0.01.
Parameters
----------
rho_coeff : numpy array of type float64_t
Array of Chebyeshev coefficients to interpolate over.
sv_inds_right1 : numpy array of type int64_t
Array of right indices nearest to sigmas for antenna 1.
sv_inds_right2 : numpy array of type int64_t
Array of right indices nearest to sigmas for antenna 2.
ds1 : numpy array of type float64_t
Array of differences between sigmas for antenna 1 and nearest sigmas at
sv_inds_right_1.
ds1 : numpy array of type float64_t
Array of differences between sigmas for antenna 2 and nearest sigmas at
sv_inds_right_2.
Returns
-------
t : numpy array of type float64_t
Array of coefficients for the first three odd Chebyshev polynomials for each
pair of sigmas.
"""
cdef int i
cdef int j
cdef int n = ds1.shape[0]
cdef int ndim = 2
cdef numpy.npy_intp * dims = [n, 3]
cdef numpy.ndarray[ndim=2, dtype=numpy.float64_t] t = numpy.PyArray_ZEROS(ndim, dims, numpy.NPY_FLOAT64, 0)
for i in prange(n, nogil=True):
for j in range(3):
t[i, j] = 1e4 * (
rho_coeff[(sv_inds_right1[i] - 1), (sv_inds_right2[i] - 1), j] * ds1[i] * ds2[i]
+ rho_coeff[(sv_inds_right1[i] - 1), sv_inds_right2[i], j] * ds1[i] * (0.01 - ds2[i])
+ rho_coeff[sv_inds_right1[i], (sv_inds_right2[i] - 1), j] * (0.01 - ds1[i]) * ds2[i]
+ rho_coeff[sv_inds_right1[i], sv_inds_right2[i], j] * (0.01 - ds1[i]) * (0.01 - ds2[i])
)
return t
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef void van_vleck_cheby(
numpy.float64_t[:, ::1] kap,
numpy.float64_t[:, :, ::1] rho_coeff,
numpy.int64_t[::1] sv_inds_right1,
numpy.int64_t[::1] sv_inds_right2,
numpy.float64_t[::1] ds1,
numpy.float64_t[::1] ds2
):
"""
Compute Van Vleck corrected cross-correlations using Chebyshev polynomials.
This function operates on input `kap` array inplace.
Parameters
----------
kap : numpy array of type float64_t
Array of values to correct.
rho_coeff : numpy array of type float64_t
Array of Chebyeshev coefficients to interpolate over.
sv_inds_right1 : numpy array of type int64_t
Array of right indices nearest to sigmas for antenna 1.
sv_inds_right2 : numpy array of type int64_t
Array of right indices nearest to sigmas for antenna 2.
ds1 : numpy array of type float64_t
Array of differences between sigmas for antenna 1 and nearest sigmas at
sv_inds_right_1.
ds1 : numpy array of type float64_t
Array of differences between sigmas for antenna 2 and nearest sigmas at
sv_inds_right_2.
"""
cdef numpy.float64_t[:, ::1] t = _get_cheby_coeff(rho_coeff, sv_inds_right1, sv_inds_right2, ds1, ds2)
cdef int n = kap.shape[1]
cdef int i
cdef int j
for i in prange(n, nogil=True):
kap[0, i] = (
kap[0, i] * (t[i, 0] - 3 * t[i, 1] + 5 * t[i, 2])
+ kap[0, i] ** 3 * (4 * t[i, 1] - 20 * t[i, 2])
+ kap[0, i] ** 5 * (16 * t[i, 2])
)
kap[1, i] = (
kap[1, i] * (t[i, 0] - 3 * t[i, 1] + 5 * t[i, 2])
+ kap[1, i] ** 3 * (4 * t[i, 1] - 20 * t[i, 2])
+ kap[1, i] ** 5 * (16 * t[i, 2])
)
return
Computing file changes ...