https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Raw File
Tip revision: d1829efacb60da384f64a8f25a280441bfa9d68a authored by Bryna Hazelton on 24 May 2019, 01:18:43 UTC
increase version to 1.4.0
Tip revision: d1829ef
test_uvflag.py
# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2019 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License

from __future__ import division

import pytest
import os
import numpy as np
import pyuvdata.tests as uvtest
from pyuvdata import UVData
from pyuvdata import UVCal
from pyuvdata.data import DATA_PATH
from pyuvdata import UVFlag
from pyuvdata.uvflag import lst_from_uv
from pyuvdata.uvflag import flags2waterfall
from pyuvdata.uvflag import and_rows_cols
from pyuvdata import version as uvversion
import shutil
import copy


test_d_file = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvcAA')
test_c_file = os.path.join(DATA_PATH, 'zen.2457555.42443.HH.uvcA.omni.calfits')
test_f_file = test_d_file + '.testuvflag.h5'
test_outfile = os.path.join(DATA_PATH, 'test', 'outtest_uvflag.h5')

pyuvdata_version_str = uvversion.version + '.'
if uvversion.git_hash is not '':
    pyuvdata_version_str += ('  Git origin: ' + uvversion.git_origin
                             + '.  Git hash: ' + uvversion.git_hash
                             + '.  Git branch: ' + uvversion.git_branch
                             + '.  Git description: ' + uvversion.git_description + '.')


def test_init_UVData():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv, history='I made a UVFlag object', label='test')
    assert uvf.metric_array.shape == uv.flag_array.shape
    assert np.all(uvf.metric_array == 0)
    assert uvf.weights_array.shape == uv.flag_array.shape
    assert np.all(uvf.weights_array == 1)
    assert uvf.type == 'baseline'
    assert uvf.mode == 'metric'
    assert np.all(uvf.time_array == uv.time_array)
    assert np.all(uvf.lst_array == uv.lst_array)
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.polarization_array)
    assert np.all(uvf.baseline_array == uv.baseline_array)
    assert np.all(uvf.ant_1_array == uv.ant_1_array)
    assert np.all(uvf.ant_2_array == uv.ant_2_array)
    assert 'I made a UVFlag object' in uvf.history
    assert 'Flag object with type "baseline"' in uvf.history
    assert pyuvdata_version_str in uvf.history
    assert uvf.label == 'test'


def test_init_UVData_copy_flags():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'metric'},
                               nwarnings=1, message='Copying flags to type=="baseline"')
    assert not hasattr(uvf, 'metric_array')  # Should be flag due to copy flags
    assert np.array_equal(uvf.flag_array, uv.flag_array)
    assert uvf.weights_array.shape == uv.flag_array.shape
    assert np.all(uvf.weights_array == 1)
    assert uvf.type == 'baseline'
    assert uvf.mode == 'flag'
    assert np.all(uvf.time_array == uv.time_array)
    assert np.all(uvf.lst_array == uv.lst_array)
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.polarization_array)
    assert np.all(uvf.baseline_array == uv.baseline_array)
    assert np.all(uvf.ant_1_array == uv.ant_1_array)
    assert np.all(uvf.ant_2_array == uv.ant_2_array)
    assert 'Flag object with type "baseline"' in uvf.history
    assert pyuvdata_version_str in uvf.history


def test_init_UVCal():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    assert uvf.metric_array.shape == uvc.flag_array.shape
    assert np.all(uvf.metric_array == 0)
    assert uvf.weights_array.shape == uvc.flag_array.shape
    assert np.all(uvf.weights_array == 1)
    assert uvf.type == 'antenna'
    assert uvf.mode == 'metric'
    assert np.all(uvf.time_array == uvc.time_array)
    lst = lst_from_uv(uvc)
    assert np.all(uvf.lst_array == lst)
    assert np.all(uvf.freq_array == uvc.freq_array[0])
    assert np.all(uvf.polarization_array == uvc.jones_array)
    assert np.all(uvf.ant_array == uvc.ant_array)
    assert 'Flag object with type "antenna"' in uvf.history
    assert pyuvdata_version_str in uvf.history


def test_init_cal_copy_flags():
    uv = UVCal()
    uv.read_calfits(test_c_file)
    uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'metric'},
                               nwarnings=1, message='Copying flags to type=="antenna"')
    assert not hasattr(uvf, 'metric_array')  # Should be flag due to copy flags
    assert np.array_equal(uvf.flag_array, uv.flag_array)
    assert uvf.weights_array.shape == uv.flag_array.shape
    assert uvf.type == 'antenna'
    assert uvf.mode == 'flag'
    assert np.all(uvf.time_array == np.unique(uv.time_array))
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.jones_array)
    assert pyuvdata_version_str in uvf.history


def test_init_waterfall_uvd():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv, waterfall=True)
    assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)
    assert np.all(uvf.metric_array == 0)
    assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)
    assert np.all(uvf.weights_array == 1)
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'metric'
    assert np.all(uvf.time_array == np.unique(uv.time_array))
    assert np.all(uvf.lst_array == np.unique(uv.lst_array))
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.polarization_array)
    assert 'Flag object with type "waterfall"' in uvf.history
    assert pyuvdata_version_str in uvf.history


def test_init_waterfall_uvc():
    uv = UVCal()
    uv.read_calfits(test_c_file)
    uvf = UVFlag(uv, waterfall=True)
    assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
    assert np.all(uvf.metric_array == 0)
    assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
    assert np.all(uvf.weights_array == 1)
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'metric'
    assert np.all(uvf.time_array == np.unique(uv.time_array))
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.jones_array)
    assert 'Flag object with type "waterfall"' in uvf.history
    assert pyuvdata_version_str in uvf.history


def test_init_waterfall_flag():
    uv = UVCal()
    uv.read_calfits(test_c_file)
    uvf = UVFlag(uv, waterfall=True, mode='flag')
    assert uvf.flag_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
    assert not np.any(uvf.flag_array)
    assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
    assert np.all(uvf.weights_array == 1)
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'flag'
    assert np.all(uvf.time_array == np.unique(uv.time_array))
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.jones_array)
    assert 'Flag object with type "waterfall"' in uvf.history
    assert pyuvdata_version_str in uvf.history


