https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: 983ae27b57a28550b6ee1a58e13cc9161bb89327 authored by Bryna Hazelton on 15 January 2020, 19:15 UTC
update release date
Tip revision: 983ae27
test_uvcal.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

"""Tests for uvcal object.

"""
from __future__ import absolute_import, division, print_function

import pytest
import os
import numpy as np
import copy

from pyuvdata import UVCal
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH


@pytest.fixture(scope='function')
def uvcal_data():
    """Set up some uvcal iter tests."""
    required_parameters = ['_Nfreqs', '_Njones', '_Ntimes', '_Nspws',
                           '_Nants_data', '_Nants_telescope',
                           '_antenna_names', '_antenna_numbers',
                           '_ant_array',
                           '_telescope_name', '_freq_array',
                           '_channel_width', '_spw_array',
                           '_jones_array', '_time_array', '_time_range',
                           '_integration_time',
                           '_gain_convention', '_flag_array',
                           '_quality_array', '_cal_type', '_cal_style',
                           '_x_orientation', '_history']

    required_properties = ['Nfreqs', 'Njones', 'Ntimes', 'Nspws',
                           'Nants_data', 'Nants_telescope',
                           'antenna_names', 'antenna_numbers',
                           'ant_array',
                           'telescope_name', 'freq_array',
                           'channel_width', 'spw_array',
                           'jones_array', 'time_array', 'time_range',
                           'integration_time',
                           'gain_convention', 'flag_array',
                           'quality_array', 'cal_type', 'cal_style',
                           'x_orientation', 'history']

    extra_parameters = ['_gain_array', '_delay_array', '_sky_field',
                        '_sky_catalog', '_ref_antenna_name', '_Nsources',
                        '_baseline_range', '_diffuse_model',
                        '_input_flag_array', '_freq_range',
                        '_observer', '_git_origin_cal',
                        '_git_hash_cal', '_total_quality_array',
                        '_extra_keywords', '_gain_scale']

    extra_properties = ['gain_array', 'delay_array', 'sky_field',
                        'sky_catalog', 'ref_antenna_name', 'Nsources',
                        'baseline_range', 'diffuse_model',
                        'input_flag_array', 'freq_range',
                        'observer', 'git_origin_cal',
                        'git_hash_cal', 'total_quality_array',
                        'extra_keywords', 'gain_scale']

    other_properties = ['pyuvdata_version_str']

    uv_cal_object = UVCal()

    # yields the data we need but will continue to the del call after tests
    yield uv_cal_object, required_parameters, required_properties, extra_parameters, extra_properties, other_properties

    # some post-test object cleanup
    del(uv_cal_object)
    return


def test_parameter_iter(uvcal_data):
    """Test expected parameters."""
    (uv_cal_object, required_parameters, required_properties,
     extra_parameters, extra_properties, other_properties) = uvcal_data
    all = []
    for prop in uv_cal_object:
        all.append(prop)
    for a in required_parameters + extra_parameters:
        assert a in all, 'expected attribute ' + a + ' not returned in object iterator'


def test_required_parameter_iter(uvcal_data):
    """Test expected required parameters."""
    (uv_cal_object, required_parameters, required_properties,
     extra_parameters, extra_properties, other_properties) = uvcal_data
    required = []
    for prop in uv_cal_object.required():
        required.append(prop)
    for a in required_parameters:
        assert a in required, 'expected attribute ' + a + ' not returned in required iterator'


def test_unexpected_parameters(uvcal_data):
    """Test for extra parameters."""
    (uv_cal_object, required_parameters, required_properties,
     extra_parameters, extra_properties, other_properties) = uvcal_data
    expected_parameters = required_parameters + extra_parameters
    attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] == '_']
    for a in attributes:
        assert a in expected_parameters, 'unexpected parameter ' + a + ' found in UVCal'


def test_unexpected_attributes(uvcal_data):
    """Test for extra attributes."""
    (uv_cal_object, required_parameters, required_properties,
     extra_parameters, extra_properties, other_properties) = uvcal_data
    expected_attributes = required_properties + \
        extra_properties + other_properties
    attributes = [i for i in uv_cal_object.__dict__.keys() if i[0] != '_']
    for a in attributes:
        assert a in expected_attributes, 'unexpected attribute ' + a + ' found in UVCal'


def test_properties(uvcal_data):
    """Test that properties can be get and set properly."""
    (uv_cal_object, required_parameters, required_properties,
     extra_parameters, extra_properties, other_properties) = uvcal_data
    prop_dict = dict(list(zip(required_properties + extra_properties,
                              required_parameters + extra_parameters)))
    for k, v in prop_dict.items():
        rand_num = np.random.rand()
        setattr(uv_cal_object, k, rand_num)
        this_param = getattr(uv_cal_object, v)
        try:
            assert rand_num == this_param.value
        except AssertionError:
            print('setting {prop_name} to a random number failed'.format(prop_name=k))
            raise


@pytest.fixture(scope='function')
def gain_data():
    """Initialize for some basic uvcal tests."""
    gain_object = UVCal()
    gainfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.gain.calfits')
    gain_object.read_calfits(gainfile)

    gain_object2 = copy.deepcopy(gain_object)
    delay_object = UVCal()
    delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.delay.calfits')
    delay_object.read_calfits(delayfile)

    class DataHolder(object):
        def __init__(self, gain_object, gain_object2, delay_object):
            self.gain_object = gain_object
            self.gain_object2 = gain_object2
            self.delay_object = delay_object

    gain_data = DataHolder(gain_object, gain_object2, delay_object)
    yield gain_data

    del(gain_data)


def test_equality(gain_data):
    """Basic equality test"""
    assert gain_data.gain_object == gain_data.gain_object


def test_check(gain_data):
    """Test that parameter checks run properly"""
    assert gain_data.gain_object.check()


def test_nants_data_telescope_larger(gain_data):
    # make sure it's okay for Nants_telescope to be strictly greater than Nants_data
    gain_data.gain_object.Nants_telescope += 1
    # add dummy information for "new antenna" to pass object check
    gain_data.gain_object.antenna_names = np.concatenate(
        (gain_data.gain_object.antenna_names, ["dummy_ant"]))
    gain_data.gain_object.antenna_numbers = np.concatenate(
        (gain_data.gain_object.antenna_numbers, [20]))
    assert gain_data.gain_object.check()


def test_ant_array_not_in_antnums(gain_data):
    # make sure an error is raised if antennas with data not in antenna_numbers
    # remove antennas from antenna_names & antenna_numbers by hand
    gain_data.gain_object.antenna_names = gain_data.gain_object.antenna_names[1:]
    gain_data.gain_object.antenna_numbers = gain_data.gain_object.antenna_numbers[1:]
    gain_data.gain_object.Nants_telescope = gain_data.gain_object.antenna_numbers.size
    with pytest.raises(ValueError) as cm:
        gain_data.gain_object.check()
    assert str(cm.value).startswith('All antennas in ant_array must be in antenna_numbers')


def test_set_gain(gain_data):
    gain_data.delay_object.set_gain()
    assert gain_data.delay_object._gain_array.required
    assert not gain_data.delay_object._delay_array.required
    assert gain_data.delay_object._gain_array.form == gain_data.delay_object._flag_array.form
    assert gain_data.delay_object._gain_array.form == gain_data.delay_object._quality_array.form


