Tip revision: fb2e721
aipy_extracts.py
``````# -*- mode: python; coding: utf-8 -*-
# Copyright 2018 the HERA Project
# Licensed under the 2-clause BSD License

"""Module for low-level interface to MIRIAD files.

This module extracts some Python code from AIPY used in our MIRIAD I/O
routines. It was copied from AIPY commit
6cb5a70876f33dccdd68d4063b076f8d42d9edae, then reformatted. The only items
used by pyuvdata are ``uv_selector`` and ``UV``.

"""
__all__ = ["uv_selector", "UV"]

import numpy as np
import re

try:
from pyuvdata import _miriad
except ImportError as e:  # pragma: no cover
raise ImportError(
"The miriad extension is not built but is required for reading miriad "
"files. Note that miriad is currently not supported on Windows."
) from e

str2pol = {
"I": 1,  # Stokes Paremeters
"Q": 2,
"U": 3,
"V": 4,
"rr": -1,  # Circular Polarizations
"ll": -2,
"rl": -3,
"lr": -4,
"xx": -5,  # Linear Polarizations
"yy": -6,
"xy": -7,
"yx": -8,
}

def bl2ij(bl):
"""
Convert baseline number to antenna numbers.

Parameters
----------
bl : int
baseline number

Returns
-------
int
first antenna number
int
second antenna number
"""
bl = int(bl)

if bl > 65536:
bl -= 65536
mant = 2048
else:
mant = 256