def test_init_waterfall_copy_flags():
    uv = UVCal()
    uv.read_calfits(test_c_file)
    uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'flag', 'waterfall': True},
                               nwarnings=1, message='Copying flags into waterfall')
    assert not hasattr(uvf, 'flag_array')  # Should be metric due to copy flags
    assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
    assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'metric'
    assert np.all(uvf.time_array == np.unique(uv.time_array))
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.jones_array)
    assert 'Flag object with type "waterfall"' in uvf.history
    assert pyuvdata_version_str in uvf.history


@uvtest.skipIf_no_h5py
def test_read_write_loop():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv, label='test')
    uvf.write(test_outfile, clobber=True)
    uvf2 = UVFlag(test_outfile)
    # Update history to match expected additions that were made
    uvf.history += 'Written by ' + pyuvdata_version_str
    uvf.history += ' Read by ' + pyuvdata_version_str
    assert uvf.__eq__(uvf2, check_history=True)


@uvtest.skipIf_no_h5py
def test_read_write_ant():
    uv = UVCal()
    uv.read_calfits(test_c_file)
    uvf = UVFlag(uv, mode='flag', label='test')
    uvf.write(test_outfile, clobber=True)
    uvf2 = UVFlag(test_outfile)
    # Update history to match expected additions that were made
    uvf.history += 'Written by ' + pyuvdata_version_str
    uvf.history += ' Read by ' + pyuvdata_version_str
    assert uvf.__eq__(uvf2, check_history=True)


@uvtest.skipIf_no_h5py
def test_read_write_nocompress():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv, label='test')
    uvf.write(test_outfile, clobber=True, data_compression=None)
    uvf2 = UVFlag(test_outfile)
    # Update history to match expected additions that were made
    uvf.history += 'Written by ' + pyuvdata_version_str
    uvf.history += ' Read by ' + pyuvdata_version_str
    assert uvf.__eq__(uvf2, check_history=True)


@uvtest.skipIf_no_h5py
def test_read_write_nocompress_flag():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv, mode='flag', label='test')
    uvf.write(test_outfile, clobber=True, data_compression=None)
    uvf2 = UVFlag(test_outfile)
    # Update history to match expected additions that were made
    uvf.history += 'Written by ' + pyuvdata_version_str
    uvf.history += ' Read by ' + pyuvdata_version_str
    assert uvf.__eq__(uvf2, check_history=True)


@uvtest.skipIf_no_h5py
def test_init_list():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uv.time_array -= 1
    uvf = UVFlag([uv, test_f_file])
    uvf1 = UVFlag(uv)
    uvf2 = UVFlag(test_f_file)
    assert np.array_equal(np.concatenate((uvf1.metric_array, uvf2.metric_array), axis=0),
                          uvf.metric_array)
    assert np.array_equal(np.concatenate((uvf1.weights_array, uvf2.weights_array), axis=0),
                          uvf.weights_array)
    assert np.array_equal(np.concatenate((uvf1.time_array, uvf2.time_array)),
                          uvf.time_array)
    assert np.array_equal(np.concatenate((uvf1.baseline_array, uvf2.baseline_array)),
                          uvf.baseline_array)
    assert np.array_equal(np.concatenate((uvf1.ant_1_array, uvf2.ant_1_array)),
                          uvf.ant_1_array)
    assert np.array_equal(np.concatenate((uvf1.ant_2_array, uvf2.ant_2_array)),
                          uvf.ant_2_array)
    assert uvf.mode == 'metric'
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.polarization_array)


@uvtest.skipIf_no_h5py
def test_read_list():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uv.time_array -= 1
    uvf = UVFlag(uv)
    uvf.write(test_outfile, clobber=True)
    uvf.read([test_outfile, test_f_file])
    uvf1 = UVFlag(uv)
    uvf2 = UVFlag(test_f_file)
    assert np.array_equal(np.concatenate((uvf1.metric_array, uvf2.metric_array), axis=0),
                          uvf.metric_array)
    assert np.array_equal(np.concatenate((uvf1.weights_array, uvf2.weights_array), axis=0),
                          uvf.weights_array)
    assert np.array_equal(np.concatenate((uvf1.time_array, uvf2.time_array)),
                          uvf.time_array)
    assert np.array_equal(np.concatenate((uvf1.baseline_array, uvf2.baseline_array)),
                          uvf.baseline_array)
    assert np.array_equal(np.concatenate((uvf1.ant_1_array, uvf2.ant_1_array)),
                          uvf.ant_1_array)
    assert np.array_equal(np.concatenate((uvf1.ant_2_array, uvf2.ant_2_array)),
                          uvf.ant_2_array)
    assert uvf.mode == 'metric'
    assert np.all(uvf.freq_array == uv.freq_array[0])
    assert np.all(uvf.polarization_array == uv.polarization_array)


@uvtest.skipIf_no_h5py
def test_read_error():
    pytest.raises(IOError, UVFlag, 'foo')


@uvtest.skipIf_no_h5py
def test_read_change_type():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    uvf.write(test_outfile, clobber=True)
    assert hasattr(uvf, 'ant_array')
    uvf.read(test_f_file)
    assert not hasattr(uvf, 'ant_array')
    assert hasattr(uvf, 'baseline_array')
    assert hasattr(uvf, 'ant_1_array')
    assert hasattr(uvf, 'ant_2_array')
    uvf.read(test_outfile)
    assert hasattr(uvf, 'ant_array')
    assert not hasattr(uvf, 'baseline_array')
    assert not hasattr(uvf, 'ant_1_array')
    assert not hasattr(uvf, 'ant_2_array')


@uvtest.skipIf_no_h5py
def test_read_change_mode():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv, mode='flag')
    assert hasattr(uvf, 'flag_array')
    assert not hasattr(uvf, 'metric_array')
    uvf.write(test_outfile, clobber=True)
    uvf.read(test_f_file)
    assert hasattr(uvf, 'metric_array')
    assert not hasattr(uvf, 'flag_array')
    uvf.read(test_outfile)
    assert hasattr(uvf, 'flag_array')
    assert not hasattr(uvf, 'metric_array')


