Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision d745e74710ab581d489e815095d0dd4ee91e9c35 authored by Bryna Hazelton on 15 September 2025, 18:00:43 UTC, committed by Jonathan Pober on 15 September 2025, 18:31:58 UTC
remove macos-13 from our CI matrix because it is deprecated
1 parent ea19da5
  • Files
  • Changes
  • 0617af3
  • /
  • tests
  • /
  • utils
  • /
  • test_uvcalibrate.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • directory
  • content
revision badge
swh:1:rev:d745e74710ab581d489e815095d0dd4ee91e9c35
directory badge
swh:1:dir:44d91f34b75d4752e8467339185a98db104ff524
content badge
swh:1:cnt:35beb589a0cee935c5d89e46aa72b5c9fe5a566b

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • revision
  • directory
  • content
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_uvcalibrate.py
# Copyright (c) 2024 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for uvcalibrate function."""

import re
from types import SimpleNamespace

import numpy as np
import pytest

from pyuvdata import UVCal, utils
from pyuvdata.datasets import fetch_data
from pyuvdata.testing import check_warnings
from pyuvdata.utils import uvcalibrate
from pyuvdata.utils.uvcalibrate import _get_pol_conventions


class TestGetPolConventions:
    def tets_nothing_specified(self):
        with check_warnings(
            UserWarning,
            match=[
                "pol_convention is not specified on the UVCal object",
                "Neither uvd_pol_convention not uvc_pol_convention are specified",
            ],
        ):
            uvc, uvd = _get_pol_conventions(
                uvdata=SimpleNamespace(pol_convention=None),
                uvcal=SimpleNamespace(pol_convention=None),
                undo=False,
                uvc_pol_convention=None,
                uvd_pol_convention=None,
            )
        assert uvc is None
        assert uvd is None

    def test_uvc_pol_convention_set(self):
        uvc, uvd = _get_pol_conventions(
            uvdata=SimpleNamespace(pol_convention=None),
            uvcal=SimpleNamespace(pol_convention="avg"),
            undo=False,
            uvc_pol_convention=None,
            uvd_pol_convention=None,
        )
        assert uvc == "avg"
        assert uvd == "avg"

    def test_uvc_uvcal_different(self):
        with pytest.raises(
            ValueError, match="uvc_pol_convention is set, and different"
        ):
            _get_pol_conventions(
                uvdata=SimpleNamespace(pol_convention=None),
                uvcal=SimpleNamespace(pol_convention="sum"),
                undo=False,
                uvc_pol_convention="avg",
                uvd_pol_convention=None,
            )

    def test_uvd_nor_uvdata_set(self):
        with pytest.warns(
            UserWarning, match="pol_convention is not specified on the UVData object"
        ):
            uvc, uvd = _get_pol_conventions(
                uvdata=SimpleNamespace(pol_convention=None),
                uvcal=SimpleNamespace(pol_convention="avg"),
                undo=True,
                uvc_pol_convention=None,
                uvd_pol_convention=None,
            )
        assert uvc == "avg"
        assert uvd == "avg"

    @pytest.mark.parametrize("undo", [True, False])
    def test_only_objects_set(self, undo):
        uvc, uvd = _get_pol_conventions(
            uvdata=SimpleNamespace(pol_convention="sum" if undo else None),
            uvcal=SimpleNamespace(pol_convention="avg"),
            undo=undo,
            uvc_pol_convention=None,
            uvd_pol_convention=None if undo else "sum",
        )
        assert uvc == "avg"
        assert uvd == "sum"

    def test_uvd_uvdata_different(self):
        with pytest.raises(
            ValueError, match="Both uvd_pol_convention and uvdata.pol_convention"
        ):
            _get_pol_conventions(
                uvdata=SimpleNamespace(pol_convention="sum"),
                uvcal=SimpleNamespace(pol_convention="avg"),
                undo=True,
                uvc_pol_convention="avg",
                uvd_pol_convention="avg",
            )

    def test_calibrate_already_calibrated(self):
        with pytest.raises(
            ValueError, match="You are trying to calibrate already-calibrated data"
        ):
            _get_pol_conventions(
                uvdata=SimpleNamespace(pol_convention="avg"),
                uvcal=SimpleNamespace(pol_convention="avg"),
                undo=False,
                uvc_pol_convention="avg",
                uvd_pol_convention="avg",
            )

    @pytest.mark.parametrize("which", ["uvc", "uvd"])
    def test_bad_convention(self, which):
        good = SimpleNamespace(pol_convention="avg")
        bad = SimpleNamespace(pol_convention="what a silly convention")

        with pytest.raises(
            ValueError, match=f"{which}_pol_convention must be 'sum' or 'avg'"
        ):
            _get_pol_conventions(
                uvdata=good if which == "uvc" else bad,
                uvcal=good if which == "uvd" else bad,
                undo=True,
                uvc_pol_convention=None,
                uvd_pol_convention=None,
            )