def test_set_delay(gain_data):
    gain_data.gain_object.set_delay()
    assert gain_data.gain_object._delay_array.required
    assert not gain_data.gain_object._gain_array.required
    assert gain_data.gain_object._gain_array.form == gain_data.gain_object._flag_array.form
    assert gain_data.gain_object._delay_array.form == gain_data.gain_object._quality_array.form


def test_set_unknown(gain_data):
    gain_data.gain_object.set_unknown_cal_type()
    assert not gain_data.gain_object._delay_array.required
    assert not gain_data.gain_object._gain_array.required
    assert gain_data.gain_object._gain_array.form == gain_data.gain_object._flag_array.form
    assert gain_data.gain_object._gain_array.form == gain_data.gain_object._quality_array.form


def test_convert_filetype(gain_data):
    # error testing
    pytest.raises(ValueError, gain_data.gain_object._convert_to_filetype, 'uvfits')


def test_convert_to_gain(gain_data):
    conventions = ['minus', 'plus']
    for c in conventions:
        gain_data.new_object = copy.deepcopy(gain_data.delay_object)

        gain_data.new_object.convert_to_gain(delay_convention=c)
        assert np.isclose(np.max(np.absolute(gain_data.new_object.gain_array)), 1.,
                          rtol=gain_data.new_object._gain_array.tols[0],
                          atol=gain_data.new_object._gain_array.tols[1])
        assert np.isclose(np.min(np.absolute(gain_data.new_object.gain_array)), 1.,
                          rtol=gain_data.new_object._gain_array.tols[0],
                          atol=gain_data.new_object._gain_array.tols[1])

        if c == 'minus':
            conv = -1
        else:
            conv = 1
        assert np.allclose(np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
                           (conv * 2 * np.pi * gain_data.delay_object.delay_array[:, :, 0, :, :]
                           * gain_data.delay_object.freq_array[0, 10]) % (2 * np.pi),
                           rtol=gain_data.new_object._gain_array.tols[0],
                           atol=gain_data.new_object._gain_array.tols[1])
        assert np.allclose(gain_data.delay_object.quality_array,
                           gain_data.new_object.quality_array[:, :, 10, :, :],
                           rtol=gain_data.new_object._quality_array.tols[0],
                           atol=gain_data.new_object._quality_array.tols[1])

        assert gain_data.new_object.history == (gain_data.delay_object.history
                                                + '  Converted from delays to gains using pyuvdata.')

    # test a file with a total_quality_array
    gain_data.new_object = copy.deepcopy(gain_data.delay_object)
    tqa_size = gain_data.new_object.delay_array.shape[1:]
    gain_data.new_object.total_quality_array = np.ones(tqa_size)
    gain_data.new_object.convert_to_gain(delay_convention='minus')
    assert np.isclose(np.max(np.absolute(gain_data.new_object.gain_array)), 1.,
                      rtol=gain_data.new_object._gain_array.tols[0],
                      atol=gain_data.new_object._gain_array.tols[1])
    assert np.isclose(np.min(np.absolute(gain_data.new_object.gain_array)), 1.,
                      rtol=gain_data.new_object._gain_array.tols[0],
                      atol=gain_data.new_object._gain_array.tols[1])
    assert np.allclose(np.angle(gain_data.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi),
                       (-1 * 2 * np.pi * gain_data.delay_object.delay_array[:, :, 0, :, :]
                        * gain_data.delay_object.freq_array[0, 10]) % (2 * np.pi),
                       rtol=gain_data.new_object._gain_array.tols[0],
                       atol=gain_data.new_object._gain_array.tols[1])
    assert np.allclose(gain_data.delay_object.quality_array,
                       gain_data.new_object.quality_array[:, :, 10, :, :],
                       rtol=gain_data.new_object._quality_array.tols[0],
                       atol=gain_data.new_object._quality_array.tols[1])

    assert gain_data.new_object.history == (gain_data.delay_object.history
                                            + '  Converted from delays to gains using pyuvdata.')

    # error testing
    pytest.raises(ValueError, gain_data.delay_object.convert_to_gain, delay_convention='bogus')
    pytest.raises(ValueError, gain_data.gain_object.convert_to_gain)
    gain_data.gain_object.set_unknown_cal_type()
    pytest.raises(ValueError, gain_data.gain_object.convert_to_gain)


def test_select_antennas(gain_data):
    old_history = gain_data.gain_object.history
    ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
    gain_data.gain_object2.select(antenna_nums=ants_to_keep)

    assert len(ants_to_keep) == gain_data.gain_object2.Nants_data
    for ant in ants_to_keep:
        assert ant in gain_data.gain_object2.ant_array
    for ant in gain_data.gain_object2.ant_array:
        assert ant in ants_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific antennas using pyuvdata.',
                                    gain_data.gain_object2.history)

    # now test using antenna_names to specify antennas to keep
    ants_to_keep = np.array(sorted(list(ants_to_keep)))
    ant_names = []
    for a in ants_to_keep:
        ind = np.where(gain_data.gain_object.antenna_numbers == a)[0][0]
        ant_names.append(gain_data.gain_object.antenna_names[ind])

    gain_data.gain_object3 = gain_data.gain_object.select(antenna_names=ant_names, inplace=False)

    assert gain_data.gain_object2 == gain_data.gain_object3

    # check for errors associated with antennas not included in data, bad names or providing numbers and names
    pytest.raises(ValueError, gain_data.gain_object.select,
                  antenna_nums=np.max(gain_data.gain_object.ant_array) + np.arange(1, 3))
    pytest.raises(ValueError, gain_data.gain_object.select, antenna_names='test1')
    pytest.raises(ValueError, gain_data.gain_object.select,
                  antenna_nums=ants_to_keep, antenna_names=ant_names)

    # check that write_calfits works with Nants_data < Nants_telescope
    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)

    # check that reading it back in works too
    new_gain_object = UVCal()
    new_gain_object.read_calfits(write_file_calfits)
    assert gain_data.gain_object2 == new_gain_object

    # check that total_quality_array is handled properly when present
    gain_data.gain_object.total_quality_array = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    uvtest.checkWarnings(gain_data.gain_object.select, [], {'antenna_names': ant_names,
                                                            'inplace': True},
                         message='Cannot preserve total_quality_array')
    assert gain_data.gain_object.total_quality_array is None


def test_select_times(gain_data):
    # self.gain_object2 = copy.deepcopy(self.gain_object)

    old_history = gain_data.gain_object.history
    times_to_keep = gain_data.gain_object.time_array[2:5]

    gain_data.gain_object2.select(times=times_to_keep)

    assert len(times_to_keep) == gain_data.gain_object2.Ntimes
    for t in times_to_keep:
        assert t in gain_data.gain_object2.time_array
    for t in np.unique(gain_data.gain_object2.time_array):
        assert t in times_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific times using pyuvdata.',
                                    gain_data.gain_object2.history)

    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    # test writing calfits with only one time
    gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
    times_to_keep = gain_data.gain_object.time_array[[1]]
    gain_data.gain_object2.select(times=times_to_keep)
    gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)

    # check for errors associated with times not included in data
    pytest.raises(ValueError, gain_data.gain_object.select,
                  times=[np.min(gain_data.gain_object.time_array) - gain_data.gain_object.integration_time])

    # check for warnings and errors associated with unevenly spaced times
    gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
    uvtest.checkWarnings(gain_data.gain_object2.select, [], {'times': gain_data.gain_object2.time_array[[0, 2, 3]]},
                         message='Selected times are not evenly spaced')
    pytest.raises(ValueError, gain_data.gain_object2.write_calfits, write_file_calfits)


