Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/RadioAstronomySoftwareGroup/pyuvdata
09 March 2026, 20:19:55 UTC
  • Code
  • Branches (76)
  • Releases (1)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/add_bitshuffle
    • refs/heads/add_concat_use_axis
    • refs/heads/allow-extrap-in-beam
    • refs/heads/allow_exclude_ants
    • refs/heads/allow_power_units
    • refs/heads/bitshuffle_v2
    • refs/heads/combine_uvdata_func
    • refs/heads/double_prec_uvws
    • refs/heads/faster-interp
    • refs/heads/faster-uvh5-indexing
    • refs/heads/fhd_beam_decompose
    • refs/heads/fits_speedup
    • refs/heads/fix-cst-beam-read
    • refs/heads/frame_attr
    • refs/heads/freq_avg_mem_fix
    • refs/heads/indexIng_tools
    • refs/heads/main
    • refs/heads/mem_fixes
    • refs/heads/miriad_tweaks_v3
    • refs/heads/mwa_van_vleck_2
    • refs/heads/ovro-lwa
    • refs/heads/pre-commit-ci-update-config
    • refs/heads/py03
    • refs/heads/read_ms_caltable_multiple_times
    • refs/heads/simple-set-antdiam
    • refs/heads/sma_dev
    • refs/heads/snap_convert
    • refs/heads/use_gh_cache
    • refs/tags/V2.1.5
    • refs/tags/v1.1
    • refs/tags/v1.2
    • refs/tags/v1.3
    • refs/tags/v1.4
    • refs/tags/v1.5
    • refs/tags/v2.0.0
    • refs/tags/v2.0.1
    • refs/tags/v2.0.2
    • refs/tags/v2.1.0
    • refs/tags/v2.1.1
    • refs/tags/v2.1.2
    • refs/tags/v2.1.3
    • refs/tags/v2.1.4
    • refs/tags/v2.2.0
    • refs/tags/v2.2.1
    • refs/tags/v2.2.10
    • refs/tags/v2.2.11
    • refs/tags/v2.2.12
    • refs/tags/v2.2.2
    • refs/tags/v2.2.3
    • refs/tags/v2.2.4
    • refs/tags/v2.2.5
    • refs/tags/v2.2.6
    • refs/tags/v2.2.7
    • refs/tags/v2.2.8
    • refs/tags/v2.2.9
    • refs/tags/v2.3.0
    • refs/tags/v2.3.1
    • refs/tags/v2.3.2
    • refs/tags/v2.3.3
    • refs/tags/v2.4.0
    • refs/tags/v2.4.1
    • refs/tags/v2.4.2
    • refs/tags/v2.4.3
    • refs/tags/v2.4.4
    • refs/tags/v2.4.5
    • refs/tags/v3.0.0
    • refs/tags/v3.1.0
    • refs/tags/v3.1.1
    • refs/tags/v3.1.2
    • refs/tags/v3.1.3
    • refs/tags/v3.2.0
    • refs/tags/v3.2.1
    • refs/tags/v3.2.2
    • refs/tags/v3.2.3
    • refs/tags/v3.2.4
    • refs/tags/v3.2.5
    • v1.0
  • 2da9263
  • /
  • pyuvdata
  • /
  • uvdata
  • /
  • aipy_extracts.py
Raw File Download Save again
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge
swh:1:cnt:b3e4282c93dd5fece47fdfa1fe842b1168730cb2
origin badgedirectory badge
swh:1:dir:7186d38478254f4c3d1bdebf1efd8d12ec8a74d5
origin badgerevision badge
swh:1:rev:6694e4e7426f41d8493145e3d61b49eba9dca0a9
origin badgesnapshot badge
swh:1:snp:fd3d03f3cb2a4f8d18ade43d819a91744736fa09

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: 6694e4e7426f41d8493145e3d61b49eba9dca0a9 authored by Bryna Hazelton on 21 July 2021, 15:54:00 UTC
update the changelog for v2.2.1
Tip revision: 6694e4e
aipy_extracts.py
# -*- mode: python; coding: utf-8 -*-
# Copyright 2018 the HERA Project
# Licensed under the 2-clause BSD License

"""Module for low-level interface to MIRIAD files.

This module extracts some Python code from AIPY used in our MIRIAD I/O
routines. It was copied from AIPY commit
6cb5a70876f33dccdd68d4063b076f8d42d9edae, then reformatted. The only items
used by pyuvdata are ``uv_selector`` and ``UV``.

"""
__all__ = ["uv_selector", "UV"]

