# -*- mode: python; coding: utf-8 -*- # Copyright (c) 2018 Radio Astronomy Software Group # Licensed under the 2-clause BSD License """Tests for HDF5 object """ import os import numpy as np import pytest from astropy.time import Time import h5py from pyuvdata import UVData from .. import uvh5 import pyuvdata.utils as uvutils from pyuvdata.data import DATA_PATH import pyuvdata.tests as uvtest # ignore common file-read warnings pytestmark = [ pytest.mark.filterwarnings("ignore:Altitude is not present in Miriad"), pytest.mark.filterwarnings("ignore:Telescope EVLA is not"), ] @pytest.fixture(scope="function") def uv_miriad(): # read in a miriad test file uv_miriad = UVData() miriad_filename = os.path.join(DATA_PATH, "zen.2456865.60537.xy.uvcRREAA") uv_miriad.read_miriad(miriad_filename) yield uv_miriad # clean up when done del uv_miriad return @pytest.fixture(scope="function") def uv_uvfits(): # read in a uvfits test file uv_uvfits = UVData() uvfits_filename = os.path.join(DATA_PATH, "day2_TDEM0003_10s_norx_1src_1spw.uvfits") uv_uvfits.read_uvfits(uvfits_filename) yield uv_uvfits # clean up when done del uv_uvfits return @pytest.fixture(scope="function") def uv_uvh5(): # read in a uvh5 test file uv_uvh5 = UVData() uvh5_filename = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") uv_uvh5.read_uvh5(uvh5_filename) yield uv_uvh5 # clean up when done del uv_uvh5 return @pytest.fixture(scope="function") def uv_partial_write(tmp_path): # convert a uvfits file to uvh5, cutting down the amount of data uv_uvfits = UVData() uvfits_filename = os.path.join(DATA_PATH, "day2_TDEM0003_10s_norx_1src_1spw.uvfits") uv_uvfits.read_uvfits(uvfits_filename) uv_uvfits.select(antenna_nums=[3, 7, 24]) uv_uvfits.lst_array = uvutils.get_lst_for_time( uv_uvfits.time_array, *uv_uvfits.telescope_location_lat_lon_alt_degrees ) testfile = str(tmp_path / "outtest.uvh5") uv_uvfits.write_uvh5(testfile) uv_uvh5 = UVData() uv_uvh5.read(testfile) yield uv_uvh5 # clean up when done del uv_uvh5 os.remove(testfile) return def initialize_with_zeros(uvd, filename): """ Make a uvh5 file with all zero values for data-sized arrays. This function is a helper function used for tests of partial writing. """ uvd.initialize_uvh5_file(filename, clobber=True) data_shape = (uvd.Nblts, 1, uvd.Nfreqs, uvd.Npols) data = np.zeros(data_shape, dtype=np.complex64) flags = np.zeros(data_shape, dtype=np.bool) nsamples = np.zeros(data_shape, dtype=np.float32) with h5py.File(filename, "r+") as h5f: dgrp = h5f["/Data"] data_dset = dgrp["visdata"] flags_dset = dgrp["flags"] nsample_dset = dgrp["nsamples"] data_dset = data # noqa flags_dset = flags # noqa nsample_dset = nsamples # noqa return def initialize_with_zeros_ints(uvd, filename): """ Make a uvh5 file with all zeros for data-sized arrays. This function is a helper function used for tests of partial writing with integer data types. """ uvd.initialize_uvh5_file( filename, clobber=True, data_write_dtype=uvh5._hera_corr_dtype ) data_shape = (uvd.Nblts, 1, uvd.Nfreqs, uvd.Npols) data = np.zeros(data_shape, dtype=np.complex64) flags = np.zeros(data_shape, dtype=np.bool) nsamples = np.zeros(data_shape, dtype=np.float32) with h5py.File(filename, "r+") as h5f: dgrp = h5f["/Data"] data_dset = dgrp["visdata"] flags_dset = dgrp["flags"] nsample_dset = dgrp["nsamples"] with data_dset.astype(uvh5._hera_corr_dtype): data_dset[:, :, :, :, "r"] = data.real data_dset[:, :, :, :, "i"] = data.imag flags_dset = flags # noqa nsample_dset = nsamples # noqa return def test_read_miriad_write_uvh5_read_uvh5(uv_miriad, tmp_path): """ Test a miriad file round trip. """ uv_in = uv_miriad uv_out = UVData() testfile = str(tmp_path / "outtest_miriad.uvh5") # create the file so the clobber gets tested with h5py.File(testfile, "w") as h5file: h5file.create_dataset("Test", list(range(10))) uv_in.write_uvh5(testfile, clobber=True) uv_out.read(testfile) assert uv_in == uv_out # also test round-tripping phased data uv_in.phase_to_time(Time(np.mean(uv_in.time_array), format="jd")) uv_in.write_uvh5(testfile, clobber=True) uv_out.read_uvh5(testfile) assert uv_in == uv_out # clean up os.remove(testfile) return def test_read_uvfits_write_uvh5_read_uvh5(uv_uvfits, tmp_path): """ Test a uvfits file round trip. """ uv_in = uv_uvfits uv_out = UVData() testfile = str(tmp_path / "outtest_uvfits.uvh5") uv_in.write_uvh5(testfile, clobber=True) uv_out.read(testfile) assert uv_in == uv_out # also test writing double-precision data_array uv_in.data_array = uv_in.data_array.astype(np.complex128) uv_in.write_uvh5(testfile, clobber=True) uv_out.read(testfile) assert uv_in == uv_out # clean up os.remove(testfile) return def test_read_uvh5_errors(): """ Test raising errors in read function. """ uv_in = UVData() fake_file = os.path.join(DATA_PATH, "fake_file.uvh5") with pytest.raises(IOError) as cm: uv_in.read_uvh5(fake_file) assert str(cm.value).startswith("{} not found".format(fake_file)) return def test_write_uvh5_errors(uv_uvfits, tmp_path): """ Test raising errors in write_uvh5 function. """ uv_in = uv_uvfits uv_out = UVData() testfile = str(tmp_path / "outtest_uvfits.uvh5") with open(testfile, "a"): os.utime(testfile, None) # assert IOError if file exists with pytest.raises(IOError) as cm: uv_in.write_uvh5(testfile, clobber=False) assert str(cm.value).startswith("File exists; skipping") # use clobber=True to write out anyway uv_in.write_uvh5(testfile, clobber=True) uv_out.read(testfile) assert uv_in == uv_out # clean up os.remove(testfile) return def test_uvh5_optional_parameters(uv_uvfits, tmp_path): """ Test reading and writing optional parameters not in sample files. """ uv_in = uv_uvfits uv_out = UVData() testfile = str(tmp_path / "outtest_uvfits.uvh5") # set optional parameters uv_in.x_orientation = "east" uv_in.antenna_diameters = np.ones_like(uv_in.antenna_numbers) * 1.0 uv_in.uvplane_reference_time = 0 # reorder_blts uv_in.reorder_blts() # write out and read back in uv_in.write_uvh5(testfile, clobber=True) uv_out.read(testfile) assert uv_in == uv_out # test with blt_order = bda as well (single entry in tuple) uv_in.reorder_blts(order="bda") uv_in.write_uvh5(testfile, clobber=True) uv_out.read(testfile) assert uv_in == uv_out # clean up os.remove(testfile) return def test_uvh5_compression_options(uv_uvfits, tmp_path): """ Test writing data with compression filters. """ import h5py uv_in = uv_uvfits uv_out = UVData() testfile = str(tmp_path / "outtest_uvfits_compression.uvh5") # write out and read back in uv_in.write_uvh5( testfile, clobber=True, chunks=True, data_compression="lzf", flags_compression=None, nsample_compression=None, ) uv_out.read(testfile) assert uv_in == uv_out # test with non-auto chunking chunks = (680, 1, 16, 1) uv_in.write_uvh5( testfile, clobber=True, chunks=chunks, data_compression="lzf", flags_compression=None, nsample_compression=None, ) uv_out.read(testfile) assert uv_in == uv_out # check that chunks match with h5py.File(testfile, "r") as f: assert f["Data"]["visdata"].chunks == chunks # clean up os.remove(testfile) return def test_uvh5_read_multiple_files(uv_uvfits, tmp_path): """ Test reading multiple uvh5 files. """ uv_in = uv_uvfits testfile1 = str(tmp_path / "uv1.uvh5") testfile2 = str(tmp_path / "uv2.uvh5") uv1 = uv_in.copy() uv2 = uv_in.copy() uv1.select(freq_chans=np.arange(0, 32)) uv2.select(freq_chans=np.arange(32, 64)) uv1.write_uvh5(testfile1, clobber=True) uv2.write_uvh5(testfile2, clobber=True) uvtest.checkWarnings( uv1.read, func_args=[np.array([testfile1, testfile2])], func_kwargs={"file_type": "uvh5"}, message=2 * ["Telescope EVLA is not"], category=2 * [UserWarning], nwarnings=2, ) # Check history is correct, before replacing and doing a full object check assert uvutils._check_histories( uv_in.history + " Downselected to " "specific frequencies using pyuvdata. " "Combined data along frequency axis using" " pyuvdata.", uv1.history, ) uv1.history = uv_in.history assert uv1 == uv_in # clean up os.remove(testfile1) os.remove(testfile2) return def test_uvh5_read_multiple_files_metadata_only(uv_uvfits, tmp_path): """ Test reading multiple uvh5 files with metadata only. """ uv_in = uv_uvfits testfile1 = str(tmp_path / "uv1.uvh5") testfile2 = str(tmp_path / "uv2.uvh5") uv1 = uv_in.copy() uv2 = uv_in.copy() uv1.select(freq_chans=np.arange(0, 32)) uv2.select(freq_chans=np.arange(32, 64)) uv1.write_uvh5(testfile1, clobber=True) uv2.write_uvh5(testfile2, clobber=True) uvfits_filename = os.path.join(DATA_PATH, "day2_TDEM0003_10s_norx_1src_1spw.uvfits") uv_full = UVData() uv_full.read_uvfits(uvfits_filename, read_data=False) uv1.read([testfile1, testfile2], read_data=False) # Check history is correct, before replacing and doing a full object check assert uvutils._check_histories( uv_full.history + " Downselected to " "specific frequencies using pyuvdata. " "Combined data along frequency axis using" " pyuvdata.", uv1.history, ) uv1.history = uv_full.history assert uv1 == uv_full # clean up os.remove(testfile1) os.remove(testfile2) return def test_uvh5_rea_multiple_files_axis(uv_uvfits, tmp_path): """ Test reading multiple uvh5 files with setting axis. """ uv_in = uv_uvfits testfile1 = str(tmp_path / "uv1.uvh5") testfile2 = str(tmp_path / "uv2.uvh5") uv1 = uv_in.copy() uv2 = uv_in.copy() uv1.select(freq_chans=np.arange(0, 32)) uv2.select(freq_chans=np.arange(32, 64)) uv1.write_uvh5(testfile1, clobber=True) uv2.write_uvh5(testfile2, clobber=True) uv1.read([testfile1, testfile2], axis="freq") # Check history is correct, before replacing and doing a full object check assert uvutils._check_histories( uv_in.history + " Downselected to " "specific frequencies using pyuvdata. " "Combined data along frequency axis using" " pyuvdata.", uv1.history, ) uv1.history = uv_in.history assert uv1 == uv_in # clean up os.remove(testfile1) os.remove(testfile2) return def test_uvh5_partial_read_antennas(uv_uvfits, tmp_path): """ Test reading in only certain antennas from disk. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # select on antennas ants_to_keep = np.array([0, 19, 11, 24, 3, 23, 1, 20, 21]) uvh5_uv.read(testfile, antenna_nums=ants_to_keep) uvh5_uv2.read(testfile) uvh5_uv2.select(antenna_nums=ants_to_keep) assert uvh5_uv == uvh5_uv2 # clean up os.remove(testfile) return def test_uvh5_partial_read_freqs(uv_uvfits, tmp_path): """ Test reading in only certain frequencies from disk. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # select on frequency channels chans_to_keep = np.arange(12, 22) uvh5_uv.read(testfile, freq_chans=chans_to_keep) uvh5_uv2.read(testfile) uvh5_uv2.select(freq_chans=chans_to_keep) assert uvh5_uv == uvh5_uv2 # clean up os.remove(testfile) return @pytest.mark.filterwarnings("ignore:Selected polarization values are not evenly spaced") def test_uvh5_partial_read_pols(uv_uvfits, tmp_path): """ Test reading in only certain polarizations from disk. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # select on pols pols_to_keep = [-1, -2] uvh5_uv.read(testfile, polarizations=pols_to_keep) uvh5_uv2.read(testfile) uvh5_uv2.select(polarizations=pols_to_keep) assert uvh5_uv == uvh5_uv2 # select on pols (non contiguous in file) # and check consistent results with and without multidim_index pols_to_keep = [-1, -2, -4] uvh5_uv.read(testfile, polarizations=pols_to_keep, multidim_index=True) uvh5_uv2.read(testfile, multidim_index=False) uvh5_uv2.select(polarizations=pols_to_keep) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_times(uv_uvfits, tmp_path): """ Test reading in only certain times from disk. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # select on read using time_range unique_times = np.unique(uvh5_uv.time_array) uvh5_uv.read(testfile, time_range=[unique_times[0], unique_times[1]]) uvh5_uv2.read(testfile) uvh5_uv2.select(time_range=[unique_times[0], unique_times[1]]) assert uvh5_uv == uvh5_uv2 # clean up os.remove(testfile) return @pytest.mark.filterwarnings("ignore:Selected polarization values are not evenly spaced") def test_uvh5_partial_read_multi1(uv_uvfits, tmp_path): """ Test select-on-read for multiple axes, frequencies being smallest fraction. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # now test selecting on multiple axes # read frequencies first ants_to_keep = np.array([0, 19, 11, 24, 3, 23, 1, 20, 21]) chans_to_keep = np.arange(12, 22) pols_to_keep = [-1, -2] uvh5_uv.read( testfile, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) uvh5_uv2.read(testfile) uvh5_uv2.select( antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep ) assert uvh5_uv == uvh5_uv2 # now check with multidim_index # use different versions of blt_inds and pol_inds # ones that are and are not sliceable by UVH5 standards chans_to_keep = np.arange(20, 35) uvh5_uv3 = UVData() uvh5_uv4 = UVData() random_blts = np.random.choice(np.arange(uv_in.Nblts), size=1000, replace=False) for blts_to_keep in [random_blts, np.arange(1000)]: for pols_to_keep in [[-1, -2], [-1, -2, -4]]: uvh5_uv3.read( testfile, blt_inds=blts_to_keep, polarizations=pols_to_keep, freq_chans=chans_to_keep, multidim_index=True, ) uvh5_uv4.read(testfile) uvh5_uv4.select( blt_inds=blts_to_keep, polarizations=pols_to_keep, freq_chans=chans_to_keep, ) assert uvh5_uv3 == uvh5_uv4 # clean up os.remove(testfile) return @pytest.mark.filterwarnings("ignore:Selected polarization values are not evenly spaced") @pytest.mark.filterwarnings("ignore:Selected frequencies are not evenly spaced") def test_uvh5_partial_read_multi2(uv_uvfits, tmp_path): """ Test select-on-read for multiple axes, baselines being smallest fraction. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # now test selecting on multiple axes # read baselines first ants_to_keep = np.array([0, 1]) chans_to_keep = np.arange(12, 22) pols_to_keep = [-1, -2] uvh5_uv.read( testfile, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) uvh5_uv2.read(testfile) uvh5_uv2.select( antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep ) assert uvh5_uv == uvh5_uv2 # now check with multidim_index # use different versions of freq_inds and pol_inds # ones that are and are not sliceable by UVH5 standards blts_to_keep = np.arange(100) uvh5_uv3 = UVData() uvh5_uv4 = UVData() random_freqs = np.random.choice(np.arange(uv_in.Nfreqs), size=50, replace=False) for chans_to_keep in [random_freqs, np.arange(50)]: for pols_to_keep in [[-1, -2], [-1, -2, -4]]: uvh5_uv3.read( testfile, blt_inds=blts_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, multidim_index=True, ) uvh5_uv4.read(testfile) uvh5_uv4.select( blt_inds=blts_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) assert uvh5_uv3 == uvh5_uv4 # clean up os.remove(testfile) return @pytest.mark.filterwarnings("ignore:Selected frequencies are not evenly spaced") def test_uvh5_partial_read_multi3(uv_uvfits, tmp_path): """ Test select-on-read for multiple axes, polarizations being smallest fraction. """ uv_in = uv_uvfits uvh5_uv = UVData() uvh5_uv2 = UVData() testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv.read(testfile) # now test selecting on multiple axes # read polarizations first ants_to_keep = np.array([0, 1, 2, 3, 6, 7, 8, 11, 14, 18, 19, 20, 21, 22]) chans_to_keep = np.arange(12, 64) pols_to_keep = [-1, -2] uvh5_uv.read( testfile, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) uvh5_uv2.read(testfile) uvh5_uv2.select( antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep ) assert uvh5_uv == uvh5_uv2 # now check with multidim_index # use different versions of blt_inds and freq_inds # ones that are and are not sliceable by UVH5 standards pols_to_keep = [-1, -2] uvh5_uv3 = UVData() uvh5_uv4 = UVData() random_blts = np.random.choice(np.arange(uv_in.Nblts), size=1000, replace=False) random_freqs = np.random.choice(np.arange(uv_in.Nfreqs), size=50, replace=False) for blts_to_keep in [random_blts, np.arange(1000)]: for chans_to_keep in [random_freqs, np.arange(50)]: uvh5_uv3.read( testfile, blt_inds=blts_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, multidim_index=True, ) uvh5_uv4.read(testfile) uvh5_uv4.select( blt_inds=blts_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) assert uvh5_uv3 == uvh5_uv4 # clean up os.remove(testfile) return @pytest.mark.filterwarnings("ignore:Selected frequencies are not evenly spaced") def test_uvh5_read_multdim_index(tmp_path, uv_uvfits): """ Test some odd cases for UVH5 multdim indexing """ uv_in = uv_uvfits testfile = str(tmp_path / "outtest.uvh5") # change telescope name to avoid errors uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) uvh5_uv = UVData() uvh5_uv.read(testfile) # check that non sliceable multidim index is caught # and does not fail ants_to_keep = np.array([0, 19, 11, 24, 3, 23, 1, 20, 21]) chans_to_keep = [15, 17, 20] uvh5_uv = UVData() uvh5_uv.read( testfile, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, ) # clean up os.remove(testfile) return def test_uvh5_partial_write_antpairs(uv_partial_write, tmp_path): """ Test writing an entire UVH5 file in pieces by antpairs. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) # write to file by iterating over antpairpol antpairpols = full_uvh5.get_antpairpols() for key in antpairpols: data = full_uvh5.get_data(key, squeeze="none") flags = full_uvh5.get_flags(key, squeeze="none") nsamples = full_uvh5.get_nsamples(key, squeeze="none") partial_uvh5.write_uvh5_part(partial_testfile, data, flags, nsamples, bls=key) # now read in the full file and make sure that it matches the original partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # test add_to_history key = antpairpols[0] data = full_uvh5.get_data(key, squeeze="none") flags = full_uvh5.get_flags(key, squeeze="none") nsamples = full_uvh5.get_nsamples(key, squeeze="none") partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, bls=key, add_to_history="foo" ) partial_uvh5.read(partial_testfile, read_data=False) assert "foo" in partial_uvh5.history # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_frequencies(uv_partial_write, tmp_path): """ Test writing an entire UVH5 file in pieces by frequencies. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # start over, and write frequencies partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) Nfreqs = full_uvh5.Nfreqs Hfreqs = Nfreqs // 2 freqs1 = np.arange(Hfreqs) freqs2 = np.arange(Hfreqs, Nfreqs) data = full_uvh5.data_array[:, :, freqs1, :] flags = full_uvh5.flag_array[:, :, freqs1, :] nsamples = full_uvh5.nsample_array[:, :, freqs1, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, freq_chans=freqs1 ) data = full_uvh5.data_array[:, :, freqs2, :] flags = full_uvh5.flag_array[:, :, freqs2, :] nsamples = full_uvh5.nsample_array[:, :, freqs2, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, freq_chans=freqs2 ) # read in the full file and make sure it matches partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_blts(uv_partial_write, tmp_path): """ Test writing an entire UVH5 file in pieces by blt. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # start over, write chunks of blts partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) Nblts = full_uvh5.Nblts Hblts = Nblts // 2 blts1 = np.arange(Hblts) blts2 = np.arange(Hblts, Nblts) data = full_uvh5.data_array[blts1, :, :, :] flags = full_uvh5.flag_array[blts1, :, :, :] nsamples = full_uvh5.nsample_array[blts1, :, :, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, blt_inds=blts1 ) data = full_uvh5.data_array[blts2, :, :, :] flags = full_uvh5.flag_array[blts2, :, :, :] nsamples = full_uvh5.nsample_array[blts2, :, :, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, blt_inds=blts2 ) # read in the full file and make sure it matches partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_pols(uv_partial_write, tmp_path): """ Test writing an entire UVH5 file in pieces by pol. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # start over, write groups of pols partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) Npols = full_uvh5.Npols Hpols = Npols // 2 pols1 = np.arange(Hpols) pols2 = np.arange(Hpols, Npols) data = full_uvh5.data_array[:, :, :, pols1] flags = full_uvh5.flag_array[:, :, :, pols1] nsamples = full_uvh5.nsample_array[:, :, :, pols1] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, polarizations=full_uvh5.polarization_array[:Hpols], ) data = full_uvh5.data_array[:, :, :, pols2] flags = full_uvh5.flag_array[:, :, :, pols2] nsamples = full_uvh5.nsample_array[:, :, :, pols2] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, polarizations=full_uvh5.polarization_array[Hpols:], ) # read in the full file and make sure it matches partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_irregular_blt(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for single blt. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # write a single blt to file blt_inds = np.arange(1) data = full_uvh5.data_array[blt_inds, :, :, :] flags = full_uvh5.flag_array[blt_inds, :, :, :] nsamples = full_uvh5.nsample_array[blt_inds, :, :, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, blt_inds=blt_inds ) # also write the arrays to the partial object partial_uvh5.data_array[blt_inds, :, :, :] = data partial_uvh5.flag_array[blt_inds, :, :, :] = flags partial_uvh5.nsample_array[blt_inds, :, :, :] = nsamples # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_irregular_freq(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for single frequency. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # write a single freq to file freq_inds = np.arange(1) data = full_uvh5.data_array[:, :, freq_inds, :] flags = full_uvh5.flag_array[:, :, freq_inds, :] nsamples = full_uvh5.nsample_array[:, :, freq_inds, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, freq_chans=freq_inds ) # also write the arrays to the partial object partial_uvh5.data_array[:, :, freq_inds, :] = data partial_uvh5.flag_array[:, :, freq_inds, :] = flags partial_uvh5.nsample_array[:, :, freq_inds, :] = nsamples # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_irregular_pol(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for single polarization. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # write a single pol to file pol_inds = np.arange(1) data = full_uvh5.data_array[:, :, :, pol_inds] flags = full_uvh5.flag_array[:, :, :, pol_inds] nsamples = full_uvh5.nsample_array[:, :, :, pol_inds] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, polarizations=partial_uvh5.polarization_array[pol_inds], ) # also write the arrays to the partial object partial_uvh5.data_array[:, :, :, pol_inds] = data partial_uvh5.flag_array[:, :, :, pol_inds] = flags partial_uvh5.nsample_array[:, :, :, pol_inds] = nsamples # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_irregular_multi1(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for blts and freqs. """ full_uvh5 = uv_partial_write full_uvh5.telescope_name = "PAPER" partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # define blts and freqs blt_inds = [0, 1, 2, 7] freq_inds = [0, 2, 3, 4] data_shape = (len(blt_inds), 1, len(freq_inds), full_uvh5.Npols) data = np.zeros(data_shape, dtype=np.complex64) flags = np.zeros(data_shape, dtype=np.bool) nsamples = np.zeros(data_shape, dtype=np.float32) for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): data[iblt, :, ifreq, :] = full_uvh5.data_array[blt_idx, :, freq_idx, :] flags[iblt, :, ifreq, :] = full_uvh5.flag_array[blt_idx, :, freq_idx, :] nsamples[iblt, :, ifreq, :] = full_uvh5.nsample_array[ blt_idx, :, freq_idx, : ] uvtest.checkWarnings( partial_uvh5.write_uvh5_part, [partial_testfile, data, flags, nsamples], {"blt_inds": blt_inds, "freq_chans": freq_inds}, message="Selected frequencies are not evenly spaced", ) # also write the arrays to the partial object for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): partial_uvh5.data_array[blt_idx, :, freq_idx, :] = data[iblt, :, ifreq, :] partial_uvh5.flag_array[blt_idx, :, freq_idx, :] = flags[iblt, :, ifreq, :] partial_uvh5.nsample_array[blt_idx, :, freq_idx, :] = nsamples[ iblt, :, ifreq, : ] # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_irregular_multi2(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for freqs and pols. """ full_uvh5 = uv_partial_write full_uvh5.telescope_name = "PAPER" partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # define freqs and pols freq_inds = [0, 1, 2, 7] pol_inds = [0, 1, 3] data_shape = (full_uvh5.Nblts, 1, len(freq_inds), len(pol_inds)) data = np.zeros(data_shape, dtype=np.complex64) flags = np.zeros(data_shape, dtype=np.bool) nsamples = np.zeros(data_shape, dtype=np.float32) for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): data[:, :, ifreq, ipol] = full_uvh5.data_array[:, :, freq_idx, pol_idx] flags[:, :, ifreq, ipol] = full_uvh5.flag_array[:, :, freq_idx, pol_idx] nsamples[:, :, ifreq, ipol] = full_uvh5.nsample_array[ :, :, freq_idx, pol_idx ] uvtest.checkWarnings( partial_uvh5.write_uvh5_part, [partial_testfile, data, flags, nsamples], { "freq_chans": freq_inds, "polarizations": full_uvh5.polarization_array[pol_inds], }, nwarnings=2, message=[ "Selected frequencies are not evenly spaced", "Selected polarization values are not evenly spaced", ], ) # also write the arrays to the partial object for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): partial_uvh5.data_array[:, :, freq_idx, pol_idx] = data[:, :, ifreq, ipol] partial_uvh5.flag_array[:, :, freq_idx, pol_idx] = flags[:, :, ifreq, ipol] partial_uvh5.nsample_array[:, :, freq_idx, pol_idx] = nsamples[ :, :, ifreq, ipol ] # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_irregular_multi3(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for blts and pols. """ full_uvh5 = uv_partial_write full_uvh5.telescope_name = "PAPER" partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # define blts and freqs blt_inds = [0, 1, 2, 7] pol_inds = [0, 1, 3] data_shape = (len(blt_inds), 1, full_uvh5.Nfreqs, len(pol_inds)) data = np.zeros(data_shape, dtype=np.complex64) flags = np.zeros(data_shape, dtype=np.bool) nsamples = np.zeros(data_shape, dtype=np.float32) for iblt, blt_idx in enumerate(blt_inds): for ipol, pol_idx in enumerate(pol_inds): data[iblt, :, :, ipol] = full_uvh5.data_array[blt_idx, :, :, pol_idx] flags[iblt, :, :, ipol] = full_uvh5.flag_array[blt_idx, :, :, pol_idx] nsamples[iblt, :, :, ipol] = full_uvh5.nsample_array[blt_idx, :, :, pol_idx] uvtest.checkWarnings( partial_uvh5.write_uvh5_part, [partial_testfile, data, flags, nsamples], {"blt_inds": blt_inds, "polarizations": full_uvh5.polarization_array[pol_inds]}, message="Selected polarization values are not evenly spaced", ) # also write the arrays to the partial object for iblt, blt_idx in enumerate(blt_inds): for ipol, pol_idx in enumerate(pol_inds): partial_uvh5.data_array[blt_idx, :, :, pol_idx] = data[iblt, :, :, ipol] partial_uvh5.flag_array[blt_idx, :, :, pol_idx] = flags[iblt, :, :, ipol] partial_uvh5.nsample_array[blt_idx, :, :, pol_idx] = nsamples[ iblt, :, :, ipol ] # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 return def test_uvh5_partial_write_irregular_multi4(uv_partial_write, tmp_path): """ Test writing a uvh5 file using irregular intervals for all axes. """ full_uvh5 = uv_partial_write full_uvh5.telescope_name = "PAPER" partial_uvh5 = UVData() # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") initialize_with_zeros(partial_uvh5, partial_testfile) # make a mostly empty object in memory to match what we'll write to disk partial_uvh5.data_array = np.zeros_like(full_uvh5.data_array, dtype=np.complex64) partial_uvh5.flag_array = np.zeros_like(full_uvh5.flag_array, dtype=np.bool) partial_uvh5.nsample_array = np.zeros_like( full_uvh5.nsample_array, dtype=np.float32 ) # define blts and freqs blt_inds = [0, 1, 2, 7] freq_inds = [0, 2, 3, 4] pol_inds = [0, 1, 3] data_shape = (len(blt_inds), 1, len(freq_inds), len(pol_inds)) data = np.zeros(data_shape, dtype=np.complex64) flags = np.zeros(data_shape, dtype=np.bool) nsamples = np.zeros(data_shape, dtype=np.float32) for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): data[iblt, :, ifreq, ipol] = full_uvh5.data_array[ blt_idx, :, freq_idx, pol_idx ] flags[iblt, :, ifreq, ipol] = full_uvh5.flag_array[ blt_idx, :, freq_idx, pol_idx ] nsamples[iblt, :, ifreq, ipol] = full_uvh5.nsample_array[ blt_idx, :, freq_idx, pol_idx ] uvtest.checkWarnings( partial_uvh5.write_uvh5_part, [partial_testfile, data, flags, nsamples], { "blt_inds": blt_inds, "freq_chans": freq_inds, "polarizations": full_uvh5.polarization_array[pol_inds], }, nwarnings=2, message=[ "Selected frequencies are not evenly spaced", "Selected polarization values are not evenly spaced", ], ) # also write the arrays to the partial object for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): partial_uvh5.data_array[blt_idx, :, freq_idx, pol_idx] = data[ iblt, :, ifreq, ipol ] partial_uvh5.flag_array[blt_idx, :, freq_idx, pol_idx] = flags[ iblt, :, ifreq, ipol ] partial_uvh5.nsample_array[blt_idx, :, freq_idx, pol_idx] = nsamples[ iblt, :, ifreq, ipol ] # read in the file and make sure it matches partial_uvh5_file = UVData() partial_uvh5_file.read(partial_testfile) assert partial_uvh5_file == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_errors(uv_partial_write, tmp_path): """ Test errors in uvh5_write_part method. """ full_uvh5 = uv_partial_write partial_uvh5 = UVData() # get a waterfall antpairpols = full_uvh5.get_antpairpols() key = antpairpols[0] data = full_uvh5.get_data(key, squeeze="none") flags = full_uvh5.get_data(key, squeeze="none") nsamples = full_uvh5.get_data(key, squeeze="none") # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # try to write to a file that doesn't exists partial_testfile = str(tmp_path / "outtest_partial.uvh5") if os.path.exists(partial_testfile): os.remove(partial_testfile) with pytest.raises(AssertionError) as cm: partial_uvh5.write_uvh5_part(partial_testfile, data, flags, nsamples, bls=key) assert str(cm.value).startswith("{} does not exist".format(partial_testfile)) # initialize file on disk partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) # pass in arrays that are different sizes with pytest.raises(AssertionError) as cm: partial_uvh5.write_uvh5_part( partial_testfile, data, flags[:, :, :, 0], nsamples, bls=key ) assert str(cm.value).startswith( "data_array and flag_array must have the same shape" ) with pytest.raises(AssertionError) as cm: partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples[:, :, :, 0], bls=key ) assert str(cm.value).startswith( "data_array and nsample_array must have the same shape" ) # pass in arrays that are the same size, but don't match expected shape with pytest.raises(AssertionError) as cm: partial_uvh5.write_uvh5_part( partial_testfile, data[:, :, :, 0], flags[:, :, :, 0], nsamples[:, :, :, 0] ) assert str(cm.value).startswith("data_array has shape") # initialize a file on disk, and pass in a different object so check_header fails empty_uvd = UVData() with pytest.raises(AssertionError) as cm: empty_uvd.write_uvh5_part(partial_testfile, data, flags, nsamples, bls=key) assert str(cm.value).startswith( "The object metadata in memory and metadata on disk are different" ) # clean up os.remove(partial_testfile) return def test_initialize_uvh5_file(uv_partial_write, tmp_path): """ Test initializing a UVH5 file on disk. """ full_uvh5 = uv_partial_write full_uvh5.data_array = None full_uvh5.flag_array = None full_uvh5.nsample_array = None # initialize file partial_uvh5 = full_uvh5.copy() partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) # read it in and make sure that the metadata matches the original partial_uvh5.read(partial_testfile, read_data=False) assert partial_uvh5 == full_uvh5 # clean up os.remove(partial_testfile) return def test_initialize_uvh5_file_errors(uv_partial_write, tmp_path, capsys): """ Test errors in initializing a UVH5 file on disk. """ full_uvh5 = uv_partial_write full_uvh5.data_array = None full_uvh5.flag_array = None full_uvh5.nsample_array = None # initialize file partial_uvh5 = full_uvh5.copy() partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) # check that IOError is raised then when clobber == False with pytest.raises(IOError) as cm: partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=False) assert str(cm.value).startswith("File exists; skipping") # check we can write to it anyway partial_uvh5.initialize_uvh5_file(partial_testfile, clobber=True) captured = capsys.readouterr() assert captured.out.startswith("File exists; clobbering") # clean up os.remove(partial_testfile) return def test_initialize_uvh5_file_compression_opts(uv_partial_write, tmp_path): """ Test initializing a uvh5 file with compression options. """ full_uvh5 = uv_partial_write full_uvh5.data_array = None full_uvh5.flag_array = None full_uvh5.nsample_array = None # add options for compression partial_uvh5 = full_uvh5.copy() partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file( partial_testfile, clobber=True, data_compression="lzf", flags_compression=None, nsample_compression=None, ) partial_uvh5.read(partial_testfile, read_data=False) assert partial_uvh5 == full_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_lst_array(uv_uvfits, tmp_path): """ Test different cases of the lst_array. """ uv_in = uv_uvfits uv_out = UVData() testfile = str(tmp_path / "outtest_uvfits.uvh5") uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) # remove lst_array from file; check that it's correctly computed on read with h5py.File(testfile, "r+") as h5f: del h5f["/Header/lst_array"] uv_out.read_uvh5(testfile) assert uv_in == uv_out # now change what's in the file and make sure a warning is raised uv_in.write_uvh5(testfile, clobber=True) with h5py.File(testfile, "r+") as h5f: lst_array = h5f["/Header/lst_array"][:] del h5f["/Header/lst_array"] h5f["/Header/lst_array"] = 2 * lst_array uvtest.checkWarnings( uv_out.read_uvh5, [testfile], message="LST values stored in outtest_uvfits.uvh5 are not self-consistent", ) uv_out.lst_array = lst_array assert uv_in == uv_out # clean up os.remove(testfile) return def test_uvh5_read_header_special_cases(uv_uvfits, tmp_path): """ Test special cases values when reading files. """ uv_in = uv_uvfits uv_out = UVData() testfile = str(tmp_path / "outtest_uvfits.uvh5") uv_in.telescope_name = "PAPER" uv_in.write_uvh5(testfile, clobber=True) # change some of the metadata to trip certain if/else clauses with h5py.File(testfile, "r+") as h5f: del h5f["Header/history"] del h5f["Header/vis_units"] del h5f["Header/phase_type"] h5f["Header/history"] = np.string_("blank history") h5f["Header/phase_type"] = np.string_("blah") uv_out.read_uvh5(testfile) # make input and output values match now uv_in.history = uv_out.history uv_in._set_unknown_phase_type() uv_in.phase_center_ra = None uv_in.phase_center_dec = None uv_in.phase_center_epoch = None uv_in.vis_units = "UNCALIB" assert uv_in == uv_out # clean up os.remove(testfile) return def test_uvh5_read_ints(uv_uvh5, tmp_path): """ Test reading visibility data saved as integers. """ uv_in = uv_uvh5 uv_out = UVData() testfile = str(tmp_path / "outtest.uvh5") uv_in.write_uvh5(testfile, clobber=True) # read it back in to make sure data is the same uv_out.read(testfile) assert uv_in == uv_out # now read in as np.complex128 uvh5_filename = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") uv_in.read_uvh5(uvh5_filename, data_array_dtype=np.complex128) assert uv_in == uv_out assert uv_in.data_array.dtype == np.dtype(np.complex128) # clean up os.remove(testfile) return def test_uvh5_read_ints_error(): """ Test raising an error for passing in an unsupported data_array dtype. """ uv_in = UVData() uvh5_filename = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # raise error for bogus data_array_dtype with pytest.raises(ValueError) as cm: uv_in.read_uvh5(uvh5_filename, data_array_dtype=np.int32) assert str(cm.value).startswith( "data_array_dtype must be np.complex64 or np.complex128" ) return def test_uvh5_write_ints(uv_uvh5, tmp_path): """ Test writing visibility data as integers. """ uv_in = uv_uvh5 uv_out = UVData() testfile = str(tmp_path / "outtest.uvh5") uv_in.write_uvh5(testfile, clobber=True, data_write_dtype=uvh5._hera_corr_dtype) # read it back in to make sure data is the same uv_out.read(testfile) assert uv_in == uv_out # also check that the datatype on disk is the right type with h5py.File(testfile, "r") as h5f: visdata_dtype = h5f["Data/visdata"].dtype assert "r" in visdata_dtype.names assert "i" in visdata_dtype.names assert visdata_dtype["r"].kind == "i" assert visdata_dtype["i"].kind == "i" # clean up os.remove(testfile) return def test_uvh5_partial_read_ints_antennas(): """ Test reading in only some antennas from disk with integer data type. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # select on antennas ants_to_keep = np.array([0, 1]) uvh5_uv.read(uvh5_file, antenna_nums=ants_to_keep) uvh5_uv2.read(uvh5_file) uvh5_uv2.select(antenna_nums=ants_to_keep) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_ints_freqs(): """ Test reading in only some frequencies from disk with integer data type. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # select on frequency channels chans_to_keep = np.arange(12, 22) uvh5_uv.read(uvh5_file, freq_chans=chans_to_keep) uvh5_uv2.read(uvh5_file) uvh5_uv2.select(freq_chans=chans_to_keep) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_ints_pols(): """ Test reading in only some polarizations from disk with integer data type. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # select on pols pols_to_keep = [-5, -6] uvh5_uv.read(uvh5_file, polarizations=pols_to_keep) uvh5_uv2.read(uvh5_file) uvh5_uv2.select(polarizations=pols_to_keep) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_ints_times(): """ Test reading in only some times from disk with integer data type. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # select on read using time_range uvh5_uv.read_uvh5(uvh5_file, read_data=False) unique_times = np.unique(uvh5_uv.time_array) uvh5_uv.read(uvh5_file, time_range=[unique_times[0], unique_times[1]]) uvh5_uv2.read(uvh5_file) uvh5_uv2.select(times=unique_times[0:2]) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_ints_multi1(): """ Test select-on-read for multiple axes, frequencies being smallest fraction. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # read frequencies first ants_to_keep = np.array([0, 1]) chans_to_keep = np.arange(12, 22) pols_to_keep = [-5, -6] uvh5_uv.read( uvh5_file, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) uvh5_uv2.read(uvh5_file) uvh5_uv2.select( antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep ) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_ints_multi2(): """ Test select-on-read for multiple axes, baselines being smallest fraction. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # read baselines first ants_to_keep = np.array([0, 1]) chans_to_keep = np.arange(12, 22) pols_to_keep = [-5, -6, -7] uvh5_uv.read( uvh5_file, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) uvh5_uv2.read(uvh5_file) uvh5_uv2.select( antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_read_ints_multi3(): """ Test select-on-read for multiple axes, polarizations being smallest fraction. """ uvh5_uv = UVData() uvh5_uv2 = UVData() uvh5_file = os.path.join(DATA_PATH, "zen.2458432.34569.uvh5") # read polarizations first ants_to_keep = np.array([0, 1, 12]) chans_to_keep = np.arange(12, 64) pols_to_keep = [-5, -6] uvh5_uv.read( uvh5_file, antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep, ) uvh5_uv2.read(uvh5_file) uvh5_uv2.select( antenna_nums=ants_to_keep, freq_chans=chans_to_keep, polarizations=pols_to_keep ) assert uvh5_uv == uvh5_uv2 return def test_uvh5_partial_write_ints_antapirs(uv_uvh5, tmp_path): """ Test writing an entire UVH5 file in pieces by antpairs using ints. """ full_uvh5 = uv_uvh5 # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file( partial_testfile, clobber=True, data_write_dtype=uvh5._hera_corr_dtype, ) # write to file by iterating over antpairpol antpairpols = full_uvh5.get_antpairpols() for key in antpairpols: data = full_uvh5.get_data(key, squeeze="none") flags = full_uvh5.get_flags(key, squeeze="none") nsamples = full_uvh5.get_nsamples(key, squeeze="none") partial_uvh5.write_uvh5_part(partial_testfile, data, flags, nsamples, bls=key) # now read in the full file and make sure that it matches the original partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_ints_frequencies(uv_uvh5, tmp_path): """ Test writing an entire UVH5 file in pieces by frequency using ints. """ full_uvh5 = uv_uvh5 # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file( partial_testfile, clobber=True, data_write_dtype=uvh5._hera_corr_dtype ) # only write certain frequencies Nfreqs = full_uvh5.Nfreqs Hfreqs = Nfreqs // 2 freqs1 = np.arange(Hfreqs) freqs2 = np.arange(Hfreqs, Nfreqs) data = full_uvh5.data_array[:, :, freqs1, :] flags = full_uvh5.flag_array[:, :, freqs1, :] nsamples = full_uvh5.nsample_array[:, :, freqs1, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, freq_chans=freqs1 ) data = full_uvh5.data_array[:, :, freqs2, :] flags = full_uvh5.flag_array[:, :, freqs2, :] nsamples = full_uvh5.nsample_array[:, :, freqs2, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, freq_chans=freqs2 ) # read in the full file and make sure it matches partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_ints_blts(uv_uvh5, tmp_path): """ Test writing an entire UVH5 file in pieces by blt using ints. """ full_uvh5 = uv_uvh5 # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file( partial_testfile, clobber=True, data_write_dtype=uvh5._hera_corr_dtype ) # only write certain blts Nblts = full_uvh5.Nblts Hblts = Nblts // 2 blts1 = np.arange(Hblts) blts2 = np.arange(Hblts, Nblts) data = full_uvh5.data_array[blts1, :, :, :] flags = full_uvh5.flag_array[blts1, :, :, :] nsamples = full_uvh5.nsample_array[blts1, :, :, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, blt_inds=blts1 ) data = full_uvh5.data_array[blts2, :, :, :] flags = full_uvh5.flag_array[blts2, :, :, :] nsamples = full_uvh5.nsample_array[blts2, :, :, :] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, blt_inds=blts2 ) # read in the full file and make sure it matches partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_uvh5_partial_write_ints_pols(uv_uvh5, tmp_path): """ Test writing an entire UVH5 file in pieces by polarization using ints. """ full_uvh5 = uv_uvh5 # delete data arrays in partial file partial_uvh5 = full_uvh5.copy() partial_uvh5.data_array = None partial_uvh5.flag_array = None partial_uvh5.nsample_array = None # initialize file on disk partial_testfile = str(tmp_path / "outtest_partial.uvh5") partial_uvh5.initialize_uvh5_file( partial_testfile, clobber=True, data_write_dtype=uvh5._hera_corr_dtype ) # only write certain polarizations Npols = full_uvh5.Npols Hpols = Npols // 2 pols1 = np.arange(Hpols) pols2 = np.arange(Hpols, Npols) data = full_uvh5.data_array[:, :, :, pols1] flags = full_uvh5.flag_array[:, :, :, pols1] nsamples = full_uvh5.nsample_array[:, :, :, pols1] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, polarizations=full_uvh5.polarization_array[:Hpols], ) data = full_uvh5.data_array[:, :, :, pols2] flags = full_uvh5.flag_array[:, :, :, pols2] nsamples = full_uvh5.nsample_array[:, :, :, pols2] partial_uvh5.write_uvh5_part( partial_testfile, data, flags, nsamples, polarizations=full_uvh5.polarization_array[Hpols:], ) # read in the full file and make sure it matches partial_uvh5.read(partial_testfile) assert full_uvh5 == partial_uvh5 # clean up os.remove(partial_testfile) return def test_read_complex_astype(tmp_path): # make a testfile with a test dataset test_file = str(tmp_path / "test_file.h5") test_data_shape = (2, 3, 4, 5) test_data = np.zeros(test_data_shape, dtype=np.complex64) test_data.real = 1.0 test_data.imag = 2.0 with h5py.File(test_file, "w") as h5f: dgrp = h5f.create_group("Data") dset = dgrp.create_dataset( "testdata", test_data_shape, dtype=uvh5._hera_corr_dtype ) with dset.astype(uvh5._hera_corr_dtype): dset[:, :, :, :, "r"] = test_data.real dset[:, :, :, :, "i"] = test_data.imag # test that reading the data back in works as expected indices = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) with h5py.File(test_file, "r") as h5f: dset = h5f["Data/testdata"] file_data = uvh5._read_complex_astype(dset, indices, np.complex64) assert np.allclose(file_data, test_data) # clean up os.remove(test_file) return def test_read_complex_astype_errors(tmp_path): # make a testfile with a test dataset test_file = str(tmp_path / "test_file.h5") test_data_shape = (2, 3, 4, 5) test_data = np.zeros(test_data_shape, dtype=np.complex64) test_data.real = 1.0 test_data.imag = 2.0 with h5py.File(test_file, "w") as h5f: dgrp = h5f.create_group("Data") dset = dgrp.create_dataset( "testdata", test_data_shape, dtype=uvh5._hera_corr_dtype ) with dset.astype(uvh5._hera_corr_dtype): dset[:, :, :, :, "r"] = test_data.real dset[:, :, :, :, "i"] = test_data.imag # test passing in a forbidden output datatype indices = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) with h5py.File(test_file, "r") as h5f: dset = h5f["Data/testdata"] with pytest.raises(ValueError) as cm: uvh5._read_complex_astype(dset, indices, np.int32) assert str(cm.value).startswith("output datatype must be one of (complex") # clean up os.remove(test_file) return def test_write_complex_astype(tmp_path): # make sure we can write data out test_file = str(tmp_path / "test_file.h5") test_data_shape = (2, 3, 4, 5) test_data = np.zeros(test_data_shape, dtype=np.complex64) test_data.real = 1.0 test_data.imag = 2.0 with h5py.File(test_file, "w") as h5f: dgrp = h5f.create_group("Data") dset = dgrp.create_dataset( "testdata", test_data_shape, dtype=uvh5._hera_corr_dtype ) inds = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) uvh5._write_complex_astype(test_data, dset, inds) # read the data back in to confirm it's right with h5py.File(test_file, "r") as h5f: dset = h5f["Data/testdata"] file_data = np.zeros(test_data_shape, dtype=np.complex64) with dset.astype(uvh5._hera_corr_dtype): file_data.real = dset["r"][:, :, :, :] file_data.imag = dset["i"][:, :, :, :] assert np.allclose(file_data, test_data) return def test_check_uvh5_dtype_errors(): # test passing in something that's not a dtype with pytest.raises(ValueError) as cm: uvh5._check_uvh5_dtype("hi") assert str(cm.value).startswith("dtype in a uvh5 file must be a numpy dtype") # test using a dtype with bad field names dtype = np.dtype([("a", "