@pytest.mark.filterwarnings("ignore:Fixing auto-correlations to be be real-only,")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
@pytest.mark.filterwarnings("ignore:telescope_location, antenna_positions")
def test_uvcalibrate_apply_gains_oldfiles(uvcalibrate_uvdata_oldfiles):
    # read data
    uvd = uvcalibrate_uvdata_oldfiles

    # give it an x_orientation
    uvd.telescope.set_feeds_from_x_orientation(
        x_orientation="east",
        polarization_array=uvd.polarization_array,
        flex_polarization_array=uvd.flex_spw_polarization_array,
    )

    uvc = UVCal()
    uvc.read_calfits(fetch_data("hera_omnical2"))
    # downselect to match each other in shape (but not in actual values!)
    uvd.select(frequencies=uvd.freq_array[:10])
    uvc.select(times=uvc.time_array[:3])
    uvc.gain_scale = "Jy"
    uvc.pol_convention = "avg"

    with pytest.raises(
        ValueError,
        match=re.escape(
            "All antenna names with data on UVData are missing "
            "on UVCal. To continue with calibration "
            "(and flag all the data), set ant_check=False."
        ),
    ):
        uvcalibrate(uvd, uvc, prop_flags=True, ant_check=True, inplace=False)

    ants_expected = [
        "The uvw_array does not match the expected values",
        "All antenna names with data on UVData are missing "
        "on UVCal. Since ant_check is False, calibration will "
        "proceed but all data will be flagged.",
    ]
    missing_times = [2457698.4036761867, 2457698.4038004624]

    time_expected = f"Time {missing_times[0]} exists on UVData but not on UVCal."

    freq_expected = f"Frequency {uvd.freq_array[0]} exists on UVData but not on UVCal."

    with (
        check_warnings(UserWarning, match=ants_expected),
        pytest.raises(ValueError, match=time_expected),
    ):
        uvcalibrate(uvd, uvc, prop_flags=True, ant_check=False, inplace=False)

    uvc.select(times=uvc.time_array[0])

    time_expected = [
        "Times do not match between UVData and UVCal but time_check is False, so "
        "calibration will be applied anyway."
    ]

    with (
        check_warnings(UserWarning, match=ants_expected + time_expected),
        pytest.raises(ValueError, match=freq_expected),
    ):
        uvcalibrate(uvd, uvc, prop_flags=True, ant_check=False, time_check=False)


@pytest.mark.filterwarnings("ignore:Fixing auto-correlations to be be real-only,")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
@pytest.mark.filterwarnings("ignore:telescope_location, antenna_positions")
def test_uvcalibrate_delay_oldfiles(uvcalibrate_uvdata_oldfiles):
    uvd = uvcalibrate_uvdata_oldfiles

    uvc = UVCal()
    uvc.read_calfits(fetch_data("hera_firstcal_delay"))
    # downselect to match
    uvc.select(times=uvc.time_array[3])
    uvc.gain_convention = "multiply"
    uvc.pol_convention = "avg"
    uvc.gain_scale = "Jy"

    freq_array_use = np.squeeze(uvd.freq_array)
    chan_with_use = uvd.channel_width

    ant_expected = [
        "The uvw_array does not match the expected values",
        "All antenna names with data on UVData are missing "
        "on UVCal. Since ant_check is False, calibration will "
        "proceed but all data will be flagged.",
        "Times do not match between UVData and UVCal but time_check is False, so "
        "calibration will be applied anyway.",
        r"UVData object does not have `x_orientation` specified but UVCal does",
    ]
    with check_warnings(UserWarning, match=ant_expected):
        uvdcal = uvcalibrate(
            uvd, uvc, prop_flags=False, ant_check=False, time_check=False, inplace=False
        )

    uvc.convert_to_gain(freq_array=freq_array_use, channel_width=chan_with_use)
    with check_warnings(UserWarning, match=ant_expected):
        uvdcal2 = uvcalibrate(
            uvd, uvc, prop_flags=False, ant_check=False, time_check=False, inplace=False
        )

    assert uvdcal == uvdcal2


