Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/RadioAstronomySoftwareGroup/pyuvdata
09 March 2026, 20:19:55 UTC
  • Code
  • Branches (76)
  • Releases (1)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/add_bitshuffle
    • refs/heads/add_concat_use_axis
    • refs/heads/allow-extrap-in-beam
    • refs/heads/allow_exclude_ants
    • refs/heads/allow_power_units
    • refs/heads/bitshuffle_v2
    • refs/heads/combine_uvdata_func
    • refs/heads/double_prec_uvws
    • refs/heads/faster-interp
    • refs/heads/faster-uvh5-indexing
    • refs/heads/fhd_beam_decompose
    • refs/heads/fits_speedup
    • refs/heads/fix-cst-beam-read
    • refs/heads/frame_attr
    • refs/heads/freq_avg_mem_fix
    • refs/heads/indexIng_tools
    • refs/heads/main
    • refs/heads/mem_fixes
    • refs/heads/miriad_tweaks_v3
    • refs/heads/mwa_van_vleck_2
    • refs/heads/ovro-lwa
    • refs/heads/pre-commit-ci-update-config
    • refs/heads/py03
    • refs/heads/read_ms_caltable_multiple_times
    • refs/heads/simple-set-antdiam
    • refs/heads/sma_dev
    • refs/heads/snap_convert
    • refs/heads/use_gh_cache
    • refs/tags/V2.1.5
    • refs/tags/v1.1
    • refs/tags/v1.2
    • refs/tags/v1.3
    • refs/tags/v1.4
    • refs/tags/v1.5
    • refs/tags/v2.0.0
    • refs/tags/v2.0.1
    • refs/tags/v2.0.2
    • refs/tags/v2.1.0
    • refs/tags/v2.1.1
    • refs/tags/v2.1.2
    • refs/tags/v2.1.3
    • refs/tags/v2.1.4
    • refs/tags/v2.2.0
    • refs/tags/v2.2.1
    • refs/tags/v2.2.10
    • refs/tags/v2.2.11
    • refs/tags/v2.2.12
    • refs/tags/v2.2.2
    • refs/tags/v2.2.3
    • refs/tags/v2.2.4
    • refs/tags/v2.2.5
    • refs/tags/v2.2.6
    • refs/tags/v2.2.7
    • refs/tags/v2.2.8
    • refs/tags/v2.2.9
    • refs/tags/v2.3.0
    • refs/tags/v2.3.1
    • refs/tags/v2.3.2
    • refs/tags/v2.3.3
    • refs/tags/v2.4.0
    • refs/tags/v2.4.1
    • refs/tags/v2.4.2
    • refs/tags/v2.4.3
    • refs/tags/v2.4.4
    • refs/tags/v2.4.5
    • refs/tags/v3.0.0
    • refs/tags/v3.1.0
    • refs/tags/v3.1.1
    • refs/tags/v3.1.2
    • refs/tags/v3.1.3
    • refs/tags/v3.2.0
    • refs/tags/v3.2.1
    • refs/tags/v3.2.2
    • refs/tags/v3.2.3
    • refs/tags/v3.2.4
    • refs/tags/v3.2.5
    • v1.0
  • 2da9263
  • /
  • pyuvdata
  • /
  • uvdata
  • /
  • ms.py
Raw File Download Save again
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge
swh:1:cnt:eb8bf6068ca224a73c357c78dd43bc44e2bdee4a
origin badgedirectory badge
swh:1:dir:7186d38478254f4c3d1bdebf1efd8d12ec8a74d5
origin badgerevision badge
swh:1:rev:6694e4e7426f41d8493145e3d61b49eba9dca0a9
origin badgesnapshot badge
swh:1:snp:fd3d03f3cb2a4f8d18ade43d819a91744736fa09

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: 6694e4e7426f41d8493145e3d61b49eba9dca0a9 authored by Bryna Hazelton on 21 July 2021, 15:54:00 UTC
update the changelog for v2.2.1
Tip revision: 6694e4e
ms.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""
Class for reading and writing casa measurement sets.