def test_select_frequencies(gain_data):
    old_history = gain_data.gain_object.history
    freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(4, 8)]

    # add dummy total_quality_array
    gain_data.gain_object.total_quality_array = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    gain_data.gain_object2.total_quality_array = np.zeros(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))

    gain_data.gain_object2.select(frequencies=freqs_to_keep)

    assert len(freqs_to_keep) == gain_data.gain_object2.Nfreqs
    for f in freqs_to_keep:
        assert f in gain_data.gain_object2.freq_array
    for f in np.unique(gain_data.gain_object2.freq_array):
        assert f in freqs_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific frequencies using pyuvdata.',
                                    gain_data.gain_object2.history)

    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    # test writing calfits with only one frequency
    gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
    freqs_to_keep = gain_data.gain_object.freq_array[0, 5]
    gain_data.gain_object2.select(frequencies=freqs_to_keep)
    gain_data.gain_object2.write_calfits(write_file_calfits, clobber=True)

    # check for errors associated with frequencies not included in data
    pytest.raises(ValueError, gain_data.gain_object.select,
                  frequencies=[np.max(gain_data.gain_object.freq_array) + gain_data.gain_object.channel_width])

    # check for warnings and errors associated with unevenly spaced frequencies
    gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
    uvtest.checkWarnings(gain_data.gain_object2.select, [], {'frequencies': gain_data.gain_object2.freq_array[0, [0, 5, 6]]},
                         message='Selected frequencies are not evenly spaced')
    pytest.raises(ValueError, gain_data.gain_object2.write_calfits, write_file_calfits)


def test_select_freq_chans(gain_data):
    old_history = gain_data.gain_object.history
    chans_to_keep = np.arange(4, 8)

    # add dummy total_quality_array
    gain_data.gain_object.total_quality_array = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    gain_data.gain_object2.total_quality_array = np.zeros(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))

    gain_data.gain_object2.select(freq_chans=chans_to_keep)

    assert len(chans_to_keep) == gain_data.gain_object2.Nfreqs
    for chan in chans_to_keep:
        assert gain_data.gain_object.freq_array[0, chan] in gain_data.gain_object2.freq_array
    for f in np.unique(gain_data.gain_object2.freq_array):
        assert f in gain_data.gain_object.freq_array[0, chans_to_keep]

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific frequencies using pyuvdata.',
                                    gain_data.gain_object2.history)

    # Test selecting both channels and frequencies
    freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(7, 10)]  # Overlaps with chans
    all_chans_to_keep = np.arange(4, 10)

    gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)
    gain_data.gain_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)

    assert len(all_chans_to_keep) == gain_data.gain_object2.Nfreqs
    for chan in all_chans_to_keep:
        assert gain_data.gain_object.freq_array[0, chan] in gain_data.gain_object2.freq_array
    for f in np.unique(gain_data.gain_object2.freq_array):
        assert f in gain_data.gain_object.freq_array[0, all_chans_to_keep]


def test_select_polarizations(gain_data):
    # add more jones terms to allow for better testing of selections
    while gain_data.gain_object.Njones < 4:
        new_jones = np.min(gain_data.gain_object.jones_array) - 1
        gain_data.gain_object.jones_array = np.append(gain_data.gain_object.jones_array, new_jones)
        gain_data.gain_object.Njones += 1
        gain_data.gain_object.flag_array = np.concatenate((gain_data.gain_object.flag_array,
                                                           gain_data.gain_object.flag_array[:, :, :, :, [-1]]),
                                                          axis=4)
        gain_data.gain_object.gain_array = np.concatenate((gain_data.gain_object.gain_array,
                                                           gain_data.gain_object.gain_array[:, :, :, :, [-1]]),
                                                          axis=4)
        gain_data.gain_object.quality_array = np.concatenate((gain_data.gain_object.quality_array,
                                                              gain_data.gain_object.quality_array[:, :, :, :, [-1]]),
                                                             axis=4)
    # add dummy total_quality_array
    gain_data.gain_object.total_quality_array = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))

    assert gain_data.gain_object.check()
    gain_data.gain_object2 = copy.deepcopy(gain_data.gain_object)

    old_history = gain_data.gain_object.history
    jones_to_keep = [-5, -6]

    gain_data.gain_object2.select(jones=jones_to_keep)

    assert len(jones_to_keep) == gain_data.gain_object2.Njones
    for j in jones_to_keep:
        assert j in gain_data.gain_object2.jones_array
    for j in np.unique(gain_data.gain_object2.jones_array):
        assert j in jones_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific jones polarization terms '
                                    'using pyuvdata.',
                                    gain_data.gain_object2.history)

    # check for errors associated with polarizations not included in data
    pytest.raises(ValueError, gain_data.gain_object2.select, jones=[-3, -4])

    # check for warnings and errors associated with unevenly spaced polarizations
    uvtest.checkWarnings(gain_data.gain_object.select, [], {'jones': gain_data.gain_object.jones_array[[0, 1, 3]]},
                         message='Selected jones polarization terms are not evenly spaced')
    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    pytest.raises(ValueError, gain_data.gain_object.write_calfits, write_file_calfits)


def test_select(gain_data):
    # now test selecting along all axes at once
    old_history = gain_data.gain_object.history

    ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
    freqs_to_keep = gain_data.gain_object.freq_array[0, np.arange(2, 5)]
    times_to_keep = gain_data.gain_object.time_array[[1, 2]]
    jones_to_keep = [-5]

    gain_data.gain_object2.select(antenna_nums=ants_to_keep, frequencies=freqs_to_keep,
                                  times=times_to_keep, jones=jones_to_keep)

    assert len(ants_to_keep) == gain_data.gain_object2.Nants_data
    for ant in ants_to_keep:
        assert ant in gain_data.gain_object2.ant_array
    for ant in gain_data.gain_object2.ant_array:
        assert ant in ants_to_keep

    assert len(times_to_keep) == gain_data.gain_object2.Ntimes
    for t in times_to_keep:
        assert t in gain_data.gain_object2.time_array
    for t in np.unique(gain_data.gain_object2.time_array):
        assert t in times_to_keep

    assert len(freqs_to_keep) == gain_data.gain_object2.Nfreqs
    for f in freqs_to_keep:
        assert f in gain_data.gain_object2.freq_array
    for f in np.unique(gain_data.gain_object2.freq_array):
        assert f in freqs_to_keep

    assert len(jones_to_keep) == gain_data.gain_object2.Njones
    for j in jones_to_keep:
        assert j in gain_data.gain_object2.jones_array
    for j in np.unique(gain_data.gain_object2.jones_array):
        assert j in jones_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific antennas, times, '
                                    'frequencies, jones polarization terms '
                                    'using pyuvdata.',
                                    gain_data.gain_object2.history)


