https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: fb2e721f0b0dfd0dab642e94c19dfac02fefd7c1 authored by Bryna Hazelton on 13 January 2022, 19:57 UTC
update tutorial and changelog.
Tip revision: fb2e721
aipy_extracts.py
# -*- mode: python; coding: utf-8 -*-
# Copyright 2018 the HERA Project
# Licensed under the 2-clause BSD License

"""Module for low-level interface to MIRIAD files.

This module extracts some Python code from AIPY used in our MIRIAD I/O
routines. It was copied from AIPY commit
6cb5a70876f33dccdd68d4063b076f8d42d9edae, then reformatted. The only items
used by pyuvdata are ``uv_selector`` and ``UV``.

"""
__all__ = ["uv_selector", "UV"]

import numpy as np
import re

try:
    from pyuvdata import _miriad
except ImportError as e:  # pragma: no cover
    raise ImportError(
        "The miriad extension is not built but is required for reading miriad "
        "files. Note that miriad is currently not supported on Windows."
    ) from e


str2pol = {
    "I": 1,  # Stokes Paremeters
    "Q": 2,
    "U": 3,
    "V": 4,
    "rr": -1,  # Circular Polarizations
    "ll": -2,
    "rl": -3,
    "lr": -4,
    "xx": -5,  # Linear Polarizations
    "yy": -6,
    "xy": -7,
    "yx": -8,
}