@uvtest.skipIf_no_h5py
def test_write_no_clobber():
    uvf = UVFlag(test_f_file)
    pytest.raises(ValueError, uvf.write, test_f_file)


def test_lst_from_uv():
    uv = UVData()
    uv.read_miriad(test_d_file)
    lst_array = lst_from_uv(uv)
    assert np.allclose(uv.lst_array, lst_array)


def test_lst_from_uv_error():
    pytest.raises(ValueError, lst_from_uv, 4)


@uvtest.skipIf_no_h5py
def test_add():
    uv1 = UVFlag(test_f_file)
    uv2 = copy.deepcopy(uv1)
    uv2.time_array += 1  # Add a day
    uv3 = uv1 + uv2
    assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)),
                          uv3.time_array)
    assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)),
                          uv3.baseline_array)
    assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)),
                          uv3.ant_1_array)
    assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)),
                          uv3.ant_2_array)
    assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)),
                          uv3.lst_array)
    assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0),
                          uv3.metric_array)
    assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
                          uv3.weights_array)
    assert np.array_equal(uv1.freq_array, uv3.freq_array)
    assert uv3.type == 'baseline'
    assert uv3.mode == 'metric'
    assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
    assert 'Data combined along time axis with ' + pyuvdata_version_str in uv3.history


@uvtest.skipIf_no_h5py
def test_add_baseline():
    uv1 = UVFlag(test_f_file)
    uv2 = copy.deepcopy(uv1)
    uv2.baseline_array += 100  # Arbitrary
    uv3 = uv1.__add__(uv2, axis='baseline')
    assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)),
                          uv3.time_array)
    assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)),
                          uv3.baseline_array)
    assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)),
                          uv3.ant_1_array)
    assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)),
                          uv3.ant_2_array)
    assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)),
                          uv3.lst_array)
    assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0),
                          uv3.metric_array)
    assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
                          uv3.weights_array)
    assert np.array_equal(uv1.freq_array, uv3.freq_array)
    assert uv3.type == 'baseline'
    assert uv3.mode == 'metric'
    assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
    assert 'Data combined along baseline axis with ' + pyuvdata_version_str in uv3.history


def test_add_antenna():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uv1 = UVFlag(uvc)
    uv2 = copy.deepcopy(uv1)
    uv2.ant_array += 100  # Arbitrary
    uv3 = uv1.__add__(uv2, axis='antenna')
    assert np.array_equal(np.concatenate((uv1.ant_array, uv2.ant_array)),
                          uv3.ant_array)
    assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0),
                          uv3.metric_array)
    assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
                          uv3.weights_array)
    assert np.array_equal(uv1.freq_array, uv3.freq_array)
    assert np.array_equal(uv1.time_array, uv3.time_array)
    assert np.array_equal(uv1.lst_array, uv3.lst_array)
    assert uv3.type == 'antenna'
    assert uv3.mode == 'metric'
    assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
    assert 'Data combined along antenna axis with ' + pyuvdata_version_str in uv3.history


@uvtest.skipIf_no_h5py
def test_add_frequency():
    uv1 = UVFlag(test_f_file)
    uv2 = copy.deepcopy(uv1)
    uv2.freq_array += 1e4  # Arbitrary
    uv3 = uv1.__add__(uv2, axis='frequency')
    assert np.array_equal(np.concatenate((uv1.freq_array, uv2.freq_array)),
                          uv3.freq_array)
    assert np.array_equal(uv1.time_array, uv3.time_array)
    assert np.array_equal(uv1.baseline_array, uv3.baseline_array)
    assert np.array_equal(uv1.ant_1_array, uv3.ant_1_array)
    assert np.array_equal(uv1.ant_2_array, uv3.ant_2_array)
    assert np.array_equal(uv1.lst_array, uv3.lst_array)
    assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=2),
                          uv3.metric_array)
    assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=2),
                          uv3.weights_array)
    assert uv3.type == 'baseline'
    assert uv3.mode == 'metric'
    assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
    assert 'Data combined along frequency axis with ' + pyuvdata_version_str in uv3.history


@uvtest.skipIf_no_h5py
def test_add_pol():
    uv1 = UVFlag(test_f_file)
    uv2 = copy.deepcopy(uv1)
    uv2.polarization_array += 1  # Arbitrary
    uv3 = uv1.__add__(uv2, axis='polarization')
    assert np.array_equal(uv1.freq_array, uv3.freq_array)
    assert np.array_equal(uv1.time_array, uv3.time_array)
    assert np.array_equal(uv1.baseline_array, uv3.baseline_array)
    assert np.array_equal(uv1.ant_1_array, uv3.ant_1_array)
    assert np.array_equal(uv1.ant_2_array, uv3.ant_2_array)
    assert np.array_equal(uv1.lst_array, uv3.lst_array)
    assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=3),
                          uv3.metric_array)
    assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=3),
                          uv3.weights_array)
    assert uv3.type == 'baseline'
    assert uv3.mode == 'metric'
    assert np.array_equal(np.concatenate((uv1.polarization_array, uv2.polarization_array)),
                          uv3.polarization_array)
    assert 'Data combined along polarization axis with ' + pyuvdata_version_str in uv3.history


def test_add_flag():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uv1 = UVFlag(uv, mode='flag')
    uv2 = copy.deepcopy(uv1)
    uv2.time_array += 1  # Add a day
    uv3 = uv1 + uv2
    assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)),
                          uv3.time_array)
    assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)),
                          uv3.baseline_array)
    assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)),
                          uv3.ant_1_array)
    assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)),
                          uv3.ant_2_array)
    assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)),
                          uv3.lst_array)
    assert np.array_equal(np.concatenate((uv1.flag_array, uv2.flag_array), axis=0),
                          uv3.flag_array)
    assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
                          uv3.weights_array)
    assert np.array_equal(uv1.freq_array, uv3.freq_array)
    assert uv3.type == 'baseline'
    assert uv3.mode == 'flag'
    assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
    assert 'Data combined along time axis with ' + pyuvdata_version_str in uv3.history