@pytest.fixture(scope='function')
def delay_data():
    """Initialization for some basic uvcal tests."""

    delay_object = UVCal()
    delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.delay.calfits')

    # add an input flag array to the file to test for that.
    write_file = os.path.join(DATA_PATH, 'test/outtest_input_flags.fits')
    uv_in = UVCal()
    uv_in.read_calfits(delayfile)
    uv_in.input_flag_array = np.zeros(uv_in._input_flag_array.expected_shape(uv_in), dtype=bool)
    uv_in.write_calfits(write_file, clobber=True)
    delay_object.read_calfits(write_file)

    class DataHolder(object):
        def __init__(self, delay_object):
            self.delay_object = delay_object
            self.delay_object2 = copy.deepcopy(delay_object)

    delay_data = DataHolder(delay_object)

    # yield the data for testing, then del after tests finish
    yield delay_data

    del(delay_data)


def test_select_antennas_delay(delay_data):
    old_history = delay_data.delay_object.history
    ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72])
    delay_data.delay_object2.select(antenna_nums=ants_to_keep)

    assert len(ants_to_keep) == delay_data.delay_object2.Nants_data
    for ant in ants_to_keep:
        assert ant in delay_data.delay_object2.ant_array
    for ant in delay_data.delay_object2.ant_array:
        assert ant in ants_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific antennas using pyuvdata.',
                                    delay_data.delay_object2.history)

    # now test using antenna_names to specify antennas to keep
    delay_data.delay_object3 = copy.deepcopy(delay_data.delay_object)
    ants_to_keep = np.array(sorted(list(ants_to_keep)))
    ant_names = []
    for a in ants_to_keep:
        ind = np.where(delay_data.delay_object3.antenna_numbers == a)[0][0]
        ant_names.append(delay_data.delay_object3.antenna_names[ind])

    delay_data.delay_object3.select(antenna_names=ant_names)

    assert delay_data.delay_object2 == delay_data.delay_object3

    # check for errors associated with antennas not included in data, bad names or providing numbers and names
    pytest.raises(ValueError, delay_data.delay_object.select,
                  antenna_nums=np.max(delay_data.delay_object.ant_array) + np.arange(1, 3))
    pytest.raises(ValueError, delay_data.delay_object.select, antenna_names='test1')
    pytest.raises(ValueError, delay_data.delay_object.select,
                  antenna_nums=ants_to_keep, antenna_names=ant_names)

    # check that total_quality_array is handled properly when present
    delay_data.delay_object.total_quality_array = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    uvtest.checkWarnings(delay_data.delay_object.select, [],
                         {'antenna_names': ant_names, 'inplace': True},
                         message='Cannot preserve total_quality_array')
    assert delay_data.delay_object.total_quality_array is None


def test_select_times_delay(delay_data):
    old_history = delay_data.delay_object.history
    times_to_keep = delay_data.delay_object.time_array[2:5]

    delay_data.delay_object2.select(times=times_to_keep)

    assert len(times_to_keep) == delay_data.delay_object2.Ntimes
    for t in times_to_keep:
        assert t in delay_data.delay_object2.time_array
    for t in np.unique(delay_data.delay_object2.time_array):
        assert t in times_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific times using pyuvdata.',
                                    delay_data.delay_object2.history)

    # check for errors associated with times not included in data
    pytest.raises(ValueError, delay_data.delay_object.select,
                  times=[np.min(delay_data.delay_object.time_array) - delay_data.delay_object.integration_time])

    # check for warnings and errors associated with unevenly spaced times
    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
    uvtest.checkWarnings(delay_data.delay_object2.select, [], {'times': delay_data.delay_object2.time_array[[0, 2, 3]]},
                         message='Selected times are not evenly spaced')
    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    pytest.raises(ValueError, delay_data.delay_object2.write_calfits, write_file_calfits)


def test_select_frequencies_delay(delay_data):
    old_history = delay_data.delay_object.history
    freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(73, 944)]

    # add dummy total_quality_array
    delay_data.delay_object.total_quality_array = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    delay_data.delay_object2.total_quality_array = np.zeros(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))

    delay_data.delay_object2.select(frequencies=freqs_to_keep)

    assert len(freqs_to_keep) == delay_data.delay_object2.Nfreqs
    for f in freqs_to_keep:
        assert f in delay_data.delay_object2.freq_array
    for f in np.unique(delay_data.delay_object2.freq_array):
        assert f in freqs_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific frequencies using pyuvdata.',
                                    delay_data.delay_object2.history)

    # check for errors associated with frequencies not included in data
    pytest.raises(ValueError, delay_data.delay_object.select,
                  frequencies=[np.max(delay_data.delay_object.freq_array) + delay_data.delay_object.channel_width])

    # check for warnings and errors associated with unevenly spaced frequencies
    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
    uvtest.checkWarnings(delay_data.delay_object2.select, [], {'frequencies': delay_data.delay_object2.freq_array[0, [0, 5, 6]]},
                         message='Selected frequencies are not evenly spaced')
    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    pytest.raises(ValueError, delay_data.delay_object2.write_calfits, write_file_calfits)


def test_select_freq_chans_delay(delay_data):
    old_history = delay_data.delay_object.history
    chans_to_keep = np.arange(73, 944)

    # add dummy total_quality_array
    delay_data.delay_object.total_quality_array = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    delay_data.delay_object2.total_quality_array = np.zeros(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))

    delay_data.delay_object2.select(freq_chans=chans_to_keep)

    assert len(chans_to_keep) == delay_data.delay_object2.Nfreqs
    for chan in chans_to_keep:
        assert delay_data.delay_object.freq_array[0, chan] in delay_data.delay_object2.freq_array
    for f in np.unique(delay_data.delay_object2.freq_array):
        assert f in delay_data.delay_object.freq_array[0, chans_to_keep]

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific frequencies using pyuvdata.',
                                    delay_data.delay_object2.history)

    # Test selecting both channels and frequencies
    freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(930, 1000)]  # Overlaps with chans
    all_chans_to_keep = np.arange(73, 1000)

    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
    delay_data.delay_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)

    assert len(all_chans_to_keep) == delay_data.delay_object2.Nfreqs
    for chan in all_chans_to_keep:
        assert delay_data.delay_object.freq_array[0, chan] in delay_data.delay_object2.freq_array
    for f in np.unique(delay_data.delay_object2.freq_array):
        assert f in delay_data.delay_object.freq_array[0, all_chans_to_keep]


