https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 983ae27b57a28550b6ee1a58e13cc9161bb89327 authored by Bryna Hazelton on 15 January 2020, 19:15:00 UTC
update release date
update release date
Tip revision: 983ae27
test_uvflag.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2019 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
from __future__ import division
import pytest
import os
import numpy as np
import pyuvdata.tests as uvtest
from pyuvdata import UVData
from pyuvdata import UVCal
from pyuvdata import utils as uvutils
from pyuvdata.data import DATA_PATH
from pyuvdata import UVFlag
from pyuvdata.uvflag import lst_from_uv
from pyuvdata.uvflag import flags2waterfall
from pyuvdata.uvflag import and_rows_cols
from pyuvdata import version as uvversion
import shutil
import copy
import warnings
import six
import h5py
# The following three fixtures are used regularly
# to initizize UVFlag objects from standard files
# We need to define these here in order to set up
# some skips for developers who do not have `pytest-cases` installed
@pytest.fixture(scope='function')
def uvf_from_miriad():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag()
uvf.from_uvdata(uv)
# yield the object for the test
yield uvf
# do some cleanup
del(uvf, uv)
@pytest.fixture(scope='function')
def uvf_from_uvcal():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag()
uvf.from_uvcal(uvc)
# yield the object for the test
yield uvf
# do some cleanup
del(uvf, uvc)
@pytest.fixture(scope='function')
def uvf_from_waterfall():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag()
uvf.from_uvdata(uv, waterfall=True)
# yield the object for the test
yield uvf
# do some cleanup
del(uvf, uv)
# Try to import `pytest-cases` and define decorators used to
# iterate over the three main types of UVFlag objects
# otherwise make the decorators skip the tests that use these iterators
try:
import pytest_cases
cases_decorator = pytest_cases.pytest_parametrize_plus(
"input_uvf", [pytest_cases.fixture_ref(uvf_from_miriad),
pytest_cases.fixture_ref(uvf_from_uvcal),
pytest_cases.fixture_ref(uvf_from_waterfall)])
cases_decorator_no_waterfall = pytest_cases.pytest_parametrize_plus(
"input_uvf", [pytest_cases.fixture_ref(uvf_from_miriad),
pytest_cases.fixture_ref(uvf_from_uvcal)])
# This warning is raised by pytest_cases
# It is due to a feature the developer does
# not know how to handle yet. ignore for now.
warnings.filterwarnings("ignore",
message="WARNING the new order is not"
+ " taken into account !!", append=True)
except ImportError:
cases_decorator = uvtest.skipIf_no_pytest_cases
cases_decorator_no_waterfall = uvtest.skipIf_no_pytest_cases
test_d_file = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvcAA')
test_c_file = os.path.join(DATA_PATH, 'zen.2457555.42443.HH.uvcA.omni.calfits')
test_f_file = test_d_file + '.testuvflag.h5'
test_outfile = os.path.join(DATA_PATH, 'test', 'outtest_uvflag.h5')
pyuvdata_version_str = (' Read/written with pyuvdata version: '
+ uvversion.version + '.')
if uvversion.git_hash != '':
pyuvdata_version_str += (' Git origin: ' + uvversion.git_origin
+ '. Git hash: ' + uvversion.git_hash
+ '. Git branch: ' + uvversion.git_branch
+ '. Git description: ' + uvversion.git_description + '.')
def test_init_bad_mode():
uv = UVData()
uv.read_miriad(test_d_file)
with pytest.raises(ValueError) as cm:
UVFlag(uv, mode='bad_mode',
history='I made a UVFlag object', label='test')
assert str(cm.value).startswith('Input mode must be within acceptable')
uv = UVCal()
uv.read_calfits(test_c_file)
with pytest.raises(ValueError) as cm:
UVFlag(uv, mode='bad_mode',
history='I made a UVFlag object', label='test')
assert str(cm.value).startswith('Input mode must be within acceptable')
def test_init_UVData():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, history='I made a UVFlag object', label='test')
assert uvf.metric_array.shape == uv.flag_array.shape
assert np.all(uvf.metric_array == 0)
assert uvf.weights_array.shape == uv.flag_array.shape
assert np.all(uvf.weights_array == 1)
assert uvf.type == 'baseline'
assert uvf.mode == 'metric'
assert np.all(uvf.time_array == uv.time_array)
assert np.all(uvf.lst_array == uv.lst_array)
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.ant_1_array == uv.ant_1_array)
assert np.all(uvf.ant_2_array == uv.ant_2_array)
assert 'I made a UVFlag object' in uvf.history
assert 'Flag object with type "baseline"' in uvf.history
assert pyuvdata_version_str in uvf.history
assert uvf.label == 'test'
def test_init_UVData_x_orientation():
uv = UVData()
uv.read_miriad(test_d_file)
uv.x_orientation = 'east'
uvf = UVFlag(uv, history='I made a UVFlag object', label='test')
assert uvf.x_orientation == uv.x_orientation
def test_init_UVData_copy_flags():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'metric'},
nwarnings=1, message='Copying flags to type=="baseline"')
# with copy flags uvf.metric_array should be none
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert np.array_equal(uvf.flag_array, uv.flag_array)
assert uvf.weights_array is None
assert uvf.type == 'baseline'
assert uvf.mode == 'flag'
assert np.all(uvf.time_array == uv.time_array)
assert np.all(uvf.lst_array == uv.lst_array)
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.ant_1_array == uv.ant_1_array)
assert np.all(uvf.ant_2_array == uv.ant_2_array)
assert 'Flag object with type "baseline"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_UVData_mode_flag():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag()
uvf.from_uvdata(uv, copy_flags=False, mode="flag")
# with copy flags uvf.metric_array should be none
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert np.array_equal(uvf.flag_array, uv.flag_array)
assert uvf.weights_array is None
assert uvf.type == 'baseline'
assert uvf.mode == 'flag'
assert np.all(uvf.time_array == uv.time_array)
assert np.all(uvf.lst_array == uv.lst_array)
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.ant_1_array == uv.ant_1_array)
assert np.all(uvf.ant_2_array == uv.ant_2_array)
assert 'Flag object with type "baseline"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_UVCal():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
assert uvf.metric_array.shape == uvc.flag_array.shape
assert np.all(uvf.metric_array == 0)
assert uvf.weights_array.shape == uvc.flag_array.shape
assert np.all(uvf.weights_array == 1)
assert uvf.type == 'antenna'
assert uvf.mode == 'metric'
assert np.all(uvf.time_array == uvc.time_array)
assert uvf.x_orientation == uvc.x_orientation
lst = lst_from_uv(uvc)
assert np.all(uvf.lst_array == lst)
assert np.all(uvf.freq_array == uvc.freq_array[0])
assert np.all(uvf.polarization_array == uvc.jones_array)
assert np.all(uvf.ant_array == uvc.ant_array)
assert 'Flag object with type "antenna"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_UVCal_mode_flag():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc, copy_flags=False, mode='flag')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert np.array_equal(uvf.flag_array, uvc.flag_array)
assert uvf.weights_array is None
assert uvf.type == 'antenna'
assert uvf.mode == 'flag'
assert np.all(uvf.time_array == uvc.time_array)
lst = lst_from_uv(uvc)
assert np.all(uvf.lst_array == lst)
assert np.all(uvf.freq_array == uvc.freq_array[0])
assert np.all(uvf.polarization_array == uvc.jones_array)
assert np.all(uvf.ant_array == uvc.ant_array)
assert 'Flag object with type "antenna"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_cal_copy_flags():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'metric'},
nwarnings=1, message='Copying flags to type=="antenna"')
# with copy flags uvf.metric_array should be none
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert np.array_equal(uvf.flag_array, uv.flag_array)
assert uvf.type == 'antenna'
assert uvf.mode == 'flag'
assert np.all(uvf.time_array == np.unique(uv.time_array))
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.jones_array)
assert pyuvdata_version_str in uvf.history
def test_init_waterfall_uvd():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, waterfall=True)
assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)
assert np.all(uvf.metric_array == 0)
assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)
assert np.all(uvf.weights_array == 1)
assert uvf.type == 'waterfall'
assert uvf.mode == 'metric'
assert np.all(uvf.time_array == np.unique(uv.time_array))
assert np.all(uvf.lst_array == np.unique(uv.lst_array))
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
assert 'Flag object with type "waterfall"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_waterfall_uvc():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = UVFlag(uv, waterfall=True, history='input history check')
assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
assert np.all(uvf.metric_array == 0)
assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
assert np.all(uvf.weights_array == 1)
assert uvf.type == 'waterfall'
assert uvf.mode == 'metric'
assert np.all(uvf.time_array == np.unique(uv.time_array))
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.jones_array)
assert 'Flag object with type "waterfall"' in uvf.history
assert 'input history check' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_waterfall_flag_uvcal():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = UVFlag(uv, waterfall=True, mode='flag')
assert uvf.flag_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones)
assert not np.any(uvf.flag_array)
assert uvf.weights_array is None
assert uvf.type == 'waterfall'
assert uvf.mode == 'flag'
assert np.all(uvf.time_array == np.unique(uv.time_array))
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.jones_array)
assert 'Flag object with type "waterfall"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_waterfall_flag_uvdata():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, waterfall=True, mode='flag')
assert uvf.flag_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)
assert not np.any(uvf.flag_array)
assert uvf.weights_array is None
assert uvf.type == 'waterfall'
assert uvf.mode == 'flag'
assert np.all(uvf.time_array == np.unique(uv.time_array))
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
assert 'Flag object with type "waterfall"' in uvf.history
assert pyuvdata_version_str in uvf.history
def test_init_waterfall_copy_flags():
uv = UVCal()
uv.read_calfits(test_c_file)
with pytest.raises(NotImplementedError) as cm:
UVFlag(uv, copy_flags=True, mode='flag', waterfall=True)
assert str(cm.value).startswith('Cannot copy flags when initializing')
uv = UVData()
uv.read_miriad(test_d_file)
with pytest.raises(NotImplementedError) as cm:
UVFlag(uv, copy_flags=True, mode='flag', waterfall=True)
assert str(cm.value).startswith('Cannot copy flags when initializing')
def test_init_invalid_input():
# input is not UVData, UVCal, path, or list/tuple
with pytest.raises(ValueError) as cm:
UVFlag(14)
assert str(cm.value).startswith('input to UVFlag.__init__ must be one of:')
def test_from_uvcal_error():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag()
with pytest.raises(ValueError) as cm:
uvf.from_uvcal(uv)
assert str(cm.value).startswith("from_uvcal can only initialize a UVFlag object")
def test_from_udata_error():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = UVFlag()
with pytest.raises(ValueError) as cm:
uvf.from_uvdata(uv)
assert str(cm.value).startswith("from_uvdata can only initialize a UVFlag object")
def test_init_list_files_weights(tmpdir):
# Test that weights are preserved when reading list of files
tmp_path = tmpdir.strpath
# Create two files to read
uvf = UVFlag(test_f_file)
np.random.seed(0)
wts1 = np.random.rand(*uvf.weights_array.shape)
uvf.weights_array = wts1.copy()
uvf.write(os.path.join(tmp_path, 'test1.h5'))
wts2 = np.random.rand(*uvf.weights_array.shape)
uvf.weights_array = wts2.copy()
uvf.write(os.path.join(tmp_path, 'test2.h5'))
uvf2 = UVFlag([os.path.join(tmp_path, 'test1.h5'),
os.path.join(tmp_path, 'test2.h5')])
assert np.all(uvf2.weights_array == np.concatenate([wts1, wts2], axis=0))
def test_data_like_property_mode_tamper():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.mode = 'test'
with pytest.raises(ValueError) as cm:
list(uvf.data_like_parameters)
assert str(cm.value).startswith('Invalid mode. Mode must be one of')
def test_read_write_loop():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.write(test_outfile, clobber=True)
uvf2 = UVFlag(test_outfile)
assert uvf.__eq__(uvf2, check_history=True)
def test_read_write_loop_with_optional_x_orientation():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.x_orientation = 'east'
uvf.write(test_outfile, clobber=True)
uvf2 = UVFlag(test_outfile)
assert uvf.__eq__(uvf2, check_history=True)
def test_read_write_loop_waterfal():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.to_waterfall()
uvf.write(test_outfile, clobber=True)
uvf2 = UVFlag(test_outfile)
assert uvf.__eq__(uvf2, check_history=True)
def test_bad_mode_savefile():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.write(test_outfile, clobber=True)
# manually re-read and tamper with parameters
with h5py.File(test_outfile, 'a') as h5:
mode = h5['Header/mode']
mode[...] = 'test'
with pytest.raises(ValueError) as cm:
uvf = UVFlag(test_outfile)
assert str(cm.value).startswith('File cannot be read. Received mode')
def test_bad_type_savefile():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.write(test_outfile, clobber=True)
# manually re-read and tamper with parameters
with h5py.File(test_outfile, 'a') as h5:
mode = h5['Header/type']
mode[...] = 'test'
with pytest.raises(ValueError) as cm:
uvf = UVFlag(test_outfile)
assert str(cm.value).startswith('File cannot be read. Received type')
def test_write_add_version_str():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf.write(test_outfile, clobber=True)
with h5py.File(test_outfile, 'r') as h5:
hist = uvutils._bytes_to_str(h5['Header/history'][()])
assert pyuvdata_version_str in hist
def test_read_add_version_str():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
assert pyuvdata_version_str in uvf.history
uvf.write(test_outfile, clobber=True)
with h5py.File(test_outfile, 'r') as h5:
hist = h5['Header/history']
del hist
uvf2 = UVFlag(test_outfile)
assert pyuvdata_version_str in uvf2.history
assert uvf == uvf2
def test_read_write_ant():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = UVFlag(uv, mode='flag', label='test')
uvf.write(test_outfile, clobber=True)
uvf2 = UVFlag(test_outfile)
assert uvf.__eq__(uvf2, check_history=True)
def test_read_missing_nants_data():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = UVFlag(uv, mode='flag', label='test')
uvf.write(test_outfile, clobber=True)
with h5py.File(test_outfile, 'a') as h5:
del h5['Header/Nants_data']
uvf2 = uvtest.checkWarnings(UVFlag, [test_outfile], {}, nwarnings=1,
message=['Nants_data not available in file,'],
category=UserWarning)
# make sure this was set to None
assert uvf2.Nants_data == len(uvf2.ant_array)
uvf2.Nants_data = uvf.Nants_data
# verify no other elements were changed
assert uvf.__eq__(uvf2, check_history=True)
def test_read_missing_nspws():
uv = UVCal()
uv.read_calfits(test_c_file)
uvf = UVFlag(uv, mode='flag', label='test')
uvf.write(test_outfile, clobber=True)
with h5py.File(test_outfile, 'a') as h5:
del h5['Header/Nspws']
uvf2 = UVFlag(test_outfile)
# make sure Nspws was calculated
assert uvf2.Nspws == 1
# verify no other elements were changed
assert uvf.__eq__(uvf2, check_history=True)
def test_read_write_nocompress():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, label='test')
uvf.write(test_outfile, clobber=True, data_compression=None)
uvf2 = UVFlag(test_outfile)
assert uvf.__eq__(uvf2, check_history=True)
def test_read_write_nocompress_flag():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, mode='flag', label='test')
uvf.write(test_outfile, clobber=True, data_compression=None)
uvf2 = UVFlag(test_outfile)
assert uvf.__eq__(uvf2, check_history=True)
def test_init_list():
uv = UVData()
uv.read_miriad(test_d_file)
uv.time_array -= 1
uvf = UVFlag([uv, test_f_file])
uvf1 = UVFlag(uv)
uvf2 = UVFlag(test_f_file)
assert np.array_equal(np.concatenate((uvf1.metric_array, uvf2.metric_array), axis=0),
uvf.metric_array)
assert np.array_equal(np.concatenate((uvf1.weights_array, uvf2.weights_array), axis=0),
uvf.weights_array)
assert np.array_equal(np.concatenate((uvf1.time_array, uvf2.time_array)),
uvf.time_array)
assert np.array_equal(np.concatenate((uvf1.baseline_array, uvf2.baseline_array)),
uvf.baseline_array)
assert np.array_equal(np.concatenate((uvf1.ant_1_array, uvf2.ant_1_array)),
uvf.ant_1_array)
assert np.array_equal(np.concatenate((uvf1.ant_2_array, uvf2.ant_2_array)),
uvf.ant_2_array)
assert uvf.mode == 'metric'
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
def test_read_list():
uv = UVData()
uv.read_miriad(test_d_file)
uv.time_array -= 1
uvf = UVFlag(uv)
uvf.write(test_outfile, clobber=True)
uvf.read([test_outfile, test_f_file])
uvf1 = UVFlag(uv)
uvf2 = UVFlag(test_f_file)
assert np.array_equal(np.concatenate((uvf1.metric_array, uvf2.metric_array), axis=0),
uvf.metric_array)
assert np.array_equal(np.concatenate((uvf1.weights_array, uvf2.weights_array), axis=0),
uvf.weights_array)
assert np.array_equal(np.concatenate((uvf1.time_array, uvf2.time_array)),
uvf.time_array)
assert np.array_equal(np.concatenate((uvf1.baseline_array, uvf2.baseline_array)),
uvf.baseline_array)
assert np.array_equal(np.concatenate((uvf1.ant_1_array, uvf2.ant_1_array)),
uvf.ant_1_array)
assert np.array_equal(np.concatenate((uvf1.ant_2_array, uvf2.ant_2_array)),
uvf.ant_2_array)
assert uvf.mode == 'metric'
assert np.all(uvf.freq_array == uv.freq_array[0])
assert np.all(uvf.polarization_array == uv.polarization_array)
def test_read_error():
with pytest.raises(IOError) as cm:
UVFlag('foo')
assert str(cm.value).startswith('foo not found')
def test_read_change_type():
uv = UVData()
uv.read_miriad(test_d_file)
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf.write(test_outfile, clobber=True)
assert hasattr(uvf, 'ant_array')
uvf.read(test_f_file)
# clear sets these to None now
assert hasattr(uvf, 'ant_array')
assert uvf.ant_array is None
assert hasattr(uvf, 'baseline_array')
assert hasattr(uvf, 'ant_1_array')
assert hasattr(uvf, 'ant_2_array')
uvf.read(test_outfile)
assert hasattr(uvf, 'ant_array')
assert hasattr(uvf, 'baseline_array')
assert uvf.baseline_array is None
assert hasattr(uvf, 'ant_1_array')
assert uvf.ant_1_array is None
assert hasattr(uvf, 'ant_2_array')
assert uvf.ant_2_array is None
def test_read_change_mode():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv, mode='flag')
assert hasattr(uvf, 'flag_array')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
uvf.write(test_outfile, clobber=True)
uvf.read(test_f_file)
assert hasattr(uvf, 'metric_array')
assert hasattr(uvf, 'flag_array')
assert uvf.flag_array is None
uvf.read(test_outfile)
assert hasattr(uvf, 'flag_array')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
def test_write_no_clobber():
uvf = UVFlag(test_f_file)
with pytest.raises(ValueError) as cm:
uvf.write(test_f_file)
assert str(cm.value).startswith('File ' + test_f_file + ' exists;')
def test_lst_from_uv():
uv = UVData()
uv.read_miriad(test_d_file)
lst_array = lst_from_uv(uv)
assert np.allclose(uv.lst_array, lst_array)
def test_lst_from_uv_error():
with pytest.raises(ValueError) as cm:
lst_from_uv(4)
assert str(cm.value).startswith('Function lst_from_uv can only operate on')
def test_add():
uv1 = UVFlag(test_f_file)
uv2 = copy.deepcopy(uv1)
uv2.time_array += 1 # Add a day
uv3 = uv1 + uv2
assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)),
uv3.time_array)
assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)),
uv3.baseline_array)
assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)),
uv3.ant_1_array)
assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)),
uv3.ant_2_array)
assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)),
uv3.lst_array)
assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0),
uv3.metric_array)
assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
uv3.weights_array)
assert np.array_equal(uv1.freq_array, uv3.freq_array)
assert uv3.type == 'baseline'
assert uv3.mode == 'metric'
assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
assert 'Data combined along time axis. ' in uv3.history
def test_add_collapsed_pols():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf.collapse_pol()
uvf3 = uvf.copy()
uvf3.time_array += 1 # increment the time array
uvf4 = uvf + uvf3
assert uvf4.Ntimes == 2 * uvf.Ntimes
assert uvf4.check()
def test_add_add_version_str():
uv1 = UVFlag(test_f_file)
uv1.history = uv1.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uv1.history
uv2 = copy.deepcopy(uv1)
uv2.time_array += 1 # Add a day
uv3 = uv1 + uv2
assert pyuvdata_version_str in uv3.history
def test_add_baseline():
uv1 = UVFlag(test_f_file)
uv2 = copy.deepcopy(uv1)
uv2.baseline_array += 100 # Arbitrary
uv3 = uv1.__add__(uv2, axis='baseline')
assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)),
uv3.time_array)
assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)),
uv3.baseline_array)
assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)),
uv3.ant_1_array)
assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)),
uv3.ant_2_array)
assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)),
uv3.lst_array)
assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0),
uv3.metric_array)
assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
uv3.weights_array)
assert np.array_equal(uv1.freq_array, uv3.freq_array)
assert uv3.type == 'baseline'
assert uv3.mode == 'metric'
assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
assert 'Data combined along baseline axis. ' in uv3.history
def test_add_antenna():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uv1 = UVFlag(uvc)
uv2 = copy.deepcopy(uv1)
uv2.ant_array += 100 # Arbitrary
uv3 = uv1.__add__(uv2, axis='antenna')
assert np.array_equal(np.concatenate((uv1.ant_array, uv2.ant_array)),
uv3.ant_array)
assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0),
uv3.metric_array)
assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0),
uv3.weights_array)
assert np.array_equal(uv1.freq_array, uv3.freq_array)
assert np.array_equal(uv1.time_array, uv3.time_array)
assert np.array_equal(uv1.lst_array, uv3.lst_array)
assert uv3.type == 'antenna'
assert uv3.mode == 'metric'
assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
assert 'Data combined along antenna axis. ' in uv3.history
def test_add_frequency():
uv1 = UVFlag(test_f_file)
uv2 = copy.deepcopy(uv1)
uv2.freq_array += 1e4 # Arbitrary
uv3 = uv1.__add__(uv2, axis='frequency')
assert np.array_equal(np.concatenate((uv1.freq_array, uv2.freq_array), axis=-1),
uv3.freq_array)
assert np.array_equal(uv1.time_array, uv3.time_array)
assert np.array_equal(uv1.baseline_array, uv3.baseline_array)
assert np.array_equal(uv1.ant_1_array, uv3.ant_1_array)
assert np.array_equal(uv1.ant_2_array, uv3.ant_2_array)
assert np.array_equal(uv1.lst_array, uv3.lst_array)
assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=2),
uv3.metric_array)
assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=2),
uv3.weights_array)
assert uv3.type == 'baseline'
assert uv3.mode == 'metric'
assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
assert 'Data combined along frequency axis. ' in uv3.history
def test_add_pol():
uv1 = UVFlag(test_f_file)
uv2 = copy.deepcopy(uv1)
uv2.polarization_array += 1 # Arbitrary
uv3 = uv1.__add__(uv2, axis='polarization')
assert np.array_equal(uv1.freq_array, uv3.freq_array)
assert np.array_equal(uv1.time_array, uv3.time_array)
assert np.array_equal(uv1.baseline_array, uv3.baseline_array)
assert np.array_equal(uv1.ant_1_array, uv3.ant_1_array)
assert np.array_equal(uv1.ant_2_array, uv3.ant_2_array)
assert np.array_equal(uv1.lst_array, uv3.lst_array)
assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=3),
uv3.metric_array)
assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=3),
uv3.weights_array)
assert uv3.type == 'baseline'
assert uv3.mode == 'metric'
assert np.array_equal(np.concatenate((uv1.polarization_array, uv2.polarization_array)),
uv3.polarization_array)
assert 'Data combined along polarization axis. ' in uv3.history
def test_add_flag():
uv = UVData()
uv.read_miriad(test_d_file)
uv1 = UVFlag(uv, mode='flag')
uv2 = copy.deepcopy(uv1)
uv2.time_array += 1 # Add a day
uv3 = uv1 + uv2
assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)),
uv3.time_array)
assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)),
uv3.baseline_array)
assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)),
uv3.ant_1_array)
assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)),
uv3.ant_2_array)
assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)),
uv3.lst_array)
assert np.array_equal(np.concatenate((uv1.flag_array, uv2.flag_array), axis=0),
uv3.flag_array)
assert np.array_equal(uv1.freq_array, uv3.freq_array)
assert uv3.type == 'baseline'
assert uv3.mode == 'flag'
assert np.array_equal(uv1.polarization_array, uv3.polarization_array)
assert 'Data combined along time axis. ' in uv3.history
def test_add_errors():
uv = UVData()
uv.read_miriad(test_d_file)
uvc = UVCal()
uvc.read_calfits(test_c_file)
uv1 = UVFlag(uv)
# Mismatched classes
with pytest.raises(ValueError) as cm:
uv1.__add__(3)
assert str(cm.value).startswith('Only UVFlag objects can be added to a UVFlag object')
# Mismatched types
uv2 = UVFlag(uvc)
with pytest.raises(ValueError) as cm:
uv1.__add__(uv2)
assert str(cm.value).startswith('UVFlag object of type ')
# Mismatched modes
uv3 = UVFlag(uv, mode='flag')
with pytest.raises(ValueError) as cm:
uv1.__add__(uv3)
assert str(cm.value).startswith('UVFlag object of mode ')
# Invalid axes
with pytest.raises(ValueError) as cm:
uv1.__add__(uv1, axis='antenna')
assert str(cm.value).endswith('concatenated along antenna axis.')
with pytest.raises(ValueError) as cm:
uv2.__add__(uv2, axis='baseline')
assert str(cm.value).endswith('concatenated along baseline axis.')
def test_inplace_add():
uv1a = UVFlag(test_f_file)
uv1b = copy.deepcopy(uv1a)
uv2 = copy.deepcopy(uv1a)
uv2.time_array += 1
uv1a += uv2
assert uv1a.__eq__(uv1b + uv2)
def test_clear_unused_attributes():
uv = UVFlag(test_f_file)
assert hasattr(uv, 'baseline_array')
assert hasattr(uv, 'ant_1_array')
assert hasattr(uv, 'ant_2_array')
assert hasattr(uv, 'Nants_telescope')
uv._set_type_antenna()
uv.clear_unused_attributes()
# clear_unused_attributes now sets these to None
print(uv._baseline_array.required)
assert hasattr(uv, 'baseline_array')
assert uv.baseline_array is None
assert hasattr(uv, 'ant_1_array')
assert uv.ant_1_array is None
assert hasattr(uv, 'ant_2_array')
assert uv.ant_2_array is None
assert hasattr(uv, 'Nants_telescope')
assert uv.Nants_telescope is None
uv._set_mode_flag()
assert hasattr(uv, 'metric_array')
uv.clear_unused_attributes()
assert hasattr(uv, 'metric_array')
assert uv.metric_array is None
# Start over
uv = UVFlag(test_f_file)
uv.ant_array = np.array([4])
uv.flag_array = np.array([5])
uv.clear_unused_attributes()
assert hasattr(uv, 'ant_array')
assert uv.ant_array is None
assert hasattr(uv, 'flag_array')
assert uv.flag_array is None
def test_not_equal():
uvf1 = UVFlag(test_f_file)
# different class
assert not uvf1.__eq__(5)
# different mode
uvf2 = uvf1.copy()
uvf2.mode = 'flag'
assert not uvf1.__eq__(uvf2)
# different type
uvf2 = uvf1.copy()
uvf2.type = 'antenna'
assert not uvf1.__eq__(uvf2)
# array different
uvf2 = uvf1.copy()
uvf2.freq_array += 1
assert not uvf1.__eq__(uvf2)
# history different
uvf2 = uvf1.copy()
uvf2.history += 'hello'
assert not uvf1.__eq__(uvf2, check_history=True)
def test_to_waterfall_bl():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf.to_waterfall()
assert uvf.type == 'waterfall'
assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
len(uvf.polarization_array))
assert uvf.weights_array.shape == uvf.metric_array.shape
def test_to_waterfall_add_version_str():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf.to_waterfall()
assert pyuvdata_version_str in uvf.history
def test_to_waterfall_bl_multi_pol():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf2 = uvf.copy() # Keep a copy to run with keep_pol=False
uvf.to_waterfall()
assert uvf.type == 'waterfall'
assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
len(uvf.polarization_array))
assert uvf.weights_array.shape == uvf.metric_array.shape
assert len(uvf.polarization_array) == 2
# Repeat with keep_pol=False
uvf2.to_waterfall(keep_pol=False)
assert uvf2.type == 'waterfall'
assert uvf2.metric_array.shape == (len(uvf2.time_array), len(uvf.freq_array), 1)
assert uvf2.weights_array.shape == uvf2.metric_array.shape
assert len(uvf2.polarization_array) == 1
assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array)))
def test_collapse_pol():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf2 = uvf.copy()
uvf2.collapse_pol()
assert len(uvf2.polarization_array) == 1
assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array)))
assert uvf2.mode == 'metric'
assert hasattr(uvf2, 'metric_array')
assert hasattr(uvf2, 'flag_array')
assert uvf2.flag_array is None
# test check passes just to be sure
assert uvf2.check()
# test writing it out and reading in to make sure polarization_array has correct type
uvf2.write(test_outfile, clobber=True)
uvf = UVFlag(test_outfile)
assert uvf._polarization_array.expected_type == six.string_types
assert uvf._polarization_array.acceptable_vals is None
assert uvf == uvf2
os.remove(test_outfile)
def test_collapse_pol_add_pol_axis():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf2 = uvf.copy()
uvf2.collapse_pol()
with pytest.raises(NotImplementedError) as cm:
uvf2.__add__(uvf2, axis='pol')
assert str(cm.value).startswith("Two UVFlag objects with their")
def test_collapse_pol_or():
uvf = UVFlag(test_f_file)
uvf.to_flag()
assert uvf.weights_array is None
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf2 = uvf.copy()
uvf2.collapse_pol(method='or')
assert len(uvf2.polarization_array) == 1
assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array)))
assert uvf2.mode == 'flag'
assert hasattr(uvf2, 'flag_array')
assert hasattr(uvf2, 'metric_array')
assert uvf2.metric_array is None
def test_collapse_pol_add_version_str():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf2 = uvf.copy()
uvf2.collapse_pol(method='or')
assert pyuvdata_version_str in uvf2.history
def test_collapse_single_pol():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf2 = uvf.copy()
uvtest.checkWarnings(uvf.collapse_pol, [], {}, nwarnings=1,
message='Cannot collapse polarization')
assert uvf == uvf2
def test_collapse_pol_flag():
uvf = UVFlag(test_f_file)
uvf.to_flag()
assert uvf.weights_array is None
uvf2 = uvf.copy()
uvf2.polarization_array[0] = -4
uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object
uvf2 = uvf.copy()
uvf2.collapse_pol()
assert len(uvf2.polarization_array) == 1
assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array)))
assert uvf2.mode == 'metric'
assert hasattr(uvf2, 'metric_array')
assert hasattr(uvf2, 'flag_array')
assert uvf2.flag_array is None
def test_to_waterfall_bl_flags():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf.to_waterfall()
assert uvf.type == 'waterfall'
assert uvf.mode == 'metric'
assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
len(uvf.polarization_array))
assert uvf.weights_array.shape == uvf.metric_array.shape
assert len(uvf.lst_array) == len(uvf.time_array)
def test_to_waterfall_bl_flags_or():
uvf = UVFlag(test_f_file)
uvf.to_flag()
assert uvf.weights_array is None
uvf.to_waterfall(method='or')
assert uvf.type == 'waterfall'
assert uvf.mode == 'flag'
assert uvf.flag_array.shape == (len(uvf.time_array), len(uvf.freq_array),
len(uvf.polarization_array))
assert len(uvf.lst_array) == len(uvf.time_array)
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf.to_waterfall(method='or')
assert uvf.type == 'waterfall'
assert uvf.mode == 'flag'
assert uvf.flag_array.shape == (len(uvf.time_array), len(uvf.freq_array),
len(uvf.polarization_array))
assert len(uvf.lst_array) == len(uvf.time_array)
def test_to_waterfall_ant():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf.to_waterfall()
assert uvf.type == 'waterfall'
assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array),
len(uvf.polarization_array))
assert uvf.weights_array.shape == uvf.metric_array.shape
assert len(uvf.lst_array) == len(uvf.time_array)
def test_to_waterfall_waterfall():
uvf = UVFlag(test_f_file)
uvf.weights_array = np.ones_like(uvf.weights_array)
uvf.to_waterfall()
uvtest.checkWarnings(uvf.to_waterfall, [], {}, nwarnings=1,
message='This object is already a waterfall')
def test_to_baseline_flags():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uvf.to_baseline(uv)
assert uvf.type == 'baseline'
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.time_array == uv.time_array)
times = np.unique(uvf.time_array)
ntrue = 0.0
ind = np.where(uvf.time_array == times[0])[0]
ntrue += len(ind)
assert np.all(uvf.flag_array[ind, 0, 10, 0])
ind = np.where(uvf.time_array == times[1])[0]
ntrue += len(ind)
assert np.all(uvf.flag_array[ind, 0, 15, 0])
assert uvf.flag_array.mean() == ntrue / uvf.flag_array.size
def test_to_baseline_metric():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf.to_waterfall()
uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10
uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15
uvf.to_baseline(uv)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.time_array == uv.time_array)
times = np.unique(uvf.time_array)
ind = np.where(uvf.time_array == times[0])[0]
nt0 = len(ind)
assert np.all(uvf.metric_array[ind, 0, 10, 0] == 3.2)
ind = np.where(uvf.time_array == times[1])[0]
nt1 = len(ind)
assert np.all(uvf.metric_array[ind, 0, 15, 0] == 2.1)
assert np.isclose(uvf.metric_array.mean(),
(3.2 * nt0 + 2.1 * nt1) / uvf.metric_array.size)
def test_to_baseline_add_version_str():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf.to_waterfall()
uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10
uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf.to_baseline(uv)
assert pyuvdata_version_str in uvf.history
def test_baseline_to_baseline():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf2 = uvf.copy()
uvf.to_baseline(uv)
assert uvf == uvf2
def test_to_baseline_metric_error(uvf_from_uvcal):
uvf = uvf_from_uvcal
uvf.select(polarizations=uvf.polarization_array[0])
uv = UVData()
uv.read_miriad(test_d_file)
with pytest.raises(NotImplementedError) as cm:
uvf.to_baseline(uv, force_pol=True)
assert str(cm.value).startswith("Cannot currently convert from "
"antenna type, metric mode")
def test_to_baseline_from_antenna(uvf_from_uvcal):
uvf = uvf_from_uvcal
uvf.select(polarizations=uvf.polarization_array[0])
uvf.to_flag()
uv = UVData()
uv.read_miriad(test_d_file)
ants_data = np.unique(uv.ant_1_array.tolist() + uv.ant_2_array.tolist())
new_ants = np.setdiff1d(ants_data, uvf.ant_array)
old_baseline = (uvf.ant_array[0], uvf.ant_array[1])
old_times = np.unique(uvf.time_array)
or_flags = np.logical_or(uvf.flag_array[0], uvf.flag_array[1])
or_flags = np.transpose(or_flags, [2, 0, 1, 3])
uv2 = copy.deepcopy(uv)
uvf2 = uvf.copy()
# hack in the exact times so we can compare some values later
uv2.select(bls=old_baseline)
uv2.time_array[:uvf2.time_array.size] = uvf.time_array
uvf.to_baseline(uv, force_pol=True)
uvf2.to_baseline(uv2, force_pol=True)
assert uvf.check()
uvf2.select(bls=old_baseline, times=old_times)
assert np.allclose(or_flags, uvf2.flag_array)
# all new antenna should be completely flagged
# checks auto correlations
uvf_new = uvf.select(antenna_nums=new_ants, inplace=False)
for bl in np.unique(uvf_new.baseline_array):
uvf2 = uvf_new.select(bls=uv.baseline_to_antnums(bl), inplace=False)
assert np.all(uvf2.flag_array)
# check for baselines with one new antenna
bls = [uvf.baseline_to_antnums(bl)
for bl in uvf.baseline_array
if np.intersect1d(new_ants, uvf.baseline_to_antnums(bl)).size > 0]
uvf_new = uvf.select(bls=bls, inplace=False)
for bl in np.unique(uvf_new.baseline_array):
uvf2 = uvf_new.select(bls=uv.baseline_to_antnums(bl), inplace=False)
assert np.all(uvf2.flag_array)
def test_to_baseline_errors():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(test_f_file)
uvf.to_waterfall()
with pytest.raises(ValueError) as cm:
uvf.to_baseline(7.3) # invalid matching object
assert str(cm.value).startswith('Must pass in UVData object or UVFlag object')
uvf = UVFlag(test_f_file)
uvf.to_waterfall()
uvf2 = uvf.copy()
uvf.polarization_array[0] = -4
with pytest.raises(ValueError) as cm:
uvf.to_baseline(uv) # Mismatched pols
assert str(cm.value).startswith('Polarizations do not match.')
uvf.__iadd__(uvf2, axis='polarization')
with pytest.raises(ValueError) as cm:
uvf.to_baseline(uv) # Mismatched pols, can't be forced
assert str(cm.value).startswith('Polarizations could not be made to match.')
def test_to_baseline_force_pol():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uvf.polarization_array[0] = -4 # Change pol, but force pol anyway
uvf.to_baseline(uv, force_pol=True)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.time_array == uv.time_array)
assert np.array_equal(uvf.polarization_array, uv.polarization_array)
times = np.unique(uvf.time_array)
ntrue = 0.0
ind = np.where(uvf.time_array == times[0])[0]
ntrue += len(ind)
assert np.all(uvf.flag_array[ind, 0, 10, 0])
ind = np.where(uvf.time_array == times[1])[0]
ntrue += len(ind)
assert np.all(uvf.flag_array[ind, 0, 15, 0])
assert uvf.flag_array.mean() == ntrue / uvf.flag_array.size
def test_to_baseline_force_pol_Npol_gt_1():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uv2 = copy.deepcopy(uv)
uv2.polarization_array[0] = -6
uv += uv2
uvf.to_baseline(uv, force_pol=True)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.time_array == uv.time_array)
assert np.array_equal(uvf.polarization_array, uv.polarization_array)
assert uvf.Npols == len(uvf.polarization_array)
def test_to_baseline_metric_force_pol():
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(uv)
uvf.to_waterfall()
uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10
uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15
uvf.polarization_array[0] = -4
uvf.to_baseline(uv, force_pol=True)
assert np.all(uvf.baseline_array == uv.baseline_array)
assert np.all(uvf.time_array == uv.time_array)
assert np.array_equal(uvf.polarization_array, uv.polarization_array)
times = np.unique(uvf.time_array)
ind = np.where(uvf.time_array == times[0])[0]
nt0 = len(ind)
assert np.all(uvf.metric_array[ind, 0, 10, 0] == 3.2)
ind = np.where(uvf.time_array == times[1])[0]
nt1 = len(ind)
assert np.all(uvf.metric_array[ind, 0, 15, 0] == 2.1)
assert np.isclose(uvf.metric_array.mean(),
(3.2 * nt0 + 2.1 * nt1) / uvf.metric_array.size)
def test_to_antenna_flags():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uvf.to_antenna(uvc)
assert uvf.type == 'antenna'
assert np.all(uvf.ant_array == uvc.ant_array)
assert np.all(uvf.time_array == uvc.time_array)
assert np.all(uvf.flag_array[:, 0, 10, 0, 0])
assert np.all(uvf.flag_array[:, 0, 15, 1, 0])
assert uvf.flag_array.mean() == 2. * uvc.Nants_data / uvf.flag_array.size
def test_to_antenna_add_version_str():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf.to_antenna(uvc)
assert pyuvdata_version_str in uvf.history
def test_to_antenna_metric():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf.to_waterfall()
uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10
uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15
uvf.to_antenna(uvc)
assert np.all(uvf.ant_array == uvc.ant_array)
assert np.all(uvf.time_array == uvc.time_array)
assert np.all(uvf.metric_array[:, 0, 10, 0, 0] == 3.2)
assert np.all(uvf.metric_array[:, 0, 15, 1, 0] == 2.1)
assert np.isclose(uvf.metric_array.mean(),
(3.2 + 2.1) * uvc.Nants_data / uvf.metric_array.size)
def test_to_antenna_flags_match_uvflag():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf2 = uvf.copy()
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uvf.to_antenna(uvf2)
assert np.all(uvf.ant_array == uvc.ant_array)
assert np.all(uvf.time_array == uvc.time_array)
assert np.all(uvf.flag_array[:, 0, 10, 0, 0])
assert np.all(uvf.flag_array[:, 0, 15, 1, 0])
assert uvf.flag_array.mean() == 2. * uvc.Nants_data / uvf.flag_array.size
def test_antenna_to_antenna():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf2 = uvf.copy()
uvf.to_antenna(uvc)
assert uvf == uvf2
def test_to_antenna_errors():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uv = UVData()
uv.read_miriad(test_d_file)
uvf = UVFlag(test_f_file)
uvf.to_waterfall()
with pytest.raises(ValueError) as cm:
uvf.to_antenna(7.3) # invalid matching object
assert str(cm.value).startswith('Must pass in UVCal object or UVFlag object ')
uvf = UVFlag(uv)
with pytest.raises(ValueError) as cm:
uvf.to_antenna(uvc) # Cannot pass in baseline type
assert str(cm.value).startswith('Cannot convert from type "baseline" to "antenna".')
uvf = UVFlag(test_f_file)
uvf.to_waterfall()
uvf2 = uvf.copy()
uvf.polarization_array[0] = -4
with pytest.raises(ValueError) as cm:
uvf.to_antenna(uvc) # Mismatched pols
assert str(cm.value).startswith('Polarizations do not match. ')
uvf.__iadd__(uvf2, axis='polarization')
with pytest.raises(ValueError) as cm:
uvf.to_antenna(uvc) # Mismatched pols, can't be forced
assert str(cm.value).startswith('Polarizations could not be made to match.')
def test_to_antenna_force_pol():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvc.select(jones=-5)
uvf = UVFlag(uvc)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[0, 10, 0] = True # Flag time0, chan10
uvf.flag_array[1, 15, 0] = True # Flag time1, chan15
uvf.polarization_array[0] = -4 # Change pol, but force pol anyway
uvf.to_antenna(uvc, force_pol=True)
assert np.all(uvf.ant_array == uvc.ant_array)
assert np.all(uvf.time_array == uvc.time_array)
assert np.array_equal(uvf.polarization_array, uvc.jones_array)
assert np.all(uvf.flag_array[:, 0, 10, 0, 0])
assert np.all(uvf.flag_array[:, 0, 15, 1, 0])
assert uvf.flag_array.mean() == 2 * uvc.Nants_data / uvf.flag_array.size
def test_to_antenna_metric_force_pol():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvc.select(jones=-5)
uvf = UVFlag(uvc)
uvf.to_waterfall()
uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10
uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15
uvf.polarization_array[0] = -4
uvf.to_antenna(uvc, force_pol=True)
assert np.all(uvf.ant_array == uvc.ant_array)
assert np.all(uvf.time_array == uvc.time_array)
assert np.array_equal(uvf.polarization_array, uvc.jones_array)
assert np.all(uvf.metric_array[:, 0, 10, 0, 0] == 3.2)
assert np.all(uvf.metric_array[:, 0, 15, 1, 0] == 2.1)
assert np.isclose(uvf.metric_array.mean(),
(3.2 + 2.1) * uvc.Nants_data / uvf.metric_array.size)
def test_copy():
uvf = UVFlag(test_f_file)
uvf2 = uvf.copy()
assert uvf == uvf2
# Make sure it's a copy and not just pointing to same object
uvf.to_waterfall()
assert uvf != uvf2
def test_or():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf2 = uvf.copy()
uvf2.flag_array = np.ones_like(uvf2.flag_array)
uvf.flag_array[0] = True
uvf2.flag_array[0] = False
uvf2.flag_array[1] = False
uvf3 = uvf | uvf2
assert np.all(uvf3.flag_array[0])
assert not np.any(uvf3.flag_array[1])
assert np.all(uvf3.flag_array[2:])
def test_or_add_version_str():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf2 = uvf.copy()
uvf2.flag_array = np.ones_like(uvf2.flag_array)
uvf.flag_array[0] = True
uvf2.flag_array[0] = False
uvf2.flag_array[1] = False
uvf3 = uvf | uvf2
assert pyuvdata_version_str in uvf3.history
def test_or_error():
uvf = UVFlag(test_f_file)
uvf2 = uvf.copy()
uvf.to_flag()
with pytest.raises(ValueError) as cm:
uvf.__or__(uvf2)
assert str(cm.value).startswith('UVFlag object must be in "flag" mode')
def test_or_add_history():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf2 = uvf.copy()
uvf2.history = 'Different history'
uvf3 = uvf | uvf2
assert uvf.history in uvf3.history
assert uvf2.history in uvf3.history
assert "Flags OR'd with:" in uvf3.history
def test_ior():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf2 = uvf.copy()
uvf2.flag_array = np.ones_like(uvf2.flag_array)
uvf.flag_array[0] = True
uvf2.flag_array[0] = False
uvf2.flag_array[1] = False
uvf |= uvf2
assert np.all(uvf.flag_array[0])
assert not np.any(uvf.flag_array[1])
assert np.all(uvf.flag_array[2:])
def test_to_flag():
uvf = UVFlag(test_f_file)
uvf.to_flag()
assert hasattr(uvf, 'flag_array')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert uvf.mode == 'flag'
assert 'Converted to mode "flag"' in uvf.history
def test_to_flag_add_version_str():
uvf = UVFlag(test_f_file)
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf.to_flag()
assert pyuvdata_version_str in uvf.history
def test_to_flag_threshold():
uvf = UVFlag(test_f_file)
uvf.metric_array = np.zeros_like(uvf.metric_array)
uvf.metric_array[0, 0, 4, 0] = 2.
uvf.to_flag(threshold=1.)
assert hasattr(uvf, 'flag_array')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert uvf.mode == 'flag'
assert uvf.flag_array[0, 0, 4, 0]
assert np.sum(uvf.flag_array) == 1.
assert 'Converted to mode "flag"' in uvf.history
def test_flag_to_flag():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf2 = uvf.copy()
uvf2.to_flag()
assert uvf == uvf2
def test_to_flag_unknown_mode():
uvf = UVFlag(test_f_file)
uvf.mode = 'foo'
with pytest.raises(ValueError) as cm:
uvf.to_flag()
assert str(cm.value).startswith('Unknown UVFlag mode: foo')
def test_to_metric_baseline():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf.flag_array[:, :, 10] = True
uvf.flag_array[1, :, :] = True
assert hasattr(uvf, 'flag_array')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert uvf.mode == 'flag'
uvf.to_metric(convert_wgts=True)
assert hasattr(uvf, 'metric_array')
assert hasattr(uvf, 'flag_array')
assert uvf.flag_array is None
assert uvf.mode == 'metric'
assert 'Converted to mode "metric"' in uvf.history
assert np.isclose(uvf.weights_array[1], 0.0).all()
assert np.isclose(uvf.weights_array[:, :, 10], 0.0).all()
def test_to_metric_add_version_str():
uvf = UVFlag(test_f_file)
uvf.to_flag()
uvf.flag_array[:, :, 10] = True
uvf.flag_array[1, :, :] = True
assert hasattr(uvf, 'flag_array')
assert hasattr(uvf, 'metric_array')
assert uvf.metric_array is None
assert uvf.mode == 'flag'
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
uvf.to_metric(convert_wgts=True)
assert pyuvdata_version_str in uvf.history
def test_to_metric_waterfall():
uvf = UVFlag(test_f_file)
uvf.to_waterfall()
uvf.to_flag()
uvf.flag_array[:, 10] = True
uvf.flag_array[1, :, :] = True
uvf.to_metric(convert_wgts=True)
assert np.isclose(uvf.weights_array[1], 0.0).all()
assert np.isclose(uvf.weights_array[:, 10], 0.0).all()
def test_to_metric_antenna():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc, mode='flag')
uvf.flag_array[10, :, :, 1, :] = True
uvf.flag_array[15, :, 3, :, :] = True
uvf.to_metric(convert_wgts=True)
assert np.isclose(uvf.weights_array[10, :, :, 1, :], 0.0).all()
assert np.isclose(uvf.weights_array[15, :, 3, :, :], 0.0).all()
def test_metric_to_metric():
uvf = UVFlag(test_f_file)
uvf2 = uvf.copy()
uvf.to_metric()
assert uvf == uvf2
def test_to_metric_unknown_mode():
uvf = UVFlag(test_f_file)
uvf.mode = 'foo'
with pytest.raises(ValueError) as cm:
uvf.to_metric()
assert str(cm.value).startswith('Unknown UVFlag mode: foo')
def test_antpair2ind():
uvf = UVFlag(test_f_file)
ind = uvf.antpair2ind(uvf.ant_1_array[0], uvf.ant_2_array[0])
assert np.all(uvf.ant_1_array[ind] == uvf.ant_1_array[0])
assert np.all(uvf.ant_2_array[ind] == uvf.ant_2_array[0])
def test_antpair2ind_nonbaseline():
uvf = UVFlag(test_f_file)
uvf.to_waterfall()
with pytest.raises(ValueError) as cm:
uvf.antpair2ind(0, 3)
assert str(cm.value).startswith('UVFlag object of type ' + uvf.type
+ ' does not contain antenna '
+ 'pairs to index.')
def test_baseline_to_antnums():
uvf = UVFlag(test_f_file)
a1, a2 = uvf.baseline_to_antnums(uvf.baseline_array[0])
assert a1 == uvf.ant_1_array[0]
assert a2 == uvf.ant_2_array[0]
def test_get_baseline_nums():
uvf = UVFlag(test_f_file)
bls = uvf.get_baseline_nums()
assert np.array_equal(bls, np.unique(uvf.baseline_array))
def test_get_antpairs():
uvf = UVFlag(test_f_file)
antpairs = uvf.get_antpairs()
for a1, a2 in antpairs:
ind = np.where((uvf.ant_1_array == a1) & (uvf.ant_2_array == a2))[0]
assert len(ind) > 0
for a1, a2 in zip(uvf.ant_1_array, uvf.ant_2_array):
assert (a1, a2) in antpairs
def test_missing_Nants_telescope():
testfile = os.path.join(DATA_PATH, 'test_missing_Nants.h5')
shutil.copyfile(test_f_file, testfile)
with h5py.File(testfile, 'r+') as f:
del(f['/Header/Nants_telescope'])
uvf = uvtest.checkWarnings(UVFlag, [testfile], {}, nwarnings=1,
message=['Nants_telescope not available in file,'])
uvf2 = UVFlag(test_f_file)
uvf2.Nants_telescope = 2047
assert uvf == uvf2
os.remove(testfile)
def test_combine_metrics_inplace():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
np.random.seed(44)
uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
uvf2 = uvf.copy()
uvf2.metric_array *= 2
uvf3 = uvf.copy()
uvf3.metric_array *= 3
uvf.combine_metrics([uvf2, uvf3])
factor = np.sqrt((1 + 4 + 9) / 3.) / 2.
assert np.allclose(uvf.metric_array,
np.abs(uvf2.metric_array) * factor)
def test_combine_metrics_not_inplace():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
np.random.seed(44)
uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
uvf2 = uvf.copy()
uvf2.metric_array *= 2
uvf3 = uvf.copy()
uvf3.metric_array *= 3
uvf4 = uvf.combine_metrics([uvf2, uvf3], inplace=False)
factor = np.sqrt((1 + 4 + 9) / 3.)
assert np.allclose(uvf4.metric_array,
np.abs(uvf.metric_array) * factor)
def test_combine_metrics_not_uvflag():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
with pytest.raises(ValueError) as cm:
uvf.combine_metrics('bubblegum')
assert str(cm.value).startswith('"others" must be UVFlag or list of UVFlag objects')
def test_combine_metrics_not_metric():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
np.random.seed(44)
uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
uvf2 = uvf.copy()
uvf2.to_flag()
with pytest.raises(ValueError) as cm:
uvf.combine_metrics(uvf2)
assert str(cm.value).startswith('UVFlag object and "others" must be in "metric"')
def test_combine_metrics_wrong_shape():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
np.random.seed(44)
uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
uvf2 = uvf.copy()
uvf2.to_waterfall()
with pytest.raises(ValueError) as cm:
uvf.combine_metrics(uvf2)
assert str(cm.value).startswith('UVFlag metric array shapes do not match.')
def test_combine_metrics_add_version_str():
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf = UVFlag(uvc)
uvf.history = uvf.history.replace(pyuvdata_version_str, '')
assert pyuvdata_version_str not in uvf.history
np.random.seed(44)
uvf.metric_array = np.random.normal(size=uvf.metric_array.shape)
uvf2 = uvf.copy()
uvf2.metric_array *= 2
uvf3 = uvf.copy()
uvf3.metric_array *= 3
uvf4 = uvf.combine_metrics([uvf2, uvf3], inplace=False)
assert pyuvdata_version_str in uvf4.history
def test_super():
class test_class(UVFlag):
def __init__(self, input, mode='metric', copy_flags=False,
waterfall=False, history='', label='', property='prop'):
super(test_class, self).__init__(input, mode=mode, copy_flags=copy_flags,
waterfall=waterfall, history=history,
label=label)
self.property = property
uv = UVData()
uv.read_miriad(test_d_file)
tc = test_class(uv, property='property')
# UVFlag.__init__ is tested, so just see if it has a metric array
assert hasattr(tc, 'metric_array')
# Check that it has the property
assert tc.property == 'property'
def test_flags2waterfall():
uv = UVData()
uv.read_miriad(test_d_file)
np.random.seed(0)
uv.flag_array = np.random.randint(0, 2, size=uv.flag_array.shape, dtype=bool)
wf = flags2waterfall(uv)
assert np.allclose(np.mean(wf), np.mean(uv.flag_array))
assert wf.shape == (uv.Ntimes, uv.Nfreqs)
wf = flags2waterfall(uv, keep_pol=True)
assert wf.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols)
# Test external flag_array
uv.flag_array = np.zeros_like(uv.flag_array)
f = np.random.randint(0, 2, size=uv.flag_array.shape, dtype=bool)
wf = flags2waterfall(uv, flag_array=f)
assert np.allclose(np.mean(wf), np.mean(f))
assert wf.shape == (uv.Ntimes, uv.Nfreqs)
# UVCal version
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvc.flag_array = np.random.randint(0, 2, size=uvc.flag_array.shape, dtype=bool)
wf = flags2waterfall(uvc)
assert np.allclose(np.mean(wf), np.mean(uvc.flag_array))
assert wf.shape == (uvc.Ntimes, uvc.Nfreqs)
wf = flags2waterfall(uvc, keep_pol=True)
assert wf.shape == (uvc.Ntimes, uvc.Nfreqs, uvc.Njones)
def test_flags2waterfall_errors():
# First argument must be UVData or UVCal object
with pytest.raises(ValueError) as cm:
flags2waterfall(5)
assert str(cm.value).startswith('flags2waterfall() requires a UVData or '
+ 'UVCal object')
uv = UVData()
uv.read_miriad(test_d_file)
# Flag array must have same shape as uv.flag_array
with pytest.raises(ValueError) as cm:
flags2waterfall(uv, np.array([4, 5]))
assert str(cm.value).startswith('Flag array must align with UVData or UVCal')
def test_and_rows_cols():
d = np.zeros((10, 20), np.bool)
d[1, :] = True
d[:, 2] = True
d[5, 10:20] = True
d[5:8, 5] = True
o = and_rows_cols(d)
assert o[1, :].all()
assert o[:, 2].all()
assert not o[5, :].all()
assert not o[:, 5].all()
def test_select_waterfall_errors(uvf_from_waterfall):
uvf = uvf_from_waterfall
with pytest.raises(ValueError) as cm:
uvf.select(antenna_nums=[0, 1, 2])
assert str(cm.value).startswith('Cannot select on antenna_nums with waterfall')
with pytest.raises(ValueError) as cm:
uvf.select(bls=[(0, 1), (0, 2)])
assert str(cm.value).startswith('Cannot select on bls with waterfall')
@cases_decorator
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_blt_inds(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
blt_inds = np.random.choice(uvf.Nblts, size=uvf.Nblts // 2, replace=False)
new_nblts = uvf.Nblts // 2
uvf1 = uvf.select(blt_inds=blt_inds, inplace=False)
# test the data was extracted correctly for each case
for param_name, new_param in zip(uvf._data_params, uvf1.data_like_parameters):
old_param = getattr(uvf, param_name)
if uvf.type == "baseline":
assert np.allclose(old_param[blt_inds], new_param)
if uvf.type == "antenna":
assert np.allclose(old_param[:, :, :, blt_inds], new_param)
if uvf.type == "waterfall":
assert np.allclose(old_param[blt_inds], new_param)
assert uvf1.Nblts == new_nblts
# verify that histories are different
assert not uvutils._check_histories(uvf.history, uvf1.history)
assert uvutils._check_histories(uvf.history + ' Downselected to '
'specific baseline-times using pyuvdata.',
uvf1.history)
# test works with higher dimension arrays:
uvf1 = uvf.select(blt_inds=np.atleast_3d(blt_inds), inplace=False)
# test the data was extraced
# assert np.allclose(uvf.metric_array[blt_inds], uvf1.metric_array)
assert uvf1.Nblts == new_nblts
assert 'baseline-times' in uvf1.history
# verify that histories are different
assert not uvutils._check_histories(uvf.history, uvf1.history)
assert uvutils._check_histories(uvf.history + ' Downselected to '
'specific baseline-times using pyuvdata.',
uvf1.history)
# test the error modes of blt_inds
with pytest.raises(ValueError) as cm:
uvf.select(blt_inds=[])
assert str(cm.value).startswith('No baseline-times were found')
with pytest.raises(ValueError) as cm:
uvf.select(blt_inds=[np.max(uvf.Nblts) + 1])
assert str(cm.value).startswith('blt_inds contains indices that are too large')
with pytest.raises(ValueError) as cm:
uvf.select(blt_inds=[-1])
assert str(cm.value).startswith('blt_inds contains indices that are negative')
@cases_decorator_no_waterfall
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_antenna_nums(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
old_history = copy.deepcopy(uvf.history)
np.random.seed(0)
if uvf.type == "baseline":
unique_ants = np.unique(uvf.ant_1_array.tolist()
+ uvf.ant_2_array.tolist())
ants_to_keep = np.random.choice(unique_ants,
size=unique_ants.size // 2,
replace=False)
blts_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in
zip(uvf.ant_1_array, uvf.ant_2_array)]
Nblts_selected = np.sum(blts_select)
else:
unique_ants = np.unique(uvf.ant_array)
ants_to_keep = np.random.choice(unique_ants,
size=unique_ants.size // 2,
replace=False)
uvf2 = copy.deepcopy(uvf)
uvf2.select(antenna_nums=ants_to_keep)
assert len(ants_to_keep) == uvf2.Nants_data
if uvf2.type == "baseline":
assert Nblts_selected == uvf2.Nblts
for ant in ants_to_keep:
assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array
for ant in np.unique(uvf2.ant_1_array.tolist()
+ uvf2.ant_2_array.tolist()):
assert ant in ants_to_keep
else:
for ant in ants_to_keep:
assert ant in uvf2.ant_array
for ant in np.unique(uvf2.ant_array):
assert ant in ants_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
uvf2.history)
# check that it also works with higher dimension array
uvf2 = copy.deepcopy(uvf)
uvf2.select(antenna_nums=np.atleast_3d(ants_to_keep))
assert len(ants_to_keep) == uvf2.Nants_data
assert len(ants_to_keep) == uvf2.Nants_data
if uvf2.type == "baseline":
assert Nblts_selected == uvf2.Nblts
for ant in ants_to_keep:
assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array
for ant in np.unique(uvf2.ant_1_array.tolist()
+ uvf2.ant_2_array.tolist()):
assert ant in ants_to_keep
else:
for ant in ants_to_keep:
assert ant in uvf2.ant_array
for ant in np.unique(uvf2.ant_array):
assert ant in ants_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific antennas using pyuvdata.',
uvf2.history)
# also test for error if antenna numbers not present in data
with pytest.raises(ValueError) as cm:
uvf.select(antenna_nums=np.max(unique_ants) + np.arange(1, 3))
assert str(cm.value).startswith('Antenna number '
'{a} is not present'.format(a=np.max(unique_ants) + 1))
def sort_bl(p):
"""Sort a tuple that starts with a pair of antennas, and may have stuff after."""
if p[1] >= p[0]:
return p
return (p[1], p[0]) + p[2:]
@cases_decorator_no_waterfall
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_bls(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
if uvf.type != "baseline":
with pytest.raises(ValueError) as cm:
uvf.select(bls=[(0, 1)])
assert str(cm.value).startswith('Only "baseline" mode UVFlag '
'objects may select along the '
'baseline axis')
else:
old_history = copy.deepcopy(uvf.history)
bls_select = np.random.choice(uvf.baseline_array,
size=uvf.Nbls // 2,
replace=False)
first_ants, second_ants = uvf.baseline_to_antnums(bls_select)
# give the conjugate bls for a few baselines
first_ants[5:8], second_ants[5:8] = copy.copy(second_ants[5:8]), copy.copy(first_ants[5:8])
new_unique_ants = np.unique(first_ants.tolist() + second_ants.tolist())
ant_pairs_to_keep = list(zip(first_ants, second_ants))
sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep]
blts_select = [sort_bl((a1, a2)) in sorted_pairs_to_keep for (a1, a2) in
zip(uvf.ant_1_array, uvf.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uvf2 = copy.deepcopy(uvf)
uvf2.select(bls=ant_pairs_to_keep)
sorted_pairs_object2 = [sort_bl(p) for p in zip(
uvf2.ant_1_array, uvf2.ant_2_array)]
assert len(new_unique_ants) == uvf2.Nants_data
assert Nblts_selected == uvf2.Nblts
for ant in new_unique_ants:
assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array
for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()):
assert ant in new_unique_ants
for pair in sorted_pairs_to_keep:
assert pair in sorted_pairs_object2
for pair in sorted_pairs_object2:
assert pair in sorted_pairs_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific baselines using pyuvdata.',
uvf2.history)
# Check with polarization too
first_ants, second_ants = uvf.baseline_to_antnums(bls_select)
# conjugate a few bls
first_ants[5:8], second_ants[5:8] = copy.copy(second_ants[5:8]), copy.copy(first_ants[5:8])
pols = ['xx'] * len(first_ants)
new_unique_ants = np.unique(first_ants.tolist() + second_ants.tolist())
ant_pairs_to_keep = list(zip(first_ants, second_ants, pols))
sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep]
blts_select = [sort_bl((a1, a2, 'xx')) in sorted_pairs_to_keep for (a1, a2) in
zip(uvf.ant_1_array, uvf.ant_2_array)]
Nblts_selected = np.sum(blts_select)
uvf2 = copy.deepcopy(uvf)
uvf2.select(bls=ant_pairs_to_keep)
sorted_pairs_object2 = [sort_bl(p) + ('xx',) for p in zip(
uvf2.ant_1_array, uvf2.ant_2_array)]
assert len(new_unique_ants) == uvf2.Nants_data
assert Nblts_selected == uvf2.Nblts
for ant in new_unique_ants:
assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array
for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()):
assert ant in new_unique_ants
for pair in sorted_pairs_to_keep:
assert pair in sorted_pairs_object2
for pair in sorted_pairs_object2:
assert pair in sorted_pairs_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific baselines, polarizations using pyuvdata.',
uvf2.history)
# check that you can specify a single pair without errors
assert isinstance(ant_pairs_to_keep[0], tuple)
uvf2.select(bls=ant_pairs_to_keep[0])
sorted_pairs_object2 = [sort_bl(p) + ('xx', ) for p in zip(
uvf2.ant_1_array, uvf2.ant_2_array)]
assert list(set(sorted_pairs_object2)) == [ant_pairs_to_keep[0]]
# test some error modes
with pytest.raises(ValueError) as cm:
uvf.select(bls=[3])
assert str(cm.value).startswith('bls must be a list of tuples')
# must be integers
with pytest.raises(ValueError) as cm:
uvf.select(bls=[(np.pi, 2 * np.pi)])
assert str(cm.value).startswith('bls must be a list of tuples of integer')
with pytest.raises(ValueError) as cm:
uvf.select(bls=(0, 1, 'xx'), polarizations=[-5])
assert str(cm.value).startswith('Cannot provide length-3 tuples and also specify polarizations.')
with pytest.raises(ValueError) as cm:
uvf.select(bls=(0, 1, 5))
assert str(cm.value).startswith('The third element in each bl must be a polarization string')
with pytest.raises(ValueError) as cm:
uvf.select(bls=(455, 456))
assert str(cm.value).startswith('Antenna number 455 is not present')
with pytest.raises(ValueError) as cm:
uvf.select(bls=(first_ants[0], 456))
assert str(cm.value).startswith('Antenna number 456 is not present')
uvf2 = copy.deepcopy(uvf)
uvf2.select(bls=[(97, 104), (97, 105), (88, 97)])
with pytest.raises(ValueError) as cm:
uvf2.select(bls=(97, 97))
assert str(cm.value).startswith("Antenna pair (97, 97) does not have any")
@cases_decorator
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_times(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
old_history = uvf.history
unique_times = np.unique(uvf.time_array)
times_to_keep = np.random.choice(unique_times, size=unique_times.size // 2,
replace=False)
Nblts_selected = np.sum([t in times_to_keep for t in uvf.time_array])
uvf2 = copy.deepcopy(uvf)
uvf2.select(times=times_to_keep)
assert len(times_to_keep) == uvf2.Ntimes
assert Nblts_selected == uvf2.Nblts
for t in times_to_keep:
assert t in uvf2.time_array
for t in np.unique(uvf2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
uvf2.history)
# check that it also works with higher dimension array
uvf2 = copy.deepcopy(uvf)
uvf2.select(times=times_to_keep[np.newaxis, :])
assert len(times_to_keep) == uvf2.Ntimes
assert Nblts_selected == uvf2.Nblts
for t in times_to_keep:
assert t in uvf2.time_array
for t in np.unique(uvf2.time_array):
assert t in times_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific times using pyuvdata.',
uvf2.history)
# check for errors associated with times not included in data
with pytest.raises(ValueError) as cm:
bad_time = [np.min(unique_times) - .005]
uvf.select(times=bad_time)
assert str(cm.value).startswith('Time {t} is not present in'
' the time_array'.format(t=bad_time[0]))
@cases_decorator
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_frequencies(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
old_history = uvf.history
freqs_to_keep = np.random.choice(uvf.freq_array.squeeze(), size=uvf.Nfreqs // 10,
replace=False)
uvf2 = copy.deepcopy(uvf)
uvf2.select(frequencies=freqs_to_keep)
assert len(freqs_to_keep) == uvf2.Nfreqs
for f in freqs_to_keep:
assert f in uvf2.freq_array
for f in np.unique(uvf2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uvf2.history)
# check that it also works with higher dimension array
uvf2 = copy.deepcopy(uvf)
uvf2.select(frequencies=freqs_to_keep[np.newaxis, :])
assert len(freqs_to_keep) == uvf2.Nfreqs
for f in freqs_to_keep:
assert f in uvf2.freq_array
for f in np.unique(uvf2.freq_array):
assert f in freqs_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uvf2.history)
# check that selecting one frequency works
uvf2 = copy.deepcopy(uvf)
uvf2.select(frequencies=freqs_to_keep[0])
assert 1 == uvf2.Nfreqs
assert freqs_to_keep[0] in uvf2.freq_array
for f in uvf2.freq_array:
assert f in [freqs_to_keep[0]]
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uvf2.history)
# check for errors associated with frequencies not included in data
with pytest.raises(ValueError) as cm:
bad_freq = [np.max(uvf.freq_array) + 100]
uvf.select(frequencies=bad_freq)
assert str(cm.value).startswith('Frequency {f} is not present in the freq_array'.format(f=bad_freq[0]))
@cases_decorator
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_freq_chans(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
old_history = uvf.history
old_history = uvf.history
chans = np.random.choice(uvf.Nfreqs, 2)
c1, c2 = np.sort(chans)
chans_to_keep = np.arange(c1, c2)
uvf2 = copy.deepcopy(uvf)
uvf2.select(freq_chans=chans_to_keep)
assert len(chans_to_keep) == uvf2.Nfreqs
for chan in chans_to_keep:
if uvf2.type != "waterfall":
assert uvf.freq_array[0, chan] in uvf2.freq_array
else:
assert uvf.freq_array[chan] in uvf2.freq_array
for f in np.unique(uvf2.freq_array):
if uvf2.type != "waterfall":
assert f in uvf.freq_array[0, chans_to_keep]
else:
assert f in uvf.freq_array[chans_to_keep]
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uvf2.history)
# check that it also works with higher dimension array
uvf2 = copy.deepcopy(uvf)
uvf2.select(freq_chans=chans_to_keep[np.newaxis, :])
assert len(chans_to_keep) == uvf2.Nfreqs
for chan in chans_to_keep:
if uvf2.type != "waterfall":
assert uvf.freq_array[0, chan] in uvf2.freq_array
else:
assert uvf.freq_array[chan] in uvf2.freq_array
for f in np.unique(uvf2.freq_array):
if uvf2.type != "waterfall":
assert f in uvf.freq_array[0, chans_to_keep]
else:
assert f in uvf.freq_array[chans_to_keep]
assert uvutils._check_histories(old_history + ' Downselected to '
'specific frequencies using pyuvdata.',
uvf2.history)
# Test selecting both channels and frequencies
chans = np.random.choice(uvf.Nfreqs, 2)
c1, c2 = np.sort(chans)
chans_to_keep = np.arange(c1, c2)
if uvf.type != "waterfall":
freqs_to_keep = uvf.freq_array[0, np.arange(c1 + 1, c2)] # Overlaps with chans
else:
freqs_to_keep = uvf.freq_array[np.arange(c1 + 1, c2)] # Overlaps with chans
all_chans_to_keep = np.arange(c1, c2)
uvf2 = copy.deepcopy(uvf)
uvf2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep)
assert len(all_chans_to_keep) == uvf2.Nfreqs
for chan in chans_to_keep:
if uvf2.type != "waterfall":
assert uvf.freq_array[0, chan] in uvf2.freq_array
else:
assert uvf.freq_array[chan] in uvf2.freq_array
for f in np.unique(uvf2.freq_array):
if uvf2.type != "waterfall":
assert f in uvf.freq_array[0, chans_to_keep]
else:
assert f in uvf.freq_array[chans_to_keep]
@cases_decorator
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select_polarizations(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
old_history = uvf.history
pols_to_keep = [-5]
uvf2 = copy.deepcopy(uvf)
uvf2.select(polarizations=pols_to_keep)
assert len(pols_to_keep) == uvf2.Npols
for p in pols_to_keep:
assert p in uvf2.polarization_array
for p in np.unique(uvf2.polarization_array):
assert p in pols_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
uvf2.history)
# check that it also works with higher dimension array
uvf2 = copy.deepcopy(uvf)
uvf2.select(polarizations=[pols_to_keep])
assert len(pols_to_keep) == uvf2.Npols
for p in pols_to_keep:
assert p in uvf2.polarization_array
for p in np.unique(uvf2.polarization_array):
assert p in pols_to_keep
assert uvutils._check_histories(old_history + ' Downselected to '
'specific polarizations using pyuvdata.',
uvf2.history)
# check for errors associated with polarizations not included in data
with pytest.raises(ValueError) as cm:
uvf2.select(polarizations=[-3])
assert str(cm.value).startswith('Polarization {p} is not present in the polarization_array'.format(p=-3))
@cases_decorator
@pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"])
def test_select(input_uvf, uvf_mode):
uvf = input_uvf
# used to set the mode depending on which input is given to uvf_mode
getattr(uvf, uvf_mode)()
np.random.seed(0)
old_history = uvf.history
# make new blts
blt_inds = np.arange(uvf.Nblts - 1)
# new freqs
freqs_to_keep = np.random.choice(uvf.freq_array.squeeze(),
size=uvf.Nfreqs - 1,
replace=False)
# new ants
if uvf.type == "baseline":
unique_ants = np.unique(uvf.ant_1_array.tolist()
+ uvf.ant_2_array.tolist())
ants_to_keep = np.random.choice(unique_ants,
size=unique_ants.size - 1,
replace=False)
elif uvf.type == "antenna":
unique_ants = np.unique(uvf.ant_array)
ants_to_keep = np.random.choice(unique_ants,
size=unique_ants.size - 1,
replace=False)
else:
ants_to_keep = None
if uvf.type == "baseline":
# new bls
bls_select = np.random.choice(uvf.baseline_array,
size=uvf.Nbls - 1,
replace=False)
first_ants, second_ants = uvf.baseline_to_antnums(bls_select)
# give the conjugate bls for a few baselines
first_ants[2:4], second_ants[2:4] = second_ants[2:4], first_ants[2:4]
ant_pairs_to_keep = list(zip(first_ants, second_ants))
sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep]
else:
ant_pairs_to_keep = None
# new times
unique_times = np.unique(uvf.time_array)
times_to_keep = np.random.choice(unique_times,
size=unique_times.size - 1,
replace=False)
# new pols
pols_to_keep = [-5]
# Independently count blts that should be selected
if uvf.type == "baseline":
blts_blt_select = [i in blt_inds for i in np.arange(uvf.Nblts)]
blts_ant_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in
zip(uvf.ant_1_array, uvf.ant_2_array)]
blts_pair_select = [sort_bl((a1, a2)) in sorted_pairs_to_keep for (a1, a2) in
zip(uvf.ant_1_array, uvf.ant_2_array)]
blts_time_select = [t in times_to_keep for t in uvf.time_array]
Nblts_select = np.sum([bi & (ai & pi) & ti for (bi, ai, pi, ti) in
zip(blts_blt_select, blts_ant_select, blts_pair_select,
blts_time_select)])
else:
blts_blt_select = [i in blt_inds for i in np.arange(uvf.Nblts)]
blts_time_select = [t in times_to_keep for t in uvf.time_array]
Nblts_select = np.sum([bi & ti for (bi, ti) in
zip(blts_blt_select, blts_time_select)])
uvf2 = copy.deepcopy(uvf)
uvf2.select(blt_inds=blt_inds, antenna_nums=ants_to_keep,
bls=ant_pairs_to_keep, frequencies=freqs_to_keep,
times=times_to_keep, polarizations=pols_to_keep)
assert Nblts_select == uvf2.Nblts
if uvf.type == "baseline":
for ant in np.unique(uvf2.ant_1_array.tolist()
+ uvf2.ant_2_array.tolist()):
assert ant in ants_to_keep
elif uvf.type == "antenna":
for ant in np.unique(uvf2.ant_array):
assert ant in ants_to_keep
assert len(freqs_to_keep) == uvf2.Nfreqs
for f in freqs_to_keep:
assert f in uvf2.freq_array
for f in np.unique(uvf2.freq_array):
assert f in freqs_to_keep
for t in np.unique(uvf2.time_array):
assert t in times_to_keep
assert len(pols_to_keep) == uvf2.Npols
for p in pols_to_keep:
assert p in uvf2.polarization_array
for p in np.unique(uvf2.polarization_array):
assert p in pols_to_keep
if uvf.type == "baseline":
assert uvutils._check_histories(old_history + ' Downselected to '
'specific baseline-times, antennas, '
'baselines, times, frequencies, '
'polarizations using pyuvdata.',
uvf2.history)
elif uvf.type == "antenna":
assert uvutils._check_histories(old_history + ' Downselected to '
'specific baseline-times, antennas, '
'times, frequencies, '
'polarizations using pyuvdata.',
uvf2.history)
else:
assert uvutils._check_histories(old_history + ' Downselected to '
'specific baseline-times, '
'times, frequencies, '
'polarizations using pyuvdata.',
uvf2.history)
def test_equality_no_history(uvf_from_miriad):
uvf = uvf_from_miriad
uvf2 = uvf.copy()
assert uvf.__eq__(uvf2, check_history=True)
uvf2.history += "different text"
assert uvf.__eq__(uvf2, check_history=False)
def test_inequality_different_classes(uvf_from_miriad):
uvf = uvf_from_miriad
class test_class(object):
def __init__(self):
pass
other_class = test_class()
assert uvf.__ne__(other_class, check_history=False)
def test_to_antenna_collapsed_pols(uvf_from_uvcal):
uvf = uvf_from_uvcal
assert not uvf.pol_collapsed
uvc = UVCal()
uvc.read_calfits(test_c_file)
uvf.collapse_pol()
assert uvf.pol_collapsed
assert uvf.check()
uvf.to_waterfall()
uvf.to_antenna(uvc, force_pol=True)
assert not uvf.pol_collapsed
assert uvf.check()