return (bl // mant - 1, bl % mant - 1)

def ij2bl(i, j):
"""
Convert antenna numbers to baseline number.

Parameters
----------
i : int
first antenna number
j : int
second antenna number

Returns
-------
int
baseline number
"""
if i > j:
i, j = j, i

if j + 1 < 256:
return 256 * (i + 1) + (j + 1)

return 2048 * (i + 1) + (j + 1) + 65536

ant_re = r"(\(((-?\d+[xy]?,?)+)\)|-?\d+[xy]?)"
bl_re = "(^(%s_%s|%s),?)" % (ant_re, ant_re, ant_re)

def parse_ants(ant_str, nants):
"""
Parse ant string into a list of (baseline, include, pol) tuples.

Generate list of (baseline, include, pol) tuples based on parsing of the
string associated with the 'ants' command-line option.

Parameters
----------
ant_str : str
string associated with the 'ants' command-line option
nants : int
number of antennas

Returns
-------
list of tuples
list of (baseline, include, pol) tuples
"""
rv, cnt = [], 0

while cnt < len(ant_str):
m = re.search(bl_re, ant_str[cnt:])

if m is None:
if ant_str[cnt:].startswith("all"):
rv = []
elif ant_str[cnt:].startswith("auto") or ant_str[cnt:].startswith("-cross"):
rv.append(("auto", 1, -1))
elif ant_str[cnt:].startswith("cross") or ant_str[cnt:].startswith("-auto"):
rv.append(("auto", 0, -1))
else:
raise ValueError('Unparsable ant argument "%s"' % ant_str)
c = ant_str[cnt:].find(",")

if c >= 0:
cnt += c + 1
else:
cnt = len(ant_str)
else:
m = m.groups()
cnt += len(m[0])

if m[2] is None:
ais = [m[8]]
ajs = list(range(nants))
else:
if m[3] is None:
ais = [m[2]]
else:
ais = m[3].split(",")

if m[6] is None:
ajs = [m[5]]
else:
ajs = m[6].split(",")

for i in ais:
if type(i) == str and i.startswith("-"):
i = i[1:]  # nibble the - off the string
include_i = 0
else:
include_i = 1

for j in ajs:
include = None

if type(j) == str and j.startswith("-"):
j = j[1:]
include_j = 0
else:
include_j = 1

include = int(include_i and include_j)
pol = None
i, j = str(i), str(j)

if not i.isdigit():
ai = re.search(r"(\d+)([x,y])", i).groups()
if not j.isdigit():
aj = re.search(r"(\d+)([x,y])", j).groups()

if i.isdigit() and not j.isdigit():
pol = ["x" + aj[1], "y" + aj[1]]
ai = [i, ""]
elif not i.isdigit() and j.isdigit():
pol = [ai[1] + "x", ai[1] + "y"]
aj = [j, ""]
elif not i.isdigit() and not j.isdigit():
pol = [ai[1] + aj[1]]

if pol is not None:
bl = ij2bl(abs(int(ai[0])), abs(int(aj[0])))
for p in pol:
rv.append((bl, include, p))
else:
bl = ij2bl(abs(int(i)), abs(int(j)))
rv.append((bl, include, -1))
return rv

def uv_selector(uv, ants=-1, pol_str=-1):
"""
Call select on a Miriad object with string arguments for antennas and polarizations.

Parameters
----------
uv : UV object
Miriad data set object
ants : str
string to select antennas or baselines, e.g. 'all', 'auto', 'cross',
'0,1,2', or '0_1,0_2'
pol_str : str
polarizations to select, e.g. 'xx', 'yy', 'xy', 'yx'

Returns
-------
None
"""
if ants != -1:
if type(ants) == str:
ants = parse_ants(ants, uv["nants"])

for cnt, (bl, include, pol) in enumerate(ants):
if cnt > 0:
if include:
uv.select("or", -1, -1)
else:
uv.select("and", -1, -1)

if pol == -1:
pol = pol_str  # default to explicit pol parameter

if bl == "auto":
uv.select("auto", 0, 0, include=include)
else:
i, j = bl2ij(bl)
uv.select("antennae", i, j, include=include)

if pol != -1:
for p in pol.split(","):
polopt = str2pol[p]
uv.select("polarization", polopt, 0)
elif pol_str != -1:
for p in pol_str.split(","):
polopt = str2pol[p]
uv.select("polarization", polopt, 0)

itemtable = {
"obstype": "a",
"history": "a",
"vartable": "a",
"ngains": "i",
"nfeeds": "i",
"ntau": "i",
"nsols": "i",
"interval": "d",
"leakage": "c",
"freq0": "d",
"freqs": "?",
"bandpass": "c",
"nspect0": "i",
"nchan0": "i",
"stopt": "d",
"duration": "d",
}

def _uv_pipe_default_action(uv, p, d, f=None):
if f is None:
return p, d
else:
return p, d, f

"""Top-level interface to a Miriad UV data set."""

def __init__(self, filename, status="old", corrmode="r"):
"""
Initialize from a miriad file.

Parameters
----------
filename : str
filename to initialize from
status : str
options are: 'old', 'new', 'append'
corrmode : str
options are 'r' (float32 data storage) or 'j' (int16 with shared exponent)
"""
assert status in ["old", "new", "append"]
assert corrmode in ["r", "j"]
# when reading mutliple files we may get a numpy array of file names
# numpy casts arrays as np.str_ and cython does not like this
_miriad.UV.__init__(self, str(filename), status, corrmode)

self.status = status

if status == "old":
self.vartable = self._gen_vartable()
self.rewind()  # Update variables for the user
try:
self.nchan = self["nchan"]
except KeyError:  # pragma: no cover
# Karto: it does not seem possible to end up in a situation where nchan
# is missing from the vartable -- the basic uvwrite utility checks
# that value so that knows if nchan has changes (and can update
# accordingly), so you would have to use something _outside_ of the
# MIRIAD tools to create such an error.
pass
else:
self.vartable = {"corr": corrmode}

def _gen_vartable(self):
"""
Generate table of variables and types from the vartable header.

Returns
-------
dict
variables and types from the vartable header
"""
vartable = {}
for line in self._rdhd("vartable").split("\n"):
try:
var_type, name = line.split()
vartable[name] = var_type
except ValueError:
pass

return vartable

def variables(self):
"""
Get the list of available variables.

Returns
-------
list of str
list of available variables
"""
return list(self.vartable.keys())

def items(self):
"""
Get the list of available header items.

Returns
-------
list of str
list of available header items
"""
items = []

for i in itemtable:
try:
items.append(i)
except IOError:
pass
return items

def _rdhd(self, name):
"""

Parameters
----------
name : str
name of header item

Returns
-------
str or int or float
value of header item
"""
itype = itemtable[name]

if itype == "?":
return self._rdhd_special(name)

h = self.haccess(name, "read")
rv = []

if len(itype) == 1:
if itype == "a":
offset = 0
else:
assert itype == t

while True:
try:
except IOError:
break

if itype == "a":
# hread will always return a byte array for itype="a", which we will
# want to convert into a string.
c = str(c[:o], "utf-8")

rv.append(c)
offset += o

if itype == "a":
rv = "".join(rv)
else:
assert t == "b"

for t in itype:
rv.append(v)
offset += o

if len(rv) == 1:
return rv[0]
elif type(rv) == str:
return rv
else:
return np.array(rv)

def _wrhd(self, name, val):
item_type = itemtable[name]

if item_type == "?":
return self._wrhd_special(name, val)

h = self.haccess(name, "write")

if len(item_type) == 1:
try:
len(val)
except TypeError:
val = [val]

if item_type == "a":
offset = 0
else:
offset = _miriad.hwrite_init(h, item_type)

for v in val:
offset += _miriad.hwrite(h, offset, v, item_type)
else:
offset = _miriad.hwrite_init(h, "b")
for v, t in zip(val, item_type):
offset += _miriad.hwrite(h, offset, v, t)

def _rdhd_special(self, name):
if name == "freqs":
h = self.haccess(name, "read")
rv = [c]
offset = 8

while True:
try:
rv.append(c)
offset += 8

rv.append(c)
offset += 8

rv.append(c)
offset += 8
except IOError:
break

return rv
else:
raise ValueError("Unknown special header: " + name)

def _wrhd_special(self, name, val):
"""Provide write access to special header items of type '?' to _wrhd."""
if name == "freqs":
h = self.haccess(name, "write")
_miriad.hwrite(h, 0, val[0], "i")
offset = 8

for i, v in enumerate(val[1:]):
if i % 3 == 0:
_miriad.hwrite(h, offset, v, "i")
else:
_miriad.hwrite(h, offset, v, "d")
offset += 8

else:
raise ValueError("Unknown special header: " + name)

def __getitem__(self, name):
try:
var_type = self.vartable[name]
return self._rdvr(name, var_type)
except KeyError:
var_type = itemtable[name]
return self._rdhd(name)

def __setitem__(self, name, val):
"""Allow setting variables and header items via ``uv[name] = val``."""
try:
var_type = self.vartable[name]
self._wrvr(name, var_type, val)
except KeyError:
self._wrhd(name, val)

def select(self, name, n1, n2, include=True):
"""
Choose which data are returned by read().

Parameters
----------
name : str
This can be: 'decimate', 'time', 'antennae', 'visibility',
'uvrange', 'pointing', 'amplitude', 'window', 'or', 'dra',
'ddec', 'uvnrange', 'increment', 'ra', 'dec', 'and', 'clear',
'on', 'polarization', 'shadow', 'auto', 'dazim', 'delev'
n1, n2 : int
Generally this is the range of values to select. For
'antennae', this is the two antennae pair to select
(indexed from 0); a -1 indicates 'all antennae'.
For 'decimate', n1 is every Nth integration to use, and
n2 is which integration within a block of N to use.
For 'shadow', a zero indicates use 'antdiam' variable.
For 'on', 'window', 'polarization', 'increment', 'shadow' only
p1 is used.
For 'and', 'or', 'clear', 'auto' p1 and p2 are ignored.
include : bool
If true, the data is selected. If false, the data is
discarded. Ignored for 'and', 'or', 'clear'.
"""
if name == "antennae":
n1 += 1
n2 += 1
self._select(name, float(n1), float(n2), int(include))

"""
Return the next data record.

Calling this function causes variables to change to
reflect the record which this function returns.

Parameters
----------
raw : bool
if True data and flags are returned seperately

Returns
-------
preamble : tuple
(uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
Julian date, and (i,j) is an antenna pair
data : ndarray or masked array
ndarray if raw is True, otherwise a masked array
flags : ndarray
only returned if raw is True
"""

if nread == 0:
raise IOError("No data read")

flags = np.logical_not(flags)

if raw:
return preamble, data, flags
return preamble, np.ma.array(data, mask=flags)

def all_data(self, raw=False):
"""
Provide an iterator over preamble, data.

Allows constructs like: ``for preamble, data in uv.all_data(): ...``.

Parameters
----------
raw : bool
If True data and flags are returned seperately.

Returns
-------
preamble : tuple
(uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
Julian date, and (i,j) is an antenna pair
data : ndarray or masked array of complex
ndarray if raw is True, otherwise a masked array
flags : ndarray
only returned if raw is True
"""
while True:
try:
except IOError:
break

def write(self, preamble, data, flags=None):
"""
Write the next data record.

Parameters
----------
preamble : tuple
(uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
Julian date, and (i,j) is an antenna pair
data : masked array of complex
spectra for this record
"""
if data is None:
return
if flags is not None:
flags = np.logical_not(flags)
elif len(data.mask.shape) == 0:
flags = np.ones(data.shape)
# Setting this to false will instantiate the mask, but keep all values
# unmasked (as expected)
else:
data = data.data

self.raw_write(preamble, data.astype(np.complex64), flags.astype(np.int32))

def init_from_uv(self, uv, override={}, exclude=[]):
"""
Initialize header items and variables from another UV.

Those in override will be overwritten by override[k], and tracking will
be turned off (meaning they will not be updated in pipe()). Those in
exclude are omitted completely.

Parameters
----------
uv : UV object
Miriad data set object to initialize from
override : dict
variables with values to overwrite
exclude : list
list of variable to exclude
"""
for k in uv.items():
if k in exclude:
continue
elif k in override:
self._wrhd(k, override[k])
else:
self._wrhd(k, uv[k])

self.vartable = {}

for k in uv.variables():
if k in exclude:
continue
elif k == "corr":
# I don't understand why reading 'corr' segfaults miriad,
# but it does.  This is a cludgy work-around.
continue
elif k in override:
self.vartable[k] = uv.vartable[k]
self._wrvr(k, uv.vartable[k], override[k])
else:
self.vartable[k] = uv.vartable[k]
self._wrvr(k, uv.vartable[k], uv[k])
uv.trackvr(k, "c")  # Set to copy when copyvr() called

def pipe(self, uv, mfunc=_uv_pipe_default_action, append2hist="", raw=False):
"""
Pipe in data from another UV.

Uses the function ``mfunc(uv, preamble, data)``, which should return
``(preamble, data)``. If mfunc is not provided, the dataset will just be
cloned, and if the returned data is None, it will be omitted.

Parameters
----------
uv : UV object
Miriad data set object to pipe from
mfunc : function
function that defines how the data are piped.
``mfunc(uv, preamble, data)`` should return ``(preamble, data)``.
Default is ``_uv_pipe_default_action`` which just clones the dataset.
append2hist : str
string to append to history
raw : bool
if True data and flags are piped seperately
"""
self._wrhd("history", self["history"] + append2hist)

if raw:
for p, d, f in uv.all_data(raw=raw):
np, nd, nf = mfunc(uv, p, d, f)
self.copyvr(uv)
self.write(np, nd, nf)
else:
for p, d in uv.all_data():
np, nd = mfunc(uv, p, d)
self.copyvr(uv)
self.write(np, nd)

def add_var(self, name, var_type):
"""
Add a variable of the specified type to a UV file.

Parameters
----------
name : str
var_type : str
string indicating the variable type (e.g. 'a', 'i', 'd')
"""
self.vartable[name] = var_type
``````