import numpy as np
import re

try:
    from pyuvdata import _miriad
except ImportError as e:  # pragma: no cover
    raise ImportError(
        "The miriad extension is not built but is required for reading miriad "
        "files. Note that miriad is currently not supported on Windows."
    ) from e


str2pol = {
    "I": 1,  # Stokes Paremeters
    "Q": 2,
    "U": 3,
    "V": 4,
    "rr": -1,  # Circular Polarizations
    "ll": -2,
    "rl": -3,
    "lr": -4,
    "xx": -5,  # Linear Polarizations
    "yy": -6,
    "xy": -7,
    "yx": -8,
}


def bl2ij(bl):
    """
    Convert baseline number to antenna numbers.

    Parameters
    ----------
    bl : int
        baseline number

    Returns
    -------
    int
        first antenna number
    int
        second antenna number
    """
    bl = int(bl)

    if bl > 65536:
        bl -= 65536
        mant = 2048
    else:
        mant = 256

    return (bl // mant - 1, bl % mant - 1)


def ij2bl(i, j):
    """
    Convert antenna numbers to baseline number.

    Parameters
    ----------
    i : int
        first antenna number
    j : int
        second antenna number

    Returns
    -------
    int
        baseline number
    """
    if i > j:
        i, j = j, i

    if j + 1 < 256:
        return 256 * (i + 1) + (j + 1)

    return 2048 * (i + 1) + (j + 1) + 65536


ant_re = r"(\(((-?\d+[xy]?,?)+)\)|-?\d+[xy]?)"
bl_re = "(^(%s_%s|%s),?)" % (ant_re, ant_re, ant_re)


def parse_ants(ant_str, nants):
    """
    Parse ant string into a list of (baseline, include, pol) tuples.

    Generate list of (baseline, include, pol) tuples based on parsing of the
    string associated with the 'ants' command-line option.

    Parameters
    ----------
    ant_str : str
        string associated with the 'ants' command-line option
    nants : int
        number of antennas

    Returns
    -------
    list of tuples
        list of (baseline, include, pol) tuples
    """
    rv, cnt = [], 0

    while cnt < len(ant_str):
        m = re.search(bl_re, ant_str[cnt:])

        if m is None:
            if ant_str[cnt:].startswith("all"):
                rv = []
            elif ant_str[cnt:].startswith("auto") or ant_str[cnt:].startswith("-cross"):
                rv.append(("auto", 1, -1))
            elif ant_str[cnt:].startswith("cross") or ant_str[cnt:].startswith("-auto"):
                rv.append(("auto", 0, -1))
            else:
                raise ValueError('Unparsable ant argument "%s"' % ant_str)
            c = ant_str[cnt:].find(",")

            if c >= 0:
                cnt += c + 1
            else:
                cnt = len(ant_str)
        else:
            m = m.groups()
            cnt += len(m[0])

            if m[2] is None:
                ais = [m[8]]
                ajs = list(range(nants))
            else:
                if m[3] is None:
                    ais = [m[2]]
                else:
                    ais = m[3].split(",")

                if m[6] is None:
                    ajs = [m[5]]
                else:
                    ajs = m[6].split(",")

            for i in ais:
                if type(i) == str and i.startswith("-"):
                    i = i[1:]  # nibble the - off the string
                    include_i = 0
                else:
                    include_i = 1

                for j in ajs:
                    include = None

                    if type(j) == str and j.startswith("-"):
                        j = j[1:]
                        include_j = 0
                    else:
                        include_j = 1

                    include = int(include_i and include_j)
                    pol = None
                    i, j = str(i), str(j)

                    if not i.isdigit():
                        ai = re.search(r"(\d+)([x,y])", i).groups()
                    if not j.isdigit():
                        aj = re.search(r"(\d+)([x,y])", j).groups()

                    if i.isdigit() and not j.isdigit():
                        pol = ["x" + aj[1], "y" + aj[1]]
                        ai = [i, ""]
                    elif not i.isdigit() and j.isdigit():
                        pol = [ai[1] + "x", ai[1] + "y"]
                        aj = [j, ""]
                    elif not i.isdigit() and not j.isdigit():
                        pol = [ai[1] + aj[1]]

                    if pol is not None:
                        bl = ij2bl(abs(int(ai[0])), abs(int(aj[0])))
                        for p in pol:
                            rv.append((bl, include, p))
                    else:
                        bl = ij2bl(abs(int(i)), abs(int(j)))
                        rv.append((bl, include, -1))
    return rv


def uv_selector(uv, ants=-1, pol_str=-1):
    """
    Call select on a Miriad object with string arguments for antennas and polarizations.

    Parameters
    ----------
    uv : UV object
        Miriad data set object
    ants : str
        string to select antennas or baselines, e.g. 'all', 'auto', 'cross',
        '0,1,2', or '0_1,0_2'
    pol_str : str
        polarizations to select, e.g. 'xx', 'yy', 'xy', 'yx'

    Returns
    -------
    None
    """
    if ants != -1:
        if type(ants) == str:
            ants = parse_ants(ants, uv["nants"])

        for cnt, (bl, include, pol) in enumerate(ants):
            if cnt > 0:
                if include:
                    uv.select("or", -1, -1)
                else:
                    uv.select("and", -1, -1)

            if pol == -1:
                pol = pol_str  # default to explicit pol parameter

            if bl == "auto":
                uv.select("auto", 0, 0, include=include)
            else:
                i, j = bl2ij(bl)
                uv.select("antennae", i, j, include=include)

            if pol != -1:
                for p in pol.split(","):
                    polopt = str2pol[p]
                    uv.select("polarization", polopt, 0)
    elif pol_str != -1:
        for p in pol_str.split(","):
            polopt = str2pol[p]
            uv.select("polarization", polopt, 0)


itemtable = {
    "obstype": "a",
    "history": "a",
    "vartable": "a",
    "ngains": "i",
    "nfeeds": "i",
    "ntau": "i",
    "nsols": "i",
    "interval": "d",
    "leakage": "c",
    "freq0": "d",
    "freqs": "?",
    "bandpass": "c",
    "nspect0": "i",
    "nchan0": "i",
    "stopt": "d",
    "duration": "d",
}


def _uv_pipe_default_action(uv, p, d, f=None):
    if f is None:
        return p, d
    else:
        return p, d, f


class UV(_miriad.UV):
    """Top-level interface to a Miriad UV data set."""

    def __init__(self, filename, status="old", corrmode="r"):
        """
        Initialize from a miriad file.

        Parameters
        ----------
        filename : str
            filename to initialize from
        status : str
            options are: 'old', 'new', 'append'
        corrmode : str
            options are 'r' (float32 data storage) or 'j' (int16 with shared exponent)
        """
        assert status in ["old", "new", "append"]
        assert corrmode in ["r", "j"]
        # when reading mutliple files we may get a numpy array of file names
        # numpy casts arrays as np.str_ and cython does not like this
        _miriad.UV.__init__(self, str(filename), status, corrmode)

        self.status = status
        self.nchan = _miriad.MAXCHAN

        if status == "old":
            self.vartable = self._gen_vartable()
            self.read()
            self.rewind()  # Update variables for the user
            try:
                self.nchan = self["nchan"]
            except KeyError:  # pragma: no cover
                # Karto: it does not seem possible to end up in a situation where nchan
                # is missing from the vartable -- the basic uvwrite utility checks
                # that value so that knows if nchan has changes (and can update
                # accordingly), so you would have to use something _outside_ of the
                # MIRIAD tools to create such an error.
                pass
        else:
            self.vartable = {"corr": corrmode}

    def _gen_vartable(self):
        """
        Generate table of variables and types from the vartable header.

        Returns
        -------
        dict
            variables and types from the vartable header
        """
        vartable = {}
        for line in self._rdhd("vartable").split("\n"):
            try:
                var_type, name = line.split()
                vartable[name] = var_type
            except ValueError:
                pass

        return vartable

    def variables(self):
        """
        Get the list of available variables.

        Returns
        -------
        list of str
            list of available variables
        """
        return list(self.vartable.keys())

    def items(self):
        """
        Get the list of available header items.

        Returns
        -------
        list of str
            list of available header items
        """
        items = []

        for i in itemtable:
            try:
                _miriad.hdaccess(self.haccess(i, "read"))
                items.append(i)
            except IOError:
                pass
        return items

    def _rdhd(self, name):
        """
        Provide read access to header items via low-level calls.

        Parameters
        ----------
        name : str
            name of header item

        Returns
        -------
        str or int or float
            value of header item
        """
        itype = itemtable[name]

        if itype == "?":
            return self._rdhd_special(name)

        h = self.haccess(name, "read")
        rv = []

        if len(itype) == 1:
            if itype == "a":
                offset = 0
            else:
                t, offset = _miriad.hread_init(h)
                assert itype == t

            while True:
                try:
                    c, o = _miriad.hread(h, offset, itype)
                except IOError:
                    break

                if itype == "a":
                    # hread will always return a byte array for itype="a", which we will
                    # want to convert into a string.
                    c = str(c[:o], "utf-8")

                rv.append(c)
                offset += o

            if itype == "a":
                rv = "".join(rv)
        else:
            t, offset = _miriad.hread_init(h)
            assert t == "b"

            for t in itype:
                v, o = _miriad.hread(h, offset, t)
                rv.append(v)
                offset += o

        _miriad.hdaccess(h)

        if len(rv) == 1:
            return rv[0]
        elif type(rv) == str:
            return rv
        else:
            return np.array(rv)

    def _wrhd(self, name, val):
        """Provide write access to header items via low-level calls."""
        item_type = itemtable[name]

        if item_type == "?":
            return self._wrhd_special(name, val)

        h = self.haccess(name, "write")

        if len(item_type) == 1:
            try:
                len(val)
            except TypeError:
                val = [val]

            if item_type == "a":
                offset = 0
            else:
                offset = _miriad.hwrite_init(h, item_type)

            for v in val:
                offset += _miriad.hwrite(h, offset, v, item_type)
        else:
            offset = _miriad.hwrite_init(h, "b")
            for v, t in zip(val, item_type):
                offset += _miriad.hwrite(h, offset, v, t)

        _miriad.hdaccess(h)

    def _rdhd_special(self, name):
        """Provide read access to special header items of type '?' to _rdhd."""
        if name == "freqs":
            h = self.haccess(name, "read")
            c, o = _miriad.hread(h, 0, "i")
            rv = [c]
            offset = 8

            while True:
                try:
                    c, o = _miriad.hread(h, offset, "i")
                    rv.append(c)
                    offset += 8

                    c, o = _miriad.hread(h, offset, "d")
                    rv.append(c)
                    offset += 8

                    c, o = _miriad.hread(h, offset, "d")
                    rv.append(c)
                    offset += 8
                except IOError:
                    break

            _miriad.hdaccess(h)
            return rv
        else:
            raise ValueError("Unknown special header: " + name)

    def _wrhd_special(self, name, val):
        """Provide write access to special header items of type '?' to _wrhd."""
        if name == "freqs":
            h = self.haccess(name, "write")
            _miriad.hwrite(h, 0, val[0], "i")
            offset = 8

            for i, v in enumerate(val[1:]):
                if i % 3 == 0:
                    _miriad.hwrite(h, offset, v, "i")
                else:
                    _miriad.hwrite(h, offset, v, "d")
                offset += 8

            _miriad.hdaccess(h)
        else:
            raise ValueError("Unknown special header: " + name)

    def __getitem__(self, name):
        """Allow access to variables and header items via ``uv[name]``."""
        try:
            var_type = self.vartable[name]
            return self._rdvr(name, var_type)
        except KeyError:
            var_type = itemtable[name]
            return self._rdhd(name)

    def __setitem__(self, name, val):
        """Allow setting variables and header items via ``uv[name] = val``."""
        try:
            var_type = self.vartable[name]
            self._wrvr(name, var_type, val)
        except KeyError:
            self._wrhd(name, val)

    def select(self, name, n1, n2, include=True):
        """
        Choose which data are returned by read().

        Parameters
        ----------
        name : str
            This can be: 'decimate', 'time', 'antennae', 'visibility',
            'uvrange', 'pointing', 'amplitude', 'window', 'or', 'dra',
            'ddec', 'uvnrange', 'increment', 'ra', 'dec', 'and', 'clear',
            'on', 'polarization', 'shadow', 'auto', 'dazim', 'delev'
        n1, n2 : int
            Generally this is the range of values to select. For
            'antennae', this is the two antennae pair to select
            (indexed from 0); a -1 indicates 'all antennae'.
            For 'decimate', n1 is every Nth integration to use, and
            n2 is which integration within a block of N to use.
            For 'shadow', a zero indicates use 'antdiam' variable.
            For 'on', 'window', 'polarization', 'increment', 'shadow' only
            p1 is used.
            For 'and', 'or', 'clear', 'auto' p1 and p2 are ignored.
        include : bool
            If true, the data is selected. If false, the data is
            discarded. Ignored for 'and', 'or', 'clear'.
        """
        if name == "antennae":
            n1 += 1
            n2 += 1
        self._select(name, float(n1), float(n2), int(include))

    def read(self, raw=False):
        """
        Return the next data record.

        Calling this function causes variables to change to
        reflect the record which this function returns.

        Parameters
        ----------
        raw : bool
            if True data and flags are returned seperately

        Returns
        -------
        preamble : tuple
            (uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
            Julian date, and (i,j) is an antenna pair
        data : ndarray or masked array
            ndarray if raw is True, otherwise a masked array
        flags : ndarray
            only returned if raw is True
        """
        preamble, data, flags, nread = self.raw_read(self.nchan)

        if nread == 0:
            raise IOError("No data read")

        flags = np.logical_not(flags)

        if raw:
            return preamble, data, flags
        return preamble, np.ma.array(data, mask=flags)

    def all_data(self, raw=False):
        """
        Provide an iterator over preamble, data.

        Allows constructs like: ``for preamble, data in uv.all_data(): ...``.

        Parameters
        ----------
        raw : bool
            If True data and flags are returned seperately.

        Returns
        -------
        preamble : tuple
            (uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
            Julian date, and (i,j) is an antenna pair
        data : ndarray or masked array of complex
            ndarray if raw is True, otherwise a masked array
        flags : ndarray
            only returned if raw is True
        """
        while True:
            try:
                yield self.read(raw=raw)
            except IOError:
                break

    def write(self, preamble, data, flags=None):
        """
        Write the next data record.

        Parameters
        ----------
        preamble : tuple
            (uvw, t, (i,j)), where uvw is an array of u,v,w, t is the
            Julian date, and (i,j) is an antenna pair
        data : masked array of complex
            spectra for this record
        """
        if data is None:
            return
        if flags is not None:
            flags = np.logical_not(flags)
        elif len(data.mask.shape) == 0:
            flags = np.ones(data.shape)
            # Setting this to false will instantiate the mask, but keep all values
            # unmasked (as expected)
            data.mask = False
        else:
            flags = np.logical_not(data.mask)
            data = data.data

        self.raw_write(preamble, data.astype(np.complex64), flags.astype(np.int32))

    def init_from_uv(self, uv, override={}, exclude=[]):
        """
        Initialize header items and variables from another UV.

        Those in override will be overwritten by override[k], and tracking will
        be turned off (meaning they will not be updated in pipe()). Those in
        exclude are omitted completely.

        Parameters
        ----------
        uv : UV object
            Miriad data set object to initialize from
        override : dict
            variables with values to overwrite
        exclude : list
            list of variable to exclude
        """
        for k in uv.items():
            if k in exclude:
                continue
            elif k in override:
                self._wrhd(k, override[k])
            else:
                self._wrhd(k, uv[k])

        self.vartable = {}

        for k in uv.variables():
            if k in exclude:
                continue
            elif k == "corr":
                # I don't understand why reading 'corr' segfaults miriad,
                # but it does.  This is a cludgy work-around.
                continue
            elif k in override:
                self.vartable[k] = uv.vartable[k]
                self._wrvr(k, uv.vartable[k], override[k])
            else:
                self.vartable[k] = uv.vartable[k]
                self._wrvr(k, uv.vartable[k], uv[k])
                uv.trackvr(k, "c")  # Set to copy when copyvr() called

    def pipe(self, uv, mfunc=_uv_pipe_default_action, append2hist="", raw=False):
        """
        Pipe in data from another UV.

        Uses the function ``mfunc(uv, preamble, data)``, which should return
        ``(preamble, data)``. If mfunc is not provided, the dataset will just be
        cloned, and if the returned data is None, it will be omitted.

        Parameters
        ----------
        uv : UV object
            Miriad data set object to pipe from
        mfunc : function
            function that defines how the data are piped.
            ``mfunc(uv, preamble, data)`` should return ``(preamble, data)``.
            Default is ``_uv_pipe_default_action`` which just clones the dataset.
        append2hist : str
            string to append to history
        raw : bool
            if True data and flags are piped seperately
        """
        self._wrhd("history", self["history"] + append2hist)

        if raw:
            for p, d, f in uv.all_data(raw=raw):
                np, nd, nf = mfunc(uv, p, d, f)
                self.copyvr(uv)
                self.write(np, nd, nf)
        else:
            for p, d in uv.all_data():
                np, nd = mfunc(uv, p, d)
                self.copyvr(uv)
                self.write(np, nd)

    def add_var(self, name, var_type):
        """
        Add a variable of the specified type to a UV file.

        Parameters
        ----------
        name : str
            name of header item to add
        var_type : str
            string indicating the variable type (e.g. 'a', 'i', 'd')
        """
        self.vartable[name] = var_type

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API