def bl2ij(bl):
    """
    Convert baseline number to antenna numbers.

    Parameters
    ----------
    bl : int
        baseline number

    Returns
    -------
    int
        first antenna number
    int
        second antenna number
    """
    bl = int(bl)

    if bl > 65536:
        bl -= 65536
        mant = 2048
    else:
        mant = 256

    return (bl // mant - 1, bl % mant - 1)


def ij2bl(i, j):
    """
    Convert antenna numbers to baseline number.

    Parameters
    ----------
    i : int
        first antenna number
    j : int
        second antenna number

    Returns
    -------
    int
        baseline number
    """
    if i > j:
        i, j = j, i

    if j + 1 < 256:
        return 256 * (i + 1) + (j + 1)

    return 2048 * (i + 1) + (j + 1) + 65536


ant_re = r"(\(((-?\d+[xy]?,?)+)\)|-?\d+[xy]?)"
bl_re = "(^(%s_%s|%s),?)" % (ant_re, ant_re, ant_re)


def parse_ants(ant_str, nants):
    """
    Parse ant string into a list of (baseline, include, pol) tuples.

    Generate list of (baseline, include, pol) tuples based on parsing of the
    string associated with the 'ants' command-line option.

    Parameters
    ----------
    ant_str : str
        string associated with the 'ants' command-line option
    nants : int
        number of antennas

    Returns
    -------
    list of tuples
        list of (baseline, include, pol) tuples
    """
    rv, cnt = [], 0

    while cnt < len(ant_str):
        m = re.search(bl_re, ant_str[cnt:])

        if m is None:
            if ant_str[cnt:].startswith("all"):
                rv = []
            elif ant_str[cnt:].startswith("auto") or ant_str[cnt:].startswith("-cross"):
                rv.append(("auto", 1, -1))
            elif ant_str[cnt:].startswith("cross") or ant_str[cnt:].startswith("-auto"):
                rv.append(("auto", 0, -1))
            else:
                raise ValueError('Unparsable ant argument "%s"' % ant_str)
            c = ant_str[cnt:].find(",")

            if c >= 0:
                cnt += c + 1
            else:
                cnt = len(ant_str)
        else:
            m = m.groups()
            cnt += len(m[0])

            if m[2] is None:
                ais = [m[8]]
                ajs = list(range(nants))
            else:
                if m[3] is None:
                    ais = [m[2]]
                else:
                    ais = m[3].split(",")

                if m[6] is None:
                    ajs = [m[5]]
                else:
                    ajs = m[6].split(",")

            for i in ais:
                if type(i) == str and i.startswith("-"):
                    i = i[1:]  # nibble the - off the string
                    include_i = 0
                else:
                    include_i = 1

                for j in ajs:
                    include = None

                    if type(j) == str and j.startswith("-"):
                        j = j[1:]
                        include_j = 0
                    else:
                        include_j = 1

                    include = int(include_i and include_j)
                    pol = None
                    i, j = str(i), str(j)

                    if not i.isdigit():
                        ai = re.search(r"(\d+)([x,y])", i).groups()
                    if not j.isdigit():
                        aj = re.search(r"(\d+)([x,y])", j).groups()

                    if i.isdigit() and not j.isdigit():
                        pol = ["x" + aj[1], "y" + aj[1]]
                        ai = [i, ""]
                    elif not i.isdigit() and j.isdigit():
                        pol = [ai[1] + "x", ai[1] + "y"]
                        aj = [j, ""]
                    elif not i.isdigit() and not j.isdigit():
                        pol = [ai[1] + aj[1]]

                    if pol is not None:
                        bl = ij2bl(abs(int(ai[0])), abs(int(aj[0])))
                        for p in pol:
                            rv.append((bl, include, p))
                    else:
                        bl = ij2bl(abs(int(i)), abs(int(j)))
                        rv.append((bl, include, -1))
    return rv


def uv_selector(uv, ants=-1, pol_str=-1):
    """
    Call select on a Miriad object with string arguments for antennas and polarizations.

    Parameters
    ----------
    uv : UV object
        Miriad data set object
    ants : str
        string to select antennas or baselines, e.g. 'all', 'auto', 'cross',
        '0,1,2', or '0_1,0_2'
    pol_str : str
        polarizations to select, e.g. 'xx', 'yy', 'xy', 'yx'

    Returns
    -------
    None
    """
    if ants != -1:
        if type(ants) == str:
            ants = parse_ants(ants, uv["nants"])

        for cnt, (bl, include, pol) in enumerate(ants):
            if cnt > 0:
                if include:
                    uv.select("or", -1, -1)
                else:
                    uv.select("and", -1, -1)

            if pol == -1:
                pol = pol_str  # default to explicit pol parameter

            if bl == "auto":
                uv.select("auto", 0, 0, include=include)
            else:
                i, j = bl2ij(bl)
                uv.select("antennae", i, j, include=include)

            if pol != -1:
                for p in pol.split(","):
                    polopt = str2pol[p]
                    uv.select("polarization", polopt, 0)
    elif pol_str != -1:
        for p in pol_str.split(","):
            polopt = str2pol[p]
            uv.select("polarization", polopt, 0)


itemtable = {
    "obstype": "a",
    "history": "a",
    "vartable": "a",
    "ngains": "i",
    "nfeeds": "i",
    "ntau": "i",
    "nsols": "i",
    "interval": "d",
    "leakage": "c",
    "freq0": "d",
    "freqs": "?",
    "bandpass": "c",
    "nspect0": "i",
    "nchan0": "i",
    "stopt": "d",
    "duration": "d",
}


def _uv_pipe_default_action(uv, p, d, f=None):
    if f is None:
        return p, d
    else:
        return p, d, f


class UV(_miriad.UV):
    """Top-level interface to a Miriad UV data set."""

    def __init__(self, filename, status="old", corrmode="r"):
        """
        Initialize from a miriad file.

        Parameters
        ----------
        filename : str
            filename to initialize from
        status : str
            options are: 'old', 'new', 'append'
        corrmode : str
            options are 'r' (float32 data storage) or 'j' (int16 with shared exponent)
        """
        assert status in ["old", "new", "append"]
        assert corrmode in ["r", "j"]
        # when reading mutliple files we may get a numpy array of file names
        # numpy casts arrays as np.str_ and cython does not like this
        _miriad.UV.__init__(self, str(filename), status, corrmode)

        self.status = status
        self.nchan = _miriad.MAXCHAN

        if status == "old":
            self.vartable = self._gen_vartable()
            self.read()
            self.rewind()  # Update variables for the user
            try:
                self.nchan = self["nchan"]
            except KeyError:  # pragma: no cover
                # Karto: it does not seem possible to end up in a situation where nchan
                # is missing from the vartable -- the basic uvwrite utility checks
                # that value so that knows if nchan has changes (and can update
                # accordingly), so you would have to use something _outside_ of the
                # MIRIAD tools to create such an error.
                pass
        else:
            self.vartable = {"corr": corrmode}

    def _gen_vartable(self):
        """
        Generate table of variables and types from the vartable header.

        Returns
        -------
        dict
            variables and types from the vartable header
        """
        vartable = {}
        for line in self._rdhd("vartable").split("\n"):
            try:
                var_type, name = line.split()
                vartable[name] = var_type
            except ValueError:
                pass

        return vartable

    def variables(self):
        """
        Get the list of available variables.

        Returns
        -------
        list of str
            list of available variables
        """
        return list(self.vartable.keys())

    def items(self):
        """
        Get the list of available header items.

        Returns
        -------
        list of str
            list of available header items
        """
        items = []

        for i in itemtable:
            try:
                _miriad.hdaccess(self.haccess(i, "read"))
                items.append(i)
            except IOError:
                pass
        return items

    def _rdhd(self, name):
        """
        Provide read access to header items via low-level calls.

        Parameters
        ----------
        name : str
            name of header item

        Returns
        -------
        str or int or float
            value of header item
        """
        itype = itemtable[name]

        if itype == "?":
            return self._rdhd_special(name)

        h = self.haccess(name, "read")
        rv = []

        if len(itype) == 1:
            if itype == "a":
                offset = 0
            else:
                t, offset = _miriad.hread_init(h)
                assert itype == t

            while True:
                try:
                    c, o = _miriad.hread(h, offset, itype)
                except IOError:
                    break

                if itype == "a":
                    # hread will always return a byte array for itype="a", which we will
                    # want to convert into a string.
                    c = str(c[:o], "utf-8")

                rv.append(c)
                offset += o

            if itype == "a":
                rv = "".join(rv)
        else:
            t, offset = _miriad.hread_init(h)
            assert t == "b"

            for t in itype:
                v, o = _miriad.hread(h, offset, t)
                rv.append(v)
                offset += o

        _miriad.hdaccess(h)

        if len(rv) == 1:
            return rv[0]
        elif type(rv) == str:
            return rv
        else:
            return np.array(rv)

    def _wrhd(self, name, val):
        """Provide write access to header items via low-level calls."""
        item_type = itemtable[name]

        if item_type == "?":
            return self._wrhd_special(name, val)

        h = self.haccess(name, "write")

        if len(item_type) == 1:
            try:
                len(val)
            except TypeError:
                val = [val]

            if item_type == "a":
                offset = 0
            else:
                offset = _miriad.hwrite_init(h, item_type)

            for v in val:
                offset += _miriad.hwrite(h, offset, v, item_type)
        else:
            offset = _miriad.hwrite_init(h, "b")
            for v, t in zip(val, item_type):
                offset += _miriad.hwrite(h, offset, v, t)

        _miriad.hdaccess(h)

    def _rdhd_special(self, name):
        """Provide read access to special header items of type '?' to _rdhd."""
        if name == "freqs":
            h = self.haccess(name, "read")
            c, o = _miriad.hread(h, 0, "i")
            rv = [c]
            offset = 8

            while True:
                try:
                    c, o = _miriad.hread(h, offset, "i")
                    rv.append(c)
                    offset += 8

                    c, o = _miriad.hread(h, offset, "d")
                    rv.append(c)
                    offset += 8

                    c, o = _miriad.hread(h, offset, "d")
                    rv.append(c)
                    offset += 8
                except IOError:
                    break

            _miriad.hdaccess(h)
            return rv
        else:
            raise ValueError("Unknown special header: " + name)

    def _wrhd_special(self, name, val):
        """Provide write access to special header items of type '?' to _wrhd."""
        if name == "freqs":
            h = self.haccess(name, "write")
            _miriad.hwrite(h, 0, val[0], "i")
            offset = 8

            for i, v in enumerate(val[1:]):
                if i % 3 == 0:
                    _miriad.hwrite(h, offset, v, "i")
                else:
                    _miriad.hwrite(h, offset, v, "d")
                offset += 8

            _miriad.hdaccess(h)
        else:
            raise ValueError("Unknown special header: " + name)

    def __getitem__(self, name):
        """Allow access to variables and header items via ``uv[name]``."""
        try:
            var_type = self.vartable[name]
            return self._rdvr(name, var_type)
        except KeyError:
            var_type = itemtable[name]
            return self._rdhd(name)

    def __setitem__(self, name, val):
        """Allow setting variables and header items via ``uv[name] = val``."""
        try:
            var_type = self.vartable[name]
            self._wrvr(name, var_type, val)
        except KeyError:
            self._wrhd(name, val)

    def select(self, name, n1, n2, include=True):
        """
        Choose which data are returned by read().

        Parameters
        ----------
        name : str
            This can be: 'decimate', 'time', 'antennae', 'visibility',
            'uvrange', 'pointing', 'amplitude', 'window', 'or', 'dra',
            'ddec', 'uvnrange', 'increment', 'ra', 'dec', 'and', 'clear',
            'on', 'polarization', 'shadow', 'auto', 'dazim', 'delev'
        n1, n2 : int
            Generally this is the range of values to select. For
            'antennae', this is the two antennae pair to select
            (indexed from 0); a -1 indicates 'all antennae'.
            For 'decimate', n1 is every Nth integration to use, and
            n2 is which integration within a block of N to use.
            For 'shadow', a zero indicates use 'antdiam' variable.
            For 'on', 'window', 'polarization', 'increment', 'shadow' only
            p1 is used.
            For 'and', 'or', 'clear', 'auto' p1 and p2 are ignored.
        include : bool
            If true, the data is selected. If false, the data is
            discarded. Ignored for 'and', 'or', 'clear'.
        """
        if name == "antennae":
            n1 += 1
            n2 += 1
        self._select(name, float(n1), float(n2), int(include))

    def read(self, raw=False):
        """
        Return the next data record.

        Calling this function causes variables to change to
        reflect the record which this function returns.

        Parameters
        ----------
        raw : bool
            if True data and flags are returned seperately

        Returns
        -------
        preamble : tuple
            (uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
            Julian date, and (i,j) is an antenna pair
        data : ndarray or masked array
            ndarray if raw is True, otherwise a masked array
        flags : ndarray
            only returned if raw is True
        """
        preamble, data, flags, nread = self.raw_read(self.nchan)

        if nread == 0:
            raise IOError("No data read")

        flags = np.logical_not(flags)

        if raw:
            return preamble, data, flags
        return preamble, np.ma.array(data, mask=flags)

    def all_data(self, raw=False):
        """
        Provide an iterator over preamble, data.

        Allows constructs like: ``for preamble, data in uv.all_data(): ...``.

        Parameters
        ----------
        raw : bool
            If True data and flags are returned seperately.

        Returns
        -------
        preamble : tuple
            (uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
            Julian date, and (i,j) is an antenna pair
        data : ndarray or masked array of complex
            ndarray if raw is True, otherwise a masked array
        flags : ndarray
            only returned if raw is True
        """
        while True:
            try:
                yield self.read(raw=raw)
            except IOError:
                break

    def write(self, preamble, data, flags=None):
        """
        Write the next data record.

        Parameters
        ----------
        preamble : tuple
            (uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
            Julian date, and (i,j) is an antenna pair
        data : masked array of complex
            spectra for this record
        """
        if data is None:
            return
        if flags is not None:
            flags = np.logical_not(flags)
        elif len(data.mask.shape) == 0:
            flags = np.ones(data.shape)
            # Setting this to false will instantiate the mask, but keep all values
            # unmasked (as expected)
            data.mask = False
        else:
            flags = np.logical_not(data.mask)
            data = data.data

        self.raw_write(preamble, data.astype(np.complex64), flags.astype(np.int32))

    def init_from_uv(self, uv, override={}, exclude=[]):
        """
        Initialize header items and variables from another UV.

        Those in override will be overwritten by override[k], and tracking will
        be turned off (meaning they will not be updated in pipe()). Those in
        exclude are omitted completely.

        Parameters
        ----------
        uv : UV object
            Miriad data set object to initialize from
        override : dict
            variables with values to overwrite
        exclude : list
            list of variable to exclude
        """
        for k in uv.items():
            if k in exclude:
                continue
            elif k in override:
                self._wrhd(k, override[k])
            else:
                self._wrhd(k, uv[k])

        self.vartable = {}

        for k in uv.variables():
            if k in exclude:
                continue
            elif k == "corr":
                # I don't understand why reading 'corr' segfaults miriad,
                # but it does.  This is a cludgy work-around.
                continue
            elif k in override:
                self.vartable[k] = uv.vartable[k]
                self._wrvr(k, uv.vartable[k], override[k])
            else:
                self.vartable[k] = uv.vartable[k]
                self._wrvr(k, uv.vartable[k], uv[k])
                uv.trackvr(k, "c")  # Set to copy when copyvr() called

    def pipe(self, uv, mfunc=_uv_pipe_default_action, append2hist="", raw=False):
        """
        Pipe in data from another UV.

        Uses the function ``mfunc(uv, preamble, data)``, which should return
        ``(preamble, data)``. If mfunc is not provided, the dataset will just be
        cloned, and if the returned data is None, it will be omitted.

        Parameters
        ----------
        uv : UV object
            Miriad data set object to pipe from
        mfunc : function
            function that defines how the data are piped.
            ``mfunc(uv, preamble, data)`` should return ``(preamble, data)``.
            Default is ``_uv_pipe_default_action`` which just clones the dataset.
        append2hist : str
            string to append to history
        raw : bool
            if True data and flags are piped seperately
        """
        self._wrhd("history", self["history"] + append2hist)

        if raw:
            for p, d, f in uv.all_data(raw=raw):
                np, nd, nf = mfunc(uv, p, d, f)
                self.copyvr(uv)
                self.write(np, nd, nf)
        else:
            for p, d in uv.all_data():
                np, nd = mfunc(uv, p, d)
                self.copyvr(uv)
                self.write(np, nd)

    def add_var(self, name, var_type):
        """
        Add a variable of the specified type to a UV file.

        Parameters
        ----------
        name : str
            name of header item to add
        var_type : str
            string indicating the variable type (e.g. 'a', 'i', 'd')
        """
        self.vartable[name] = var_type
back to top