Requires casacore.
"""
import numpy as np
import os
import warnings
import astropy.time as time

from .uvdata import UVData
from .. import utils as uvutils

__all__ = ["MS"]


"""
This dictionary defines the mapping between CASA polarization numbers and
AIPS polarization numbers
"""
pol_dict = {
    1: 1,
    2: 2,
    3: 3,
    4: 4,
    5: -1,
    6: -3,
    7: -4,
    8: -2,
    9: -5,
    10: -7,
    11: -8,
    12: -6,
}

# convert from casa polarization integers to pyuvdata


class MS(UVData):
    """
    Defines a class for reading and writing casa measurement sets.

    This class should not be interacted with directly, instead use the read_ms
    method on the UVData class.

    Attributes
    ----------
    ms_required_extra : list of str
        Names of optional UVParameters that are required for MS
    """

    ms_required_extra = ["datacolumn", "antenna_positions"]

    def _ms_hist_to_string(self, history_table):
        """
        Convert a CASA history table into a string for the uvdata history parameter.

        Also stores messages column as a list for consitency with other uvdata types

        Parameters
        ----------
        history_table : a casa table object
            CASA table with history information.

        Returns
        -------
        str
            string containing only message column (consistent with other UVDATA
            history strings)
        str
            string enconding complete casa history table converted with a new
            line denoting rows and a ';' denoting column breaks.
        """
        # string to store just the usual uvdata history
        message_str = ""
        # string to store all the casa history info
        history_str = ""

        # Do not touch the history table if it has no information
        if history_table.nrows() > 0:
            history_str = (
                "APP_PARAMS;CLI_COMMAND;APPLICATION;MESSAGE;"
                "OBJECT_ID;OBSERVATION_ID;ORIGIN;PRIORITY;TIME\n"
            )

            app_params = history_table.getcol("APP_PARAMS")["array"]
            cli_command = history_table.getcol("CLI_COMMAND")["array"]
            application = history_table.getcol("APPLICATION")
            message = history_table.getcol("MESSAGE")
            obj_id = history_table.getcol("OBJECT_ID")
            obs_id = history_table.getcol("OBSERVATION_ID")
            origin = history_table.getcol("ORIGIN")
            priority = history_table.getcol("PRIORITY")
            times = history_table.getcol("TIME")
            # Now loop through columns and generate history string
            ntimes = len(times)
            tables = [
                app_params,
                cli_command,
                application,
                message,
                obj_id,
                obs_id,
                origin,
                priority,
                times,
            ]
            for tbrow in range(ntimes):
                message_str += str(message[tbrow])
                newline = ";".join([str(table[tbrow]) for table in tables]) + "\n"
                history_str += newline
                if tbrow < ntimes - 1:
                    message_str += "\n"

        def is_not_ascii(s):
            return any(ord(c) >= 128 for c in s)

        def find_not_ascii(s):
            output = []
            for c in s:
                if ord(c) >= 128:
                    output += c
            return output

        return message_str, history_str

    # ms write functionality to be added later.
    def write_ms(self):
        """Write ms: Not yet supported."""

    def read_ms(
        self,
        filepath,
        data_column="DATA",
        pol_order="AIPS",
        background_lsts=True,
        run_check=True,
        check_extra=True,
        run_check_acceptability=True,
        strict_uvw_antpos_check=False,
        use_old_phase=False,
        fix_phase=False,
    ):
        """
        Read in a casa measurement set.

        Parameters
        ----------
        filepath : str
            The measurement set root directory to read from.
        data_column : str
            name of CASA data column to read into data_array. Options are:
            'DATA', 'MODEL', or 'CORRECTED_DATA'
        pol_order : str
            Option to specify polarizations order convention, options are
            'CASA' or 'AIPS'.
        background_lsts : bool
            When set to True, the lst_array is calculated in a background thread.
        run_check : bool
            Option to check for the existence and proper shapes of parameters
            after after reading in the file (the default is True,
            meaning the check will be run).
        check_extra : bool
            Option to check optional parameters as well as required ones (the
            default is True, meaning the optional parameters will be checked).
        run_check_acceptability : bool
            Option to check acceptable range of the values of parameters after
            reading in the file (the default is True, meaning the acceptable
            range check will be done).
        strict_uvw_antpos_check : bool
            Option to raise an error rather than a warning if the check that
            uvws match antenna positions does not pass.

        Raises
        ------
        IOError
            If root file directory doesn't exist.
        ValueError
            If the `data_column` is not set to an allowed value.
            If the data are have multiple subarrays or are multi source or have
            multiple spectral windows.
            If the data have multiple data description ID values.

        """
        try:
            import casacore.tables as tables
        except ImportError as e:  # pragma: no cover
            raise ImportError(
                "casacore is not installed but is required for "
                "measurement set functionality"
            ) from e

        # make sure user requests a valid data_column
        if data_column not in ["DATA", "CORRECTED_DATA", "MODEL"]:
            raise ValueError(
                "Invalid data_column value supplied. Use 'Data','MODEL' or"
                " 'CORRECTED_DATA'"
            )
        if not os.path.exists(filepath):
            raise IOError(filepath + " not found")
        # set filename variable
        basename = filepath.rstrip("/")
        self.filename = [os.path.basename(basename)]
        self._filename.form = (1,)
        # set visibility units
        if data_column == "DATA":
            self.vis_units = "UNCALIB"
        elif data_column == "CORRECTED_DATA":
            self.vis_units = "JY"
        elif data_column == "MODEL":
            self.vis_units = "JY"
        # limit length of extra_keywords keys to 8 characters to match uvfits & miriad
        self.extra_keywords["DATA_COL"] = data_column
        # get frequency information from spectral window table
        tb_spws = tables.table(filepath + "/SPECTRAL_WINDOW", ack=False)
        spw_names = tb_spws.getcol("NAME")
        self.Nspws = len(spw_names)
        if self.Nspws > 1:
            raise ValueError(
                "Sorry.  Files with more than one spectral"
                "window (spw) are not yet supported. A "
                "great project for the interested student!"
            )
        freqs = tb_spws.getcol("CHAN_FREQ")
        self.freq_array = freqs
        self.Nfreqs = int(freqs.shape[1])
        self.channel_width = float(tb_spws.getcol("CHAN_WIDTH")[0, 0])
        self.Nspws = int(freqs.shape[0])

        self.spw_array = np.arange(self.Nspws)
        tb_spws.close()
        # now get the data
        tb = tables.table(filepath, ack=False)
        # check for multiple subarrays. importuvfits does not appear to
        # preserve subarray information!
        subarray = np.unique(np.int32(tb.getcol("ARRAY_ID")) - 1)
        if len(set(subarray)) > 1:
            raise ValueError(
                "This file appears to have multiple subarray "
                "values; only files with one subarray are "
                "supported."
            )
        times_unique = time.Time(
            np.unique(tb.getcol("TIME") / (3600.0 * 24.0)), format="mjd"
        ).jd

        # check for multiple data description ids (combination of spw id & pol id)
        data_desc_id = np.unique(np.int32(tb.getcol("DATA_DESC_ID") - 1))
        if len(set(data_desc_id)) > 1:
            raise ValueError(
                "This file appears to have multiple data description ID "
                "values; only files with one data description ID are "
                "supported."
            )

        self.Ntimes = int(len(times_unique))
        # FITS uvw direction convention is opposite ours and Miriad's.
        # CASA's convention is unclear: the docs contradict themselves,
        # but empirically it appears to match uvfits
        # So conjugate the visibilities and flip the uvws:
        data_array = np.conj(tb.getcol(data_column))
        self.Nblts = int(data_array.shape[0])
        flag_array = tb.getcol("FLAG")
        # CASA stores data in complex array with dimension NbltsxNfreqsxNpols
        if len(data_array.shape) == 3:
            data_array = np.expand_dims(data_array, axis=1)
            flag_array = np.expand_dims(flag_array, axis=1)
        self.data_array = data_array
        self.flag_array = flag_array
        self.Npols = int(data_array.shape[-1])
        # FITS uvw direction convention is opposite ours and Miriad's.
        # CASA's convention is unclear: the docs contradict themselves,
        # but empirically it appears to match uvfits
        # So conjugate the visibilities and flip the uvws:
        self.uvw_array = -1 * tb.getcol("UVW")
        self.ant_1_array = tb.getcol("ANTENNA1").astype(np.int32)
        self.ant_2_array = tb.getcol("ANTENNA2").astype(np.int32)
        self.Nants_data = len(
            np.unique(
                np.concatenate(
                    (np.unique(self.ant_1_array), np.unique(self.ant_2_array))
                )
            )
        )
        self.baseline_array = self.antnums_to_baseline(
            self.ant_1_array, self.ant_2_array
        )
        self.Nbls = len(np.unique(self.baseline_array))
        # Get times. MS from cotter are modified Julian dates in seconds
        # (thanks to Danny Jacobs for figuring out the proper conversion)
        self.time_array = time.Time(
            tb.getcol("TIME") / (3600.0 * 24.0), format="mjd"
        ).jd

        # Polarization array
        tb_pol = tables.table(filepath + "/POLARIZATION", ack=False)
        num_pols = tb_pol.getcol("NUM_CORR")
        # get pol setup ID from data description table
        tb_data_desc = tables.table(filepath + "/DATA_DESCRIPTION", ack=False)
        pol_id = tb_data_desc.getcol("POLARIZATION_ID")[0]
        # use an assert because I don't think it's possible to make a file
        # that fails this, but I'm not sure
        assert num_pols[pol_id] == self.Npols

        if np.unique(num_pols).size > 1:
            # use getvarcol method, which returns a dict
            pol_list = tb_pol.getvarcol("CORR_TYPE")["r" + str(pol_id + 1)][0].tolist()
        else:
            # list of lists, probably with each list corresponding to SPW.
            pol_list = tb_pol.getcol("CORR_TYPE")[pol_id]
        self.polarization_array = np.zeros(len(pol_list), dtype=np.int32)
        for polnum in range(len(pol_list)):
            self.polarization_array[polnum] = int(pol_dict[pol_list[polnum]])
        tb_pol.close()

        # Integration time
        # use first interval and assume rest are constant (though measurement
        # set has all integration times for each Nblt )
        # self.integration_time=tb.getcol('INTERVAL')[0]
        # for some reason, interval ends up larger than the difference between times...
        if len(times_unique) == 1:
            self.integration_time = np.ones_like(self.time_array, dtype=np.float64)
        else:
            # assume that all times in the file are the same size
            int_time = self._calc_single_integration_time()
            self.integration_time = (
                np.ones_like(self.time_array, dtype=np.float64) * int_time
            )
        # open table with antenna location information
        tb_ant = tables.table(filepath + "/ANTENNA", ack=False)
        tb_obs = tables.table(filepath + "/OBSERVATION", ack=False)
        self.telescope_name = tb_obs.getcol("TELESCOPE_NAME")[0]
        self.instrument = tb_obs.getcol("TELESCOPE_NAME")[0]
        tb_obs.close()
        # Use Telescopes.py dictionary to set array position
        full_antenna_positions = tb_ant.getcol("POSITION")
        xyz_telescope_frame = tb_ant.getcolkeyword("POSITION", "MEASINFO")["Ref"]
        ant_flags = np.empty(len(full_antenna_positions), dtype=bool)
        ant_flags[:] = False
        for antnum in range(len(ant_flags)):
            ant_flags[antnum] = np.all(full_antenna_positions[antnum, :] == 0)
        if xyz_telescope_frame == "ITRF":
            self.telescope_location = np.array(
                np.mean(full_antenna_positions[np.invert(ant_flags), :], axis=0)
            )
        if self.telescope_location is None:
            try:
                self.set_telescope_params()
            except ValueError:
                warnings.warn(
                    "Telescope frame is not ITRF and telescope is not "
                    "in known_telescopes, so telescope_location is not set."
                )

        # antenna names
        ant_names = tb_ant.getcol("STATION")
        ant_diams = tb_ant.getcol("DISH_DIAMETER")

        self.antenna_diameters = ant_diams[ant_diams > 0]

        self.Nants_telescope = len(ant_flags[np.invert(ant_flags)])
        test_name = ant_names[0]
        names_same = True
        for antnum in range(len(ant_names)):
            if not (ant_names[antnum] == test_name):
                names_same = False
        if not (names_same):
            # cotter measurement sets store antenna names in the NAMES column.
            self.antenna_names = ant_names
        else:
            # importuvfits measurement sets store antenna names in the STATION column.
            self.antenna_names = tb_ant.getcol("NAME")
        self.antenna_numbers = np.arange(len(self.antenna_names)).astype(int)
        ant_names = []
        for ant_num in range(len(self.antenna_names)):
            if not (ant_flags[ant_num]):
                ant_names.append(self.antenna_names[ant_num])
        self.antenna_names = ant_names
        self.antenna_numbers = self.antenna_numbers[np.invert(ant_flags)]

        relative_positions = np.zeros_like(full_antenna_positions)
        relative_positions = full_antenna_positions - self.telescope_location.reshape(
            1, 3
        )
        self.antenna_positions = relative_positions[np.invert(ant_flags), :]

        tb_ant.close()
        tb_field = tables.table(filepath + "/FIELD", ack=False)

        # Error if the phase_dir has a polynomial term because we don't know
        # how to handle that
        message = (
            "PHASE_DIR is expressed as a polynomial. "
            "We do not currently support this mode, please make an issue."
        )
        assert tb_field.getcol("PHASE_DIR").shape[1] == 1, message

        self.phase_type = "phased"
        # MSv2.0 appears to assume J2000. Not sure how to specifiy otherwise
        epoch_string = tb.getcolkeyword("UVW", "MEASINFO")["Ref"]
        # for measurement sets made with COTTER, this keyword is ITRF
        # instead of the epoch
        if epoch_string == "ITRF":
            warnings.warn(
                "ITRF coordinate frame detected, although within cotter this is "
                "synonymous with J2000. Assuming J2000 coordinate frame."
            )
            self.phase_center_frame = "fk5"
            self.phase_center_epoch = 2000.0
        elif epoch_string == "J2000":
            # In CASA 'J2000' refers to a specific frame -- FK5 w/ an epoch of
            # J2000. We'll plug that in here directly, noting that CASA has an
            # explicit list of supported reference frames, located here:
            # casa.nrao.edu/casadocs/casa-5.0.0/reference-material/coordinate-frames
            self.phase_center_frame = "fk5"
            self.phase_center_epoch = 2000.0
        self.phase_center_ra = float(tb_field.getcol("PHASE_DIR")[0][0][0])
        self.phase_center_dec = float(tb_field.getcol("PHASE_DIR")[0][0][1])
        self._set_phased()

        # set LST array from times and itrf
        proc = self.set_lsts_from_time_array(background=background_lsts)

        # set the history parameter
        _, self.history = self._ms_hist_to_string(
            tables.table(filepath + "/HISTORY", ack=False)
        )
        # CASA weights column keeps track of number of data points averaged.

        if not uvutils._check_history_version(self.history, self.pyuvdata_version_str):
            self.history += self.pyuvdata_version_str
        # 'WEIGHT_SPECTRUM' is optional - some files may not have per-channel values
        if "WEIGHT_SPECTRUM" in tb.colnames():
            self.nsample_array = tb.getcol("WEIGHT_SPECTRUM")
        else:
            self.nsample_array = tb.getcol("WEIGHT")
            # Propagate the weights in frequency
            self.nsample_array = np.stack(
                [self.nsample_array for chan in range(self.Nfreqs)], axis=1
            )
        if len(self.nsample_array.shape) == 3:
            self.nsample_array = np.expand_dims(self.nsample_array, axis=1)
        self.object_name = tb_field.getcol("NAME")[0]
        tb_field.close()
        tb.close()

        if proc is not None:
            proc.join()
        # Fill in the apparent coordinates here
        self._set_app_coords_helper()

        # order polarizations
        self.reorder_pols(order=pol_order)
        if run_check:
            self.check(
                check_extra=check_extra,
                run_check_acceptability=run_check_acceptability,
                strict_uvw_antpos_check=strict_uvw_antpos_check,
            )

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API