"""Tests for uvcal object.""" import nose.tools as nt import os import numpy as np import copy import ephem from pyuvdata.uvcal import UVCal import pyuvdata.utils as uvutils import pyuvdata.tests as uvtest from pyuvdata.data import DATA_PATH class TestUVCalInit(object): def setUp(self): """Setup for basic parameter, property and iterator tests.""" self.required_parameters = ['_Nfreqs', '_Njones', '_Ntimes', '_Nspws', '_Nants_data', '_Nants_telescope', '_antenna_names', '_antenna_numbers', '_ant_array', '_telescope_name', '_freq_array', '_channel_width', '_spw_array', '_jones_array', '_time_array', '_time_range', '_integration_time', '_gain_convention', '_flag_array', '_quality_array', '_cal_type', '_x_orientation', '_history'] self.required_properties = ['Nfreqs', 'Njones', 'Ntimes', 'Nspws', 'Nants_data', 'Nants_telescope', 'antenna_names', 'antenna_numbers', 'ant_array', 'telescope_name', 'freq_array', 'channel_width', 'spw_array', 'jones_array', 'time_array', 'time_range', 'integration_time', 'gain_convention', 'flag_array', 'quality_array', 'cal_type', 'x_orientation', 'history'] self.extra_parameters = ['_gain_array', '_delay_array', '_input_flag_array', '_freq_range', '_observer', '_git_origin_cal', '_git_hash_cal', '_total_quality_array'] self.extra_properties = ['gain_array', 'delay_array', 'input_flag_array', 'freq_range', 'observer', 'git_origin_cal', 'git_hash_cal', 'total_quality_array'] self.other_properties = ['pyuvdata_version_str'] self.uv_cal_object = UVCal() def teardown(self): """Test teardown: delete object.""" del(self.uv_cal_object) def test_parameter_iter(self): "Test expected parameters." all = [] for prop in self.uv_cal_object: all.append(prop) for a in self.required_parameters + self.extra_parameters: nt.assert_true(a in all, msg='expected attribute ' + a + ' not returned in object iterator') def test_required_parameter_iter(self): "Test expected required parameters." required = [] for prop in self.uv_cal_object.required(): required.append(prop) for a in self.required_parameters: nt.assert_true(a in required, msg='expected attribute ' + a + ' not returned in required iterator') def test_unexpected_parameters(self): "Test for extra parameters." expected_parameters = self.required_parameters + self.extra_parameters attributes = [i for i in self.uv_cal_object.__dict__.keys() if i[0] == '_'] for a in attributes: nt.assert_true(a in expected_parameters, msg='unexpected parameter ' + a + ' found in UVCal') def test_unexpected_attributes(self): "Test for extra attributes." expected_attributes = self.required_properties + \ self.extra_properties + self.other_properties attributes = [i for i in self.uv_cal_object.__dict__.keys() if i[0] != '_'] for a in attributes: nt.assert_true(a in expected_attributes, msg='unexpected attribute ' + a + ' found in UVCal') def test_properties(self): "Test that properties can be get and set properly." prop_dict = dict(zip(self.required_properties + self.extra_properties, self.required_parameters + self.extra_parameters)) for k, v in prop_dict.iteritems(): rand_num = np.random.rand() setattr(self.uv_cal_object, k, rand_num) this_param = getattr(self.uv_cal_object, v) try: nt.assert_equal(rand_num, this_param.value) except(AssertionError): print('setting {prop_name} to a random number failed'.format(prop_name=k)) raise(AssertionError) class TestUVCalBasicMethods(object): def setUp(self): """Set up test""" self.gain_object = UVCal() gainfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.fitsA') self.gain_object.read_calfits(gainfile) self.gain_object2 = copy.deepcopy(self.gain_object) self.delay_object = UVCal() delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvc.fits') message = delayfile + ' appears to be an old calfits format' uvtest.checkWarnings(self.delay_object.read_calfits, [delayfile], message=message) def teardown(self): """Tear down test""" del(self.gain_object) del(self.gain_object2) def test_equality(self): """Basic equality test""" nt.assert_equal(self.gain_object, self.gain_object) def test_check(self): """Test that parameter checks run properly""" nt.assert_true(self.gain_object.check()) def test_nants_data_telescope(self): self.gain_object.Nants_data = self.gain_object.Nants_telescope - 1 nt.assert_true(self.gain_object.check) self.gain_object.Nants_data = self.gain_object.Nants_telescope + 1 nt.assert_raises(ValueError, self.gain_object.check) def test_set_gain(self): self.delay_object.set_gain() nt.assert_true(self.delay_object._gain_array.required) nt.assert_false(self.delay_object._delay_array.required) nt.assert_equal(self.delay_object._gain_array.form, self.delay_object._flag_array.form) nt.assert_equal(self.delay_object._gain_array.form, self.delay_object._quality_array.form) def test_set_delay(self): self.gain_object.set_delay() nt.assert_true(self.gain_object._delay_array.required) nt.assert_false(self.gain_object._gain_array.required) nt.assert_equal(self.gain_object._gain_array.form, self.gain_object._flag_array.form) nt.assert_equal(self.gain_object._delay_array.form, self.gain_object._quality_array.form) def test_set_unknown(self): self.gain_object.set_unknown_cal_type() nt.assert_false(self.gain_object._delay_array.required) nt.assert_false(self.gain_object._gain_array.required) nt.assert_equal(self.gain_object._gain_array.form, self.gain_object._flag_array.form) nt.assert_equal(self.gain_object._gain_array.form, self.gain_object._quality_array.form) def test_convert_filetype(self): # error testing nt.assert_raises(ValueError, self.gain_object._convert_to_filetype, 'uvfits') def test_convert_to_gain(self): conventions = ['minus', 'plus'] for c in conventions: self.new_object = copy.deepcopy(self.delay_object) self.new_object.convert_to_gain(delay_convention=c) nt.assert_true(np.isclose(np.max(np.absolute(self.new_object.gain_array)), 1., rtol=self.new_object._gain_array.tols[0], atol=self.new_object._gain_array.tols[1])) nt.assert_true(np.isclose(np.min(np.absolute(self.new_object.gain_array)), 1., rtol=self.new_object._gain_array.tols[0], atol=self.new_object._gain_array.tols[1])) if c == 'minus': conv = -1 else: conv = 1 nt.assert_true(np.allclose(np.angle(self.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi), (conv * 2 * np.pi * self.delay_object.delay_array[:, :, 0, :, :] * self.delay_object.freq_array[0, 10]) % (2 * np.pi), rtol=self.new_object._gain_array.tols[0], atol=self.new_object._gain_array.tols[1])) nt.assert_true(np.allclose(self.delay_object.quality_array, self.new_object.quality_array[:, :, 10, :, :], rtol=self.new_object._quality_array.tols[0], atol=self.new_object._quality_array.tols[1])) # test a file with a total_quality_array self.new_object = copy.deepcopy(self.delay_object) tqa_size = self.new_object.delay_array.shape[1:] self.new_object.total_quality_array = np.ones(tqa_size) self.new_object.convert_to_gain(delay_convention='minus') nt.assert_true(np.isclose(np.max(np.absolute(self.new_object.gain_array)), 1., rtol=self.new_object._gain_array.tols[0], atol=self.new_object._gain_array.tols[1])) nt.assert_true(np.isclose(np.min(np.absolute(self.new_object.gain_array)), 1., rtol=self.new_object._gain_array.tols[0], atol=self.new_object._gain_array.tols[1])) nt.assert_true(np.allclose(np.angle(self.new_object.gain_array[:, :, 10, :, :]) % (2 * np.pi), (-1 * 2 * np.pi * self.delay_object.delay_array[:, :, 0, :, :] * self.delay_object.freq_array[0, 10]) % (2 * np.pi), rtol=self.new_object._gain_array.tols[0], atol=self.new_object._gain_array.tols[1])) nt.assert_true(np.allclose(self.delay_object.quality_array, self.new_object.quality_array[:, :, 10, :, :], rtol=self.new_object._quality_array.tols[0], atol=self.new_object._quality_array.tols[1])) # error testing nt.assert_raises(ValueError, self.delay_object.convert_to_gain, delay_convention='bogus') nt.assert_raises(ValueError, self.gain_object.convert_to_gain) self.gain_object.set_unknown_cal_type() nt.assert_raises(ValueError, self.gain_object.convert_to_gain) class TestUVCalSelectGain(object): def setUp(self): """Set up test""" self.gain_object = UVCal() gainfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.fitsA') self.gain_object.read_calfits(gainfile) self.gain_object2 = copy.deepcopy(self.gain_object) def teardown(self): """Tear down test""" del(self.gain_object) del(self.gain_object2) def test_select_antennas(self): old_history = self.gain_object.history ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72]) self.gain_object2.select(antenna_nums=ants_to_keep) nt.assert_equal(len(ants_to_keep), self.gain_object2.Nants_data) for ant in ants_to_keep: nt.assert_true(ant in self.gain_object2.ant_array) for ant in self.gain_object2.ant_array: nt.assert_true(ant in ants_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific antennas using pyuvdata.', self.gain_object2.history)) # now test using antenna_names to specify antennas to keep ants_to_keep = np.array(sorted(list(ants_to_keep))) ant_names = [] for a in ants_to_keep: ind = np.where(self.gain_object.antenna_numbers == a)[0][0] ant_names.append(self.gain_object.antenna_names[ind]) self.gain_object3 = self.gain_object.select(antenna_names=ant_names, inplace=False) nt.assert_equal(self.gain_object2, self.gain_object3) # check for errors associated with antennas not included in data, bad names or providing numbers and names nt.assert_raises(ValueError, self.gain_object.select, antenna_nums=np.max(self.gain_object.ant_array) + np.arange(1, 3)) nt.assert_raises(ValueError, self.gain_object.select, antenna_names='test1') nt.assert_raises(ValueError, self.gain_object.select, antenna_nums=ants_to_keep, antenna_names=ant_names) # check that write_calfits works with Nants_data < Nants_telescope write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') status = self.gain_object2.write_calfits(write_file_calfits, clobber=True) # check that total_quality_array is handled properly when present self.gain_object.total_quality_array = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) uvtest.checkWarnings(self.gain_object.select, [], {'antenna_names': ant_names, 'inplace': True}, message='Cannot preserve total_quality_array') nt.assert_equal(self.gain_object.total_quality_array, None) def test_select_times(self): # add another time to allow for better testing of selections new_time = np.max(self.gain_object.time_array) + self.gain_object.integration_time self.gain_object.time_array = np.append(self.gain_object.time_array, new_time) self.gain_object.Ntimes += 1 self.gain_object.flag_array = np.concatenate((self.gain_object.flag_array, self.gain_object.flag_array[:, :, :, [-1], :]), axis=3) self.gain_object.gain_array = np.concatenate((self.gain_object.gain_array, self.gain_object.gain_array[:, :, :, [-1], :]), axis=3) self.gain_object.quality_array = np.concatenate((self.gain_object.quality_array, self.gain_object.quality_array[:, :, :, [-1], :]), axis=3) self.gain_object.total_quality_array = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) nt.assert_true(self.gain_object.check()) self.gain_object2 = copy.deepcopy(self.gain_object) old_history = self.gain_object.history times_to_keep = self.gain_object.time_array[[2, 0]] self.gain_object2.select(times=times_to_keep) nt.assert_equal(len(times_to_keep), self.gain_object2.Ntimes) for t in times_to_keep: nt.assert_true(t in self.gain_object2.time_array) for t in np.unique(self.gain_object2.time_array): nt.assert_true(t in times_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific times using pyuvdata.', self.gain_object2.history)) write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') # test writing calfits with only one time self.gain_object2 = copy.deepcopy(self.gain_object) times_to_keep = self.gain_object.time_array[[1]] self.gain_object2.select(times=times_to_keep) self.gain_object2.write_calfits(write_file_calfits, clobber=True) # check for errors associated with times not included in data nt.assert_raises(ValueError, self.gain_object.select, times=[np.min(self.gain_object.time_array) - self.gain_object.integration_time]) # check for warnings and errors associated with unevenly spaced times self.gain_object2 = copy.deepcopy(self.gain_object) uvtest.checkWarnings(self.gain_object2.select, [], {'times': self.gain_object2.time_array[[0, 2, 3]]}, message='Selected times are not evenly spaced') nt.assert_raises(ValueError, self.gain_object2.write_calfits, write_file_calfits) def test_select_frequencies(self): old_history = self.gain_object.history freqs_to_keep = self.gain_object.freq_array[0, np.arange(73, 944)] # add dummy total_quality_array self.gain_object.total_quality_array = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) self.gain_object2.total_quality_array = np.zeros( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) self.gain_object2.select(frequencies=freqs_to_keep) nt.assert_equal(len(freqs_to_keep), self.gain_object2.Nfreqs) for f in freqs_to_keep: nt.assert_true(f in self.gain_object2.freq_array) for f in np.unique(self.gain_object2.freq_array): nt.assert_true(f in freqs_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', self.gain_object2.history)) write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') # test writing calfits with only one frequency self.gain_object2 = copy.deepcopy(self.gain_object) freqs_to_keep = self.gain_object.freq_array[0, 51] self.gain_object2.select(frequencies=freqs_to_keep) self.gain_object2.write_calfits(write_file_calfits, clobber=True) # check for errors associated with frequencies not included in data nt.assert_raises(ValueError, self.gain_object.select, frequencies=[np.max(self.gain_object.freq_array) + self.gain_object.channel_width]) # check for warnings and errors associated with unevenly spaced frequencies self.gain_object2 = copy.deepcopy(self.gain_object) uvtest.checkWarnings(self.gain_object2.select, [], {'frequencies': self.gain_object2.freq_array[0, [0, 5, 6]]}, message='Selected frequencies are not evenly spaced') nt.assert_raises(ValueError, self.gain_object2.write_calfits, write_file_calfits) def test_select_freq_chans(self): old_history = self.gain_object.history chans_to_keep = np.arange(73, 944) # add dummy total_quality_array self.gain_object.total_quality_array = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) self.gain_object2.total_quality_array = np.zeros( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) self.gain_object2.select(freq_chans=chans_to_keep) nt.assert_equal(len(chans_to_keep), self.gain_object2.Nfreqs) for chan in chans_to_keep: nt.assert_true(self.gain_object.freq_array[0, chan] in self.gain_object2.freq_array) for f in np.unique(self.gain_object2.freq_array): nt.assert_true(f in self.gain_object.freq_array[0, chans_to_keep]) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', self.gain_object2.history)) # Test selecting both channels and frequencies freqs_to_keep = self.gain_object.freq_array[0, np.arange(930, 1000)] # Overlaps with chans all_chans_to_keep = np.arange(73, 1000) self.gain_object2 = copy.deepcopy(self.gain_object) self.gain_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep) nt.assert_equal(len(all_chans_to_keep), self.gain_object2.Nfreqs) for chan in all_chans_to_keep: nt.assert_true(self.gain_object.freq_array[0, chan] in self.gain_object2.freq_array) for f in np.unique(self.gain_object2.freq_array): nt.assert_true(f in self.gain_object.freq_array[0, all_chans_to_keep]) def test_select_polarizations(self): # add more jones terms to allow for better testing of selections while self.gain_object.Njones < 4: new_jones = np.min(self.gain_object.jones_array) - 1 self.gain_object.jones_array = np.append(self.gain_object.jones_array, new_jones) self.gain_object.Njones += 1 self.gain_object.flag_array = np.concatenate((self.gain_object.flag_array, self.gain_object.flag_array[:, :, :, :, [-1]]), axis=4) self.gain_object.gain_array = np.concatenate((self.gain_object.gain_array, self.gain_object.gain_array[:, :, :, :, [-1]]), axis=4) self.gain_object.quality_array = np.concatenate((self.gain_object.quality_array, self.gain_object.quality_array[:, :, :, :, [-1]]), axis=4) # add dummy total_quality_array self.gain_object.total_quality_array = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) nt.assert_true(self.gain_object.check()) self.gain_object2 = copy.deepcopy(self.gain_object) old_history = self.gain_object.history jones_to_keep = [-5, -6] self.gain_object2.select(jones=jones_to_keep) nt.assert_equal(len(jones_to_keep), self.gain_object2.Njones) for j in jones_to_keep: nt.assert_true(j in self.gain_object2.jones_array) for j in np.unique(self.gain_object2.jones_array): nt.assert_true(j in jones_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific jones polarization terms ' 'using pyuvdata.', self.gain_object2.history)) # check for errors associated with polarizations not included in data nt.assert_raises(ValueError, self.gain_object2.select, jones=[-3, -4]) # check for warnings and errors associated with unevenly spaced polarizations uvtest.checkWarnings(self.gain_object.select, [], {'jones': self.gain_object.jones_array[[0, 1, 3]]}, message='Selected jones polarization terms are not evenly spaced') write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') nt.assert_raises(ValueError, self.gain_object.write_calfits, write_file_calfits) def test_select(self): # now test selecting along all axes at once old_history = self.gain_object.history ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64]) freqs_to_keep = self.gain_object.freq_array[0, np.arange(31, 56)] times_to_keep = self.gain_object.time_array[[1, 2]] jones_to_keep = [-5] self.gain_object2.select(antenna_nums=ants_to_keep, frequencies=freqs_to_keep, times=times_to_keep, jones=jones_to_keep) nt.assert_equal(len(ants_to_keep), self.gain_object2.Nants_data) for ant in ants_to_keep: nt.assert_true(ant in self.gain_object2.ant_array) for ant in self.gain_object2.ant_array: nt.assert_true(ant in ants_to_keep) nt.assert_equal(len(times_to_keep), self.gain_object2.Ntimes) for t in times_to_keep: nt.assert_true(t in self.gain_object2.time_array) for t in np.unique(self.gain_object2.time_array): nt.assert_true(t in times_to_keep) nt.assert_equal(len(freqs_to_keep), self.gain_object2.Nfreqs) for f in freqs_to_keep: nt.assert_true(f in self.gain_object2.freq_array) for f in np.unique(self.gain_object2.freq_array): nt.assert_true(f in freqs_to_keep) nt.assert_equal(len(jones_to_keep), self.gain_object2.Njones) for j in jones_to_keep: nt.assert_true(j in self.gain_object2.jones_array) for j in np.unique(self.gain_object2.jones_array): nt.assert_true(j in jones_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific antennas, times, ' 'frequencies, jones polarization terms ' 'using pyuvdata.', self.gain_object2.history)) class TestUVCalSelectDelay(object): def setUp(self): """Set up test""" self.delay_object = UVCal() delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvc.fits') # add an input flag array to the file to test for that. write_file = os.path.join(DATA_PATH, 'test/outtest_input_flags.fits') uv_in = UVCal() message = delayfile + ' appears to be an old calfits format' uvtest.checkWarnings(uv_in.read_calfits, [delayfile], message=message) uv_in.input_flag_array = np.zeros(uv_in._input_flag_array.expected_shape(uv_in), dtype=bool) uv_in.write_calfits(write_file, clobber=True) self.delay_object.read_calfits(write_file) self.delay_object2 = copy.deepcopy(self.delay_object) def teardown(self): """Tear down test""" del(self.delay_object) del(self.delay_object2) def test_select_antennas(self): old_history = self.delay_object.history ants_to_keep = np.array([65, 96, 9, 97, 89, 22, 20, 72]) self.delay_object2.select(antenna_nums=ants_to_keep) nt.assert_equal(len(ants_to_keep), self.delay_object2.Nants_data) for ant in ants_to_keep: nt.assert_true(ant in self.delay_object2.ant_array) for ant in self.delay_object2.ant_array: nt.assert_true(ant in ants_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific antennas using pyuvdata.', self.delay_object2.history)) # now test using antenna_names to specify antennas to keep self.delay_object3 = copy.deepcopy(self.delay_object) ants_to_keep = np.array(sorted(list(ants_to_keep))) ant_names = [] for a in ants_to_keep: ind = np.where(self.delay_object3.antenna_numbers == a)[0][0] ant_names.append(self.delay_object3.antenna_names[ind]) self.delay_object3.select(antenna_names=ant_names) nt.assert_equal(self.delay_object2, self.delay_object3) # check for errors associated with antennas not included in data, bad names or providing numbers and names nt.assert_raises(ValueError, self.delay_object.select, antenna_nums=np.max(self.delay_object.ant_array) + np.arange(1, 3)) nt.assert_raises(ValueError, self.delay_object.select, antenna_names='test1') nt.assert_raises(ValueError, self.delay_object.select, antenna_nums=ants_to_keep, antenna_names=ant_names) # check that total_quality_array is handled properly when present self.delay_object.total_quality_array = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) uvtest.checkWarnings(self.delay_object.select, [], {'antenna_names': ant_names, 'inplace': True}, message='Cannot preserve total_quality_array') nt.assert_equal(self.delay_object.total_quality_array, None) def test_select_times(self): # add another time to allow for better testing of selections new_time = np.max(self.delay_object.time_array) + self.delay_object.integration_time self.delay_object.time_array = np.append(self.delay_object.time_array, new_time) self.delay_object.Ntimes += 1 self.delay_object.flag_array = np.concatenate((self.delay_object.flag_array, self.delay_object.flag_array[:, :, :, [-1], :]), axis=3) self.delay_object.input_flag_array = np.concatenate((self.delay_object.input_flag_array, self.delay_object.input_flag_array[:, :, :, [-1], :]), axis=3) self.delay_object.delay_array = np.concatenate((self.delay_object.delay_array, self.delay_object.delay_array[:, :, :, [-1], :]), axis=3) self.delay_object.quality_array = np.concatenate((self.delay_object.quality_array, self.delay_object.quality_array[:, :, :, [-1], :]), axis=3) self.delay_object.total_quality_array = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) nt.assert_true(self.delay_object.check()) self.delay_object2 = copy.deepcopy(self.delay_object) old_history = self.delay_object.history times_to_keep = self.delay_object.time_array[[2, 0]] self.delay_object2.select(times=times_to_keep) nt.assert_equal(len(times_to_keep), self.delay_object2.Ntimes) for t in times_to_keep: nt.assert_true(t in self.delay_object2.time_array) for t in np.unique(self.delay_object2.time_array): nt.assert_true(t in times_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific times using pyuvdata.', self.delay_object2.history)) # check for errors associated with times not included in data nt.assert_raises(ValueError, self.delay_object.select, times=[np.min(self.delay_object.time_array) - self.delay_object.integration_time]) # check for warnings and errors associated with unevenly spaced times self.delay_object2 = copy.deepcopy(self.delay_object) uvtest.checkWarnings(self.delay_object2.select, [], {'times': self.delay_object2.time_array[[0, 2, 3]]}, message='Selected times are not evenly spaced') write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') nt.assert_raises(ValueError, self.delay_object2.write_calfits, write_file_calfits) def test_select_frequencies(self): old_history = self.delay_object.history freqs_to_keep = self.delay_object.freq_array[0, np.arange(73, 944)] # add dummy total_quality_array self.delay_object.total_quality_array = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) self.delay_object2.total_quality_array = np.zeros( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) self.delay_object2.select(frequencies=freqs_to_keep) nt.assert_equal(len(freqs_to_keep), self.delay_object2.Nfreqs) for f in freqs_to_keep: nt.assert_true(f in self.delay_object2.freq_array) for f in np.unique(self.delay_object2.freq_array): nt.assert_true(f in freqs_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', self.delay_object2.history)) # check for errors associated with frequencies not included in data nt.assert_raises(ValueError, self.delay_object.select, frequencies=[np.max(self.delay_object.freq_array) + self.delay_object.channel_width]) # check for warnings and errors associated with unevenly spaced frequencies self.delay_object2 = copy.deepcopy(self.delay_object) uvtest.checkWarnings(self.delay_object2.select, [], {'frequencies': self.delay_object2.freq_array[0, [0, 5, 6]]}, message='Selected frequencies are not evenly spaced') write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') nt.assert_raises(ValueError, self.delay_object2.write_calfits, write_file_calfits) def test_select_freq_chans(self): old_history = self.delay_object.history chans_to_keep = np.arange(73, 944) # add dummy total_quality_array self.delay_object.total_quality_array = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) self.delay_object2.total_quality_array = np.zeros( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) self.delay_object2.select(freq_chans=chans_to_keep) nt.assert_equal(len(chans_to_keep), self.delay_object2.Nfreqs) for chan in chans_to_keep: nt.assert_true(self.delay_object.freq_array[0, chan] in self.delay_object2.freq_array) for f in np.unique(self.delay_object2.freq_array): nt.assert_true(f in self.delay_object.freq_array[0, chans_to_keep]) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific frequencies using pyuvdata.', self.delay_object2.history)) # Test selecting both channels and frequencies freqs_to_keep = self.delay_object.freq_array[0, np.arange(930, 1000)] # Overlaps with chans all_chans_to_keep = np.arange(73, 1000) self.delay_object2 = copy.deepcopy(self.delay_object) self.delay_object2.select(frequencies=freqs_to_keep, freq_chans=chans_to_keep) nt.assert_equal(len(all_chans_to_keep), self.delay_object2.Nfreqs) for chan in all_chans_to_keep: nt.assert_true(self.delay_object.freq_array[0, chan] in self.delay_object2.freq_array) for f in np.unique(self.delay_object2.freq_array): nt.assert_true(f in self.delay_object.freq_array[0, all_chans_to_keep]) def test_select_polarizations(self): # add more jones terms to allow for better testing of selections while self.delay_object.Njones < 4: new_jones = np.min(self.delay_object.jones_array) - 1 self.delay_object.jones_array = np.append(self.delay_object.jones_array, new_jones) self.delay_object.Njones += 1 self.delay_object.flag_array = np.concatenate((self.delay_object.flag_array, self.delay_object.flag_array[:, :, :, :, [-1]]), axis=4) self.delay_object.input_flag_array = np.concatenate((self.delay_object.input_flag_array, self.delay_object.input_flag_array[:, :, :, :, [-1]]), axis=4) self.delay_object.delay_array = np.concatenate((self.delay_object.delay_array, self.delay_object.delay_array[:, :, :, :, [-1]]), axis=4) self.delay_object.quality_array = np.concatenate((self.delay_object.quality_array, self.delay_object.quality_array[:, :, :, :, [-1]]), axis=4) # add dummy total_quality_array self.delay_object.total_quality_array = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) nt.assert_true(self.delay_object.check()) self.delay_object2 = copy.deepcopy(self.delay_object) old_history = self.delay_object.history jones_to_keep = [-5, -6] self.delay_object2.select(jones=jones_to_keep) nt.assert_equal(len(jones_to_keep), self.delay_object2.Njones) for j in jones_to_keep: nt.assert_true(j in self.delay_object2.jones_array) for j in np.unique(self.delay_object2.jones_array): nt.assert_true(j in jones_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific jones polarization terms ' 'using pyuvdata.', self.delay_object2.history)) # check for errors associated with polarizations not included in data nt.assert_raises(ValueError, self.delay_object2.select, jones=[-3, -4]) # check for warnings and errors associated with unevenly spaced polarizations uvtest.checkWarnings(self.delay_object.select, [], {'jones': self.delay_object.jones_array[[0, 1, 3]]}, message='Selected jones polarization terms are not evenly spaced') write_file_calfits = os.path.join(DATA_PATH, 'test/select_test.calfits') nt.assert_raises(ValueError, self.delay_object.write_calfits, write_file_calfits) def test_select(self): # now test selecting along all axes at once old_history = self.delay_object.history ants_to_keep = np.array([10, 89, 43, 9, 80, 96, 64]) freqs_to_keep = self.delay_object.freq_array[0, np.arange(31, 56)] times_to_keep = self.delay_object.time_array[[1, 2]] jones_to_keep = [-5] self.delay_object2.select(antenna_nums=ants_to_keep, frequencies=freqs_to_keep, times=times_to_keep, jones=jones_to_keep) nt.assert_equal(len(ants_to_keep), self.delay_object2.Nants_data) for ant in ants_to_keep: nt.assert_true(ant in self.delay_object2.ant_array) for ant in self.delay_object2.ant_array: nt.assert_true(ant in ants_to_keep) nt.assert_equal(len(times_to_keep), self.delay_object2.Ntimes) for t in times_to_keep: nt.assert_true(t in self.delay_object2.time_array) for t in np.unique(self.delay_object2.time_array): nt.assert_true(t in times_to_keep) nt.assert_equal(len(freqs_to_keep), self.delay_object2.Nfreqs) for f in freqs_to_keep: nt.assert_true(f in self.delay_object2.freq_array) for f in np.unique(self.delay_object2.freq_array): nt.assert_true(f in freqs_to_keep) nt.assert_equal(len(jones_to_keep), self.delay_object2.Njones) for j in jones_to_keep: nt.assert_true(j in self.delay_object2.jones_array) for j in np.unique(self.delay_object2.jones_array): nt.assert_true(j in jones_to_keep) nt.assert_true(uvutils.check_histories(old_history + ' Downselected to ' 'specific antennas, times, ' 'frequencies, jones polarization terms ' 'using pyuvdata.', self.delay_object2.history)) class TestUVCalAddGain(object): def setUp(self): """Set up test""" self.gain_object = UVCal() gainfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.fitsA') self.gain_object.read_calfits(gainfile) self.gain_object2 = copy.deepcopy(self.gain_object) def teardown(self): """Tear down test""" del(self.gain_object) del(self.gain_object2) def test_add_antennas(self): """Test adding antennas between two UVCal objects""" gain_object_full = copy.deepcopy(self.gain_object) ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72]) ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112]) self.gain_object.select(antenna_nums=ants1) self.gain_object2.select(antenna_nums=ants2) self.gain_object += self.gain_object2 # Check history is correct, before replacing and doing a full object check nt.assert_true(uvutils.check_histories(gain_object_full.history + ' Downselected to specific ' 'antennas using pyuvdata. Combined ' 'data along antenna axis using pyuvdata.', self.gain_object.history)) self.gain_object.history = gain_object_full.history nt.assert_equal(self.gain_object, gain_object_full) # test for when total_quality_array is present self.gain_object.select(antenna_nums=ants1) self.gain_object.total_quality_array = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) uvtest.checkWarnings(self.gain_object.__iadd__, [self.gain_object2], message='Total quality array detected') nt.assert_equal(self.gain_object.total_quality_array, None) def test_add_frequencies(self): """Test adding frequencies between two UVCal objects""" gain_object_full = copy.deepcopy(self.gain_object) freqs1 = self.gain_object.freq_array[0, np.arange(0, 512)] freqs2 = self.gain_object2.freq_array[0, np.arange(512, 1024)] self.gain_object.select(frequencies=freqs1) self.gain_object2.select(frequencies=freqs2) self.gain_object += self.gain_object2 # Check history is correct, before replacing and doing a full object check nt.assert_true(uvutils.check_histories(gain_object_full.history + ' Downselected to specific ' 'frequencies using pyuvdata. Combined ' 'data along frequency axis using pyuvdata.', self.gain_object.history)) self.gain_object.history = gain_object_full.history nt.assert_equal(self.gain_object, gain_object_full) # test for when total_quality_array is present in first file but not second self.gain_object.select(frequencies=freqs1) tqa = np.ones( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.zeros( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=1) self.gain_object.total_quality_array = tqa self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) # test for when total_quality_array is present in second file but not first self.gain_object.select(frequencies=freqs1) tqa = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.ones( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=1) self.gain_object.total_quality_array = None self.gain_object2.total_quality_array = tqa2 self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) # test for when total_quality_array is present in both self.gain_object.select(frequencies=freqs1) tqa = np.ones( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.ones( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tqa *= 2 tot_tqa = np.concatenate([tqa, tqa2], axis=1) self.gain_object.total_quality_array = tqa self.gain_object2.total_quality_array = tqa2 self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) def test_add_times(self): """Test adding times between two UVCal objects""" gain_object_full = copy.deepcopy(self.gain_object) Nt2 = self.gain_object.Ntimes / 2 times1 = self.gain_object.time_array[:Nt2] times2 = self.gain_object.time_array[Nt2:] self.gain_object.select(times=times1) self.gain_object2.select(times=times2) self.gain_object += self.gain_object2 # Check history is correct, before replacing and doing a full object check nt.assert_true(uvutils.check_histories(gain_object_full.history + ' Downselected to specific ' 'times using pyuvdata. Combined ' 'data along time axis using pyuvdata.', self.gain_object.history)) self.gain_object.history = gain_object_full.history nt.assert_equal(self.gain_object, gain_object_full) # test for when total_quality_array is present in first file but not second self.gain_object.select(times=times1) tqa = np.ones( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.zeros( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=2) self.gain_object.total_quality_array = tqa self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) # test for when total_quality_array is present in second file but not first self.gain_object.select(times=times1) tqa = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.ones( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=2) self.gain_object.total_quality_array = None self.gain_object2.total_quality_array = tqa2 self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) # test for when total_quality_array is present in both self.gain_object.select(times=times1) tqa = np.ones( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.ones( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tqa *= 2 tot_tqa = np.concatenate([tqa, tqa2], axis=2) self.gain_object.total_quality_array = tqa self.gain_object2.total_quality_array = tqa2 self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) def test_add_jones(self): """Test adding Jones axes between two UVCal objects""" gain_object_original = copy.deepcopy(self.gain_object) # artificially change the Jones value to permit addition self.gain_object2.jones_array[0] = -6 self.gain_object += self.gain_object2 # check dimensionality of resulting object nt.assert_equal(self.gain_object.gain_array.shape[-1], 2) nt.assert_equal(sorted(self.gain_object.jones_array), [-6, -5]) # test for when total_quality_array is present in first file but not second self.gain_object = copy.deepcopy(gain_object_original) tqa = np.ones( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.zeros( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=3) self.gain_object.total_quality_array = tqa self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) # test for when total_quality_array is present in second file but not first self.gain_object = copy.deepcopy(gain_object_original) tqa = np.zeros( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.ones( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=3) self.gain_object2.total_quality_array = tqa2 self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) # test for when total_quality_array is present in both self.gain_object = copy.deepcopy(gain_object_original) tqa = np.ones( self.gain_object._total_quality_array.expected_shape(self.gain_object)) tqa2 = np.ones( self.gain_object2._total_quality_array.expected_shape(self.gain_object2)) tqa *= 2 tot_tqa = np.concatenate([tqa, tqa2], axis=3) self.gain_object.total_quality_array = tqa self.gain_object2.total_quality_array = tqa2 self.gain_object += self.gain_object2 nt.assert_true(np.allclose(self.gain_object.total_quality_array, tot_tqa, rtol=self.gain_object._total_quality_array.tols[0], atol=self.gain_object._total_quality_array.tols[1])) def test_add(self): """Test miscellaneous aspects of add method""" # test not-in-place addition gain_object = copy.deepcopy(self.gain_object) ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72]) ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112]) self.gain_object.select(antenna_nums=ants1) self.gain_object2.select(antenna_nums=ants2) gain_object_add = self.gain_object + self.gain_object2 # Check history is correct, before replacing and doing a full object check nt.assert_true(uvutils.check_histories(gain_object.history + ' Downselected to specific ' 'antennas using pyuvdata. Combined ' 'data along antenna axis using pyuvdata.', gain_object_add.history)) gain_object_add.history = gain_object.history nt.assert_equal(gain_object_add, gain_object) # test history concatenation self.gain_object.history = gain_object.history self.gain_object2.history = 'Some random history string OMNI_RUN' self.gain_object += self.gain_object2 nt.assert_true(uvutils.check_histories(gain_object.history + ' Combined data along antenna axis ' 'using pyuvdata. Some random ' 'history string', self.gain_object.history)) def test_add_multiple_axes(self): """Test addition along multiple axes""" ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72]) ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112]) freqs1 = self.gain_object.freq_array[0, np.arange(0, 512)] freqs2 = self.gain_object2.freq_array[0, np.arange(512, 1024)] Nt2 = self.gain_object.Ntimes // 2 times1 = self.gain_object.time_array[:Nt2] times2 = self.gain_object.time_array[Nt2:] # artificially change the Jones value to permit addition self.gain_object2.jones_array[0] = -6 # perform select self.gain_object.select(antenna_nums=ants1, frequencies=freqs1, times=times1) self.gain_object2.select(antenna_nums=ants2, frequencies=freqs2, times=times2) self.gain_object += self.gain_object2 # check resulting dimensionality nt.assert_equal(len(self.gain_object.ant_array), 19) nt.assert_equal(len(self.gain_object.freq_array[0, :]), 1024) nt.assert_equal(len(self.gain_object.time_array), Nt2 * 2) nt.assert_equal(len(self.gain_object.jones_array), 2) def test_add_errors(self): """Test behavior that will raise errors""" # test addition of two identical objects nt.assert_raises(ValueError, self.gain_object.__add__, self.gain_object2) # test addition of UVCal and non-UVCal object (empty list) nt.assert_raises(ValueError, self.gain_object.__add__, []) # test compatibility param mismatch telescope_name = self.gain_object2.telescope_name self.gain_object2.telescope_name = "PAPER" nt.assert_raises(ValueError, self.gain_object.__add__, self.gain_object2) def test_jones_warning(self): """Test having non-contiguous Jones elements""" self.gain_object2.jones_array[0] = -6 self.gain_object += self.gain_object2 self.gain_object2.jones_array[0] = -8 uvtest.checkWarnings(self.gain_object.__iadd__, [self.gain_object2], message='Combined Jones elements') nt.assert_equal(sorted(self.gain_object.jones_array), [-8, -6, -5]) def test_frequency_warnings(self): """Test having uneven or non-contiguous frequencies""" # test having unevenly spaced frequency separations go1 = copy.deepcopy(self.gain_object) go2 = copy.deepcopy(self.gain_object2) freqs1 = self.gain_object.freq_array[0, np.arange(0, 512)] freqs2 = self.gain_object2.freq_array[0, np.arange(512, 1024)] self.gain_object.select(frequencies=freqs1) self.gain_object2.select(frequencies=freqs2) # change the last frequency bin to be smaller than the others df = self.gain_object2.freq_array[0, -1] - self.gain_object2.freq_array[0, -2] self.gain_object2.freq_array[0, -1] = self.gain_object2.freq_array[0, -2] + df / 2 uvtest.checkWarnings(self.gain_object.__iadd__, [self.gain_object2], message='Combined frequencies are not evenly spaced') nt.assert_equal(len(self.gain_object.freq_array[0, :]), self.gain_object.Nfreqs) # now check having "non-contiguous" frequencies self.gain_object = copy.deepcopy(go1) self.gain_object2 = copy.deepcopy(go2) freqs1 = self.gain_object.freq_array[0, np.arange(0, 512)] freqs2 = self.gain_object2.freq_array[0, np.arange(512, 1024)] self.gain_object.select(frequencies=freqs1) self.gain_object2.select(frequencies=freqs2) # artificially space out frequencies self.gain_object.freq_array[0, :] *= 10 self.gain_object2.freq_array[0, :] *= 10 uvtest.checkWarnings(self.gain_object.__iadd__, [self.gain_object2], message='Combined frequencies are not contiguous') freqs1 *= 10 freqs2 *= 10 freqs = np.concatenate([freqs1, freqs2]) nt.assert_true(np.allclose(self.gain_object.freq_array[0, :], freqs, rtol=self.gain_object._freq_array.tols[0], atol=self.gain_object._freq_array.tols[1])) def test_parameter_warnings(self): """Test changing a parameter that will raise a warning""" # change observer and select frequencies self.gain_object2.observer = 'mystery_person' freqs1 = self.gain_object.freq_array[0, np.arange(0, 512)] freqs2 = self.gain_object2.freq_array[0, np.arange(512, 1024)] self.gain_object.select(frequencies=freqs1) self.gain_object2.select(frequencies=freqs2) uvtest.checkWarnings(self.gain_object.__iadd__, [self.gain_object2], message='UVParameter observer does not match') freqs = np.concatenate([freqs1, freqs2]) nt.assert_true(np.allclose(self.gain_object.freq_array, freqs, rtol=self.gain_object._freq_array.tols[0], atol=self.gain_object._freq_array.tols[1])) def test_multi_files(self): """Test read function when multiple files are included""" gain_object_full = copy.deepcopy(self.gain_object) Nt2 = self.gain_object.Ntimes / 2 # Break up delay object into two objects, divided in time times1 = self.gain_object.time_array[:Nt2] times2 = self.gain_object.time_array[Nt2:] self.gain_object.select(times=times1) self.gain_object2.select(times=times2) # Write those objects to files f1 = os.path.join(DATA_PATH, 'test/read_multi1.calfits') f2 = os.path.join(DATA_PATH, 'test/read_multi2.calfits') self.gain_object.write_calfits(f1, clobber=True) self.gain_object2.write_calfits(f2, clobber=True) # Read both files together self.gain_object.read_calfits([f1, f2]) nt.assert_true(uvutils.check_histories(gain_object_full.history + ' Downselected to specific times' ' using pyuvdata. Combined data ' 'along time axis using pyuvdata.', self.gain_object.history)) self.gain_object.history = gain_object_full.history nt.assert_equal(self.gain_object, gain_object_full) class TestUVCalAddDelay(object): def setUp(self): """Set up test""" self.delay_object = UVCal() delayfile = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvc.fits') # add an input flag array to the file to test for that. write_file = os.path.join(DATA_PATH, 'test/outtest_input_flags.fits') uv_in = UVCal() message = delayfile + ' appears to be an old calfits format' uvtest.checkWarnings(uv_in.read_calfits, [delayfile], message=message) uv_in.input_flag_array = np.zeros(uv_in._input_flag_array.expected_shape(uv_in), dtype=bool) uv_in.write_calfits(write_file, clobber=True) self.delay_object.read_calfits(write_file) self.delay_object2 = copy.deepcopy(self.delay_object) def teardown(self): """Tear down test""" del(self.delay_object) del(self.delay_object2) def test_add_antennas(self): """Test adding antennas between two UVCal objects""" delay_object_full = copy.deepcopy(self.delay_object) ants1 = np.array([9, 10, 20, 22, 31, 43, 53, 64, 65, 72]) ants2 = np.array([80, 81, 88, 89, 96, 97, 104, 105, 112]) self.delay_object.select(antenna_nums=ants1) self.delay_object2.select(antenna_nums=ants2) self.delay_object += self.delay_object2 # Check history is correct, before replacing and doing a full object check nt.assert_true(uvutils.check_histories(delay_object_full.history + ' Downselected to specific ' 'antennas using pyuvdata. Combined ' 'data along antenna axis using pyuvdata.', self.delay_object.history)) self.delay_object.history = delay_object_full.history nt.assert_equal(self.delay_object, delay_object_full) # test for when total_quality_array is present self.delay_object.select(antenna_nums=ants1) self.delay_object.total_quality_array = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) uvtest.checkWarnings(self.delay_object.__iadd__, [self.delay_object2], message='Total quality array detected') nt.assert_equal(self.delay_object.total_quality_array, None) # test for when input_flag_array is present in first file but not second self.delay_object.select(antenna_nums=ants1) ifa = np.zeros( self.delay_object._input_flag_array.expected_shape(self.delay_object)).astype(np.bool) ifa2 = np.ones( self.delay_object2._input_flag_array.expected_shape(self.delay_object2)).astype(np.bool) tot_ifa = np.concatenate([ifa, ifa2], axis=0) self.delay_object.input_flag_array = ifa self.delay_object2.input_flag_array = None self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.input_flag_array, tot_ifa)) # test for when input_flag_array is present in second file but not first self.delay_object.select(antenna_nums=ants1) ifa = np.ones( self.delay_object._input_flag_array.expected_shape(self.delay_object)).astype(np.bool) ifa2 = np.zeros( self.delay_object2._input_flag_array.expected_shape(self.delay_object2)).astype(np.bool) tot_ifa = np.concatenate([ifa, ifa2], axis=0) self.delay_object.input_flag_array = None self.delay_object2.input_flag_array = ifa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.input_flag_array, tot_ifa)) def test_add_times(self): """Test adding times between two UVCal objects""" delay_object_full = copy.deepcopy(self.delay_object) Nt2 = self.delay_object.Ntimes / 2 times1 = self.delay_object.time_array[:Nt2] times2 = self.delay_object.time_array[Nt2:] self.delay_object.select(times=times1) self.delay_object2.select(times=times2) self.delay_object += self.delay_object2 # Check history is correct, before replacing and doing a full object check nt.assert_true(uvutils.check_histories(delay_object_full.history + ' Downselected to specific ' 'times using pyuvdata. Combined ' 'data along time axis using pyuvdata.', self.delay_object.history)) self.delay_object.history = delay_object_full.history nt.assert_equal(self.delay_object, delay_object_full) # test for when total_quality_array is present in first file but not second self.delay_object.select(times=times1) tqa = np.ones( self.delay_object._total_quality_array.expected_shape(self.delay_object)) tqa2 = np.zeros( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=2) self.delay_object.total_quality_array = tqa self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.total_quality_array, tot_tqa, rtol=self.delay_object._total_quality_array.tols[0], atol=self.delay_object._total_quality_array.tols[1])) # test for when total_quality_array is present in second file but not first self.delay_object.select(times=times1) tqa = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) tqa2 = np.ones( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=2) self.delay_object.total_quality_array = None self.delay_object2.total_quality_array = tqa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.total_quality_array, tot_tqa, rtol=self.delay_object._total_quality_array.tols[0], atol=self.delay_object._total_quality_array.tols[1])) # test for when total_quality_array is present in both self.delay_object.select(times=times1) tqa = np.ones( self.delay_object._total_quality_array.expected_shape(self.delay_object)) tqa2 = np.ones( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) tqa *= 2 tot_tqa = np.concatenate([tqa, tqa2], axis=2) self.delay_object.total_quality_array = tqa self.delay_object2.total_quality_array = tqa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.total_quality_array, tot_tqa, rtol=self.delay_object._total_quality_array.tols[0], atol=self.delay_object._total_quality_array.tols[1])) # test for when input_flag_array is present in first file but not second self.delay_object.select(times=times1) ifa = np.zeros( self.delay_object._input_flag_array.expected_shape(self.delay_object)).astype(np.bool) ifa2 = np.ones( self.delay_object2._input_flag_array.expected_shape(self.delay_object2)).astype(np.bool) tot_ifa = np.concatenate([ifa, ifa2], axis=3) self.delay_object.input_flag_array = ifa self.delay_object2.input_flag_array = None self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.input_flag_array, tot_ifa)) # test for when input_flag_array is present in second file but not first self.delay_object.select(times=times1) ifa = np.ones( self.delay_object._input_flag_array.expected_shape(self.delay_object)).astype(np.bool) ifa2 = np.zeros( self.delay_object2._input_flag_array.expected_shape(self.delay_object2)).astype(np.bool) tot_ifa = np.concatenate([ifa, ifa2], axis=3) self.delay_object.input_flag_array = None self.delay_object2.input_flag_array = ifa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.input_flag_array, tot_ifa)) def test_add_jones(self): """Test adding Jones axes between two UVCal objects""" delay_object_original = copy.deepcopy(self.delay_object) # artificially change the Jones value to permit addition self.delay_object2.jones_array[0] = -6 self.delay_object += self.delay_object2 # check dimensionality of resulting object nt.assert_equal(self.delay_object.delay_array.shape[-1], 2) nt.assert_equal(sorted(self.delay_object.jones_array), [-6, -5]) # test for when total_quality_array is present in first file but not second self.delay_object = copy.deepcopy(delay_object_original) tqa = np.ones( self.delay_object._total_quality_array.expected_shape(self.delay_object)) tqa2 = np.zeros( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=3) self.delay_object.total_quality_array = tqa self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.total_quality_array, tot_tqa, rtol=self.delay_object._total_quality_array.tols[0], atol=self.delay_object._total_quality_array.tols[1])) # test for when total_quality_array is present in second file but not first self.delay_object = copy.deepcopy(delay_object_original) tqa = np.zeros( self.delay_object._total_quality_array.expected_shape(self.delay_object)) tqa2 = np.ones( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) tot_tqa = np.concatenate([tqa, tqa2], axis=3) self.delay_object2.total_quality_array = tqa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.total_quality_array, tot_tqa, rtol=self.delay_object._total_quality_array.tols[0], atol=self.delay_object._total_quality_array.tols[1])) # test for when total_quality_array is present in both self.delay_object = copy.deepcopy(delay_object_original) tqa = np.ones( self.delay_object._total_quality_array.expected_shape(self.delay_object)) tqa2 = np.ones( self.delay_object2._total_quality_array.expected_shape(self.delay_object2)) tqa *= 2 tot_tqa = np.concatenate([tqa, tqa2], axis=3) self.delay_object.total_quality_array = tqa self.delay_object2.total_quality_array = tqa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.total_quality_array, tot_tqa, rtol=self.delay_object._total_quality_array.tols[0], atol=self.delay_object._total_quality_array.tols[1])) # test for when input_flag_array is present in first file but not second self.delay_object = copy.deepcopy(delay_object_original) ifa = np.zeros( self.delay_object._input_flag_array.expected_shape(self.delay_object)).astype(np.bool) ifa2 = np.ones( self.delay_object2._input_flag_array.expected_shape(self.delay_object2)).astype(np.bool) tot_ifa = np.concatenate([ifa, ifa2], axis=4) self.delay_object.input_flag_array = ifa self.delay_object2.input_flag_array = None self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.input_flag_array, tot_ifa)) # test for when input_flag_array is present in second file but not first self.delay_object = copy.deepcopy(delay_object_original) ifa = np.ones( self.delay_object._input_flag_array.expected_shape(self.delay_object)).astype(np.bool) ifa2 = np.zeros( self.delay_object2._input_flag_array.expected_shape(self.delay_object2)).astype(np.bool) tot_ifa = np.concatenate([ifa, ifa2], axis=4) self.delay_object.input_flag_array = None self.delay_object2.input_flag_array = ifa2 self.delay_object += self.delay_object2 nt.assert_true(np.allclose(self.delay_object.input_flag_array, tot_ifa)) def test_add_errors(self): """Test behavior that will raise errors""" # test addition of two identical objects nt.assert_raises(ValueError, self.delay_object.__add__, self.delay_object2) def test_multi_files(self): """Test read function when multiple files are included""" delay_object_full = copy.deepcopy(self.delay_object) Nt2 = self.delay_object.Ntimes / 2 # Break up delay object into two objects, divided in time times1 = self.delay_object.time_array[:Nt2] times2 = self.delay_object.time_array[Nt2:] self.delay_object.select(times=times1) self.delay_object2.select(times=times2) # Write those objects to files f1 = os.path.join(DATA_PATH, 'test/read_multi1.calfits') f2 = os.path.join(DATA_PATH, 'test/read_multi2.calfits') self.delay_object.write_calfits(f1, clobber=True) self.delay_object2.write_calfits(f2, clobber=True) # Read both files together self.delay_object.read_calfits([f1, f2]) nt.assert_true(uvutils.check_histories(delay_object_full.history + ' Downselected to specific times' ' using pyuvdata. Combined data ' 'along time axis using pyuvdata.', self.delay_object.history)) self.delay_object.history = delay_object_full.history nt.assert_equal(self.delay_object, delay_object_full)