def test_add_errors():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uv1 = UVFlag(uv)
    # Mismatched classes
    pytest.raises(ValueError, uv1.__add__, 3)
    # Mismatched types
    uv2 = UVFlag(uvc)
    pytest.raises(ValueError, uv1.__add__, uv2)
    # Mismatched modes
    uv3 = UVFlag(uv, mode='flag')
    pytest.raises(ValueError, uv1.__add__, uv3)
    # Invalid axes
    pytest.raises(ValueError, uv1.__add__, uv1, axis='antenna')
    pytest.raises(ValueError, uv2.__add__, uv2, axis='baseline')


@uvtest.skipIf_no_h5py
def test_inplace_add():
    uv1a = UVFlag(test_f_file)
    uv1b = copy.deepcopy(uv1a)
    uv2 = copy.deepcopy(uv1a)
    uv2.time_array += 1
    uv1a += uv2
    assert uv1a.__eq__(uv1b + uv2)


@uvtest.skipIf_no_h5py
def test_clear_unused_attributes():
    uv = UVFlag(test_f_file)
    assert hasattr(uv, 'baseline_array')
    assert hasattr(uv, 'ant_1_array')
    assert hasattr(uv, 'ant_2_array')
    assert hasattr(uv, 'Nants_telescope')
    uv.type = 'antenna'
    uv.clear_unused_attributes()
    assert not hasattr(uv, 'baseline_array')
    assert not hasattr(uv, 'ant_1_array')
    assert not hasattr(uv, 'ant_2_array')
    assert not hasattr(uv, 'Nants_telescope')
    uv.mode = 'flag'
    assert hasattr(uv, 'metric_array')
    uv.clear_unused_attributes()
    assert not hasattr(uv, 'metric_array')

    # Start over
    uv = UVFlag(test_f_file)
    uv.ant_array = np.array([4])
    uv.flag_array = np.array([5])
    uv.clear_unused_attributes()
    assert not hasattr(uv, 'ant_array')
    assert not hasattr(uv, 'flag_array')


@uvtest.skipIf_no_h5py
def test_not_equal():
    uvf1 = UVFlag(test_f_file)
    # different class
    assert not uvf1.__eq__(5)
    # different mode
    uvf2 = uvf1.copy()
    uvf2.mode = 'flag'
    assert not uvf1.__eq__(uvf2)
    # different type
    uvf2 = uvf1.copy()
    uvf2.type = 'antenna'
    assert not uvf1.__eq__(uvf2)
    # array different
    uvf2 = uvf1.copy()
    uvf2.freq_array += 1
    assert not uvf1.__eq__(uvf2)
    # history different
    uvf2 = uvf1.copy()
    uvf2.history += 'hello'
    assert not uvf1.__eq__(uvf2, check_history=True)


@uvtest.skipIf_no_h5py
def test_to_waterfall_bl():
    uvf = UVFlag(test_f_file)
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf.to_waterfall()
    assert uvf.type == 'waterfall'
    assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
                                      len(uvf.polarization_array))
    assert uvf.weights_array.shape == uvf.metric_array.shape


@uvtest.skipIf_no_h5py
def test_to_waterfall_bl_multi_pol():
    uvf = UVFlag(test_f_file)
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf2 = uvf.copy()
    uvf2.polarization_array[0] = -4
    uvf.__add__(uvf2, inplace=True, axis='pol')  # Concatenate to form multi-pol object
    uvf2 = uvf.copy()  # Keep a copy to run with keep_pol=False
    uvf.to_waterfall()
    assert uvf.type == 'waterfall'
    assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
                                      len(uvf.polarization_array))
    assert uvf.weights_array.shape == uvf.metric_array.shape
    assert len(uvf.polarization_array) == 2
    # Repeat with keep_pol=False
    uvf2.to_waterfall(keep_pol=False)
    assert uvf2.type == 'waterfall'
    assert uvf2.metric_array.shape == (len(uvf2.time_array), len(uvf.freq_array), 1)
    assert uvf2.weights_array.shape == uvf2.metric_array.shape
    assert len(uvf2.polarization_array) == 1
    assert uvf2.polarization_array[0] == np.string_(','.join(map(str, uvf.polarization_array)))


@uvtest.skipIf_no_h5py
def test_collapse_pol():
    uvf = UVFlag(test_f_file)
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf2 = uvf.copy()
    uvf2.polarization_array[0] = -4
    uvf.__add__(uvf2, inplace=True, axis='pol')  # Concatenate to form multi-pol object
    uvf2 = uvf.copy()
    uvf2.collapse_pol()
    assert len(uvf2.polarization_array) == 1
    assert uvf2.polarization_array[0] == np.string_(','.join(map(str, uvf.polarization_array)))
    assert uvf2.mode == 'metric'
    assert hasattr(uvf2, 'metric_array')
    assert not hasattr(uvf2, 'flag_array')

    # test writing it out and reading in to make sure polarization_array has correct type
    uvf2.write(test_outfile, clobber=True)
    uvf = UVFlag(test_outfile)
    assert uvf == uvf2
    os.remove(test_outfile)


@uvtest.skipIf_no_h5py
def test_collapse_pol_or():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf2 = uvf.copy()
    uvf2.polarization_array[0] = -4
    uvf.__add__(uvf2, inplace=True, axis='pol')  # Concatenate to form multi-pol object
    uvf2 = uvf.copy()
    uvf2.collapse_pol(method='or')
    assert len(uvf2.polarization_array) == 1
    assert uvf2.polarization_array[0] == np.string_(','.join(map(str, uvf.polarization_array)))
    assert uvf2.mode == 'flag'
    assert hasattr(uvf2, 'flag_array')
    assert not hasattr(uvf2, 'metric_array')


@uvtest.skipIf_no_h5py
def test_collapse_single_pol():
    uvf = UVFlag(test_f_file)
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf2 = uvf.copy()
    uvtest.checkWarnings(uvf.collapse_pol, [], {}, nwarnings=1,
                         message='Cannot collapse polarization')
    assert uvf == uvf2