@pytest.mark.filterwarnings("ignore:Fixing auto-correlations to be be real-only,")
@pytest.mark.parametrize("flip_gain_conj", [False, True])
@pytest.mark.parametrize("gain_convention", ["divide", "multiply"])
@pytest.mark.parametrize("time_range", [None, "Ntimes", 3])
def test_uvcalibrate(uvcalibrate_data, flip_gain_conj, gain_convention, time_range):
    uvd, uvc = uvcalibrate_data

    if time_range is not None:
        tstarts = uvc.time_array - uvc.integration_time / (86400 * 2)
        tends = uvc.time_array + uvc.integration_time / (86400 * 2)
        if time_range == "Ntimes":
            uvc.time_range = np.stack((tstarts, tends), axis=1)
        else:
            nt_per_range = int(np.ceil(uvc.Ntimes / time_range))
            tstart_inds = np.array(np.arange(time_range) * nt_per_range)
            tstarts_use = tstarts[tstart_inds]
            tend_inds = np.array((np.arange(time_range) + 1) * nt_per_range - 1)
            tend_inds[-1] = -1
            tends_use = tends[tend_inds]
            uvc.select(times=uvc.time_array[0:time_range])
            uvc.time_range = np.stack((tstarts_use, tends_use), axis=1)
        uvc.time_array = None
        uvc.lst_array = None
        uvc.set_lsts_from_time_array()

    uvc.gain_convention = gain_convention

    if gain_convention == "divide":
        # set the gain_scale to None to test handling
        uvc.gain_scale = None
        cal_warn_msg = [
            "gain_scale is not set, so there is no way to know",
            "gain_scale should be set if pol_convention is set",
        ]
        cal_warn_type = UserWarning
        undo_warn_msg = [
            "pol_convention is not specified on the UVData object",
            "gain_scale is not set, so there is no way to know",
            "gain_scale should be set if pol_convention is set",
        ]
        undo_warn_type = [UserWarning, UserWarning, UserWarning]
    else:
        cal_warn_msg = ""
        cal_warn_type = None
        undo_warn_msg = ""
        undo_warn_type = None

    with check_warnings(cal_warn_type, match=cal_warn_msg):
        uvdcal = uvcalibrate(uvd, uvc, inplace=False, flip_gain_conj=flip_gain_conj)
    if gain_convention == "divide":
        assert uvdcal.vis_units == "uncalib"
    else:
        assert uvdcal.vis_units == "Jy"

    key = (1, 13, "xx")
    ant1 = (1, "Jxx")
    ant2 = (13, "Jxx")

    if flip_gain_conj:
        gain_product = (uvc.get_gains(ant1).conj() * uvc.get_gains(ant2)).T
    else:
        gain_product = (uvc.get_gains(ant1) * uvc.get_gains(ant2).conj()).T

    if time_range is not None and time_range != "Ntimes":
        gain_product = gain_product[:, np.newaxis]
        gain_product = np.repeat(gain_product, nt_per_range, axis=1)
        current_shape = gain_product.shape
        new_shape = (current_shape[0] * current_shape[1], current_shape[-1])
        gain_product = gain_product.reshape(new_shape)
        gain_product = gain_product[: uvd.Ntimes]

    if gain_convention == "divide":
        np.testing.assert_array_almost_equal(
            uvdcal.get_data(key), uvd.get_data(key) / gain_product
        )
    else:
        np.testing.assert_array_almost_equal(
            uvdcal.get_data(key), uvd.get_data(key) * gain_product
        )

    # test undo
    with check_warnings(undo_warn_type, match=undo_warn_msg):
        uvdcal = uvcalibrate(
            uvdcal,
            uvc,
            prop_flags=True,
            ant_check=False,
            inplace=False,
            undo=True,
            flip_gain_conj=flip_gain_conj,
        )

    np.testing.assert_array_almost_equal(uvd.get_data(key), uvdcal.get_data(key))
    assert uvdcal.vis_units == "uncalib"


