swh:1:snp:03e8cdabc80bf6f4ef54d117858809e20088601c
Tip revision: 07089bf1da861d8bcee0596d1da1cdec77798950 authored by Bryna Hazelton on 08 July 2020, 16:03:43 UTC
Update the changelog for the new version
Update the changelog for the new version
Tip revision: 07089bf
utils.pyx
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2020 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
# distutils: language = c
# cython: linetrace=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1
# python imports
import numpy as np
import warnings
# cython imports
cimport numpy
cimport cython
from libc.math cimport sin, cos, sqrt, atan2
# in order to not have circular dependencies
# define transformation parameters here
# parameters for transforming between xyz & lat/lon/alt
gps_b = 6356752.31424518
gps_a = 6378137
e_squared = 6.69437999014e-3
e_prime_squared = 6.73949674228e-3
# make c-viewed versions of these variables
cdef numpy.float64_t _gps_a = gps_a
cdef numpy.float64_t _gps_b = gps_b
cdef numpy.float64_t _e2 = e_squared
cdef numpy.float64_t _ep2 = e_prime_squared
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef tuple baseline_to_antnums(numpy.ndarray[ndim=1, dtype=numpy.int64_t] baseline):
cdef unsigned long n = baseline.size
cdef numpy.ndarray[ndim=1, dtype=numpy.int64_t] ant1 = np.empty(n, dtype=np.int64)
cdef numpy.ndarray[ndim=1, dtype=numpy.int64_t] ant2 = np.empty(n, dtype=np.int64)
cdef long _min = baseline.min()
cdef int i
# make views as c-contiguous arrays of a known dtype
# effectivly turns the numpy array into a c-array
cdef numpy.int64_t[::1] _a1 = ant1
cdef numpy.int64_t[::1] _a2 = ant2
cdef numpy.int64_t[::1] _bl = baseline
with nogil:
for i in range(n):
if _min > 2 ** 16:
_a2[i] = (_bl[i] - 2 ** 16) % 2048 - 1
_a1[i] = (_bl[i] - 2 ** 16 - (_a2[i] + 1)) // 2048 - 1
else:
_a2[i] = (_bl[i]) % 256 - 1
_a1[i] = (_bl[i] - (_a2[i] + 1)) // 256 - 1
return ant1, ant2
@cython.boundscheck(False)
@cython.wraparound(False)
cdef numpy.ndarray[dtype=numpy.int64_t] _antnum_to_bl_2048(
numpy.ndarray[ndim=1, dtype=numpy.int64_t] ant1,
numpy.ndarray[ndim=1, dtype=numpy.int64_t] ant2,
):
cdef unsigned long n = ant1.size
cdef unsigned int i
cdef numpy.ndarray[ndim=1, dtype=numpy.int64_t] baselines = np.empty(n, dtype=np.int64)
# make views as c-contiguous arrays of a known dtype
# effectivly turns the numpy array into a c-array
cdef numpy.int64_t[::1] _bl = baselines
cdef numpy.int64_t[::1] _a1 = ant1
cdef numpy.int64_t[::1] _a2 = ant2
with nogil:
for i in range(n):
_bl[i] = 2048 * (_a1[i] + 1) + (_a2[i] + 1) + 2 ** 16
return baselines
@cython.boundscheck(False)
@cython.wraparound(False)
cdef numpy.ndarray[dtype=numpy.int64_t] _antnum_to_bl_256(
numpy.ndarray[ndim=1, dtype=numpy.int64_t] ant1,
numpy.ndarray[ndim=1, dtype=numpy.int64_t] ant2,
):
cdef unsigned long n = ant1.size
cdef unsigned int i
cdef numpy.ndarray[dtype=numpy.int64_t, ndim=1] baselines = np.empty(n, dtype=np.int64)
# make views as c-contiguous arrays of a known dtype
# effectivly turns the numpy array into a c-array
cdef numpy.int64_t[::1] _bl = baselines
cdef numpy.int64_t[::1] _a1 = ant1
cdef numpy.int64_t[::1] _a2 = ant2
with nogil:
for i in range(n):
_bl[i] = 256 * (_a1[i] + 1) + (_a2[i] + 1)
return baselines
cpdef numpy.ndarray[dtype=numpy.int64_t] antnums_to_baseline(
numpy.ndarray[dtype=numpy.int64_t, ndim=1] ant1,
numpy.ndarray[dtype=numpy.int64_t, ndim=1] ant2,
bint attempt256=False
):
if attempt256 and np.max([ant1, ant2]) < 255:
baseline = _antnum_to_bl_256(ant1, ant2)
elif attempt256 and np.max([ant1, ant2]) >= 255:
message = (
"antnums_to_baseline: found > 256 antennas, using "
"2048 baseline indexing. Beware compatibility "
"with CASA etc"
)
warnings.warn(message)
baseline = _antnum_to_bl_2048(ant1, ant2)
else:
baseline = _antnum_to_bl_2048(ant1, ant2)
return baseline
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
cpdef _latlonalt_from_xyz(numpy.float64_t[:, ::1] xyz):
cdef int n = xyz.shape[0]
cdef int i
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] latitude = np.empty(n, dtype=np.float64)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] longitude = np.empty(n, dtype=np.float64)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] altitude = np.empty(n, dtype=np.float64)
# create some memoryviews
cdef numpy.float64_t[::1] _lat = latitude
cdef numpy.float64_t[::1] _lon = longitude
cdef numpy.float64_t[::1] _alt = altitude
# see wikipedia geodetic_datum and Datum transformations of
# GPS positions PDF in docs/references folder
cdef numpy.float64_t gps_p, gps_theta, gps_n
for i in range(n):
gps_p = sqrt(xyz[i, 0] ** 2 + xyz[i, 1] ** 2)
gps_theta = atan2(xyz[i, 2] * _gps_a, gps_p * _gps_b)
_lat[i] = atan2(
xyz[i, 2] + _ep2 * _gps_b * sin(gps_theta) ** 3,
gps_p - _e2 * _gps_a * cos(gps_theta) ** 3,
)
_lon[i] = atan2(xyz[i, 1], xyz[i, 0])
gps_n = _gps_a / sqrt(1.0 - _e2 * sin(_lat[i]) ** 2)
_alt[i] = (gps_p / cos(_lat[i])) - gps_n
return latitude, longitude, altitude
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
cpdef numpy.ndarray[dtype=numpy.float64_t] _xyz_from_latlonalt(
numpy.float64_t[::1] _lat,
numpy.float64_t[::1] _lon,
numpy.float64_t[::1] _alt,
):
cdef int n_pts = len(_lat)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=2] xyz = np.empty((n_pts, 3))
cdef numpy.float64_t _gps_a = gps_a
cdef numpy.float64_t _gps_b = gps_b
cdef numpy.float64_t _e2 = e_squared
# create a memoryview
cdef numpy.float64_t[:, ::1] _xyz = xyz
cdef numpy.float64_t gps_n
with nogil:
for i in range(n_pts):
gps_n = _gps_a / sqrt(1.0 - _e2 * sin(_lat[i]) ** 2)
_xyz[i, 0] = (gps_n + _alt[i]) * cos(_lat[i]) * cos(_lon[i])
_xyz[i, 1] = (gps_n + _alt[i]) * cos(_lat[i]) * sin(_lon[i])
_xyz[i, 2] = (_gps_b ** 2 / _gps_a ** 2 * gps_n + _alt[i]) * sin(_lat[i])
return xyz.squeeze()
# this function takes memoryviews as inputs
# that is why _lat, _lon, and _alt are indexed below to get the 0th entry
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef numpy.ndarray[dtype=numpy.float64_t] _ENU_from_ECEF(
numpy.float64_t[:, ::1] xyz,
numpy.float64_t[::1] _lat,
numpy.float64_t[::1] _lon,
numpy.float64_t[::1] _alt,
):
cdef int i
cdef int nblts = xyz.shape[0]
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=2] enu = np.empty((nblts, 3), dtype=np.float64)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] xyz_center = _xyz_from_latlonalt(_lat, _lon, _alt)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] xyz_use = np.empty(3, dtype=np.float64)
# make a memoryview for the numpy array in c
cdef numpy.float64_t[:, ::1] _enu = enu
with nogil:
for i in range(nblts):
xyz_use[0] = xyz[i, 0] - xyz_center[0]
xyz_use[1] = xyz[i, 1] - xyz_center[1]
xyz_use[2] = xyz[i, 2] - xyz_center[2]
_enu[i, 0] = -sin(_lon[0]) * xyz_use[0] + cos(_lon[0]) * xyz_use[1]
_enu[i, 1] = (
- sin(_lat[0]) * cos(_lon[0]) * xyz_use[0]
- sin(_lat[0]) * sin(_lon[0]) * xyz_use[1]
+ cos(_lat[0]) * xyz_use[2]
)
_enu[i, 2] = (
cos(_lat[0]) * cos(_lon[0]) * xyz_use[0]
+ cos(_lat[0]) * sin(_lon[0]) * xyz_use[1]
+ sin(_lat[0]) * xyz_use[2]
)
return enu
# this function takes memoryviews as inputs
# that is why _lat, _lon, and _alt are indexed below to get the 0th entry
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef numpy.ndarray[dtype=numpy.float64_t] _ECEF_FROM_ENU(
numpy.float64_t[:, ::1] enu,
numpy.float64_t[::1] _lat,
numpy.float64_t[::1] _lon,
numpy.float64_t[::1] _alt,
):
cdef int i
cdef int nblts = enu.shape[0]
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=2] xyz = np.empty((nblts, 3), dtype=np.float64)
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=1] xyz_center = _xyz_from_latlonalt(_lat, _lon, _alt)
# make a memoryview for the numpy array in c
cdef numpy.float64_t[:, ::1] _xyz = xyz
with nogil:
for i in range(nblts):
_xyz[i, 0] = (
- sin(_lat[0]) * cos(_lon[0]) * enu[i, 1]
- sin(_lon[0]) * enu[i, 0]
+ cos(_lat[0]) * cos(_lon[0]) * enu[i, 2]
+ xyz_center[0]
)
_xyz[i, 1] = (
- sin(_lat[0]) * sin(_lon[0]) * enu[i, 1]
+ cos(_lon[0]) * enu[i, 0]
+ cos(_lat[0]) * sin(_lon[0]) * enu[i, 2]
+ xyz_center[1]
)
_xyz[i, 2] = cos(_lat[0]) * enu[i, 1] + sin(_lat[0]) * enu[i, 2] + xyz_center[2]
return xyz
# inital_uvw is a memoryviewed array as an input
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef numpy.ndarray[dtype=numpy.float64_t] _phase_uvw(
numpy.float64_t ra,
numpy.float64_t dec,
numpy.float64_t[:, ::1] initial_uvw
):
cdef int i
cdef int nuvw = initial_uvw.shape[0]
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=2] uvw = np.empty((nuvw, 3), dtype=np.float64)
# make a memoryview for the numpy array in c
cdef numpy.float64_t[:, ::1] _uvw = uvw
with nogil:
for i in range(nuvw):
_uvw[i, 0] = - sin(ra) * initial_uvw[i, 0] + cos(ra) * initial_uvw[i, 1]
_uvw[i, 1] = (
- sin(dec) * cos(ra) * initial_uvw[i, 0]
- sin(dec) * sin(ra) * initial_uvw[i, 1]
+ cos(dec) * initial_uvw[i, 2]
)
_uvw[i, 2] = (
cos(dec) * cos(ra) * initial_uvw[i, 0]
+ cos(dec) * sin(ra) * initial_uvw[i, 1]
+ sin(dec) * initial_uvw[i, 2]
)
return uvw
# uvw is a memoryviewed array as an input
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef numpy.ndarray[dtype=numpy.float64_t] _unphase_uvw(
numpy.float64_t ra,
numpy.float64_t dec,
numpy.float64_t[:, ::1] uvw
):
cdef int i
cdef int nuvw = uvw.shape[0]
cdef numpy.ndarray[dtype=numpy.float64_t, ndim=2] unphased_uvw = np.empty((nuvw, 3), dtype=np.float64)
# make a memoryview for the numpy array in c
cdef numpy.float64_t[:, ::1] _u_uvw = unphased_uvw
with nogil:
for i in range(nuvw):
_u_uvw[i, 0] = (
- sin(ra) * uvw[i, 0]
- sin(dec) * cos(ra) * uvw[i, 1]
+ cos(dec) * cos(ra) * uvw[i, 2]
)
_u_uvw[i, 1] = (
cos(ra) * uvw[i, 0]
- sin(dec) * sin(ra) * uvw[i, 1]
+ cos(dec) * sin(ra) * uvw[i, 2]
)
_u_uvw[i, 2] = cos(dec) * uvw[i, 1] + sin(dec) * uvw[i, 2]
return unphased_uvw