@uvtest.skipIf_no_h5py
def test_collapse_pol_flag():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf2 = uvf.copy()
    uvf2.polarization_array[0] = -4
    uvf.__add__(uvf2, inplace=True, axis='pol')  # Concatenate to form multi-pol object
    uvf2 = uvf.copy()
    uvf2.collapse_pol()
    assert len(uvf2.polarization_array) == 1
    assert uvf2.polarization_array[0] == np.string_(','.join(map(str, uvf.polarization_array)))
    assert uvf2.mode == 'metric'
    assert hasattr(uvf2, 'metric_array')
    assert not hasattr(uvf2, 'flag_array')


@uvtest.skipIf_no_h5py
def test_to_waterfall_bl_flags():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf.to_waterfall()
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'metric'
    assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
                                      len(uvf.polarization_array))
    assert uvf.weights_array.shape == uvf.metric_array.shape
    assert len(uvf.lst_array) == len(uvf.time_array)


@uvtest.skipIf_no_h5py
def test_to_waterfall_bl_flags_or():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf.to_waterfall(method='or')
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'flag'
    assert uvf.flag_array.shape == (len(uvf.time_array), len(uvf.freq_array),
                                    len(uvf.polarization_array))
    assert np.array_equal(uvf.weights_array, np.ones_like(uvf.flag_array, np.float))
    assert len(uvf.lst_array) == len(uvf.time_array)
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf.weights_array[0, 0, 0, 0] = 0.2
    uvtest.checkWarnings(uvf.to_waterfall, [], {'method': 'or'}, nwarnings=1,
                         message='Currently weights are')
    assert uvf.type == 'waterfall'
    assert uvf.mode == 'flag'
    assert uvf.flag_array.shape == (len(uvf.time_array), len(uvf.freq_array),
                                    len(uvf.polarization_array))
    assert np.array_equal(uvf.weights_array, np.ones_like(uvf.flag_array, np.float))
    assert len(uvf.lst_array) == len(uvf.time_array)


def test_to_waterfall_ant():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf.to_waterfall()
    assert uvf.type == 'waterfall'
    assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
                                      len(uvf.polarization_array))
    assert uvf.weights_array.shape == uvf.metric_array.shape
    assert len(uvf.lst_array) == len(uvf.time_array)


@uvtest.skipIf_no_h5py
def test_to_waterfall_waterfall():
    uvf = UVFlag(test_f_file)
    uvf.weights_array = np.ones_like(uvf.weights_array)
    uvf.to_waterfall()
    uvtest.checkWarnings(uvf.to_waterfall, [], {}, nwarnings=1,
                         message='This object is already a waterfall')


def test_to_baseline_flags():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv)
    uvf.to_waterfall()
    uvf.to_flag()
    uvf.flag_array[0, 10, 0] = True  # Flag time0, chan10
    uvf.flag_array[1, 15, 0] = True  # Flag time1, chan15
    uvf.to_baseline(uv)
    assert uvf.type == 'baseline'
    assert np.all(uvf.baseline_array == uv.baseline_array)
    assert np.all(uvf.time_array == uv.time_array)
    times = np.unique(uvf.time_array)
    ntrue = 0.0
    ind = np.where(uvf.time_array == times[0])[0]
    ntrue += len(ind)
    assert np.all(uvf.flag_array[ind, 0, 10, 0])
    ind = np.where(uvf.time_array == times[1])[0]
    ntrue += len(ind)
    assert np.all(uvf.flag_array[ind, 0, 15, 0])
    assert uvf.flag_array.mean() == ntrue / uvf.flag_array.size


def test_to_baseline_metric():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv)
    uvf.to_waterfall()
    uvf.metric_array[0, 10, 0] = 3.2  # Fill in time0, chan10
    uvf.metric_array[1, 15, 0] = 2.1  # Fill in time1, chan15
    uvf.to_baseline(uv)
    assert np.all(uvf.baseline_array == uv.baseline_array)
    assert np.all(uvf.time_array == uv.time_array)
    times = np.unique(uvf.time_array)
    ind = np.where(uvf.time_array == times[0])[0]
    nt0 = len(ind)
    assert np.all(uvf.metric_array[ind, 0, 10, 0] == 3.2)
    ind = np.where(uvf.time_array == times[1])[0]
    nt1 = len(ind)
    assert np.all(uvf.metric_array[ind, 0, 15, 0] == 2.1)
    assert np.isclose(uvf.metric_array.mean(),
                      (3.2 * nt0 + 2.1 * nt1) / uvf.metric_array.size)


def test_baseline_to_baseline():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv)
    uvf2 = uvf.copy()
    uvf.to_baseline(uv)
    assert uvf == uvf2


@uvtest.skipIf_no_h5py
def test_to_baseline_errors():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(test_f_file)
    uvf.to_waterfall()
    pytest.raises(ValueError, uvf.to_baseline, 7.3)  # invalid matching object
    uvf = UVFlag(uvc)
    pytest.raises(ValueError, uvf.to_baseline, uv)  # Cannot pass in antenna type
    uvf = UVFlag(test_f_file)
    uvf.to_waterfall()
    uvf2 = uvf.copy()
    uvf.polarization_array[0] = -4
    pytest.raises(ValueError, uvf.to_baseline, uv)  # Mismatched pols
    uvf.__iadd__(uvf2, axis='polarization')
    pytest.raises(ValueError, uvf.to_baseline, uv)  # Mismatched pols, can't be forced


def test_to_baseline_force_pol():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv)
    uvf.to_waterfall()
    uvf.to_flag()
    uvf.flag_array[0, 10, 0] = True  # Flag time0, chan10
    uvf.flag_array[1, 15, 0] = True  # Flag time1, chan15
    uvf.polarization_array[0] = -4  # Change pol, but force pol anyway
    uvf.to_baseline(uv, force_pol=True)
    assert np.all(uvf.baseline_array == uv.baseline_array)
    assert np.all(uvf.time_array == uv.time_array)
    assert np.array_equal(uvf.polarization_array, uv.polarization_array)
    times = np.unique(uvf.time_array)
    ntrue = 0.0
    ind = np.where(uvf.time_array == times[0])[0]
    ntrue += len(ind)
    assert np.all(uvf.flag_array[ind, 0, 10, 0])
    ind = np.where(uvf.time_array == times[1])[0]
    ntrue += len(ind)
    assert np.all(uvf.flag_array[ind, 0, 15, 0])
    assert uvf.flag_array.mean() == ntrue / uvf.flag_array.size