def test_select_polarizations_delay(delay_data):
    # add more jones terms to allow for better testing of selections
    while delay_data.delay_object.Njones < 4:
        new_jones = np.min(delay_data.delay_object.jones_array) - 1
        delay_data.delay_object.jones_array = np.append(delay_data.delay_object.jones_array, new_jones)
        delay_data.delay_object.Njones += 1
        delay_data.delay_object.flag_array = np.concatenate((delay_data.delay_object.flag_array,
                                                             delay_data.delay_object.flag_array[:, :, :, :, [-1]]),
                                                            axis=4)
        delay_data.delay_object.input_flag_array = np.concatenate((delay_data.delay_object.input_flag_array,
                                                                   delay_data.delay_object.input_flag_array[:, :, :, :, [-1]]),
                                                                  axis=4)
        delay_data.delay_object.delay_array = np.concatenate((delay_data.delay_object.delay_array,
                                                              delay_data.delay_object.delay_array[:, :, :, :, [-1]]),
                                                             axis=4)
        delay_data.delay_object.quality_array = np.concatenate((delay_data.delay_object.quality_array,
                                                                delay_data.delay_object.quality_array[:, :, :, :, [-1]]),
                                                               axis=4)
    # add dummy total_quality_array
    delay_data.delay_object.total_quality_array = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    assert delay_data.delay_object.check()
    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)

    old_history = delay_data.delay_object.history
    jones_to_keep = [-5, -6]

    delay_data.delay_object2.select(jones=jones_to_keep)

    assert len(jones_to_keep) == delay_data.delay_object2.Njones
    for j in jones_to_keep:
        assert j in delay_data.delay_object2.jones_array
    for j in np.unique(delay_data.delay_object2.jones_array):
        assert j in jones_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific jones polarization terms '
                                    'using pyuvdata.',
                                    delay_data.delay_object2.history)

    # check for errors associated with polarizations not included in data
    pytest.raises(ValueError, delay_data.delay_object2.select, jones=[-3, -4])

    # check for warnings and errors associated with unevenly spaced polarizations
    uvtest.checkWarnings(delay_data.delay_object.select, [], {'jones': delay_data.delay_object.jones_array[[0, 1, 3]]},
                         message='Selected jones polarization terms are not evenly spaced')
    write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits')
    pytest.raises(ValueError, delay_data.delay_object.write_calfits, write_file_calfits)


def test_select_delay(delay_data):
    # now test selecting along all axes at once
    old_history = delay_data.delay_object.history

    ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64])
    freqs_to_keep = delay_data.delay_object.freq_array[0, np.arange(31, 56)]
    times_to_keep = delay_data.delay_object.time_array[[1, 2]]
    jones_to_keep = [-5]

    delay_data.delay_object2.select(antenna_nums=ants_to_keep, frequencies=freqs_to_keep,
                                    times=times_to_keep, jones=jones_to_keep)

    assert len(ants_to_keep) == delay_data.delay_object2.Nants_data
    for ant in ants_to_keep:
        assert ant in delay_data.delay_object2.ant_array
    for ant in delay_data.delay_object2.ant_array:
        assert ant in ants_to_keep

    assert len(times_to_keep) == delay_data.delay_object2.Ntimes
    for t in times_to_keep:
        assert t in delay_data.delay_object2.time_array
    for t in np.unique(delay_data.delay_object2.time_array):
        assert t in times_to_keep

    assert len(freqs_to_keep) == delay_data.delay_object2.Nfreqs
    for f in freqs_to_keep:
        assert f in delay_data.delay_object2.freq_array
    for f in np.unique(delay_data.delay_object2.freq_array):
        assert f in freqs_to_keep

    assert len(jones_to_keep) == delay_data.delay_object2.Njones
    for j in jones_to_keep:
        assert j in delay_data.delay_object2.jones_array
    for j in np.unique(delay_data.delay_object2.jones_array):
        assert j in jones_to_keep

    assert uvutils._check_histories(old_history + '  Downselected to '
                                    'specific antennas, times, '
                                    'frequencies, jones polarization terms '
                                    'using pyuvdata.',
                                    delay_data.delay_object2.history)


def test_add_antennas(gain_data):
    """Test adding antennas between two UVCal objects"""
    gain_object_full = copy.deepcopy(gain_data.gain_object)
    ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
    ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
    gain_data.gain_object.select(antenna_nums=ants1)
    gain_data.gain_object2.select(antenna_nums=ants2)
    gain_data.gain_object += gain_data.gain_object2
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(gain_object_full.history
                                    + '  Downselected to specific '
                                    'antennas using pyuvdata. Combined '
                                    'data along antenna axis using pyuvdata.',
                                    gain_data.gain_object.history)
    gain_data.gain_object.history = gain_object_full.history
    assert gain_data.gain_object == gain_object_full

    # test for when total_quality_array is present
    gain_data.gain_object.select(antenna_nums=ants1)
    gain_data.gain_object.total_quality_array = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
                         message='Total quality array detected')
    assert gain_data.gain_object.total_quality_array is None