@pytest.mark.filterwarnings("ignore:Combined frequencies are separated by more than")
def test_uvcalibrate_dterm_handling(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # test d-term exception
    with pytest.raises(
        ValueError, match="Cannot apply D-term calibration without -7 or -8"
    ):
        uvcalibrate(uvd, uvc, d_term_cal=True)

    # d-term not implemented error
    uvcDterm = uvc.copy()
    uvcDterm.jones_array = np.array([-7, -8])
    uvcDterm = uvc + uvcDterm
    with pytest.raises(
        NotImplementedError, match="D-term calibration is not yet implemented."
    ):
        uvcalibrate(uvd, uvcDterm, d_term_cal=True)


@pytest.mark.filterwarnings("ignore:Changing number of antennas, but preserving")
def test_uvcalibrate_flag_propagation(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # test flag propagation
    uvc.flag_array[0] = True
    uvc.gain_array[1] = 0.0
    uvdcal = uvcalibrate(uvd, uvc, prop_flags=True, ant_check=False, inplace=False)

    assert np.all(uvdcal.get_flags(1, 13, "xx"))  # assert completely flagged
    assert np.all(uvdcal.get_flags(0, 12, "xx"))  # assert completely flagged
    np.testing.assert_array_almost_equal(
        uvd.get_data(1, 13, "xx"), uvdcal.get_data(1, 13, "xx")
    )
    np.testing.assert_array_almost_equal(
        uvd.get_data(0, 12, "xx"), uvdcal.get_data(0, 12, "xx")
    )

    uvc_sub = uvc.select(antenna_nums=[1, 12], inplace=False)

    uvdata_unique_nums = np.unique(np.append(uvd.ant_1_array, uvd.ant_2_array))
    uvd.telescope.antenna_names = np.array(uvd.telescope.antenna_names)
    missing_ants = uvdata_unique_nums.tolist()
    missing_ants.remove(1)
    missing_ants.remove(12)
    missing_ant_names = [
        uvd.telescope.antenna_names[
            np.where(uvd.telescope.antenna_numbers == antnum)[0][0]
        ]
        for antnum in missing_ants
    ]

    exp_err = (
        f"Antennas {missing_ant_names} have data on UVData but "
        "are missing on UVCal. To continue calibration and "
        "flag the data from missing antennas, set ant_check=False."
    )

    with pytest.raises(ValueError) as errinfo:
        uvdcal = uvcalibrate(
            uvd, uvc_sub, prop_flags=True, ant_check=True, inplace=False
        )

    assert exp_err == str(errinfo.value)

    uvc_sub.gain_scale = "Jy"
    with pytest.warns(UserWarning) as warninfo:
        uvdcal = uvcalibrate(
            uvd, uvc_sub, prop_flags=True, ant_check=False, inplace=False
        )
    warns = {warn.message.args[0] for warn in warninfo}
    ant_expected = {
        f"Antennas {missing_ant_names} have data on UVData but are missing "
        "on UVCal. Since ant_check is False, calibration will "
        "proceed and the data for these antennas will be flagged."
    }

    assert warns == ant_expected
    assert np.all(uvdcal.get_flags(13, 24, "xx"))  # assert completely flagged


@pytest.mark.filterwarnings("ignore:Cannot preserve total_quality_array")
def test_uvcalibrate_flag_propagation_name_mismatch(uvcalibrate_init_data):
    uvd, uvc = uvcalibrate_init_data
    uvc.gain_scale = "Jy"

    # test flag propagation
    uvc.flag_array[0] = True
    uvc.gain_array[1] = 0.0
    with pytest.raises(
        ValueError,
        match=re.escape(
            "All antenna names with data on UVData are missing "
            "on UVCal. To continue with calibration "
            "(and flag all the data), set ant_check=False."
        ),
    ):
        uvdcal = uvcalibrate(uvd, uvc, prop_flags=True, ant_check=True, inplace=False)

    with check_warnings(
        UserWarning,
        match="All antenna names with data on UVData are missing "
        "on UVCal. Since ant_check is False, calibration will "
        "proceed but all data will be flagged.",
    ):
        uvdcal = uvcalibrate(uvd, uvc, prop_flags=True, ant_check=False, inplace=False)

    assert np.all(uvdcal.get_flags(1, 13, "xx"))  # assert completely flagged
    assert np.all(uvdcal.get_flags(0, 12, "xx"))  # assert completely flagged
    np.testing.assert_array_almost_equal(
        uvd.get_data(1, 13, "xx"), uvdcal.get_data(1, 13, "xx")
    )
    np.testing.assert_array_almost_equal(
        uvd.get_data(0, 12, "xx"), uvdcal.get_data(0, 12, "xx")
    )


def test_uvcalibrate_extra_cal_antennas(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # remove some antennas from the data
    uvd.select(antenna_nums=[0, 1, 12, 13])

    uvdcal = uvcalibrate(uvd, uvc, inplace=False)

    key = (1, 13, "xx")
    ant1 = (1, "Jxx")
    ant2 = (13, "Jxx")

    np.testing.assert_array_almost_equal(
        uvdcal.get_data(key),
        uvd.get_data(key) / (uvc.get_gains(ant1) * uvc.get_gains(ant2).conj()).T,
    )


def test_uvcalibrate_antenna_names_mismatch(uvcalibrate_init_data):
    uvd, uvc = uvcalibrate_init_data
    uvc.gain_scale = "Jy"

    with pytest.raises(
        ValueError,
        match=re.escape(
            "All antenna names with data on UVData are missing "
            "on UVCal. To continue with calibration "
            "(and flag all the data), set ant_check=False."
        ),
    ):
        uvcalibrate(uvd, uvc, inplace=False)

    # now test that they're all flagged if ant_check is False
    with check_warnings(
        UserWarning,
        match="All antenna names with data on UVData are missing "
        "on UVCal. Since ant_check is False, calibration will "
        "proceed but all data will be flagged.",
    ):
        uvdcal = uvcalibrate(uvd, uvc, ant_check=False, inplace=False)

    assert np.all(uvdcal.flag_array)  # assert completely flagged


@pytest.mark.parametrize("time_range", [True, False])
def test_uvcalibrate_time_mismatch(uvcalibrate_data, time_range):
    uvd, uvc = uvcalibrate_data
    uvc.gain_scale = "Jy"
    if time_range:
        tstarts = uvc.time_array - uvc.integration_time / (86400 * 2)
        tends = uvc.time_array + uvc.integration_time / (86400 * 2)
        original_time_range = np.stack((tstarts, tends), axis=1)
        uvc.time_range = original_time_range
        uvc.time_array = None
        uvc.lst_array = None
        uvc.set_lsts_from_time_array()

    # change times to get warnings
    if time_range:
        uvc.time_range = uvc.time_range + 1
        uvc.set_lsts_from_time_array()
        expected_err = "Time_ranges on UVCal do not cover all UVData times."
        with pytest.raises(ValueError, match=expected_err):
            uvcalibrate(uvd, uvc, inplace=False)
    else:
        uvc.time_array = uvc.time_array + 1
        uvc.set_lsts_from_time_array()
        expected_err = {
            f"Time {this_time} exists on UVData but not on UVCal."
            for this_time in np.unique(uvd.time_array)
        }

        with pytest.raises(ValueError) as errinfo:
            uvcalibrate(uvd, uvc, inplace=False)
        assert str(errinfo.value) in expected_err

    # for time_range, make the time ranges not cover some UVData times
    if time_range:
        uvc.time_range = original_time_range
        uvc.time_range[0, 1] = uvc.time_range[0, 0] + uvc.integration_time[0] / (
            86400 * 4
        )
        uvc.set_lsts_from_time_array()
        with pytest.raises(ValueError, match=expected_err):
            uvcalibrate(uvd, uvc, inplace=False)

        uvc.phase_center_id_array = np.arange(uvc.Ntimes)
        uvc.phase_center_catalog = {0: None}
        uvc.select(phase_center_ids=0)
        with check_warnings(
            UserWarning, match="Time_range on UVCal does not cover all UVData times"
        ):
            _ = uvcalibrate(uvd, uvc, inplace=False, time_check=False)


def test_uvcalibrate_time_wrong_size(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # downselect by one time to get error
    uvc.select(times=uvc.time_array[1:])
    with pytest.raises(
        ValueError,
        match="The uvcal object has more than one time but fewer than the "
        "number of unique times on the uvdata object.",
    ):
        uvcalibrate(uvd, uvc, inplace=False)


@pytest.mark.filterwarnings("ignore:The time_array and time_range attributes")
@pytest.mark.filterwarnings("ignore:The lst_array and lst_range attributes")
@pytest.mark.parametrize("time_range", [True, False])
def test_uvcalibrate_single_time_types(uvcalibrate_data, time_range):
    uvd, uvc = uvcalibrate_data

    # only one time
    uvc.select(times=uvc.time_array[0])
    if time_range:
        # check cal runs fine with a good time range
        uvc.time_range = np.reshape(
            np.array([np.min(uvd.time_array), np.max(uvd.time_array)]), (1, 2)
        )
        uvc.set_lsts_from_time_array()
        with pytest.raises(
            ValueError, match="The time_array and time_range attributes are both set"
        ):
            uvdcal = uvcalibrate(uvd, uvc, inplace=False, time_check=False)
        uvc.time_array = uvc.lst_array = None
        uvdcal = uvcalibrate(uvd, uvc, inplace=False)

        key = (1, 13, "xx")
        ant1 = (1, "Jxx")
        ant2 = (13, "Jxx")

        np.testing.assert_array_almost_equal(
            uvdcal.get_data(key),
            uvd.get_data(key) / (uvc.get_gains(ant1) * uvc.get_gains(ant2).conj()).T,
        )

        # then change time_range to get warnings
        uvc.time_range = np.array(uvc.time_range) + 1
        uvc.set_lsts_from_time_array()

    if time_range:
        msg_start = "Time_range on UVCal does not cover all UVData times"
    else:
        msg_start = "Times do not match between UVData and UVCal"
    err_msg = msg_start + ". Set time_check=False to apply calibration anyway."
    warn_msg = [
        msg_start + " but time_check is False, so calibration will be applied anyway."
    ]

    with pytest.raises(ValueError, match=err_msg):
        uvcalibrate(uvd, uvc, inplace=False)

    if not time_range:
        with check_warnings(UserWarning, match=warn_msg):
            uvdcal = uvcalibrate(uvd, uvc, inplace=False, time_check=False)

        key = (1, 13, "xx")
        ant1 = (1, "Jxx")
        ant2 = (13, "Jxx")

        np.testing.assert_array_almost_equal(
            uvdcal.get_data(key),
            uvd.get_data(key) / (uvc.get_gains(ant1) * uvc.get_gains(ant2).conj()).T,
        )


@pytest.mark.filterwarnings("ignore:Combined frequencies are separated by more than")
def test_uvcalibrate_extra_cal_times(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    uvc2 = uvc.copy()
    uvc2.time_array = uvc.time_array + 1
    uvc2.set_lsts_from_time_array()
    uvc_use = uvc + uvc2

    uvdcal = uvcalibrate(uvd, uvc_use, inplace=False)

    key = (1, 13, "xx")
    ant1 = (1, "Jxx")
    ant2 = (13, "Jxx")

    np.testing.assert_array_almost_equal(
        uvdcal.get_data(key),
        uvd.get_data(key) / (uvc.get_gains(ant1) * uvc.get_gains(ant2).conj()).T,
    )


def test_uvcalibrate_freq_mismatch(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # change some frequencies to get warnings
    maxf = np.max(uvc.freq_array)
    uvc.freq_array[uvc.Nfreqs // 2 :] = uvc.freq_array[uvc.Nfreqs // 2 :] + maxf
    expected_err = {
        f"Frequency {this_freq} exists on UVData but not on UVCal."
        for this_freq in uvd.freq_array[uvd.Nfreqs // 2 :]
    }
    # structured this way rather than using the match parameter because expected_err
    # is a set.
    with pytest.raises(ValueError) as errinfo:
        uvcalibrate(uvd, uvc, inplace=False)
    assert str(errinfo.value) in expected_err


@pytest.mark.filterwarnings("ignore:Combined frequencies are not evenly spaced.")
@pytest.mark.filterwarnings("ignore:Selected frequencies are not contiguous.")
def test_uvcalibrate_extra_cal_freqs(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    uvc2 = uvc.copy()
    uvc2.freq_array = uvc.freq_array + np.max(uvc.freq_array)
    uvc_use = uvc + uvc2

    uvdcal = uvcalibrate(uvd, uvc_use, inplace=False)

    key = (1, 13, "xx")
    ant1 = (1, "Jxx")
    ant2 = (13, "Jxx")

    np.testing.assert_array_almost_equal(
        uvdcal.get_data(key),
        uvd.get_data(key) / (uvc.get_gains(ant1) * uvc.get_gains(ant2).conj()).T,
    )


def test_uvcalibrate_feedpol_mismatch(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # downselect the feed polarization to get warnings
    uvc.select(
        jones=utils.jstr2num(
            "Jnn", x_orientation=uvc.telescope.get_x_orientation_from_feeds()
        )
    )
    with pytest.raises(
        ValueError, match=("Feed polarization e exists on UVData but not on UVCal.")
    ):
        uvcalibrate(uvd, uvc, inplace=False)


def test_uvcalibrate_x_orientation_mismatch(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # next check None uvd_x
    uvd.telescope.set_feeds_from_x_orientation(None)
    uvc.telescope.set_feeds_from_x_orientation(
        "east", polarization_array=uvc.jones_array
    )
    with pytest.warns(
        UserWarning,
        match=r"UVData object does not have `x_orientation` specified but UVCal does",
    ):
        uvcalibrate(uvd, uvc, inplace=False)


def test_uvcalibrate_wideband_gain(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    uvc.flex_spw_id_array = None
    uvc._set_wide_band()
    uvc.spw_array = np.array([1, 2, 3])
    uvc.Nspws = 3
    uvc.gain_array = uvc.gain_array[:, 0:3, :, :]
    uvc.flag_array = uvc.flag_array[:, 0:3, :, :]
    uvc.quality_array = uvc.quality_array[:, 0:3, :, :]
    uvc.total_quality_array = uvc.total_quality_array[0:3, :, :]

    uvc.freq_range = np.zeros((uvc.Nspws, 2), dtype=uvc.freq_array.dtype)
    uvc.freq_range[0, :] = uvc.freq_array[[0, 2]]
    uvc.freq_range[1, :] = uvc.freq_array[[2, 4]]
    uvc.freq_range[2, :] = uvc.freq_array[[4, 6]]

    uvc.channel_width = None
    uvc.freq_array = None
    uvc.Nfreqs = 1

    uvc.check()
    with pytest.raises(
        ValueError,
        match="uvcalibrate currently does not support wide-band calibrations",
    ):
        uvcalibrate(uvd, uvc, inplace=False)


@pytest.mark.filterwarnings("ignore:Fixing auto-correlations to be be real-only")
@pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected values")
@pytest.mark.filterwarnings("ignore:Nfreqs will be required to be 1 for wide_band cals")
@pytest.mark.filterwarnings("ignore:telescope_location, antenna_positions")
def test_uvcalibrate_delay_multispw(uvcalibrate_uvdata_oldfiles):
    uvd = uvcalibrate_uvdata_oldfiles

    uvc = UVCal()
    uvc.read_calfits(fetch_data("hera_firstcal_delay"))
    # downselect to match
    uvc.select(times=uvc.time_array[3])
    uvc.gain_convention = "multiply"

    uvc.Nspws = 3
    uvc.spw_array = np.array([1, 2, 3])

    # copy the delay array to the second SPW
    uvc.delay_array = np.repeat(uvc.delay_array, uvc.Nspws, axis=1)
    uvc.flag_array = np.repeat(uvc.flag_array, uvc.Nspws, axis=1)
    uvc.quality_array = np.repeat(uvc.quality_array, uvc.Nspws, axis=1)

    uvc.freq_range = np.repeat(uvc.freq_range, uvc.Nspws, axis=0)
    # Make the second & third SPWs be contiguous with a 10 MHz range
    uvc.freq_range[1, 0] = uvc.freq_range[0, 1]
    uvc.freq_range[1, 1] = uvc.freq_range[1, 0] + 10e6
    uvc.freq_range[2, 0] = uvc.freq_range[1, 1]
    uvc.freq_range[2, 1] = uvc.freq_range[1, 1] + 10e6

    uvc.check()
    with pytest.raises(
        ValueError,
        match="uvcalibrate currently does not support multi spectral window delay "
        "calibrations",
    ):
        uvcalibrate(uvd, uvc, inplace=False)


@pytest.mark.filterwarnings(
    "ignore:pol_convention is not specified on the UVCal object"
)
@pytest.mark.filterwarnings(
    "ignore:pol_convention is not specified on the UVData object"
)
@pytest.mark.filterwarnings(
    "ignore:Neither uvd_pol_convention nor uvc_pol_convention are specified"
)
@pytest.mark.parametrize("convention_on_object", [True, False])
@pytest.mark.parametrize("uvc_pol_convention", ["sum", "avg", None])
@pytest.mark.parametrize("uvd_pol_convention", ["sum", "avg", None])
@pytest.mark.parametrize("polkind", ["linear", "circular"])  # stokes not possible yet
def test_uvcalibrate_pol_conventions(
    uvcalibrate_data,
    convention_on_object,
    uvc_pol_convention,
    uvd_pol_convention,
    polkind,
):
    uvd, uvc = uvcalibrate_data

    # Set defaults
    uvd.pol_convention = None
    uvc.pol_convention = None
    uvc.gain_array[:] = 1.0
    uvd.data_array[:] = 1.0

    # if polkind=='stokes':
    #     uvd.polarization_array = np.array([1,2])
    #     uvc.jones_array = -np.array([5,6])
    if polkind == "circular":
        uvd.polarization_array = -np.array([1, 2])
        uvc.jones_array = -np.array([1, 2])
    else:
        uvd.polarization_array = -np.array([5, 6])
        uvc.jones_array = -np.array([5, 6])

    uvdpol = uvd_pol_convention

    if convention_on_object:
        uvc.pol_convention = uvc_pol_convention
        uvcpol = None
    else:
        uvcpol = uvc_pol_convention

    # go forwards and back
    calib = uvcalibrate(
        uvd,
        uvc,
        uvd_pol_convention=uvdpol,
        uvc_pol_convention=uvcpol,
        undo=False,
        inplace=False,
    )
    roundtrip = uvcalibrate(
        calib,
        uvc,
        uvd_pol_convention=uvdpol,
        uvc_pol_convention=uvcpol,
        undo=True,
        inplace=False,
    )

    assert calib.pol_convention == uvd_pol_convention or uvc_pol_convention
    assert roundtrip.pol_convention is None

    # Check we went around the loop properly.
    np.testing.assert_allclose(
        roundtrip.data_array,
        uvd.data_array,
        rtol=uvd._data_array.tols[0],
        atol=uvd._data_array.tols[1],
    )

    if (
        uvc_pol_convention == uvd_pol_convention
        or uvc_pol_convention is None
        or uvd_pol_convention is None
    ):
        np.testing.assert_almost_equal(calib.data_array, 1.0)
    else:
        if uvc_pol_convention == "sum":
            # Then uvd pol convention is 'avg', so it has I = (XX+YY)/2, i.e. XX ~ I,
            # but the cal intrinsically assumed that I = (XX+YY), i.e. XX ~ I/2.
            # Therefore, the result should be 2.0
            np.testing.assert_allclose(
                calib.data_array,
                2.0,
                rtol=uvd._data_array.tols[0],
                atol=uvd._data_array.tols[1],
            )
        else:
            # the opposite
            np.testing.assert_allclose(
                calib.data_array,
                0.5,
                rtol=uvd._data_array.tols[0],
                atol=uvd._data_array.tols[1],
            )


def test_gain_scale_wrong(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    uvc.gain_scale = "mK"
    uvd.vis_units = "Jy"

    with pytest.raises(
        ValueError, match="Cannot undo calibration if gain_scale is not the same"
    ):
        uvcalibrate(uvd, uvc, undo=True)


def test_uvdata_pol_array_in_stokes(uvcalibrate_data):
    uvd, uvc = uvcalibrate_data

    # Set polarization_array to be in Stokes I, Q, U, V
    uvd.polarization_array = np.array([1, 2, 3, 4])

    with pytest.raises(
        NotImplementedError,
        match=(
            "It is currently not possible to calibrate or de-calibrate data with "
            "stokes polarizations"
        ),
    ):
        uvcalibrate(uvd, uvc)
The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API