def test_to_baseline_metric_force_pol():
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(uv)
    uvf.to_waterfall()
    uvf.metric_array[0, 10, 0] = 3.2  # Fill in time0, chan10
    uvf.metric_array[1, 15, 0] = 2.1  # Fill in time1, chan15
    uvf.polarization_array[0] = -4
    uvf.to_baseline(uv, force_pol=True)
    assert np.all(uvf.baseline_array == uv.baseline_array)
    assert np.all(uvf.time_array == uv.time_array)
    assert np.array_equal(uvf.polarization_array, uv.polarization_array)
    times = np.unique(uvf.time_array)
    ind = np.where(uvf.time_array == times[0])[0]
    nt0 = len(ind)
    assert np.all(uvf.metric_array[ind, 0, 10, 0] == 3.2)
    ind = np.where(uvf.time_array == times[1])[0]
    nt1 = len(ind)
    assert np.all(uvf.metric_array[ind, 0, 15, 0] == 2.1)
    assert np.isclose(uvf.metric_array.mean(),
                      (3.2 * nt0 + 2.1 * nt1) / uvf.metric_array.size)


def test_to_antenna_flags():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    uvf.to_waterfall()
    uvf.to_flag()
    uvf.flag_array[0, 10, 0] = True  # Flag time0, chan10
    uvf.flag_array[1, 15, 0] = True  # Flag time1, chan15
    uvf.to_antenna(uvc)
    assert uvf.type == 'antenna'
    assert np.all(uvf.ant_array == uvc.ant_array)
    assert np.all(uvf.time_array == uvc.time_array)
    assert np.all(uvf.flag_array[:, 0, 10, 0, 0])
    assert np.all(uvf.flag_array[:, 0, 15, 1, 0])
    assert uvf.flag_array.mean() == 2. * uvc.Nants_data / uvf.flag_array.size


def test_to_antenna_metric():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    uvf.to_waterfall()
    uvf.metric_array[0, 10, 0] = 3.2  # Fill in time0, chan10
    uvf.metric_array[1, 15, 0] = 2.1  # Fill in time1, chan15
    uvf.to_antenna(uvc)
    assert np.all(uvf.ant_array == uvc.ant_array)
    assert np.all(uvf.time_array == uvc.time_array)
    assert np.all(uvf.metric_array[:, 0, 10, 0, 0] == 3.2)
    assert np.all(uvf.metric_array[:, 0, 15, 1, 0] == 2.1)
    assert np.isclose(uvf.metric_array.mean(),
                      (3.2 + 2.1) * uvc.Nants_data / uvf.metric_array.size)


def test_to_antenna_flags_match_uvflag():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    uvf2 = uvf.copy()
    uvf.to_waterfall()
    uvf.to_flag()
    uvf.flag_array[0, 10, 0] = True  # Flag time0, chan10
    uvf.flag_array[1, 15, 0] = True  # Flag time1, chan15
    uvf.to_antenna(uvf2)
    assert np.all(uvf.ant_array == uvc.ant_array)
    assert np.all(uvf.time_array == uvc.time_array)
    assert np.all(uvf.flag_array[:, 0, 10, 0, 0])
    assert np.all(uvf.flag_array[:, 0, 15, 1, 0])
    assert uvf.flag_array.mean() == 2. * uvc.Nants_data / uvf.flag_array.size


def test_antenna_to_antenna():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    uvf2 = uvf.copy()
    uvf.to_antenna(uvc)
    assert uvf == uvf2


@uvtest.skipIf_no_h5py
def test_to_antenna_errors():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uv = UVData()
    uv.read_miriad(test_d_file)
    uvf = UVFlag(test_f_file)
    uvf.to_waterfall()
    pytest.raises(ValueError, uvf.to_antenna, 7.3)  # invalid matching object
    uvf = UVFlag(uv)
    pytest.raises(ValueError, uvf.to_antenna, uvc)  # Cannot pass in baseline type
    uvf = UVFlag(test_f_file)
    uvf.to_waterfall()
    uvf2 = uvf.copy()
    uvf.polarization_array[0] = -4
    pytest.raises(ValueError, uvf.to_antenna, uvc)  # Mismatched pols
    uvf.__iadd__(uvf2, axis='polarization')
    pytest.raises(ValueError, uvf.to_antenna, uvc)  # Mismatched pols, can't be forced


def test_to_antenna_force_pol():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvc.select(jones=-5)
    uvf = UVFlag(uvc)
    uvf.to_waterfall()
    uvf.to_flag()
    uvf.flag_array[0, 10, 0] = True  # Flag time0, chan10
    uvf.flag_array[1, 15, 0] = True  # Flag time1, chan15
    uvf.polarization_array[0] = -4  # Change pol, but force pol anyway
    uvf.to_antenna(uvc, force_pol=True)
    assert np.all(uvf.ant_array == uvc.ant_array)
    assert np.all(uvf.time_array == uvc.time_array)
    assert np.array_equal(uvf.polarization_array, uvc.jones_array)
    assert np.all(uvf.flag_array[:, 0, 10, 0, 0])
    assert np.all(uvf.flag_array[:, 0, 15, 1, 0])
    assert uvf.flag_array.mean() == 2 * uvc.Nants_data / uvf.flag_array.size


def test_to_antenna_metric_force_pol():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvc.select(jones=-5)
    uvf = UVFlag(uvc)
    uvf.to_waterfall()
    uvf.metric_array[0, 10, 0] = 3.2  # Fill in time0, chan10
    uvf.metric_array[1, 15, 0] = 2.1  # Fill in time1, chan15
    uvf.polarization_array[0] = -4
    uvf.to_antenna(uvc, force_pol=True)
    assert np.all(uvf.ant_array == uvc.ant_array)
    assert np.all(uvf.time_array == uvc.time_array)
    assert np.array_equal(uvf.polarization_array, uvc.jones_array)
    assert np.all(uvf.metric_array[:, 0, 10, 0, 0] == 3.2)
    assert np.all(uvf.metric_array[:, 0, 15, 1, 0] == 2.1)
    assert np.isclose(uvf.metric_array.mean(),
                      (3.2 + 2.1) * uvc.Nants_data / uvf.metric_array.size)