def test_add_frequencies(gain_data):
    """Test adding frequencies between two UVCal objects"""
    gain_object_full = copy.deepcopy(gain_data.gain_object)
    freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
    freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
    gain_data.gain_object.select(frequencies=freqs1)
    gain_data.gain_object2.select(frequencies=freqs2)
    gain_data.gain_object += gain_data.gain_object2
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(gain_object_full.history
                                    + '  Downselected to specific '
                                    'frequencies using pyuvdata. Combined '
                                    'data along frequency axis using pyuvdata.',
                                    gain_data.gain_object.history)
    gain_data.gain_object.history = gain_object_full.history
    assert gain_data.gain_object == gain_object_full

    # test for when total_quality_array is present in first file but not second
    gain_data.gain_object.select(frequencies=freqs1)
    tqa = np.ones(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.zeros(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=1)
    gain_data.gain_object.total_quality_array = tqa
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in second file but not first
    gain_data.gain_object.select(frequencies=freqs1)
    tqa = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.ones(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=1)
    gain_data.gain_object.total_quality_array = None
    gain_data.gain_object2.total_quality_array = tqa2
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in both
    gain_data.gain_object.select(frequencies=freqs1)
    tqa = np.ones(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.ones(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tqa *= 2
    tot_tqa = np.concatenate([tqa, tqa2], axis=1)
    gain_data.gain_object.total_quality_array = tqa
    gain_data.gain_object2.total_quality_array = tqa2
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # Out of order - freqs
    gain_data.gain_object = copy.deepcopy(gain_object_full)
    gain_data.gain_object2 = copy.deepcopy(gain_object_full)
    gain_data.gain_object.select(frequencies=freqs2)
    gain_data.gain_object2.select(frequencies=freqs1)
    gain_data.gain_object += gain_data.gain_object2
    gain_data.gain_object.history = gain_object_full.history
    assert gain_data.gain_object == gain_object_full


def test_add_times(gain_data):
    """Test adding times between two UVCal objects"""
    gain_object_full = copy.deepcopy(gain_data.gain_object)
    Nt2 = gain_data.gain_object.Ntimes // 2
    times1 = gain_data.gain_object.time_array[:Nt2]
    times2 = gain_data.gain_object.time_array[Nt2:]
    gain_data.gain_object.select(times=times1)
    gain_data.gain_object2.select(times=times2)
    gain_data.gain_object += gain_data.gain_object2
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(gain_object_full.history
                                    + '  Downselected to specific '
                                    'times using pyuvdata. Combined '
                                    'data along time axis using pyuvdata.',
                                    gain_data.gain_object.history)
    gain_data.gain_object.history = gain_object_full.history
    assert gain_data.gain_object == gain_object_full

    # test for when total_quality_array is present in first file but not second
    gain_data.gain_object.select(times=times1)
    tqa = np.ones(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.zeros(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=2)
    gain_data.gain_object.total_quality_array = tqa
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in second file but not first
    gain_data.gain_object.select(times=times1)
    tqa = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.ones(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=2)
    gain_data.gain_object.total_quality_array = None
    gain_data.gain_object2.total_quality_array = tqa2
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in both
    gain_data.gain_object.select(times=times1)
    tqa = np.ones(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.ones(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tqa *= 2
    tot_tqa = np.concatenate([tqa, tqa2], axis=2)
    gain_data.gain_object.total_quality_array = tqa
    gain_data.gain_object2.total_quality_array = tqa2
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])


def test_add_jones(gain_data):
    """Test adding Jones axes between two UVCal objects"""
    gain_object_original = copy.deepcopy(gain_data.gain_object)
    # artificially change the Jones value to permit addition
    gain_data.gain_object2.jones_array[0] = -6
    gain_data.gain_object += gain_data.gain_object2

    # check dimensionality of resulting object
    assert gain_data.gain_object.gain_array.shape[-1] == 2
    assert sorted(gain_data.gain_object.jones_array) == [-6, -5]

    # test for when total_quality_array is present in first file but not second
    gain_data.gain_object = copy.deepcopy(gain_object_original)
    tqa = np.ones(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.zeros(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=3)
    gain_data.gain_object.total_quality_array = tqa
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in second file but not first
    gain_data.gain_object = copy.deepcopy(gain_object_original)
    tqa = np.zeros(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.ones(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=3)
    gain_data.gain_object2.total_quality_array = tqa2
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in both
    gain_data.gain_object = copy.deepcopy(gain_object_original)
    tqa = np.ones(
        gain_data.gain_object._total_quality_array.expected_shape(gain_data.gain_object))
    tqa2 = np.ones(
        gain_data.gain_object2._total_quality_array.expected_shape(gain_data.gain_object2))
    tqa *= 2
    tot_tqa = np.concatenate([tqa, tqa2], axis=3)
    gain_data.gain_object.total_quality_array = tqa
    gain_data.gain_object2.total_quality_array = tqa2
    gain_data.gain_object += gain_data.gain_object2
    assert np.allclose(gain_data.gain_object.total_quality_array, tot_tqa,
                       rtol=gain_data.gain_object._total_quality_array.tols[0],
                       atol=gain_data.gain_object._total_quality_array.tols[1])


def test_add(gain_data):
    """Test miscellaneous aspects of add method"""
    # test not-in-place addition
    gain_object = copy.deepcopy(gain_data.gain_object)
    ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
    ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
    gain_data.gain_object.select(antenna_nums=ants1)
    gain_data.gain_object2.select(antenna_nums=ants2)
    gain_object_add = gain_data.gain_object + gain_data.gain_object2
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(gain_object.history
                                    + '  Downselected to specific '
                                    'antennas using pyuvdata. Combined '
                                    'data along antenna axis using pyuvdata.',
                                    gain_object_add.history)
    gain_object_add.history = gain_object.history
    assert gain_object_add == gain_object

    # test history concatenation
    gain_data.gain_object.history = gain_object.history
    gain_data.gain_object2.history = 'Some random history string OMNI_RUN:'
    gain_data.gain_object += gain_data.gain_object2
    assert uvutils._check_histories(gain_object.history
                                    + ' Combined data along antenna axis '
                                    'using pyuvdata. Some random '
                                    'history string',
                                    gain_data.gain_object.history)


def test_add_multiple_axes(gain_data):
    """Test addition along multiple axes"""
    ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
    ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
    freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
    freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
    Nt2 = gain_data.gain_object.Ntimes // 2
    times1 = gain_data.gain_object.time_array[:Nt2]
    times2 = gain_data.gain_object.time_array[Nt2:]
    # artificially change the Jones value to permit addition
    gain_data.gain_object2.jones_array[0] = -6

    # perform select
    gain_data.gain_object.select(antenna_nums=ants1, frequencies=freqs1,
                                 times=times1)
    gain_data.gain_object2.select(antenna_nums=ants2, frequencies=freqs2,
                                  times=times2)

    gain_data.gain_object += gain_data.gain_object2

    # check resulting dimensionality
    assert len(gain_data.gain_object.ant_array) == 19
    assert len(gain_data.gain_object.freq_array[0, :]) == 10
    assert len(gain_data.gain_object.time_array) == gain_data.gain_object.Ntimes
    assert len(gain_data.gain_object.jones_array) == 2


def test_add_errors(gain_data):
    """Test behavior that will raise errors"""
    # test addition of two identical objects
    pytest.raises(ValueError, gain_data.gain_object.__add__, gain_data.gain_object2)

    # test addition of UVCal and non-UVCal object (empty list)
    pytest.raises(ValueError, gain_data.gain_object.__add__, [])

    # test compatibility param mismatch
    gain_data.gain_object2.telescope_name = "PAPER"
    pytest.raises(ValueError, gain_data.gain_object.__add__, gain_data.gain_object2)


def test_jones_warning(gain_data):
    """Test having non-contiguous Jones elements"""
    gain_data.gain_object2.jones_array[0] = -6
    gain_data.gain_object += gain_data.gain_object2
    gain_data.gain_object2.jones_array[0] = -8
    uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
                         message='Combined Jones elements')
    assert sorted(gain_data.gain_object.jones_array) == [-8, -6, -5]


def test_frequency_warnings(gain_data):
    """Test having uneven or non-contiguous frequencies"""
    # test having unevenly spaced frequency separations
    go1 = copy.deepcopy(gain_data.gain_object)
    go2 = copy.deepcopy(gain_data.gain_object2)
    freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
    freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
    gain_data.gain_object.select(frequencies=freqs1)
    gain_data.gain_object2.select(frequencies=freqs2)

    # change the last frequency bin to be smaller than the others
    df = gain_data.gain_object2.freq_array[0, -1] - gain_data.gain_object2.freq_array[0, -2]
    gain_data.gain_object2.freq_array[0, -1] = gain_data.gain_object2.freq_array[0, -2] + df / 2
    uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
                         message='Combined frequencies are not evenly spaced')
    assert len(gain_data.gain_object.freq_array[0, :]) == gain_data.gain_object.Nfreqs

    # now check having "non-contiguous" frequencies
    gain_data.gain_object = copy.deepcopy(go1)
    gain_data.gain_object2 = copy.deepcopy(go2)
    freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
    freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
    gain_data.gain_object.select(frequencies=freqs1)
    gain_data.gain_object2.select(frequencies=freqs2)

    # artificially space out frequencies
    gain_data.gain_object.freq_array[0, :] *= 10
    gain_data.gain_object2.freq_array[0, :] *= 10
    uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
                         message='Combined frequencies are not contiguous')
    freqs1 *= 10
    freqs2 *= 10
    freqs = np.concatenate([freqs1, freqs2])
    assert np.allclose(gain_data.gain_object.freq_array[0, :], freqs,
                       rtol=gain_data.gain_object._freq_array.tols[0],
                       atol=gain_data.gain_object._freq_array.tols[1])


def test_parameter_warnings(gain_data):
    """Test changing a parameter that will raise a warning"""
    # change observer and select frequencies
    gain_data.gain_object2.observer = 'mystery_person'
    freqs1 = gain_data.gain_object.freq_array[0, np.arange(0, 5)]
    freqs2 = gain_data.gain_object2.freq_array[0, np.arange(5, 10)]
    gain_data.gain_object.select(frequencies=freqs1)
    gain_data.gain_object2.select(frequencies=freqs2)
    uvtest.checkWarnings(gain_data.gain_object.__iadd__, [gain_data.gain_object2],
                         message='UVParameter observer does not match')
    freqs = np.concatenate([freqs1, freqs2])
    assert np.allclose(gain_data.gain_object.freq_array, freqs,
                       rtol=gain_data.gain_object._freq_array.tols[0],
                       atol=gain_data.gain_object._freq_array.tols[1])


def test_multi_files(gain_data):
    """Test read function when multiple files are included"""
    gain_object_full = copy.deepcopy(gain_data.gain_object)
    Nt2 = gain_data.gain_object.Ntimes // 2
    # Break up delay object into two objects, divided in time
    times1 = gain_data.gain_object.time_array[:Nt2]
    times2 = gain_data.gain_object.time_array[Nt2:]
    gain_data.gain_object.select(times=times1)
    gain_data.gain_object2.select(times=times2)
    # Write those objects to files
    f1 = os.path.join(DATA_PATH, 'test/read_multi1.calfits')
    f2 = os.path.join(DATA_PATH, 'test/read_multi2.calfits')
    gain_data.gain_object.write_calfits(f1, clobber=True)
    gain_data.gain_object2.write_calfits(f2, clobber=True)
    # Read both files together
    gain_data.gain_object.read_calfits([f1, f2])
    assert uvutils._check_histories(gain_object_full.history
                                    + '  Downselected to specific times'
                                    ' using pyuvdata. Combined data '
                                    'along time axis using pyuvdata.',
                                    gain_data.gain_object.history)
    gain_data.gain_object.history = gain_object_full.history
    assert gain_data.gain_object == gain_object_full


def test_add_antennas_delay(delay_data):
    """Test adding antennas between two UVCal objects"""
    delay_object_full = copy.deepcopy(delay_data.delay_object)
    ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72])
    ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112])
    delay_data.delay_object.select(antenna_nums=ants1)
    delay_data.delay_object2.select(antenna_nums=ants2)
    delay_data.delay_object += delay_data.delay_object2
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(delay_object_full.history
                                    + '  Downselected to specific '
                                    'antennas using pyuvdata. Combined '
                                    'data along antenna axis using pyuvdata.',
                                    delay_data.delay_object.history)
    delay_data.delay_object.history = delay_object_full.history
    assert delay_data.delay_object == delay_object_full

    # test for when total_quality_array is present
    delay_data.delay_object.select(antenna_nums=ants1)
    delay_data.delay_object.total_quality_array = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    uvtest.checkWarnings(delay_data.delay_object.__iadd__, [delay_data.delay_object2],
                         message='Total quality array detected')
    assert delay_data.delay_object.total_quality_array is None

    # test for when input_flag_array is present in first file but not second
    delay_data.delay_object.select(antenna_nums=ants1)
    ifa = np.zeros(
        delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
    ifa2 = np.ones(
        delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
    tot_ifa = np.concatenate([ifa, ifa2], axis=0)
    delay_data.delay_object.input_flag_array = ifa
    delay_data.delay_object2.input_flag_array = None
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)

    # test for when input_flag_array is present in second file but not first
    delay_data.delay_object.select(antenna_nums=ants1)
    ifa = np.ones(
        delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
    ifa2 = np.zeros(
        delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
    tot_ifa = np.concatenate([ifa, ifa2], axis=0)
    delay_data.delay_object.input_flag_array = None
    delay_data.delay_object2.input_flag_array = ifa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)

    # Out of order - antennas
    delay_data.delay_object = copy.deepcopy(delay_object_full)
    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
    delay_data.delay_object.select(antenna_nums=ants2)
    delay_data.delay_object2.select(antenna_nums=ants1)
    delay_data.delay_object += delay_data.delay_object2
    delay_data.delay_object.history = delay_object_full.history
    assert delay_data.delay_object == delay_object_full


def test_add_times_delay(delay_data):
    """Test adding times between two UVCal objects"""
    delay_object_full = copy.deepcopy(delay_data.delay_object)
    Nt2 = delay_data.delay_object.Ntimes // 2
    times1 = delay_data.delay_object.time_array[:Nt2]
    times2 = delay_data.delay_object.time_array[Nt2:]
    delay_data.delay_object.select(times=times1)
    delay_data.delay_object2.select(times=times2)
    delay_data.delay_object += delay_data.delay_object2
    # Check history is correct, before replacing and doing a full object check
    assert uvutils._check_histories(delay_object_full.history
                                    + '  Downselected to specific '
                                    'times using pyuvdata. Combined '
                                    'data along time axis using pyuvdata.',
                                    delay_data.delay_object.history)
    delay_data.delay_object.history = delay_object_full.history
    assert delay_data.delay_object == delay_object_full

    # test for when total_quality_array is present in first file but not second
    delay_data.delay_object.select(times=times1)
    tqa = np.ones(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    tqa2 = np.zeros(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=2)
    delay_data.delay_object.total_quality_array = tqa
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
                       rtol=delay_data.delay_object._total_quality_array.tols[0],
                       atol=delay_data.delay_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in second file but not first
    delay_data.delay_object.select(times=times1)
    tqa = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    tqa2 = np.ones(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=2)
    delay_data.delay_object.total_quality_array = None
    delay_data.delay_object2.total_quality_array = tqa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
                       rtol=delay_data.delay_object._total_quality_array.tols[0],
                       atol=delay_data.delay_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in both
    delay_data.delay_object.select(times=times1)
    tqa = np.ones(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    tqa2 = np.ones(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
    tqa *= 2
    tot_tqa = np.concatenate([tqa, tqa2], axis=2)
    delay_data.delay_object.total_quality_array = tqa
    delay_data.delay_object2.total_quality_array = tqa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
                       rtol=delay_data.delay_object._total_quality_array.tols[0],
                       atol=delay_data.delay_object._total_quality_array.tols[1])

    # test for when input_flag_array is present in first file but not second
    delay_data.delay_object.select(times=times1)
    ifa = np.zeros(
        delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
    ifa2 = np.ones(
        delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
    tot_ifa = np.concatenate([ifa, ifa2], axis=3)
    delay_data.delay_object.input_flag_array = ifa
    delay_data.delay_object2.input_flag_array = None
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)

    # test for when input_flag_array is present in second file but not first
    delay_data.delay_object.select(times=times1)
    ifa = np.ones(
        delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
    ifa2 = np.zeros(
        delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
    tot_ifa = np.concatenate([ifa, ifa2], axis=3)
    delay_data.delay_object.input_flag_array = None
    delay_data.delay_object2.input_flag_array = ifa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)

    # Out of order - times
    delay_data.delay_object = copy.deepcopy(delay_object_full)
    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
    delay_data.delay_object.select(times=times2)
    delay_data.delay_object2.select(times=times1)
    delay_data.delay_object += delay_data.delay_object2
    delay_data.delay_object.history = delay_object_full.history
    assert delay_data.delay_object == delay_object_full


def test_add_jones_delay(delay_data):
    """Test adding Jones axes between two UVCal objects"""
    delay_object_original = copy.deepcopy(delay_data.delay_object)
    # artificially change the Jones value to permit addition
    delay_data.delay_object2.jones_array[0] = -6
    delay_data.delay_object += delay_data.delay_object2

    # check dimensionality of resulting object
    assert delay_data.delay_object.delay_array.shape[-1] == 2
    assert sorted(delay_data.delay_object.jones_array) == [-6, -5]

    # test for when total_quality_array is present in first file but not second
    delay_data.delay_object = copy.deepcopy(delay_object_original)
    tqa = np.ones(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    tqa2 = np.zeros(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=3)
    delay_data.delay_object.total_quality_array = tqa
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
                       rtol=delay_data.delay_object._total_quality_array.tols[0],
                       atol=delay_data.delay_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in second file but not first
    delay_data.delay_object = copy.deepcopy(delay_object_original)
    tqa = np.zeros(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    tqa2 = np.ones(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
    tot_tqa = np.concatenate([tqa, tqa2], axis=3)
    delay_data.delay_object2.total_quality_array = tqa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
                       rtol=delay_data.delay_object._total_quality_array.tols[0],
                       atol=delay_data.delay_object._total_quality_array.tols[1])

    # test for when total_quality_array is present in both
    delay_data.delay_object = copy.deepcopy(delay_object_original)
    tqa = np.ones(
        delay_data.delay_object._total_quality_array.expected_shape(delay_data.delay_object))
    tqa2 = np.ones(
        delay_data.delay_object2._total_quality_array.expected_shape(delay_data.delay_object2))
    tqa *= 2
    tot_tqa = np.concatenate([tqa, tqa2], axis=3)
    delay_data.delay_object.total_quality_array = tqa
    delay_data.delay_object2.total_quality_array = tqa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.total_quality_array, tot_tqa,
                       rtol=delay_data.delay_object._total_quality_array.tols[0],
                       atol=delay_data.delay_object._total_quality_array.tols[1])

    # test for when input_flag_array is present in first file but not second
    delay_data.delay_object = copy.deepcopy(delay_object_original)
    ifa = np.zeros(
        delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
    ifa2 = np.ones(
        delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
    tot_ifa = np.concatenate([ifa, ifa2], axis=4)
    delay_data.delay_object.input_flag_array = ifa
    delay_data.delay_object2.input_flag_array = None
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)

    # test for when input_flag_array is present in second file but not first
    delay_data.delay_object = copy.deepcopy(delay_object_original)
    ifa = np.ones(
        delay_data.delay_object._input_flag_array.expected_shape(delay_data.delay_object)).astype(np.bool)
    ifa2 = np.zeros(
        delay_data.delay_object2._input_flag_array.expected_shape(delay_data.delay_object2)).astype(np.bool)
    tot_ifa = np.concatenate([ifa, ifa2], axis=4)
    delay_data.delay_object.input_flag_array = None
    delay_data.delay_object2.input_flag_array = ifa2
    delay_data.delay_object += delay_data.delay_object2
    assert np.allclose(delay_data.delay_object.input_flag_array, tot_ifa)

    # Out of order - jones
    delay_data.delay_object = copy.deepcopy(delay_object_original)
    delay_data.delay_object2 = copy.deepcopy(delay_object_original)
    delay_data.delay_object.jones_array[0] = -6
    delay_data.delay_object += delay_data.delay_object2
    delay_data.delay_object2 = copy.deepcopy(delay_data.delay_object)
    delay_data.delay_object.select(jones=-5)
    delay_data.delay_object.history = delay_object_original.history
    assert delay_data.delay_object == delay_object_original
    delay_data.delay_object2.select(jones=-6)
    delay_data.delay_object2.jones_array[:] = -5
    delay_data.delay_object2.history = delay_object_original.history
    assert delay_data.delay_object2 == delay_object_original


def test_add_errors_delay(delay_data):
    """Test behavior that will raise errors"""
    # test addition of two identical objects
    pytest.raises(ValueError, delay_data.delay_object.__add__, delay_data.delay_object2)


def test_multi_files_delay(delay_data):
    """Test read function when multiple files are included"""
    delay_object_full = copy.deepcopy(delay_data.delay_object)
    Nt2 = delay_data.delay_object.Ntimes // 2
    # Break up delay object into two objects, divided in time
    times1 = delay_data.delay_object.time_array[:Nt2]
    times2 = delay_data.delay_object.time_array[Nt2:]
    delay_data.delay_object.select(times=times1)
    delay_data.delay_object2.select(times=times2)
    # Write those objects to files
    f1 = os.path.join(DATA_PATH, 'test/read_multi1.calfits')
    f2 = os.path.join(DATA_PATH, 'test/read_multi2.calfits')
    delay_data.delay_object.write_calfits(f1, clobber=True)
    delay_data.delay_object2.write_calfits(f2, clobber=True)
    # Read both files together
    delay_data.delay_object.read_calfits([f1, f2])
    assert uvutils._check_histories(delay_object_full.history
                                    + '  Downselected to specific times'
                                    ' using pyuvdata. Combined data '
                                    'along time axis using pyuvdata.',
                                    delay_data.delay_object.history)
    delay_data.delay_object.history = delay_object_full.history
    assert delay_data.delay_object == delay_object_full


def test_uvcal_get_methods():
    # load data
    uvc = UVCal()
    uvc.read_calfits(os.path.join(DATA_PATH, 'zen.2457698.40355.xx.gain.calfits'))

    # test get methods: add in a known value and make sure it is returned
    key = (10, 'Jee')
    uvc.gain_array[1] = 0.0
    d, f, q = uvc.get_gains(key), uvc.get_flags(key), uvc.get_quality(key)

    # test shapes
    assert np.all(np.isclose(d, 0.0))
    assert d.shape == (uvc.Nfreqs, uvc.Ntimes)
    assert f.shape == (uvc.Nfreqs, uvc.Ntimes)
    assert q.shape == (uvc.Nfreqs, uvc.Ntimes)

    # test against by-hand indexing
    np.testing.assert_array_almost_equal(d, uvc.gain_array[uvc.ant_array.tolist().index(10), 0, :, :, uvc.jones_array.tolist().index(-5)])

    # test variable key input
    d2 = uvc.get_gains(*key)
    np.testing.assert_array_almost_equal(d, d2)
    d2 = uvc.get_gains(key[0])
    np.testing.assert_array_almost_equal(d, d2)
    d2 = uvc.get_gains(key[:1])
    np.testing.assert_array_almost_equal(d, d2)
    d2 = uvc.get_gains(10, -5)
    np.testing.assert_array_almost_equal(d, d2)
    d2 = uvc.get_gains(10, 'x')
    np.testing.assert_array_almost_equal(d, d2)

    # check has_key
    assert uvc._has_key(antnum=10)
    assert uvc._has_key(jpol='Jee')
    assert uvc._has_key(antnum=10, jpol='Jee')
    assert not uvc._has_key(antnum=10, jpol='Jnn')
    assert not uvc._has_key(antnum=101, jpol='Jee')

    # test exceptions
    pytest.raises(ValueError, uvc.get_gains, 1)
    pytest.raises(ValueError, uvc.get_gains, (10, 'Jnn'))
    uvc.cal_type = 'delay'
    pytest.raises(ValueError, uvc.get_gains, 10)


def test_write_read_optional_attrs():
    # read a test file
    cal_in = UVCal()
    testfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.gain.calfits')
    cal_in.read_calfits(testfile)

    # set some optional parameters
    cal_in.gain_scale = 'Jy'
    cal_in.sky_field = 'GLEAM'

    # write
    write_file_calfits = os.path.join(DATA_PATH, 'test/test.calfits')
    cal_in.write_calfits(write_file_calfits, clobber=True)

    # read and compare
    cal_in2 = UVCal()
    cal_in2.read_calfits(write_file_calfits)
    assert cal_in == cal_in2
    os.remove(write_file_calfits)
back to top