# -*- mode: python; coding: utf-8 -*- # Copyright (c) 2019 Radio Astronomy Software Group # Licensed under the 2-clause BSD License from __future__ import division import pytest import os import numpy as np import pyuvdata.tests as uvtest from pyuvdata import UVData, UVCal, utils as uvutils from pyuvdata.data import DATA_PATH from pyuvdata import UVFlag from ..uvflag import lst_from_uv, flags2waterfall, and_rows_cols from pyuvdata import __version__ import shutil import copy import warnings import h5py # The following three fixtures are used regularly # to initizize UVFlag objects from standard files # We need to define these here in order to set up # some skips for developers who do not have `pytest-cases` installed @pytest.fixture(scope='function') def uvf_from_miriad(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag() uvf.from_uvdata(uv) # yield the object for the test yield uvf # do some cleanup del(uvf, uv) @pytest.fixture(scope='function') def uvf_from_uvcal(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag() uvf.from_uvcal(uvc) # yield the object for the test yield uvf # do some cleanup del(uvf, uvc) @pytest.fixture(scope='function') def uvf_from_waterfall(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag() uvf.from_uvdata(uv, waterfall=True) # yield the object for the test yield uvf # do some cleanup del(uvf, uv) # Try to import `pytest-cases` and define decorators used to # iterate over the three main types of UVFlag objects # otherwise make the decorators skip the tests that use these iterators try: import pytest_cases cases_decorator = pytest_cases.pytest_parametrize_plus( "input_uvf", [pytest_cases.fixture_ref(uvf_from_miriad), pytest_cases.fixture_ref(uvf_from_uvcal), pytest_cases.fixture_ref(uvf_from_waterfall)]) cases_decorator_no_waterfall = pytest_cases.pytest_parametrize_plus( "input_uvf", [pytest_cases.fixture_ref(uvf_from_miriad), pytest_cases.fixture_ref(uvf_from_uvcal)]) # This warning is raised by pytest_cases # It is due to a feature the developer does # not know how to handle yet. ignore for now. warnings.filterwarnings("ignore", message="WARNING the new order is not" + " taken into account !!", append=True) except ImportError: cases_decorator = uvtest.skipIf_no_pytest_cases cases_decorator_no_waterfall = uvtest.skipIf_no_pytest_cases test_d_file = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvcAA') test_c_file = os.path.join(DATA_PATH, 'zen.2457555.42443.HH.uvcA.omni.calfits') test_f_file = test_d_file + '.testuvflag.h5' test_outfile = os.path.join(DATA_PATH, 'test', 'outtest_uvflag.h5') pyuvdata_version_str = (' Read/written with pyuvdata version: ' + __version__ + '.') def test_init_bad_mode(): uv = UVData() uv.read_miriad(test_d_file) with pytest.raises(ValueError) as cm: UVFlag(uv, mode='bad_mode', history='I made a UVFlag object', label='test') assert str(cm.value).startswith('Input mode must be within acceptable') uv = UVCal() uv.read_calfits(test_c_file) with pytest.raises(ValueError) as cm: UVFlag(uv, mode='bad_mode', history='I made a UVFlag object', label='test') assert str(cm.value).startswith('Input mode must be within acceptable') def test_init_UVData(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, history='I made a UVFlag object', label='test') assert uvf.metric_array.shape == uv.flag_array.shape assert np.all(uvf.metric_array == 0) assert uvf.weights_array.shape == uv.flag_array.shape assert np.all(uvf.weights_array == 1) assert uvf.type == 'baseline' assert uvf.mode == 'metric' assert np.all(uvf.time_array == uv.time_array) assert np.all(uvf.lst_array == uv.lst_array) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.ant_1_array == uv.ant_1_array) assert np.all(uvf.ant_2_array == uv.ant_2_array) assert 'I made a UVFlag object' in uvf.history assert 'Flag object with type "baseline"' in uvf.history assert pyuvdata_version_str in uvf.history assert uvf.label == 'test' def test_init_UVData_x_orientation(): uv = UVData() uv.read_miriad(test_d_file) uv.x_orientation = 'east' uvf = UVFlag(uv, history='I made a UVFlag object', label='test') assert uvf.x_orientation == uv.x_orientation def test_init_UVData_copy_flags(): uv = UVData() uv.read_miriad(test_d_file) uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'metric'}, nwarnings=1, message='Copying flags to type=="baseline"') # with copy flags uvf.metric_array should be none assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert np.array_equal(uvf.flag_array, uv.flag_array) assert uvf.weights_array is None assert uvf.type == 'baseline' assert uvf.mode == 'flag' assert np.all(uvf.time_array == uv.time_array) assert np.all(uvf.lst_array == uv.lst_array) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.ant_1_array == uv.ant_1_array) assert np.all(uvf.ant_2_array == uv.ant_2_array) assert 'Flag object with type "baseline"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_UVData_mode_flag(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag() uvf.from_uvdata(uv, copy_flags=False, mode="flag") # with copy flags uvf.metric_array should be none assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert np.array_equal(uvf.flag_array, uv.flag_array) assert uvf.weights_array is None assert uvf.type == 'baseline' assert uvf.mode == 'flag' assert np.all(uvf.time_array == uv.time_array) assert np.all(uvf.lst_array == uv.lst_array) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.ant_1_array == uv.ant_1_array) assert np.all(uvf.ant_2_array == uv.ant_2_array) assert 'Flag object with type "baseline"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_UVCal(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) assert uvf.metric_array.shape == uvc.flag_array.shape assert np.all(uvf.metric_array == 0) assert uvf.weights_array.shape == uvc.flag_array.shape assert np.all(uvf.weights_array == 1) assert uvf.type == 'antenna' assert uvf.mode == 'metric' assert np.all(uvf.time_array == uvc.time_array) assert uvf.x_orientation == uvc.x_orientation lst = lst_from_uv(uvc) assert np.all(uvf.lst_array == lst) assert np.all(uvf.freq_array == uvc.freq_array[0]) assert np.all(uvf.polarization_array == uvc.jones_array) assert np.all(uvf.ant_array == uvc.ant_array) assert 'Flag object with type "antenna"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_UVCal_mode_flag(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc, copy_flags=False, mode='flag') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert np.array_equal(uvf.flag_array, uvc.flag_array) assert uvf.weights_array is None assert uvf.type == 'antenna' assert uvf.mode == 'flag' assert np.all(uvf.time_array == uvc.time_array) lst = lst_from_uv(uvc) assert np.all(uvf.lst_array == lst) assert np.all(uvf.freq_array == uvc.freq_array[0]) assert np.all(uvf.polarization_array == uvc.jones_array) assert np.all(uvf.ant_array == uvc.ant_array) assert 'Flag object with type "antenna"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_cal_copy_flags(): uv = UVCal() uv.read_calfits(test_c_file) uvf = uvtest.checkWarnings(UVFlag, [uv], {'copy_flags': True, 'mode': 'metric'}, nwarnings=1, message='Copying flags to type=="antenna"') # with copy flags uvf.metric_array should be none assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert np.array_equal(uvf.flag_array, uv.flag_array) assert uvf.type == 'antenna' assert uvf.mode == 'flag' assert np.all(uvf.time_array == np.unique(uv.time_array)) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.jones_array) assert pyuvdata_version_str in uvf.history def test_init_waterfall_uvd(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, waterfall=True) assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols) assert np.all(uvf.metric_array == 0) assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols) assert np.all(uvf.weights_array == 1) assert uvf.type == 'waterfall' assert uvf.mode == 'metric' assert np.all(uvf.time_array == np.unique(uv.time_array)) assert np.all(uvf.lst_array == np.unique(uv.lst_array)) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) assert 'Flag object with type "waterfall"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_waterfall_uvc(): uv = UVCal() uv.read_calfits(test_c_file) uvf = UVFlag(uv, waterfall=True, history='input history check') assert uvf.metric_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones) assert np.all(uvf.metric_array == 0) assert uvf.weights_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones) assert np.all(uvf.weights_array == 1) assert uvf.type == 'waterfall' assert uvf.mode == 'metric' assert np.all(uvf.time_array == np.unique(uv.time_array)) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.jones_array) assert 'Flag object with type "waterfall"' in uvf.history assert 'input history check' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_waterfall_flag_uvcal(): uv = UVCal() uv.read_calfits(test_c_file) uvf = UVFlag(uv, waterfall=True, mode='flag') assert uvf.flag_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Njones) assert not np.any(uvf.flag_array) assert uvf.weights_array is None assert uvf.type == 'waterfall' assert uvf.mode == 'flag' assert np.all(uvf.time_array == np.unique(uv.time_array)) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.jones_array) assert 'Flag object with type "waterfall"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_waterfall_flag_uvdata(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, waterfall=True, mode='flag') assert uvf.flag_array.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols) assert not np.any(uvf.flag_array) assert uvf.weights_array is None assert uvf.type == 'waterfall' assert uvf.mode == 'flag' assert np.all(uvf.time_array == np.unique(uv.time_array)) assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) assert 'Flag object with type "waterfall"' in uvf.history assert pyuvdata_version_str in uvf.history def test_init_waterfall_copy_flags(): uv = UVCal() uv.read_calfits(test_c_file) with pytest.raises(NotImplementedError) as cm: UVFlag(uv, copy_flags=True, mode='flag', waterfall=True) assert str(cm.value).startswith('Cannot copy flags when initializing') uv = UVData() uv.read_miriad(test_d_file) with pytest.raises(NotImplementedError) as cm: UVFlag(uv, copy_flags=True, mode='flag', waterfall=True) assert str(cm.value).startswith('Cannot copy flags when initializing') def test_init_invalid_input(): # input is not UVData, UVCal, path, or list/tuple with pytest.raises(ValueError) as cm: UVFlag(14) assert str(cm.value).startswith('input to UVFlag.__init__ must be one of:') def test_from_uvcal_error(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag() with pytest.raises(ValueError) as cm: uvf.from_uvcal(uv) assert str(cm.value).startswith("from_uvcal can only initialize a UVFlag object") def test_from_udata_error(): uv = UVCal() uv.read_calfits(test_c_file) uvf = UVFlag() with pytest.raises(ValueError) as cm: uvf.from_uvdata(uv) assert str(cm.value).startswith("from_uvdata can only initialize a UVFlag object") def test_init_list_files_weights(tmpdir): # Test that weights are preserved when reading list of files tmp_path = tmpdir.strpath # Create two files to read uvf = UVFlag(test_f_file) np.random.seed(0) wts1 = np.random.rand(*uvf.weights_array.shape) uvf.weights_array = wts1.copy() uvf.write(os.path.join(tmp_path, 'test1.h5')) wts2 = np.random.rand(*uvf.weights_array.shape) uvf.weights_array = wts2.copy() uvf.write(os.path.join(tmp_path, 'test2.h5')) uvf2 = UVFlag([os.path.join(tmp_path, 'test1.h5'), os.path.join(tmp_path, 'test2.h5')]) assert np.all(uvf2.weights_array == np.concatenate([wts1, wts2], axis=0)) def test_data_like_property_mode_tamper(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.mode = 'test' with pytest.raises(ValueError) as cm: list(uvf.data_like_parameters) assert str(cm.value).startswith('Invalid mode. Mode must be one of') def test_read_write_loop(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.write(test_outfile, clobber=True) uvf2 = UVFlag(test_outfile) assert uvf.__eq__(uvf2, check_history=True) def test_read_write_loop_with_optional_x_orientation(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.x_orientation = 'east' uvf.write(test_outfile, clobber=True) uvf2 = UVFlag(test_outfile) assert uvf.__eq__(uvf2, check_history=True) def test_read_write_loop_waterfal(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.to_waterfall() uvf.write(test_outfile, clobber=True) uvf2 = UVFlag(test_outfile) assert uvf.__eq__(uvf2, check_history=True) def test_bad_mode_savefile(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.write(test_outfile, clobber=True) # manually re-read and tamper with parameters with h5py.File(test_outfile, 'a') as h5: mode = h5['Header/mode'] mode[...] = 'test' with pytest.raises(ValueError) as cm: uvf = UVFlag(test_outfile) assert str(cm.value).startswith('File cannot be read. Received mode') def test_bad_type_savefile(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.write(test_outfile, clobber=True) # manually re-read and tamper with parameters with h5py.File(test_outfile, 'a') as h5: mode = h5['Header/type'] mode[...] = 'test' with pytest.raises(ValueError) as cm: uvf = UVFlag(test_outfile) assert str(cm.value).startswith('File cannot be read. Received type') def test_write_add_version_str(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf.write(test_outfile, clobber=True) with h5py.File(test_outfile, 'r') as h5: hist = h5['Header/history'][()].decode("utf8") assert pyuvdata_version_str in hist def test_read_add_version_str(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') assert pyuvdata_version_str in uvf.history uvf.write(test_outfile, clobber=True) with h5py.File(test_outfile, 'r') as h5: hist = h5['Header/history'] del hist uvf2 = UVFlag(test_outfile) assert pyuvdata_version_str in uvf2.history assert uvf == uvf2 def test_read_write_ant(): uv = UVCal() uv.read_calfits(test_c_file) uvf = UVFlag(uv, mode='flag', label='test') uvf.write(test_outfile, clobber=True) uvf2 = UVFlag(test_outfile) assert uvf.__eq__(uvf2, check_history=True) def test_read_missing_nants_data(): uv = UVCal() uv.read_calfits(test_c_file) uvf = UVFlag(uv, mode='flag', label='test') uvf.write(test_outfile, clobber=True) with h5py.File(test_outfile, 'a') as h5: del h5['Header/Nants_data'] uvf2 = uvtest.checkWarnings(UVFlag, [test_outfile], {}, nwarnings=1, message=['Nants_data not available in file,'], category=UserWarning) # make sure this was set to None assert uvf2.Nants_data == len(uvf2.ant_array) uvf2.Nants_data = uvf.Nants_data # verify no other elements were changed assert uvf.__eq__(uvf2, check_history=True) def test_read_missing_nspws(): uv = UVCal() uv.read_calfits(test_c_file) uvf = UVFlag(uv, mode='flag', label='test') uvf.write(test_outfile, clobber=True) with h5py.File(test_outfile, 'a') as h5: del h5['Header/Nspws'] uvf2 = UVFlag(test_outfile) # make sure Nspws was calculated assert uvf2.Nspws == 1 # verify no other elements were changed assert uvf.__eq__(uvf2, check_history=True) def test_read_write_nocompress(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, label='test') uvf.write(test_outfile, clobber=True, data_compression=None) uvf2 = UVFlag(test_outfile) assert uvf.__eq__(uvf2, check_history=True) def test_read_write_nocompress_flag(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, mode='flag', label='test') uvf.write(test_outfile, clobber=True, data_compression=None) uvf2 = UVFlag(test_outfile) assert uvf.__eq__(uvf2, check_history=True) def test_init_list(): uv = UVData() uv.read_miriad(test_d_file) uv.time_array -= 1 uvf = UVFlag([uv, test_f_file]) uvf1 = UVFlag(uv) uvf2 = UVFlag(test_f_file) assert np.array_equal(np.concatenate((uvf1.metric_array, uvf2.metric_array), axis=0), uvf.metric_array) assert np.array_equal(np.concatenate((uvf1.weights_array, uvf2.weights_array), axis=0), uvf.weights_array) assert np.array_equal(np.concatenate((uvf1.time_array, uvf2.time_array)), uvf.time_array) assert np.array_equal(np.concatenate((uvf1.baseline_array, uvf2.baseline_array)), uvf.baseline_array) assert np.array_equal(np.concatenate((uvf1.ant_1_array, uvf2.ant_1_array)), uvf.ant_1_array) assert np.array_equal(np.concatenate((uvf1.ant_2_array, uvf2.ant_2_array)), uvf.ant_2_array) assert uvf.mode == 'metric' assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) def test_read_list(): uv = UVData() uv.read_miriad(test_d_file) uv.time_array -= 1 uvf = UVFlag(uv) uvf.write(test_outfile, clobber=True) uvf.read([test_outfile, test_f_file]) uvf1 = UVFlag(uv) uvf2 = UVFlag(test_f_file) assert np.array_equal(np.concatenate((uvf1.metric_array, uvf2.metric_array), axis=0), uvf.metric_array) assert np.array_equal(np.concatenate((uvf1.weights_array, uvf2.weights_array), axis=0), uvf.weights_array) assert np.array_equal(np.concatenate((uvf1.time_array, uvf2.time_array)), uvf.time_array) assert np.array_equal(np.concatenate((uvf1.baseline_array, uvf2.baseline_array)), uvf.baseline_array) assert np.array_equal(np.concatenate((uvf1.ant_1_array, uvf2.ant_1_array)), uvf.ant_1_array) assert np.array_equal(np.concatenate((uvf1.ant_2_array, uvf2.ant_2_array)), uvf.ant_2_array) assert uvf.mode == 'metric' assert np.all(uvf.freq_array == uv.freq_array[0]) assert np.all(uvf.polarization_array == uv.polarization_array) def test_read_error(): with pytest.raises(IOError) as cm: UVFlag('foo') assert str(cm.value).startswith('foo not found') def test_read_change_type(): uv = UVData() uv.read_miriad(test_d_file) uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf.write(test_outfile, clobber=True) assert hasattr(uvf, 'ant_array') uvf.read(test_f_file) # clear sets these to None now assert hasattr(uvf, 'ant_array') assert uvf.ant_array is None assert hasattr(uvf, 'baseline_array') assert hasattr(uvf, 'ant_1_array') assert hasattr(uvf, 'ant_2_array') uvf.read(test_outfile) assert hasattr(uvf, 'ant_array') assert hasattr(uvf, 'baseline_array') assert uvf.baseline_array is None assert hasattr(uvf, 'ant_1_array') assert uvf.ant_1_array is None assert hasattr(uvf, 'ant_2_array') assert uvf.ant_2_array is None def test_read_change_mode(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv, mode='flag') assert hasattr(uvf, 'flag_array') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None uvf.write(test_outfile, clobber=True) uvf.read(test_f_file) assert hasattr(uvf, 'metric_array') assert hasattr(uvf, 'flag_array') assert uvf.flag_array is None uvf.read(test_outfile) assert hasattr(uvf, 'flag_array') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None def test_write_no_clobber(): uvf = UVFlag(test_f_file) with pytest.raises(ValueError) as cm: uvf.write(test_f_file) assert str(cm.value).startswith('File ' + test_f_file + ' exists;') def test_lst_from_uv(): uv = UVData() uv.read_miriad(test_d_file) lst_array = lst_from_uv(uv) assert np.allclose(uv.lst_array, lst_array) def test_lst_from_uv_error(): with pytest.raises(ValueError) as cm: lst_from_uv(4) assert str(cm.value).startswith('Function lst_from_uv can only operate on') def test_add(): uv1 = UVFlag(test_f_file) uv2 = copy.deepcopy(uv1) uv2.time_array += 1 # Add a day uv3 = uv1 + uv2 assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)), uv3.time_array) assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)), uv3.baseline_array) assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)), uv3.ant_1_array) assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)), uv3.ant_2_array) assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)), uv3.lst_array) assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0), uv3.metric_array) assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0), uv3.weights_array) assert np.array_equal(uv1.freq_array, uv3.freq_array) assert uv3.type == 'baseline' assert uv3.mode == 'metric' assert np.array_equal(uv1.polarization_array, uv3.polarization_array) assert 'Data combined along time axis. ' in uv3.history def test_add_collapsed_pols(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf.collapse_pol() uvf3 = uvf.copy() uvf3.time_array += 1 # increment the time array uvf4 = uvf + uvf3 assert uvf4.Ntimes == 2 * uvf.Ntimes assert uvf4.check() def test_add_add_version_str(): uv1 = UVFlag(test_f_file) uv1.history = uv1.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uv1.history uv2 = copy.deepcopy(uv1) uv2.time_array += 1 # Add a day uv3 = uv1 + uv2 assert pyuvdata_version_str in uv3.history def test_add_baseline(): uv1 = UVFlag(test_f_file) uv2 = copy.deepcopy(uv1) uv2.baseline_array += 100 # Arbitrary uv3 = uv1.__add__(uv2, axis='baseline') assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)), uv3.time_array) assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)), uv3.baseline_array) assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)), uv3.ant_1_array) assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)), uv3.ant_2_array) assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)), uv3.lst_array) assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0), uv3.metric_array) assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0), uv3.weights_array) assert np.array_equal(uv1.freq_array, uv3.freq_array) assert uv3.type == 'baseline' assert uv3.mode == 'metric' assert np.array_equal(uv1.polarization_array, uv3.polarization_array) assert 'Data combined along baseline axis. ' in uv3.history def test_add_antenna(): uvc = UVCal() uvc.read_calfits(test_c_file) uv1 = UVFlag(uvc) uv2 = copy.deepcopy(uv1) uv2.ant_array += 100 # Arbitrary uv3 = uv1.__add__(uv2, axis='antenna') assert np.array_equal(np.concatenate((uv1.ant_array, uv2.ant_array)), uv3.ant_array) assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=0), uv3.metric_array) assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=0), uv3.weights_array) assert np.array_equal(uv1.freq_array, uv3.freq_array) assert np.array_equal(uv1.time_array, uv3.time_array) assert np.array_equal(uv1.lst_array, uv3.lst_array) assert uv3.type == 'antenna' assert uv3.mode == 'metric' assert np.array_equal(uv1.polarization_array, uv3.polarization_array) assert 'Data combined along antenna axis. ' in uv3.history def test_add_frequency(): uv1 = UVFlag(test_f_file) uv2 = copy.deepcopy(uv1) uv2.freq_array += 1e4 # Arbitrary uv3 = uv1.__add__(uv2, axis='frequency') assert np.array_equal(np.concatenate((uv1.freq_array, uv2.freq_array), axis=-1), uv3.freq_array) assert np.array_equal(uv1.time_array, uv3.time_array) assert np.array_equal(uv1.baseline_array, uv3.baseline_array) assert np.array_equal(uv1.ant_1_array, uv3.ant_1_array) assert np.array_equal(uv1.ant_2_array, uv3.ant_2_array) assert np.array_equal(uv1.lst_array, uv3.lst_array) assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=2), uv3.metric_array) assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=2), uv3.weights_array) assert uv3.type == 'baseline' assert uv3.mode == 'metric' assert np.array_equal(uv1.polarization_array, uv3.polarization_array) assert 'Data combined along frequency axis. ' in uv3.history def test_add_pol(): uv1 = UVFlag(test_f_file) uv2 = copy.deepcopy(uv1) uv2.polarization_array += 1 # Arbitrary uv3 = uv1.__add__(uv2, axis='polarization') assert np.array_equal(uv1.freq_array, uv3.freq_array) assert np.array_equal(uv1.time_array, uv3.time_array) assert np.array_equal(uv1.baseline_array, uv3.baseline_array) assert np.array_equal(uv1.ant_1_array, uv3.ant_1_array) assert np.array_equal(uv1.ant_2_array, uv3.ant_2_array) assert np.array_equal(uv1.lst_array, uv3.lst_array) assert np.array_equal(np.concatenate((uv1.metric_array, uv2.metric_array), axis=3), uv3.metric_array) assert np.array_equal(np.concatenate((uv1.weights_array, uv2.weights_array), axis=3), uv3.weights_array) assert uv3.type == 'baseline' assert uv3.mode == 'metric' assert np.array_equal(np.concatenate((uv1.polarization_array, uv2.polarization_array)), uv3.polarization_array) assert 'Data combined along polarization axis. ' in uv3.history def test_add_flag(): uv = UVData() uv.read_miriad(test_d_file) uv1 = UVFlag(uv, mode='flag') uv2 = copy.deepcopy(uv1) uv2.time_array += 1 # Add a day uv3 = uv1 + uv2 assert np.array_equal(np.concatenate((uv1.time_array, uv2.time_array)), uv3.time_array) assert np.array_equal(np.concatenate((uv1.baseline_array, uv2.baseline_array)), uv3.baseline_array) assert np.array_equal(np.concatenate((uv1.ant_1_array, uv2.ant_1_array)), uv3.ant_1_array) assert np.array_equal(np.concatenate((uv1.ant_2_array, uv2.ant_2_array)), uv3.ant_2_array) assert np.array_equal(np.concatenate((uv1.lst_array, uv2.lst_array)), uv3.lst_array) assert np.array_equal(np.concatenate((uv1.flag_array, uv2.flag_array), axis=0), uv3.flag_array) assert np.array_equal(uv1.freq_array, uv3.freq_array) assert uv3.type == 'baseline' assert uv3.mode == 'flag' assert np.array_equal(uv1.polarization_array, uv3.polarization_array) assert 'Data combined along time axis. ' in uv3.history def test_add_errors(): uv = UVData() uv.read_miriad(test_d_file) uvc = UVCal() uvc.read_calfits(test_c_file) uv1 = UVFlag(uv) # Mismatched classes with pytest.raises(ValueError) as cm: uv1.__add__(3) assert str(cm.value).startswith('Only UVFlag objects can be added to a UVFlag object') # Mismatched types uv2 = UVFlag(uvc) with pytest.raises(ValueError) as cm: uv1.__add__(uv2) assert str(cm.value).startswith('UVFlag object of type ') # Mismatched modes uv3 = UVFlag(uv, mode='flag') with pytest.raises(ValueError) as cm: uv1.__add__(uv3) assert str(cm.value).startswith('UVFlag object of mode ') # Invalid axes with pytest.raises(ValueError) as cm: uv1.__add__(uv1, axis='antenna') assert str(cm.value).endswith('concatenated along antenna axis.') with pytest.raises(ValueError) as cm: uv2.__add__(uv2, axis='baseline') assert str(cm.value).endswith('concatenated along baseline axis.') def test_inplace_add(): uv1a = UVFlag(test_f_file) uv1b = copy.deepcopy(uv1a) uv2 = copy.deepcopy(uv1a) uv2.time_array += 1 uv1a += uv2 assert uv1a.__eq__(uv1b + uv2) def test_clear_unused_attributes(): uv = UVFlag(test_f_file) assert hasattr(uv, 'baseline_array') assert hasattr(uv, 'ant_1_array') assert hasattr(uv, 'ant_2_array') assert hasattr(uv, 'Nants_telescope') uv._set_type_antenna() uv.clear_unused_attributes() # clear_unused_attributes now sets these to None print(uv._baseline_array.required) assert hasattr(uv, 'baseline_array') assert uv.baseline_array is None assert hasattr(uv, 'ant_1_array') assert uv.ant_1_array is None assert hasattr(uv, 'ant_2_array') assert uv.ant_2_array is None assert hasattr(uv, 'Nants_telescope') assert uv.Nants_telescope is None uv._set_mode_flag() assert hasattr(uv, 'metric_array') uv.clear_unused_attributes() assert hasattr(uv, 'metric_array') assert uv.metric_array is None # Start over uv = UVFlag(test_f_file) uv.ant_array = np.array([4]) uv.flag_array = np.array([5]) uv.clear_unused_attributes() assert hasattr(uv, 'ant_array') assert uv.ant_array is None assert hasattr(uv, 'flag_array') assert uv.flag_array is None def test_not_equal(): uvf1 = UVFlag(test_f_file) # different class assert not uvf1.__eq__(5) # different mode uvf2 = uvf1.copy() uvf2.mode = 'flag' assert not uvf1.__eq__(uvf2) # different type uvf2 = uvf1.copy() uvf2.type = 'antenna' assert not uvf1.__eq__(uvf2) # array different uvf2 = uvf1.copy() uvf2.freq_array += 1 assert not uvf1.__eq__(uvf2) # history different uvf2 = uvf1.copy() uvf2.history += 'hello' assert not uvf1.__eq__(uvf2, check_history=True) def test_to_waterfall_bl(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf.to_waterfall() assert uvf.type == 'waterfall' assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array), len(uvf.polarization_array)) assert uvf.weights_array.shape == uvf.metric_array.shape def test_to_waterfall_add_version_str(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf.to_waterfall() assert pyuvdata_version_str in uvf.history def test_to_waterfall_bl_multi_pol(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf2 = uvf.copy() # Keep a copy to run with keep_pol=False uvf.to_waterfall() assert uvf.type == 'waterfall' assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array), len(uvf.polarization_array)) assert uvf.weights_array.shape == uvf.metric_array.shape assert len(uvf.polarization_array) == 2 # Repeat with keep_pol=False uvf2.to_waterfall(keep_pol=False) assert uvf2.type == 'waterfall' assert uvf2.metric_array.shape == (len(uvf2.time_array), len(uvf.freq_array), 1) assert uvf2.weights_array.shape == uvf2.metric_array.shape assert len(uvf2.polarization_array) == 1 assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array))) def test_collapse_pol(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf2 = uvf.copy() uvf2.collapse_pol() assert len(uvf2.polarization_array) == 1 assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array))) assert uvf2.mode == 'metric' assert hasattr(uvf2, 'metric_array') assert hasattr(uvf2, 'flag_array') assert uvf2.flag_array is None # test check passes just to be sure assert uvf2.check() # test writing it out and reading in to make sure polarization_array has correct type uvf2.write(test_outfile, clobber=True) uvf = UVFlag(test_outfile) assert uvf._polarization_array.expected_type == str assert uvf._polarization_array.acceptable_vals is None assert uvf == uvf2 os.remove(test_outfile) def test_collapse_pol_add_pol_axis(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf2 = uvf.copy() uvf2.collapse_pol() with pytest.raises(NotImplementedError) as cm: uvf2.__add__(uvf2, axis='pol') assert str(cm.value).startswith("Two UVFlag objects with their") def test_collapse_pol_or(): uvf = UVFlag(test_f_file) uvf.to_flag() assert uvf.weights_array is None uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf2 = uvf.copy() uvf2.collapse_pol(method='or') assert len(uvf2.polarization_array) == 1 assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array))) assert uvf2.mode == 'flag' assert hasattr(uvf2, 'flag_array') assert hasattr(uvf2, 'metric_array') assert uvf2.metric_array is None def test_collapse_pol_add_version_str(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf2 = uvf.copy() uvf2.collapse_pol(method='or') assert pyuvdata_version_str in uvf2.history def test_collapse_single_pol(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf2 = uvf.copy() uvtest.checkWarnings(uvf.collapse_pol, [], {}, nwarnings=1, message='Cannot collapse polarization') assert uvf == uvf2 def test_collapse_pol_flag(): uvf = UVFlag(test_f_file) uvf.to_flag() assert uvf.weights_array is None uvf2 = uvf.copy() uvf2.polarization_array[0] = -4 uvf.__add__(uvf2, inplace=True, axis='pol') # Concatenate to form multi-pol object uvf2 = uvf.copy() uvf2.collapse_pol() assert len(uvf2.polarization_array) == 1 assert uvf2.polarization_array[0] == np.str_(','.join(map(str, uvf.polarization_array))) assert uvf2.mode == 'metric' assert hasattr(uvf2, 'metric_array') assert hasattr(uvf2, 'flag_array') assert uvf2.flag_array is None def test_to_waterfall_bl_flags(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf.to_waterfall() assert uvf.type == 'waterfall' assert uvf.mode == 'metric' assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array), len(uvf.polarization_array)) assert uvf.weights_array.shape == uvf.metric_array.shape assert len(uvf.lst_array) == len(uvf.time_array) def test_to_waterfall_bl_flags_or(): uvf = UVFlag(test_f_file) uvf.to_flag() assert uvf.weights_array is None uvf.to_waterfall(method='or') assert uvf.type == 'waterfall' assert uvf.mode == 'flag' assert uvf.flag_array.shape == (len(uvf.time_array), len(uvf.freq_array), len(uvf.polarization_array)) assert len(uvf.lst_array) == len(uvf.time_array) uvf = UVFlag(test_f_file) uvf.to_flag() uvf.to_waterfall(method='or') assert uvf.type == 'waterfall' assert uvf.mode == 'flag' assert uvf.flag_array.shape == (len(uvf.time_array), len(uvf.freq_array), len(uvf.polarization_array)) assert len(uvf.lst_array) == len(uvf.time_array) def test_to_waterfall_ant(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf.weights_array = np.ones_like(uvf.weights_array) uvf.to_waterfall() assert uvf.type == 'waterfall' assert uvf.metric_array.shape == (len(uvf.time_array), len(uvf.freq_array), len(uvf.polarization_array)) assert uvf.weights_array.shape == uvf.metric_array.shape assert len(uvf.lst_array) == len(uvf.time_array) def test_to_waterfall_waterfall(): uvf = UVFlag(test_f_file) uvf.weights_array = np.ones_like(uvf.weights_array) uvf.to_waterfall() uvtest.checkWarnings(uvf.to_waterfall, [], {}, nwarnings=1, message='This object is already a waterfall') def test_to_baseline_flags(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uvf.to_baseline(uv) assert uvf.type == 'baseline' assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.time_array == uv.time_array) times = np.unique(uvf.time_array) ntrue = 0.0 ind = np.where(uvf.time_array == times[0])[0] ntrue += len(ind) assert np.all(uvf.flag_array[ind, 0, 10, 0]) ind = np.where(uvf.time_array == times[1])[0] ntrue += len(ind) assert np.all(uvf.flag_array[ind, 0, 15, 0]) assert uvf.flag_array.mean() == ntrue / uvf.flag_array.size def test_to_baseline_metric(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf.to_waterfall() uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10 uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15 uvf.to_baseline(uv) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.time_array == uv.time_array) times = np.unique(uvf.time_array) ind = np.where(uvf.time_array == times[0])[0] nt0 = len(ind) assert np.all(uvf.metric_array[ind, 0, 10, 0] == 3.2) ind = np.where(uvf.time_array == times[1])[0] nt1 = len(ind) assert np.all(uvf.metric_array[ind, 0, 15, 0] == 2.1) assert np.isclose(uvf.metric_array.mean(), (3.2 * nt0 + 2.1 * nt1) / uvf.metric_array.size) def test_to_baseline_add_version_str(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf.to_waterfall() uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10 uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15 uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf.to_baseline(uv) assert pyuvdata_version_str in uvf.history def test_baseline_to_baseline(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf2 = uvf.copy() uvf.to_baseline(uv) assert uvf == uvf2 def test_to_baseline_metric_error(uvf_from_uvcal): uvf = uvf_from_uvcal uvf.select(polarizations=uvf.polarization_array[0]) uv = UVData() uv.read_miriad(test_d_file) with pytest.raises(NotImplementedError) as cm: uvf.to_baseline(uv, force_pol=True) assert str(cm.value).startswith("Cannot currently convert from " "antenna type, metric mode") def test_to_baseline_from_antenna(uvf_from_uvcal): uvf = uvf_from_uvcal uvf.select(polarizations=uvf.polarization_array[0]) uvf.to_flag() uv = UVData() uv.read_miriad(test_d_file) ants_data = np.unique(uv.ant_1_array.tolist() + uv.ant_2_array.tolist()) new_ants = np.setdiff1d(ants_data, uvf.ant_array) old_baseline = (uvf.ant_array[0], uvf.ant_array[1]) old_times = np.unique(uvf.time_array) or_flags = np.logical_or(uvf.flag_array[0], uvf.flag_array[1]) or_flags = np.transpose(or_flags, [2, 0, 1, 3]) uv2 = copy.deepcopy(uv) uvf2 = uvf.copy() # hack in the exact times so we can compare some values later uv2.select(bls=old_baseline) uv2.time_array[:uvf2.time_array.size] = uvf.time_array uvf.to_baseline(uv, force_pol=True) uvf2.to_baseline(uv2, force_pol=True) assert uvf.check() uvf2.select(bls=old_baseline, times=old_times) assert np.allclose(or_flags, uvf2.flag_array) # all new antenna should be completely flagged # checks auto correlations uvf_new = uvf.select(antenna_nums=new_ants, inplace=False) for bl in np.unique(uvf_new.baseline_array): uvf2 = uvf_new.select(bls=uv.baseline_to_antnums(bl), inplace=False) assert np.all(uvf2.flag_array) # check for baselines with one new antenna bls = [uvf.baseline_to_antnums(bl) for bl in uvf.baseline_array if np.intersect1d(new_ants, uvf.baseline_to_antnums(bl)).size > 0] uvf_new = uvf.select(bls=bls, inplace=False) for bl in np.unique(uvf_new.baseline_array): uvf2 = uvf_new.select(bls=uv.baseline_to_antnums(bl), inplace=False) assert np.all(uvf2.flag_array) def test_to_baseline_errors(): uvc = UVCal() uvc.read_calfits(test_c_file) uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(test_f_file) uvf.to_waterfall() with pytest.raises(ValueError) as cm: uvf.to_baseline(7.3) # invalid matching object assert str(cm.value).startswith('Must pass in UVData object or UVFlag object') uvf = UVFlag(test_f_file) uvf.to_waterfall() uvf2 = uvf.copy() uvf.polarization_array[0] = -4 with pytest.raises(ValueError) as cm: uvf.to_baseline(uv) # Mismatched pols assert str(cm.value).startswith('Polarizations do not match.') uvf.__iadd__(uvf2, axis='polarization') with pytest.raises(ValueError) as cm: uvf.to_baseline(uv) # Mismatched pols, can't be forced assert str(cm.value).startswith('Polarizations could not be made to match.') def test_to_baseline_force_pol(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uvf.polarization_array[0] = -4 # Change pol, but force pol anyway uvf.to_baseline(uv, force_pol=True) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.time_array == uv.time_array) assert np.array_equal(uvf.polarization_array, uv.polarization_array) times = np.unique(uvf.time_array) ntrue = 0.0 ind = np.where(uvf.time_array == times[0])[0] ntrue += len(ind) assert np.all(uvf.flag_array[ind, 0, 10, 0]) ind = np.where(uvf.time_array == times[1])[0] ntrue += len(ind) assert np.all(uvf.flag_array[ind, 0, 15, 0]) assert uvf.flag_array.mean() == ntrue / uvf.flag_array.size def test_to_baseline_force_pol_Npol_gt_1(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uv2 = copy.deepcopy(uv) uv2.polarization_array[0] = -6 uv += uv2 uvf.to_baseline(uv, force_pol=True) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.time_array == uv.time_array) assert np.array_equal(uvf.polarization_array, uv.polarization_array) assert uvf.Npols == len(uvf.polarization_array) def test_to_baseline_metric_force_pol(): uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(uv) uvf.to_waterfall() uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10 uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15 uvf.polarization_array[0] = -4 uvf.to_baseline(uv, force_pol=True) assert np.all(uvf.baseline_array == uv.baseline_array) assert np.all(uvf.time_array == uv.time_array) assert np.array_equal(uvf.polarization_array, uv.polarization_array) times = np.unique(uvf.time_array) ind = np.where(uvf.time_array == times[0])[0] nt0 = len(ind) assert np.all(uvf.metric_array[ind, 0, 10, 0] == 3.2) ind = np.where(uvf.time_array == times[1])[0] nt1 = len(ind) assert np.all(uvf.metric_array[ind, 0, 15, 0] == 2.1) assert np.isclose(uvf.metric_array.mean(), (3.2 * nt0 + 2.1 * nt1) / uvf.metric_array.size) def test_to_antenna_flags(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uvf.to_antenna(uvc) assert uvf.type == 'antenna' assert np.all(uvf.ant_array == uvc.ant_array) assert np.all(uvf.time_array == uvc.time_array) assert np.all(uvf.flag_array[:, 0, 10, 0, 0]) assert np.all(uvf.flag_array[:, 0, 15, 1, 0]) assert uvf.flag_array.mean() == 2. * uvc.Nants_data / uvf.flag_array.size def test_to_antenna_add_version_str(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf.to_antenna(uvc) assert pyuvdata_version_str in uvf.history def test_to_antenna_metric(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf.to_waterfall() uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10 uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15 uvf.to_antenna(uvc) assert np.all(uvf.ant_array == uvc.ant_array) assert np.all(uvf.time_array == uvc.time_array) assert np.all(uvf.metric_array[:, 0, 10, 0, 0] == 3.2) assert np.all(uvf.metric_array[:, 0, 15, 1, 0] == 2.1) assert np.isclose(uvf.metric_array.mean(), (3.2 + 2.1) * uvc.Nants_data / uvf.metric_array.size) def test_to_antenna_flags_match_uvflag(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf2 = uvf.copy() uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uvf.to_antenna(uvf2) assert np.all(uvf.ant_array == uvc.ant_array) assert np.all(uvf.time_array == uvc.time_array) assert np.all(uvf.flag_array[:, 0, 10, 0, 0]) assert np.all(uvf.flag_array[:, 0, 15, 1, 0]) assert uvf.flag_array.mean() == 2. * uvc.Nants_data / uvf.flag_array.size def test_antenna_to_antenna(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf2 = uvf.copy() uvf.to_antenna(uvc) assert uvf == uvf2 def test_to_antenna_errors(): uvc = UVCal() uvc.read_calfits(test_c_file) uv = UVData() uv.read_miriad(test_d_file) uvf = UVFlag(test_f_file) uvf.to_waterfall() with pytest.raises(ValueError) as cm: uvf.to_antenna(7.3) # invalid matching object assert str(cm.value).startswith('Must pass in UVCal object or UVFlag object ') uvf = UVFlag(uv) with pytest.raises(ValueError) as cm: uvf.to_antenna(uvc) # Cannot pass in baseline type assert str(cm.value).startswith('Cannot convert from type "baseline" to "antenna".') uvf = UVFlag(test_f_file) uvf.to_waterfall() uvf2 = uvf.copy() uvf.polarization_array[0] = -4 with pytest.raises(ValueError) as cm: uvf.to_antenna(uvc) # Mismatched pols assert str(cm.value).startswith('Polarizations do not match. ') uvf.__iadd__(uvf2, axis='polarization') with pytest.raises(ValueError) as cm: uvf.to_antenna(uvc) # Mismatched pols, can't be forced assert str(cm.value).startswith('Polarizations could not be made to match.') def test_to_antenna_force_pol(): uvc = UVCal() uvc.read_calfits(test_c_file) uvc.select(jones=-5) uvf = UVFlag(uvc) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[0, 10, 0] = True # Flag time0, chan10 uvf.flag_array[1, 15, 0] = True # Flag time1, chan15 uvf.polarization_array[0] = -4 # Change pol, but force pol anyway uvf.to_antenna(uvc, force_pol=True) assert np.all(uvf.ant_array == uvc.ant_array) assert np.all(uvf.time_array == uvc.time_array) assert np.array_equal(uvf.polarization_array, uvc.jones_array) assert np.all(uvf.flag_array[:, 0, 10, 0, 0]) assert np.all(uvf.flag_array[:, 0, 15, 1, 0]) assert uvf.flag_array.mean() == 2 * uvc.Nants_data / uvf.flag_array.size def test_to_antenna_metric_force_pol(): uvc = UVCal() uvc.read_calfits(test_c_file) uvc.select(jones=-5) uvf = UVFlag(uvc) uvf.to_waterfall() uvf.metric_array[0, 10, 0] = 3.2 # Fill in time0, chan10 uvf.metric_array[1, 15, 0] = 2.1 # Fill in time1, chan15 uvf.polarization_array[0] = -4 uvf.to_antenna(uvc, force_pol=True) assert np.all(uvf.ant_array == uvc.ant_array) assert np.all(uvf.time_array == uvc.time_array) assert np.array_equal(uvf.polarization_array, uvc.jones_array) assert np.all(uvf.metric_array[:, 0, 10, 0, 0] == 3.2) assert np.all(uvf.metric_array[:, 0, 15, 1, 0] == 2.1) assert np.isclose(uvf.metric_array.mean(), (3.2 + 2.1) * uvc.Nants_data / uvf.metric_array.size) def test_copy(): uvf = UVFlag(test_f_file) uvf2 = uvf.copy() assert uvf == uvf2 # Make sure it's a copy and not just pointing to same object uvf.to_waterfall() assert uvf != uvf2 def test_or(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf2 = uvf.copy() uvf2.flag_array = np.ones_like(uvf2.flag_array) uvf.flag_array[0] = True uvf2.flag_array[0] = False uvf2.flag_array[1] = False uvf3 = uvf | uvf2 assert np.all(uvf3.flag_array[0]) assert not np.any(uvf3.flag_array[1]) assert np.all(uvf3.flag_array[2:]) def test_or_add_version_str(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf2 = uvf.copy() uvf2.flag_array = np.ones_like(uvf2.flag_array) uvf.flag_array[0] = True uvf2.flag_array[0] = False uvf2.flag_array[1] = False uvf3 = uvf | uvf2 assert pyuvdata_version_str in uvf3.history def test_or_error(): uvf = UVFlag(test_f_file) uvf2 = uvf.copy() uvf.to_flag() with pytest.raises(ValueError) as cm: uvf.__or__(uvf2) assert str(cm.value).startswith('UVFlag object must be in "flag" mode') def test_or_add_history(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf2 = uvf.copy() uvf2.history = 'Different history' uvf3 = uvf | uvf2 assert uvf.history in uvf3.history assert uvf2.history in uvf3.history assert "Flags OR'd with:" in uvf3.history def test_ior(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf2 = uvf.copy() uvf2.flag_array = np.ones_like(uvf2.flag_array) uvf.flag_array[0] = True uvf2.flag_array[0] = False uvf2.flag_array[1] = False uvf |= uvf2 assert np.all(uvf.flag_array[0]) assert not np.any(uvf.flag_array[1]) assert np.all(uvf.flag_array[2:]) def test_to_flag(): uvf = UVFlag(test_f_file) uvf.to_flag() assert hasattr(uvf, 'flag_array') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert uvf.mode == 'flag' assert 'Converted to mode "flag"' in uvf.history def test_to_flag_add_version_str(): uvf = UVFlag(test_f_file) uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf.to_flag() assert pyuvdata_version_str in uvf.history def test_to_flag_threshold(): uvf = UVFlag(test_f_file) uvf.metric_array = np.zeros_like(uvf.metric_array) uvf.metric_array[0, 0, 4, 0] = 2. uvf.to_flag(threshold=1.) assert hasattr(uvf, 'flag_array') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert uvf.mode == 'flag' assert uvf.flag_array[0, 0, 4, 0] assert np.sum(uvf.flag_array) == 1. assert 'Converted to mode "flag"' in uvf.history def test_flag_to_flag(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf2 = uvf.copy() uvf2.to_flag() assert uvf == uvf2 def test_to_flag_unknown_mode(): uvf = UVFlag(test_f_file) uvf.mode = 'foo' with pytest.raises(ValueError) as cm: uvf.to_flag() assert str(cm.value).startswith('Unknown UVFlag mode: foo') def test_to_metric_baseline(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf.flag_array[:, :, 10] = True uvf.flag_array[1, :, :] = True assert hasattr(uvf, 'flag_array') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert uvf.mode == 'flag' uvf.to_metric(convert_wgts=True) assert hasattr(uvf, 'metric_array') assert hasattr(uvf, 'flag_array') assert uvf.flag_array is None assert uvf.mode == 'metric' assert 'Converted to mode "metric"' in uvf.history assert np.isclose(uvf.weights_array[1], 0.0).all() assert np.isclose(uvf.weights_array[:, :, 10], 0.0).all() def test_to_metric_add_version_str(): uvf = UVFlag(test_f_file) uvf.to_flag() uvf.flag_array[:, :, 10] = True uvf.flag_array[1, :, :] = True assert hasattr(uvf, 'flag_array') assert hasattr(uvf, 'metric_array') assert uvf.metric_array is None assert uvf.mode == 'flag' uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history uvf.to_metric(convert_wgts=True) assert pyuvdata_version_str in uvf.history def test_to_metric_waterfall(): uvf = UVFlag(test_f_file) uvf.to_waterfall() uvf.to_flag() uvf.flag_array[:, 10] = True uvf.flag_array[1, :, :] = True uvf.to_metric(convert_wgts=True) assert np.isclose(uvf.weights_array[1], 0.0).all() assert np.isclose(uvf.weights_array[:, 10], 0.0).all() def test_to_metric_antenna(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc, mode='flag') uvf.flag_array[10, :, :, 1, :] = True uvf.flag_array[15, :, 3, :, :] = True uvf.to_metric(convert_wgts=True) assert np.isclose(uvf.weights_array[10, :, :, 1, :], 0.0).all() assert np.isclose(uvf.weights_array[15, :, 3, :, :], 0.0).all() def test_metric_to_metric(): uvf = UVFlag(test_f_file) uvf2 = uvf.copy() uvf.to_metric() assert uvf == uvf2 def test_to_metric_unknown_mode(): uvf = UVFlag(test_f_file) uvf.mode = 'foo' with pytest.raises(ValueError) as cm: uvf.to_metric() assert str(cm.value).startswith('Unknown UVFlag mode: foo') def test_antpair2ind(): uvf = UVFlag(test_f_file) ind = uvf.antpair2ind(uvf.ant_1_array[0], uvf.ant_2_array[0]) assert np.all(uvf.ant_1_array[ind] == uvf.ant_1_array[0]) assert np.all(uvf.ant_2_array[ind] == uvf.ant_2_array[0]) def test_antpair2ind_nonbaseline(): uvf = UVFlag(test_f_file) uvf.to_waterfall() with pytest.raises(ValueError) as cm: uvf.antpair2ind(0, 3) assert str(cm.value).startswith('UVFlag object of type ' + uvf.type + ' does not contain antenna ' + 'pairs to index.') def test_baseline_to_antnums(): uvf = UVFlag(test_f_file) a1, a2 = uvf.baseline_to_antnums(uvf.baseline_array[0]) assert a1 == uvf.ant_1_array[0] assert a2 == uvf.ant_2_array[0] def test_get_baseline_nums(): uvf = UVFlag(test_f_file) bls = uvf.get_baseline_nums() assert np.array_equal(bls, np.unique(uvf.baseline_array)) def test_get_antpairs(): uvf = UVFlag(test_f_file) antpairs = uvf.get_antpairs() for a1, a2 in antpairs: ind = np.where((uvf.ant_1_array == a1) & (uvf.ant_2_array == a2))[0] assert len(ind) > 0 for a1, a2 in zip(uvf.ant_1_array, uvf.ant_2_array): assert (a1, a2) in antpairs def test_missing_Nants_telescope(): testfile = os.path.join(DATA_PATH, 'test_missing_Nants.h5') shutil.copyfile(test_f_file, testfile) with h5py.File(testfile, 'r+') as f: del(f['/Header/Nants_telescope']) uvf = uvtest.checkWarnings(UVFlag, [testfile], {}, nwarnings=1, message=['Nants_telescope not available in file,']) uvf2 = UVFlag(test_f_file) uvf2.Nants_telescope = 2047 assert uvf == uvf2 os.remove(testfile) def test_combine_metrics_inplace(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) np.random.seed(44) uvf.metric_array = np.random.normal(size=uvf.metric_array.shape) uvf2 = uvf.copy() uvf2.metric_array *= 2 uvf3 = uvf.copy() uvf3.metric_array *= 3 uvf.combine_metrics([uvf2, uvf3]) factor = np.sqrt((1 + 4 + 9) / 3.) / 2. assert np.allclose(uvf.metric_array, np.abs(uvf2.metric_array) * factor) def test_combine_metrics_not_inplace(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) np.random.seed(44) uvf.metric_array = np.random.normal(size=uvf.metric_array.shape) uvf2 = uvf.copy() uvf2.metric_array *= 2 uvf3 = uvf.copy() uvf3.metric_array *= 3 uvf4 = uvf.combine_metrics([uvf2, uvf3], inplace=False) factor = np.sqrt((1 + 4 + 9) / 3.) assert np.allclose(uvf4.metric_array, np.abs(uvf.metric_array) * factor) def test_combine_metrics_not_uvflag(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) with pytest.raises(ValueError) as cm: uvf.combine_metrics('bubblegum') assert str(cm.value).startswith('"others" must be UVFlag or list of UVFlag objects') def test_combine_metrics_not_metric(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) np.random.seed(44) uvf.metric_array = np.random.normal(size=uvf.metric_array.shape) uvf2 = uvf.copy() uvf2.to_flag() with pytest.raises(ValueError) as cm: uvf.combine_metrics(uvf2) assert str(cm.value).startswith('UVFlag object and "others" must be in "metric"') def test_combine_metrics_wrong_shape(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) np.random.seed(44) uvf.metric_array = np.random.normal(size=uvf.metric_array.shape) uvf2 = uvf.copy() uvf2.to_waterfall() with pytest.raises(ValueError) as cm: uvf.combine_metrics(uvf2) assert str(cm.value).startswith('UVFlag metric array shapes do not match.') def test_combine_metrics_add_version_str(): uvc = UVCal() uvc.read_calfits(test_c_file) uvf = UVFlag(uvc) uvf.history = uvf.history.replace(pyuvdata_version_str, '') assert pyuvdata_version_str not in uvf.history np.random.seed(44) uvf.metric_array = np.random.normal(size=uvf.metric_array.shape) uvf2 = uvf.copy() uvf2.metric_array *= 2 uvf3 = uvf.copy() uvf3.metric_array *= 3 uvf4 = uvf.combine_metrics([uvf2, uvf3], inplace=False) assert pyuvdata_version_str in uvf4.history def test_super(): class test_class(UVFlag): def __init__(self, input, mode='metric', copy_flags=False, waterfall=False, history='', label='', property='prop'): super(test_class, self).__init__(input, mode=mode, copy_flags=copy_flags, waterfall=waterfall, history=history, label=label) self.property = property uv = UVData() uv.read_miriad(test_d_file) tc = test_class(uv, property='property') # UVFlag.__init__ is tested, so just see if it has a metric array assert hasattr(tc, 'metric_array') # Check that it has the property assert tc.property == 'property' def test_flags2waterfall(): uv = UVData() uv.read_miriad(test_d_file) np.random.seed(0) uv.flag_array = np.random.randint(0, 2, size=uv.flag_array.shape, dtype=bool) wf = flags2waterfall(uv) assert np.allclose(np.mean(wf), np.mean(uv.flag_array)) assert wf.shape == (uv.Ntimes, uv.Nfreqs) wf = flags2waterfall(uv, keep_pol=True) assert wf.shape == (uv.Ntimes, uv.Nfreqs, uv.Npols) # Test external flag_array uv.flag_array = np.zeros_like(uv.flag_array) f = np.random.randint(0, 2, size=uv.flag_array.shape, dtype=bool) wf = flags2waterfall(uv, flag_array=f) assert np.allclose(np.mean(wf), np.mean(f)) assert wf.shape == (uv.Ntimes, uv.Nfreqs) # UVCal version uvc = UVCal() uvc.read_calfits(test_c_file) uvc.flag_array = np.random.randint(0, 2, size=uvc.flag_array.shape, dtype=bool) wf = flags2waterfall(uvc) assert np.allclose(np.mean(wf), np.mean(uvc.flag_array)) assert wf.shape == (uvc.Ntimes, uvc.Nfreqs) wf = flags2waterfall(uvc, keep_pol=True) assert wf.shape == (uvc.Ntimes, uvc.Nfreqs, uvc.Njones) def test_flags2waterfall_errors(): # First argument must be UVData or UVCal object with pytest.raises(ValueError) as cm: flags2waterfall(5) assert str(cm.value).startswith('flags2waterfall() requires a UVData or ' + 'UVCal object') uv = UVData() uv.read_miriad(test_d_file) # Flag array must have same shape as uv.flag_array with pytest.raises(ValueError) as cm: flags2waterfall(uv, np.array([4, 5])) assert str(cm.value).startswith('Flag array must align with UVData or UVCal') def test_and_rows_cols(): d = np.zeros((10, 20), np.bool) d[1, :] = True d[:, 2] = True d[5, 10:20] = True d[5:8, 5] = True o = and_rows_cols(d) assert o[1, :].all() assert o[:, 2].all() assert not o[5, :].all() assert not o[:, 5].all() def test_select_waterfall_errors(uvf_from_waterfall): uvf = uvf_from_waterfall with pytest.raises(ValueError) as cm: uvf.select(antenna_nums=[0, 1, 2]) assert str(cm.value).startswith('Cannot select on antenna_nums with waterfall') with pytest.raises(ValueError) as cm: uvf.select(bls=[(0, 1), (0, 2)]) assert str(cm.value).startswith('Cannot select on bls with waterfall') @cases_decorator @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_blt_inds(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) blt_inds = np.random.choice(uvf.Nblts, size=uvf.Nblts // 2, replace=False) new_nblts = uvf.Nblts // 2 uvf1 = uvf.select(blt_inds=blt_inds, inplace=False) # test the data was extracted correctly for each case for param_name, new_param in zip(uvf._data_params, uvf1.data_like_parameters): old_param = getattr(uvf, param_name) if uvf.type == "baseline": assert np.allclose(old_param[blt_inds], new_param) if uvf.type == "antenna": assert np.allclose(old_param[:, :, :, blt_inds], new_param) if uvf.type == "waterfall": assert np.allclose(old_param[blt_inds], new_param) assert uvf1.Nblts == new_nblts # verify that histories are different assert not uvutils._check_histories(uvf.history, uvf1.history) assert uvutils._check_histories(uvf.history + ' Downselected to ' 'specific baseline-times using pyuvdata.', uvf1.history) # test works with higher dimension arrays: uvf1 = uvf.select(blt_inds=np.atleast_3d(blt_inds), inplace=False) # test the data was extraced # assert np.allclose(uvf.metric_array[blt_inds], uvf1.metric_array) assert uvf1.Nblts == new_nblts assert 'baseline-times' in uvf1.history # verify that histories are different assert not uvutils._check_histories(uvf.history, uvf1.history) assert uvutils._check_histories(uvf.history + ' Downselected to ' 'specific baseline-times using pyuvdata.', uvf1.history) # test the error modes of blt_inds with pytest.raises(ValueError) as cm: uvf.select(blt_inds=[]) assert str(cm.value).startswith('No baseline-times were found') with pytest.raises(ValueError) as cm: uvf.select(blt_inds=[np.max(uvf.Nblts) + 1]) assert str(cm.value).startswith('blt_inds contains indices that are too large') with pytest.raises(ValueError) as cm: uvf.select(blt_inds=[-1]) assert str(cm.value).startswith('blt_inds contains indices that are negative') @cases_decorator_no_waterfall @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_antenna_nums(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() old_history = copy.deepcopy(uvf.history) np.random.seed(0) if uvf.type == "baseline": unique_ants = np.unique(uvf.ant_1_array.tolist() + uvf.ant_2_array.tolist()) ants_to_keep = np.random.choice(unique_ants, size=unique_ants.size // 2, replace=False) blts_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in zip(uvf.ant_1_array, uvf.ant_2_array)] Nblts_selected = np.sum(blts_select) else: unique_ants = np.unique(uvf.ant_array) ants_to_keep = np.random.choice(unique_ants, size=unique_ants.size // 2, replace=False) uvf2 = copy.deepcopy(uvf) uvf2.select(antenna_nums=ants_to_keep) assert len(ants_to_keep) == uvf2.Nants_data if uvf2.type == "baseline": assert Nblts_selected == uvf2.Nblts for ant in ants_to_keep: assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()): assert ant in ants_to_keep else: for ant in ants_to_keep: assert ant in uvf2.ant_array for ant in np.unique(uvf2.ant_array): assert ant in ants_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific antennas using pyuvdata.', uvf2.history) # check that it also works with higher dimension array uvf2 = copy.deepcopy(uvf) uvf2.select(antenna_nums=np.atleast_3d(ants_to_keep)) assert len(ants_to_keep) == uvf2.Nants_data assert len(ants_to_keep) == uvf2.Nants_data if uvf2.type == "baseline": assert Nblts_selected == uvf2.Nblts for ant in ants_to_keep: assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()): assert ant in ants_to_keep else: for ant in ants_to_keep: assert ant in uvf2.ant_array for ant in np.unique(uvf2.ant_array): assert ant in ants_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific antennas using pyuvdata.', uvf2.history) # also test for error if antenna numbers not present in data with pytest.raises(ValueError) as cm: uvf.select(antenna_nums=np.max(unique_ants) + np.arange(1, 3)) assert str(cm.value).startswith('Antenna number ' '{a} is not present'.format(a=np.max(unique_ants) + 1)) def sort_bl(p): """Sort a tuple that starts with a pair of antennas, and may have stuff after.""" if p[1] >= p[0]: return p return (p[1], p[0]) + p[2:] @cases_decorator_no_waterfall @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_bls(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) if uvf.type != "baseline": with pytest.raises(ValueError) as cm: uvf.select(bls=[(0, 1)]) assert str(cm.value).startswith('Only "baseline" mode UVFlag ' 'objects may select along the ' 'baseline axis') else: old_history = copy.deepcopy(uvf.history) bls_select = np.random.choice(uvf.baseline_array, size=uvf.Nbls // 2, replace=False) first_ants, second_ants = uvf.baseline_to_antnums(bls_select) # give the conjugate bls for a few baselines first_ants[5:8], second_ants[5:8] = copy.copy(second_ants[5:8]), copy.copy(first_ants[5:8]) new_unique_ants = np.unique(first_ants.tolist() + second_ants.tolist()) ant_pairs_to_keep = list(zip(first_ants, second_ants)) sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep] blts_select = [sort_bl((a1, a2)) in sorted_pairs_to_keep for (a1, a2) in zip(uvf.ant_1_array, uvf.ant_2_array)] Nblts_selected = np.sum(blts_select) uvf2 = copy.deepcopy(uvf) uvf2.select(bls=ant_pairs_to_keep) sorted_pairs_object2 = [sort_bl(p) for p in zip( uvf2.ant_1_array, uvf2.ant_2_array)] assert len(new_unique_ants) == uvf2.Nants_data assert Nblts_selected == uvf2.Nblts for ant in new_unique_ants: assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()): assert ant in new_unique_ants for pair in sorted_pairs_to_keep: assert pair in sorted_pairs_object2 for pair in sorted_pairs_object2: assert pair in sorted_pairs_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific baselines using pyuvdata.', uvf2.history) # Check with polarization too first_ants, second_ants = uvf.baseline_to_antnums(bls_select) # conjugate a few bls first_ants[5:8], second_ants[5:8] = copy.copy(second_ants[5:8]), copy.copy(first_ants[5:8]) pols = ['xx'] * len(first_ants) new_unique_ants = np.unique(first_ants.tolist() + second_ants.tolist()) ant_pairs_to_keep = list(zip(first_ants, second_ants, pols)) sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep] blts_select = [sort_bl((a1, a2, 'xx')) in sorted_pairs_to_keep for (a1, a2) in zip(uvf.ant_1_array, uvf.ant_2_array)] Nblts_selected = np.sum(blts_select) uvf2 = copy.deepcopy(uvf) uvf2.select(bls=ant_pairs_to_keep) sorted_pairs_object2 = [sort_bl(p) + ('xx',) for p in zip( uvf2.ant_1_array, uvf2.ant_2_array)] assert len(new_unique_ants) == uvf2.Nants_data assert Nblts_selected == uvf2.Nblts for ant in new_unique_ants: assert ant in uvf2.ant_1_array or ant in uvf2.ant_2_array for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()): assert ant in new_unique_ants for pair in sorted_pairs_to_keep: assert pair in sorted_pairs_object2 for pair in sorted_pairs_object2: assert pair in sorted_pairs_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific baselines, polarizations using pyuvdata.', uvf2.history) # check that you can specify a single pair without errors assert isinstance(ant_pairs_to_keep[0], tuple) uvf2.select(bls=ant_pairs_to_keep[0]) sorted_pairs_object2 = [sort_bl(p) + ('xx', ) for p in zip( uvf2.ant_1_array, uvf2.ant_2_array)] assert list(set(sorted_pairs_object2)) == [ant_pairs_to_keep[0]] # test some error modes with pytest.raises(ValueError) as cm: uvf.select(bls=[3]) assert str(cm.value).startswith('bls must be a list of tuples') # must be integers with pytest.raises(ValueError) as cm: uvf.select(bls=[(np.pi, 2 * np.pi)]) assert str(cm.value).startswith('bls must be a list of tuples of integer') with pytest.raises(ValueError) as cm: uvf.select(bls=(0, 1, 'xx'), polarizations=[-5]) assert str(cm.value).startswith('Cannot provide length-3 tuples and also specify polarizations.') with pytest.raises(ValueError) as cm: uvf.select(bls=(0, 1, 5)) assert str(cm.value).startswith('The third element in each bl must be a polarization string') with pytest.raises(ValueError) as cm: uvf.select(bls=(455, 456)) assert str(cm.value).startswith('Antenna number 455 is not present') with pytest.raises(ValueError) as cm: uvf.select(bls=(first_ants[0], 456)) assert str(cm.value).startswith('Antenna number 456 is not present') uvf2 = copy.deepcopy(uvf) uvf2.select(bls=[(97, 104), (97, 105), (88, 97)]) with pytest.raises(ValueError) as cm: uvf2.select(bls=(97, 97)) assert str(cm.value).startswith("Antenna pair (97, 97) does not have any") @cases_decorator @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_times(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) old_history = uvf.history unique_times = np.unique(uvf.time_array) times_to_keep = np.random.choice(unique_times, size=unique_times.size // 2, replace=False) Nblts_selected = np.sum([t in times_to_keep for t in uvf.time_array]) uvf2 = copy.deepcopy(uvf) uvf2.select(times=times_to_keep) assert len(times_to_keep) == uvf2.Ntimes assert Nblts_selected == uvf2.Nblts for t in times_to_keep: assert t in uvf2.time_array for t in np.unique(uvf2.time_array): assert t in times_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific times using pyuvdata.', uvf2.history) # check that it also works with higher dimension array uvf2 = copy.deepcopy(uvf) uvf2.select(times=times_to_keep[np.newaxis, :]) assert len(times_to_keep) == uvf2.Ntimes assert Nblts_selected == uvf2.Nblts for t in times_to_keep: assert t in uvf2.time_array for t in np.unique(uvf2.time_array): assert t in times_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific times using pyuvdata.', uvf2.history) # check for errors associated with times not included in data with pytest.raises(ValueError) as cm: bad_time = [np.min(unique_times) - .005] uvf.select(times=bad_time) assert str(cm.value).startswith('Time {t} is not present in' ' the time_array'.format(t=bad_time[0])) @cases_decorator @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_frequencies(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) old_history = uvf.history freqs_to_keep = np.random.choice(uvf.freq_array.squeeze(), size=uvf.Nfreqs // 10, replace=False) uvf2 = copy.deepcopy(uvf) uvf2.select(frequencies=freqs_to_keep) assert len(freqs_to_keep) == uvf2.Nfreqs for f in freqs_to_keep: assert f in uvf2.freq_array for f in np.unique(uvf2.freq_array): assert f in freqs_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', uvf2.history) # check that it also works with higher dimension array uvf2 = copy.deepcopy(uvf) uvf2.select(frequencies=freqs_to_keep[np.newaxis, :]) assert len(freqs_to_keep) == uvf2.Nfreqs for f in freqs_to_keep: assert f in uvf2.freq_array for f in np.unique(uvf2.freq_array): assert f in freqs_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', uvf2.history) # check that selecting one frequency works uvf2 = copy.deepcopy(uvf) uvf2.select(frequencies=freqs_to_keep[0]) assert 1 == uvf2.Nfreqs assert freqs_to_keep[0] in uvf2.freq_array for f in uvf2.freq_array: assert f in [freqs_to_keep[0]] assert uvutils._check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', uvf2.history) # check for errors associated with frequencies not included in data with pytest.raises(ValueError) as cm: bad_freq = [np.max(uvf.freq_array) + 100] uvf.select(frequencies=bad_freq) assert str(cm.value).startswith('Frequency {f} is not present in the freq_array'.format(f=bad_freq[0])) @cases_decorator @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_freq_chans(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) old_history = uvf.history old_history = uvf.history chans = np.random.choice(uvf.Nfreqs, 2) c1, c2 = np.sort(chans) chans_to_keep = np.arange(c1, c2) uvf2 = copy.deepcopy(uvf) uvf2.select(freq_chans=chans_to_keep) assert len(chans_to_keep) == uvf2.Nfreqs for chan in chans_to_keep: if uvf2.type != "waterfall": assert uvf.freq_array[0, chan] in uvf2.freq_array else: assert uvf.freq_array[chan] in uvf2.freq_array for f in np.unique(uvf2.freq_array): if uvf2.type != "waterfall": assert f in uvf.freq_array[0, chans_to_keep] else: assert f in uvf.freq_array[chans_to_keep] assert uvutils._check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', uvf2.history) # check that it also works with higher dimension array uvf2 = copy.deepcopy(uvf) uvf2.select(freq_chans=chans_to_keep[np.newaxis, :]) assert len(chans_to_keep) == uvf2.Nfreqs for chan in chans_to_keep: if uvf2.type != "waterfall": assert uvf.freq_array[0, chan] in uvf2.freq_array else: assert uvf.freq_array[chan] in uvf2.freq_array for f in np.unique(uvf2.freq_array): if uvf2.type != "waterfall": assert f in uvf.freq_array[0, chans_to_keep] else: assert f in uvf.freq_array[chans_to_keep] assert uvutils._check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', uvf2.history) # Test selecting both channels and frequencies chans = np.random.choice(uvf.Nfreqs, 2) c1, c2 = np.sort(chans) chans_to_keep = np.arange(c1, c2) if uvf.type != "waterfall": freqs_to_keep = uvf.freq_array[0, np.arange(c1 + 1, c2)] # Overlaps with chans else: freqs_to_keep = uvf.freq_array[np.arange(c1 + 1, c2)] # Overlaps with chans all_chans_to_keep = np.arange(c1, c2) uvf2 = copy.deepcopy(uvf) uvf2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep) assert len(all_chans_to_keep) == uvf2.Nfreqs for chan in chans_to_keep: if uvf2.type != "waterfall": assert uvf.freq_array[0, chan] in uvf2.freq_array else: assert uvf.freq_array[chan] in uvf2.freq_array for f in np.unique(uvf2.freq_array): if uvf2.type != "waterfall": assert f in uvf.freq_array[0, chans_to_keep] else: assert f in uvf.freq_array[chans_to_keep] @cases_decorator @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select_polarizations(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) old_history = uvf.history pols_to_keep = [-5] uvf2 = copy.deepcopy(uvf) uvf2.select(polarizations=pols_to_keep) assert len(pols_to_keep) == uvf2.Npols for p in pols_to_keep: assert p in uvf2.polarization_array for p in np.unique(uvf2.polarization_array): assert p in pols_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific polarizations using pyuvdata.', uvf2.history) # check that it also works with higher dimension array uvf2 = copy.deepcopy(uvf) uvf2.select(polarizations=[pols_to_keep]) assert len(pols_to_keep) == uvf2.Npols for p in pols_to_keep: assert p in uvf2.polarization_array for p in np.unique(uvf2.polarization_array): assert p in pols_to_keep assert uvutils._check_histories(old_history + ' Downselected to ' 'specific polarizations using pyuvdata.', uvf2.history) # check for errors associated with polarizations not included in data with pytest.raises(ValueError) as cm: uvf2.select(polarizations=[-3]) assert str(cm.value).startswith('Polarization {p} is not present in the polarization_array'.format(p=-3)) @cases_decorator @pytest.mark.parametrize("uvf_mode", ["to_flag", "to_metric"]) def test_select(input_uvf, uvf_mode): uvf = input_uvf # used to set the mode depending on which input is given to uvf_mode getattr(uvf, uvf_mode)() np.random.seed(0) old_history = uvf.history # make new blts blt_inds = np.arange(uvf.Nblts - 1) # new freqs freqs_to_keep = np.random.choice(uvf.freq_array.squeeze(), size=uvf.Nfreqs - 1, replace=False) # new ants if uvf.type == "baseline": unique_ants = np.unique(uvf.ant_1_array.tolist() + uvf.ant_2_array.tolist()) ants_to_keep = np.random.choice(unique_ants, size=unique_ants.size - 1, replace=False) elif uvf.type == "antenna": unique_ants = np.unique(uvf.ant_array) ants_to_keep = np.random.choice(unique_ants, size=unique_ants.size - 1, replace=False) else: ants_to_keep = None if uvf.type == "baseline": # new bls bls_select = np.random.choice(uvf.baseline_array, size=uvf.Nbls - 1, replace=False) first_ants, second_ants = uvf.baseline_to_antnums(bls_select) # give the conjugate bls for a few baselines first_ants[2:4], second_ants[2:4] = second_ants[2:4], first_ants[2:4] ant_pairs_to_keep = list(zip(first_ants, second_ants)) sorted_pairs_to_keep = [sort_bl(p) for p in ant_pairs_to_keep] else: ant_pairs_to_keep = None # new times unique_times = np.unique(uvf.time_array) times_to_keep = np.random.choice(unique_times, size=unique_times.size - 1, replace=False) # new pols pols_to_keep = [-5] # Independently count blts that should be selected if uvf.type == "baseline": blts_blt_select = [i in blt_inds for i in np.arange(uvf.Nblts)] blts_ant_select = [(a1 in ants_to_keep) & (a2 in ants_to_keep) for (a1, a2) in zip(uvf.ant_1_array, uvf.ant_2_array)] blts_pair_select = [sort_bl((a1, a2)) in sorted_pairs_to_keep for (a1, a2) in zip(uvf.ant_1_array, uvf.ant_2_array)] blts_time_select = [t in times_to_keep for t in uvf.time_array] Nblts_select = np.sum([bi & (ai & pi) & ti for (bi, ai, pi, ti) in zip(blts_blt_select, blts_ant_select, blts_pair_select, blts_time_select)]) else: blts_blt_select = [i in blt_inds for i in np.arange(uvf.Nblts)] blts_time_select = [t in times_to_keep for t in uvf.time_array] Nblts_select = np.sum([bi & ti for (bi, ti) in zip(blts_blt_select, blts_time_select)]) uvf2 = copy.deepcopy(uvf) uvf2.select(blt_inds=blt_inds, antenna_nums=ants_to_keep, bls=ant_pairs_to_keep, frequencies=freqs_to_keep, times=times_to_keep, polarizations=pols_to_keep) assert Nblts_select == uvf2.Nblts if uvf.type == "baseline": for ant in np.unique(uvf2.ant_1_array.tolist() + uvf2.ant_2_array.tolist()): assert ant in ants_to_keep elif uvf.type == "antenna": for ant in np.unique(uvf2.ant_array): assert ant in ants_to_keep assert len(freqs_to_keep) == uvf2.Nfreqs for f in freqs_to_keep: assert f in uvf2.freq_array for f in np.unique(uvf2.freq_array): assert f in freqs_to_keep for t in np.unique(uvf2.time_array): assert t in times_to_keep assert len(pols_to_keep) == uvf2.Npols for p in pols_to_keep: assert p in uvf2.polarization_array for p in np.unique(uvf2.polarization_array): assert p in pols_to_keep if uvf.type == "baseline": assert uvutils._check_histories(old_history + ' Downselected to ' 'specific baseline-times, antennas, ' 'baselines, times, frequencies, ' 'polarizations using pyuvdata.', uvf2.history) elif uvf.type == "antenna": assert uvutils._check_histories(old_history + ' Downselected to ' 'specific baseline-times, antennas, ' 'times, frequencies, ' 'polarizations using pyuvdata.', uvf2.history) else: assert uvutils._check_histories(old_history + ' Downselected to ' 'specific baseline-times, ' 'times, frequencies, ' 'polarizations using pyuvdata.', uvf2.history) def test_equality_no_history(uvf_from_miriad): uvf = uvf_from_miriad uvf2 = uvf.copy() assert uvf.__eq__(uvf2, check_history=True) uvf2.history += "different text" assert uvf.__eq__(uvf2, check_history=False) def test_inequality_different_classes(uvf_from_miriad): uvf = uvf_from_miriad class test_class(object): def __init__(self): pass other_class = test_class() assert uvf.__ne__(other_class, check_history=False) def test_to_antenna_collapsed_pols(uvf_from_uvcal): uvf = uvf_from_uvcal assert not uvf.pol_collapsed uvc = UVCal() uvc.read_calfits(test_c_file) uvf.collapse_pol() assert uvf.pol_collapsed assert uvf.check() uvf.to_waterfall() uvf.to_antenna(uvc, force_pol=True) assert not uvf.pol_collapsed assert uvf.check()