@uvtest.skipIf_no_h5py
def test_copy():
    uvf = UVFlag(test_f_file)
    uvf2 = uvf.copy()
    assert uvf == uvf2
    # Make sure it's a copy and not just pointing to same object
    uvf.to_waterfall()
    assert uvf != uvf2


@uvtest.skipIf_no_h5py
def test_or():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf2 = uvf.copy()
    uvf2.flag_array = np.ones_like(uvf2.flag_array)
    uvf.flag_array[0] = True
    uvf2.flag_array[0] = False
    uvf2.flag_array[1] = False
    uvf3 = uvf | uvf2
    assert np.all(uvf3.flag_array[0])
    assert not np.any(uvf3.flag_array[1])
    assert np.all(uvf3.flag_array[2:])


@uvtest.skipIf_no_h5py
def test_or_error():
    uvf = UVFlag(test_f_file)
    uvf2 = uvf.copy()
    uvf.to_flag()
    pytest.raises(ValueError, uvf.__or__, uvf2)


@uvtest.skipIf_no_h5py
def test_or_add_history():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf2 = uvf.copy()
    uvf2.history = 'Different history'
    uvf3 = uvf | uvf2
    assert uvf.history in uvf3.history
    assert uvf2.history in uvf3.history
    assert "Flags OR'd with:" in uvf3.history


@uvtest.skipIf_no_h5py
def test_ior():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf2 = uvf.copy()
    uvf2.flag_array = np.ones_like(uvf2.flag_array)
    uvf.flag_array[0] = True
    uvf2.flag_array[0] = False
    uvf2.flag_array[1] = False
    uvf |= uvf2
    assert np.all(uvf.flag_array[0])
    assert not np.any(uvf.flag_array[1])
    assert np.all(uvf.flag_array[2:])


@uvtest.skipIf_no_h5py
def test_to_flag():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    assert hasattr(uvf, 'flag_array')
    assert not hasattr(uvf, 'metric_array')
    assert uvf.mode == 'flag'
    assert 'Converted to mode "flag"' in uvf.history


@uvtest.skipIf_no_h5py
def test_to_flag_threshold():
    uvf = UVFlag(test_f_file)
    uvf.metric_array = np.zeros_like(uvf.metric_array)
    uvf.metric_array[0, 0, 4, 0] = 2.
    uvf.to_flag(threshold=1.)
    assert hasattr(uvf, 'flag_array')
    assert not hasattr(uvf, 'metric_array')
    assert uvf.mode == 'flag'
    assert uvf.flag_array[0, 0, 4, 0]
    assert np.sum(uvf.flag_array) == 1.
    assert 'Converted to mode "flag"' in uvf.history


@uvtest.skipIf_no_h5py
def test_flag_to_flag():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf2 = uvf.copy()
    uvf2.to_flag()
    assert uvf == uvf2


@uvtest.skipIf_no_h5py
def test_to_flag_unknown_mode():
    uvf = UVFlag(test_f_file)
    uvf.mode = 'foo'
    pytest.raises(ValueError, uvf.to_flag)


@uvtest.skipIf_no_h5py
def test_to_metric_baseline():
    uvf = UVFlag(test_f_file)
    uvf.to_flag()
    uvf.flag_array[:, :, 10] = True
    uvf.flag_array[1, :, :] = True
    assert hasattr(uvf, 'flag_array')
    assert not hasattr(uvf, 'metric_array')
    assert uvf.mode == 'flag'
    uvf.to_metric(convert_wgts=True)
    assert hasattr(uvf, 'metric_array')
    assert not hasattr(uvf, 'flag_array')
    assert uvf.mode == 'metric'
    assert 'Converted to mode "metric"' in uvf.history
    assert np.isclose(uvf.weights_array[1], 0.0).all()
    assert np.isclose(uvf.weights_array[:, :, 10], 0.0).all()


@uvtest.skipIf_no_h5py
def test_to_metric_waterfall():
    uvf = UVFlag(test_f_file)
    uvf.to_waterfall()
    uvf.to_flag()
    uvf.flag_array[:, 10] = True
    uvf.flag_array[1, :, :] = True
    uvf.to_metric(convert_wgts=True)
    assert np.isclose(uvf.weights_array[1], 0.0).all()
    assert np.isclose(uvf.weights_array[:, 10], 0.0).all()


def test_to_metric_antenna():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc, mode='flag')
    uvf.flag_array[10, :, :, 1, :] = True
    uvf.flag_array[15, :, 3, :, :] = True
    uvf.to_metric(convert_wgts=True)
    assert np.isclose(uvf.weights_array[10, :, :, 1, :], 0.0).all()
    assert np.isclose(uvf.weights_array[15, :, 3, :, :], 0.0).all()


@uvtest.skipIf_no_h5py
def test_metric_to_metric():
    uvf = UVFlag(test_f_file)
    uvf2 = uvf.copy()
    uvf.to_metric()
    assert uvf == uvf2


@uvtest.skipIf_no_h5py
def test_to_metric_unknown_mode():
    uvf = UVFlag(test_f_file)
    uvf.mode = 'foo'
    pytest.raises(ValueError, uvf.to_metric)


@uvtest.skipIf_no_h5py
def test_antpair2ind():
    uvf = UVFlag(test_f_file)
    ind = uvf.antpair2ind(uvf.ant_1_array[0], uvf.ant_2_array[0])
    assert np.all(uvf.ant_1_array[ind] == uvf.ant_1_array[0])
    assert np.all(uvf.ant_2_array[ind] == uvf.ant_2_array[0])


@uvtest.skipIf_no_h5py
def test_antpair2ind_nonbaseline():
    uvf = UVFlag(test_f_file)
    uvf.to_waterfall()
    pytest.raises(ValueError, uvf.antpair2ind, 0, 3)


@uvtest.skipIf_no_h5py
def test_baseline_to_antnums():
    uvf = UVFlag(test_f_file)
    a1, a2 = uvf.baseline_to_antnums(uvf.baseline_array[0])
    assert a1 == uvf.ant_1_array[0]
    assert a2 == uvf.ant_2_array[0]


@uvtest.skipIf_no_h5py
def test_get_baseline_nums():
    uvf = UVFlag(test_f_file)
    bls = uvf.get_baseline_nums()
    assert np.array_equal(bls, np.unique(uvf.baseline_array))


@uvtest.skipIf_no_h5py
def test_get_antpairs():
    uvf = UVFlag(test_f_file)
    antpairs = uvf.get_antpairs()
    for a1, a2 in antpairs:
        ind = np.where((uvf.ant_1_array == a1) & (uvf.ant_2_array == a2))[0]
        assert len(ind) > 0
    for a1, a2 in zip(uvf.ant_1_array, uvf.ant_2_array):
        assert (a1, a2) in antpairs


@uvtest.skipIf_no_h5py
def test_missing_Nants_telescope():
    import h5py

    testfile = os.path.join(DATA_PATH, 'test_missing_Nants.h5')
    shutil.copyfile(test_f_file, testfile)

    with h5py.File(testfile, 'r+') as f:
        del(f['/Header/Nants_telescope'])
    uvf = uvtest.checkWarnings(UVFlag, [testfile], {}, nwarnings=1,
                               message='Nants_telescope not availabe')
    uvf2 = UVFlag(test_f_file)
    uvf2.Nants_telescope = None
    assert uvf == uvf2
    os.remove(testfile)


def test_combine_metrics_inplace():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    np.random.seed(44)
    uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
    uvf2 = uvf.copy()
    uvf2.metric_array *= 2
    uvf3 = uvf.copy()
    uvf3.metric_array *= 3
    uvf.combine_metrics([uvf2, uvf3])
    factor = np.sqrt((1 + 4 + 9) / 3.) / 2.
    assert np.allclose(uvf.metric_array,
                       np.abs(uvf2.metric_array) * factor)


def test_combine_metrics_not_inplace():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    np.random.seed(44)
    uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
    uvf2 = uvf.copy()
    uvf2.metric_array *= 2
    uvf3 = uvf.copy()
    uvf3.metric_array *= 3
    uvf4 = uvf.combine_metrics([uvf2, uvf3], inplace=False)
    factor = np.sqrt((1 + 4 + 9) / 3.)
    assert np.allclose(uvf4.metric_array,
                       np.abs(uvf.metric_array) * factor)


def test_combine_metrics_not_uvflag():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    pytest.raises(ValueError, uvf.combine_metrics, 'bubblegum')


def test_combine_metrics_not_metric():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    np.random.seed(44)
    uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
    uvf2 = uvf.copy()
    uvf2.to_flag()
    pytest.raises(ValueError, uvf.combine_metrics, uvf2)


def test_combine_metrics_wrong_shape():
    uvc = UVCal()
    uvc.read_calfits(test_c_file)
    uvf = UVFlag(uvc)
    np.random.seed(44)
    uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
    uvf2 = uvf.copy()
    uvf2.to_waterfall()
    pytest.raises(ValueError, uvf.combine_metrics, uvf2)


def test_super():

    class test_class(UVFlag):

        def __init__(self, input, mode='metric', copy_flags=False,
                     waterfall=False, history='', label='', property='prop'):

            super(test_class, self).__init__(input, mode=mode, copy_flags=copy_flags,
                                             waterfall=waterfall, history=history,
                                             label=label)

            self.property = property

    uv = UVData()
    uv.read_miriad(test_d_file)

    tc = test_class(uv, property='property')

    # UVFlag.__init__ is tested, so just see if it has a metric array
    assert hasattr(tc, 'metric_array')
    # Check that it has the property
    assert tc.property == 'property'


def test_flags2waterfall():
    uv = UVData()
    uv.read_miriad(test_d_file)

    np.random.seed(0)
    uv.flag_array = np.random.randint(0, 2, size=uv.flag_array.shape, dtype=bool)
    wf = flags2waterfall(uv)
    assert np.allclose(np.mean(wf), np.mean(uv.flag_array))
    assert wf.shape == (uv.Ntimes, uv.Nfreqs)

    wf = flags2waterfall(uv, keep_pol=True)
    assert wf.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)

    # Test external flag_array
    uv.flag_array = np.zeros_like(uv.flag_array)
    f = np.random.randint(0, 2, size=uv.flag_array.shape, dtype=bool)
    wf = flags2waterfall(uv, flag_array=f)
    assert np.allclose(np.mean(wf), np.mean(f))
    assert wf.shape == (uv.Ntimes, uv.Nfreqs)

    # UVCal version
    uvc = UVCal()
    uvc.read_calfits(test_c_file)

    uvc.flag_array = np.random.randint(0, 2, size=uvc.flag_array.shape, dtype=bool)
    wf = flags2waterfall(uvc)
    assert np.allclose(np.mean(wf), np.mean(uvc.flag_array))
    assert wf.shape == (uvc.Ntimes, uvc.Nfreqs)

    wf = flags2waterfall(uvc, keep_pol=True)
    assert wf.shape == (uvc.Ntimes, uvc.Nfreqs, uvc.Njones)


def test_flags2waterfall_errors():

    # First argument must be UVData or UVCal object
    pytest.raises(ValueError, flags2waterfall, 5)

    uv = UVData()
    uv.read_miriad(test_d_file)
    # Flag array must have same shape as uv.flag_array
    pytest.raises(ValueError, flags2waterfall, uv, np.array([4, 5]))


def test_and_rows_cols():
    d = np.zeros((10, 20), np.bool)
    d[1, :] = True
    d[:, 2] = True
    d[5, 10:20] = True
    d[5:8, 5] = True

    o = and_rows_cols(d)
    assert o[1, :].all()
    assert o[:, 2].all()
    assert not o[5, :].all()
    assert not o[:, 5].all()
back to top