https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: 4b5dfe6ed6e3067d14f2e762d768a82229681b18 authored by EXTERNAL-Ewall-Wice on 02 November 2018, 05:08:50 UTC
now output separate file per polarization.
now output separate file per polarization.
Tip revision: 4b5dfe6
test_miriad.py
# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Tests for Miriad object.
"""
from __future__ import absolute_import, division, print_function
import os
import shutil
import copy
import numpy as np
import nose.tools as nt
from astropy.time import Time, TimeDelta
from astropy import constants as const
from pyuvdata import UVData
from pyuvdata.miriad import Miriad
import pyuvdata.utils as uvutils
import pyuvdata.tests as uvtest
from pyuvdata.data import DATA_PATH
from .. import aipy_extracts
def test_ReadWriteReadATCA():
uv_in = UVData()
uv_out = UVData()
atca_file = os.path.join(DATA_PATH, 'atca_miriad')
testfile = os.path.join(DATA_PATH, 'test/outtest_atca_miriad.uv')
uvtest.checkWarnings(uv_in.read, [atca_file],
nwarnings=4, category=[UserWarning, UserWarning, UserWarning, UserWarning],
message=['Altitude is not present in Miriad file, and '
'telescope ATCA is not in known_telescopes. ',
'Altitude is not present',
'Telescope location is set at sealevel at the file lat/lon '
'coordinates. Antenna positions are present, but the mean antenna '
'position does not give a telescope_location on the surface of the '
'earth. Antenna positions do not appear to be on the surface of the '
'earth and will be treated as relative.',
'Telescope ATCA is not in known_telescopes.'])
uv_in.write_miriad(testfile, clobber=True)
uvtest.checkWarnings(uv_out.read, [testfile],
message='Telescope ATCA is not in known_telescopes.')
nt.assert_equal(uv_in, uv_out)
def test_ReadNRAOWriteMiriadReadMiriad():
"""Test reading in a CASA tutorial uvfits file, writing and reading as miriad"""
uvfits_uv = UVData()
miriad_uv = UVData()
testfile = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
writefile = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
expected_extra_keywords = ['OBSERVER', 'SORTORD', 'SPECSYS',
'RESTFREQ', 'ORIGIN']
uvtest.checkWarnings(uvfits_uv.read_uvfits, [testfile], message='Telescope EVLA is not')
uvfits_uv.write_miriad(writefile, clobber=True)
uvtest.checkWarnings(miriad_uv.read, [writefile], message='Telescope EVLA is not')
nt.assert_equal(uvfits_uv, miriad_uv)
del(uvfits_uv)
del(miriad_uv)
def test_ReadMiriadWriteUVFits():
"""
Miriad to uvfits loopback test.
Read in Miriad files, write out as uvfits, read back in and check for
object equality.
"""
miriad_uv = UVData()
uvfits_uv = UVData()
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
testfile = os.path.join(DATA_PATH, 'test/outtest_miriad.uvfits')
uvtest.checkWarnings(miriad_uv.read, [miriad_file],
known_warning='miriad')
miriad_uv.write_uvfits(testfile, spoof_nonessential=True, force_phase=True)
uvfits_uv.read_uvfits(testfile)
nt.assert_equal(miriad_uv, uvfits_uv)
# check error if phase_type is wrong and force_phase not set
uvtest.checkWarnings(miriad_uv.read, [miriad_file],
known_warning='miriad')
nt.assert_raises(ValueError, miriad_uv.write_uvfits, testfile, spoof_nonessential=True)
miriad_uv.set_unknown_phase_type()
nt.assert_raises(ValueError, miriad_uv.write_uvfits, testfile, spoof_nonessential=True)
# check error if spoof_nonessential not set
uvtest.checkWarnings(miriad_uv.read, [miriad_file],
known_warning='miriad')
nt.assert_raises(ValueError, miriad_uv.write_uvfits, testfile, force_phase=True)
# check warning when correct_lat_lon is set to False
uvtest.checkWarnings(miriad_uv.read, [miriad_file],
{'correct_lat_lon': False}, nwarnings=1,
message=['Altitude is not present in Miriad file, using known location altitude value for PAPER and lat/lon from file.'])
# check that setting the phase_type to something wrong errors
nt.assert_raises(ValueError, uvtest.checkWarnings, miriad_uv.read,
[miriad_file], {'phase_type': 'phased'})
nt.assert_raises(ValueError, uvtest.checkWarnings, miriad_uv.read,
[miriad_file], {'phase_type': 'foo'})
del(miriad_uv)
del(uvfits_uv)
def test_wronglatlon():
"""
Check for appropriate warnings with incorrect lat/lon values or missing telescope
To test this, we needed files without altitudes and with wrong lat, lon or telescope values.
These test files were made commenting out the line in miriad.py that adds altitude
to the file and running the following code:
import os
import numpy as np
from pyuvdata import UVData
from pyuvdata.data import DATA_PATH
uv_in = UVData()
uv_out = UVData()
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
latfile = os.path.join(DATA_PATH, 'zen.2456865.60537_wronglat.xy.uvcRREAA')
lonfile = os.path.join(DATA_PATH, 'zen.2456865.60537_wronglon.xy.uvcRREAA')
telescopefile = os.path.join(DATA_PATH, 'zen.2456865.60537_wrongtelecope.xy.uvcRREAA')
uv_in.read(miriad_file)
uv_in.select(times=uv_in.time_array[0])
uv_in.select(freq_chans=[0])
lat, lon, alt = uv_in.telescope_location_lat_lon_alt
lat_wrong = lat + 10 * np.pi / 180.
uv_in.telescope_location_lat_lon_alt = (lat_wrong, lon, alt)
uv_in.write_miriad(latfile)
uv_out.read(latfile)
lon_wrong = lon + 10 * np.pi / 180.
uv_in.telescope_location_lat_lon_alt = (lat, lon_wrong, alt)
uv_in.write_miriad(lonfile)
uv_out.read(lonfile)
uv_in.telescope_location_lat_lon_alt = (lat, lon, alt)
uv_in.telescope_name = 'foo'
uv_in.write_miriad(telescopefile)
uv_out.read(telescopefile, run_check=False)
"""
uv_in = UVData()
latfile = os.path.join(DATA_PATH, 'zen.2456865.60537_wronglat.xy.uvcRREAA')
lonfile = os.path.join(DATA_PATH, 'zen.2456865.60537_wronglon.xy.uvcRREAA')
telescopefile = os.path.join(DATA_PATH, 'zen.2456865.60537_wrongtelecope.xy.uvcRREAA')
uvtest.checkWarnings(uv_in.read, [latfile], nwarnings=3,
message=['Altitude is not present in file and latitude value does not match',
'This file was written with an old version of pyuvdata',
'This file was written with an old version of pyuvdata'])
uvtest.checkWarnings(uv_in.read, [lonfile], nwarnings=3,
message=['Altitude is not present in file and longitude value does not match',
'This file was written with an old version of pyuvdata',
'This file was written with an old version of pyuvdata'])
uvtest.checkWarnings(uv_in.read, [telescopefile], {'run_check': False},
nwarnings=6,
message=['Altitude is not present in Miriad file, and telescope',
'Altitude is not present in Miriad file, and telescope',
'Telescope location is set at sealevel at the '
'file lat/lon coordinates. Antenna positions '
'are present, but the mean antenna position',
'This file was written with an old version of pyuvdata',
'This file was written with an old version of pyuvdata',
'Telescope foo is not in known_telescopes.'])
def test_miriad_location_handling():
uv_in = UVData()
uv_out = UVData()
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
testdir = os.path.join(DATA_PATH, 'test/')
testfile = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
aipy_uv = aipy_extracts.UV(miriad_file)
if os.path.exists(testfile):
shutil.rmtree(testfile)
# Test for using antenna positions to get telescope position
uvtest.checkWarnings(uv_in.read, [miriad_file],
message=['Altitude is not present in Miriad file, using known '
'location values for PAPER.'])
# extract antenna positions and rotate them for miriad
nants = aipy_uv['nants']
rel_ecef_antpos = np.zeros((nants, 3), dtype=uv_in.antenna_positions.dtype)
for ai, num in enumerate(uv_in.antenna_numbers):
rel_ecef_antpos[num, :] = uv_in.antenna_positions[ai, :]
# find zeros so antpos can be zeroed there too
antpos_length = np.sqrt(np.sum(np.abs(rel_ecef_antpos)**2, axis=1))
ecef_antpos = rel_ecef_antpos + uv_in.telescope_location
longitude = uv_in.telescope_location_lat_lon_alt[1]
antpos = uvutils.rotECEF_from_ECEF(ecef_antpos, longitude)
# zero out bad locations (these are checked on read)
antpos[np.where(antpos_length == 0), :] = [0, 0, 0]
antpos = antpos.T.flatten() / const.c.to('m/ns').value
# make new file
aipy_uv2 = aipy_extracts.UV(testfile, status='new')
# initialize headers from old file
# change telescope name (so the position isn't set from known_telescopes)
# and use absolute antenna positions
aipy_uv2.init_from_uv(aipy_uv, override={'telescop': 'foo', 'antpos': antpos})
# copy data from old file
aipy_uv2.pipe(aipy_uv)
# close file properly
del(aipy_uv2)
uvtest.checkWarnings(uv_out.read, [testfile], nwarnings=4,
message=['Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Altitude is not present ',
'Telescope location is not set, but antenna '
'positions are present. Mean antenna latitude '
'and longitude values match file values, so '
'telescope_position will be set using the mean '
'of the antenna altitudes',
'Telescope foo is not in known_telescopes.'])
# Test for handling when antenna positions have a different mean latitude than the file latitude
# make new file
if os.path.exists(testfile):
shutil.rmtree(testfile)
aipy_uv = aipy_extracts.UV(miriad_file)
aipy_uv2 = aipy_extracts.UV(testfile, status='new')
# initialize headers from old file
# change telescope name (so the position isn't set from known_telescopes)
# and use absolute antenna positions, change file latitude
new_lat = aipy_uv['latitud'] * 1.5
aipy_uv2.init_from_uv(aipy_uv, override={'telescop': 'foo', 'antpos': antpos,
'latitud': new_lat})
# copy data from old file
aipy_uv2.pipe(aipy_uv)
# close file properly
del(aipy_uv2)
uvtest.checkWarnings(uv_out.read, [testfile], nwarnings=5,
message=['Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Telescope location is set at sealevel at the '
'file lat/lon coordinates. Antenna positions '
'are present, but the mean antenna latitude '
'value does not match',
'drift RA, Dec is off from lst, latitude by more than 1.0 deg',
'Telescope foo is not in known_telescopes.'])
# Test for handling when antenna positions have a different mean longitude than the file longitude
# this is harder because of the rotation that's done on the antenna positions
# make new file
if os.path.exists(testfile):
shutil.rmtree(testfile)
aipy_uv = aipy_extracts.UV(miriad_file)
aipy_uv2 = aipy_extracts.UV(testfile, status='new')
# initialize headers from old file
# change telescope name (so the position isn't set from known_telescopes)
# and use absolute antenna positions, change file longitude
new_lon = aipy_uv['longitu'] + np.pi
aipy_uv2.init_from_uv(aipy_uv, override={'telescop': 'foo', 'antpos': antpos,
'longitu': new_lon})
# copy data from old file
aipy_uv2.pipe(aipy_uv)
# close file properly
del(aipy_uv2)
uvtest.checkWarnings(uv_out.read, [testfile], nwarnings=5,
message=['Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Telescope location is set at sealevel at the '
'file lat/lon coordinates. Antenna positions '
'are present, but the mean antenna longitude '
'value does not match',
'drift RA, Dec is off from lst, latitude by more than 1.0 deg',
'Telescope foo is not in known_telescopes.'])
# Test for handling when antenna positions have a different mean longitude &
# latitude than the file longitude
# make new file
if os.path.exists(testfile):
shutil.rmtree(testfile)
aipy_uv = aipy_extracts.UV(miriad_file)
aipy_uv2 = aipy_extracts.UV(testfile, status='new')
# initialize headers from old file
# change telescope name (so the position isn't set from known_telescopes)
# and use absolute antenna positions, change file latitude and longitude
aipy_uv2.init_from_uv(aipy_uv, override={'telescop': 'foo', 'antpos': antpos,
'latitud': new_lat, 'longitu': new_lon})
# copy data from old file
aipy_uv2.pipe(aipy_uv)
# close file properly
del(aipy_uv2)
uvtest.checkWarnings(uv_out.read, [testfile], nwarnings=5,
message=['Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Telescope location is set at sealevel at the '
'file lat/lon coordinates. Antenna positions '
'are present, but the mean antenna latitude and '
'longitude values do not match',
'drift RA, Dec is off from lst, latitude by more than 1.0 deg',
'Telescope foo is not in known_telescopes.'])
# Test for handling when antenna positions are far enough apart to make the
# mean position inside the earth
good_antpos = np.where(antpos_length > 0)[0]
rot_ants = good_antpos[:len(good_antpos) // 2]
rot_antpos = uvutils.rotECEF_from_ECEF(ecef_antpos[rot_ants, :], longitude + np.pi)
modified_antpos = uvutils.rotECEF_from_ECEF(ecef_antpos, longitude)
modified_antpos[rot_ants, :] = rot_antpos
# zero out bad locations (these are checked on read)
modified_antpos[np.where(antpos_length == 0), :] = [0, 0, 0]
modified_antpos = modified_antpos.T.flatten() / const.c.to('m/ns').value
# make new file
if os.path.exists(testfile):
shutil.rmtree(testfile)
aipy_uv = aipy_extracts.UV(miriad_file)
aipy_uv2 = aipy_extracts.UV(testfile, status='new')
# initialize headers from old file
# change telescope name (so the position isn't set from known_telescopes)
# and use modified absolute antenna positions
aipy_uv2.init_from_uv(aipy_uv, override={'telescop': 'foo', 'antpos': modified_antpos})
# copy data from old file
aipy_uv2.pipe(aipy_uv)
# close file properly
del(aipy_uv2)
uvtest.checkWarnings(uv_out.read, [testfile], nwarnings=4,
message=['Altitude is not present in Miriad file, and '
'telescope foo is not in known_telescopes. '
'Telescope location will be set using antenna positions.',
'Altitude is not present ',
'Telescope location is set at sealevel at the '
'file lat/lon coordinates. Antenna positions '
'are present, but the mean antenna position '
'does not give a telescope_location on the '
'surface of the earth.',
'Telescope foo is not in known_telescopes.'])
def test_singletimeselect_drift():
"""
Check behavior with writing & reading after selecting a single time from a drift file.
"""
uv_in = UVData()
uv_out = UVData()
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
testfile = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
uv_in.select(times=uv_in.time_array[0])
uv_in.write_miriad(testfile, clobber=True)
uv_out.read(testfile)
nt.assert_equal(uv_in, uv_out)
# check that setting the phase_type works
uv_out.read(testfile, phase_type='drift')
nt.assert_equal(uv_in, uv_out)
# check again with more than one time but only 1 unflagged time
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
time_gt0_array = np.where(uv_in.time_array > uv_in.time_array[0])[0]
uv_in.flag_array[time_gt0_array, :, :, :] = True
# get unflagged blts
blt_good = np.where(~np.all(uv_in.flag_array, axis=(1, 2, 3)))
nt.assert_true(np.isclose(np.mean(np.diff(uv_in.time_array[blt_good])), 0.))
uv_in.write_miriad(testfile, clobber=True)
uv_out.read(testfile)
nt.assert_equal(uv_in, uv_out)
# check that setting the phase_type works
uv_out.read(testfile, phase_type='drift')
nt.assert_equal(uv_in, uv_out)
def test_poltoind():
miriad_uv = UVData()
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
uvtest.checkWarnings(miriad_uv.read, [miriad_file], known_warning='miriad')
pol_arr = miriad_uv.polarization_array
miriad = miriad_uv._convert_to_filetype('miriad')
miriad.polarization_array = None
nt.assert_raises(ValueError, miriad._pol_to_ind, pol_arr[0])
miriad.polarization_array = [pol_arr[0], pol_arr[0]]
nt.assert_raises(ValueError, miriad._pol_to_ind, pol_arr[0])
def test_miriad_extra_keywords():
uv_in = UVData()
uv_out = UVData()
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
testfile = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
# check for warnings & errors with extra_keywords that are dicts, lists or arrays
uv_in.extra_keywords['testdict'] = {'testkey': 23}
uvtest.checkWarnings(uv_in.check, message=['testdict in extra_keywords is a '
'list, array or dict'])
nt.assert_raises(TypeError, uv_in.write_miriad, testfile, clobber=True,
run_check=False)
uv_in.extra_keywords.pop('testdict')
uv_in.extra_keywords['testlist'] = [12, 14, 90]
uvtest.checkWarnings(uv_in.check, message=['testlist in extra_keywords is a '
'list, array or dict'])
nt.assert_raises(TypeError, uv_in.write_miriad, testfile, clobber=True,
run_check=False)
uv_in.extra_keywords.pop('testlist')
uv_in.extra_keywords['testarr'] = np.array([12, 14, 90])
uvtest.checkWarnings(uv_in.check, message=['testarr in extra_keywords is a '
'list, array or dict'])
nt.assert_raises(TypeError, uv_in.write_miriad, testfile, clobber=True,
run_check=False)
uv_in.extra_keywords.pop('testarr')
# check for warnings with extra_keywords keys that are too long
uv_in.extra_keywords['test_long_key'] = True
uvtest.checkWarnings(uv_in.check, message=['key test_long_key in extra_keywords '
'is longer than 8 characters'])
uvtest.checkWarnings(uv_in.write_miriad, [testfile], {'clobber': True, 'run_check': False},
message=['key test_long_key in extra_keywords is longer than 8 characters'])
uv_in.extra_keywords.pop('test_long_key')
# check handling of boolean keywords
uv_in.extra_keywords['bool'] = True
uv_in.extra_keywords['bool2'] = False
uv_in.write_miriad(testfile, clobber=True)
uv_out.read(testfile)
nt.assert_equal(uv_in, uv_out)
uv_in.extra_keywords.pop('bool')
uv_in.extra_keywords.pop('bool2')
# check handling of int-like keywords
uv_in.extra_keywords['int1'] = np.int(5)
uv_in.extra_keywords['int2'] = 7
uv_in.write_miriad(testfile, clobber=True)
uv_out.read(testfile)
nt.assert_equal(uv_in, uv_out)
uv_in.extra_keywords.pop('int1')
uv_in.extra_keywords.pop('int2')
# check handling of float-like keywords
uv_in.extra_keywords['float1'] = np.int64(5.3)
uv_in.extra_keywords['float2'] = 6.9
uv_in.write_miriad(testfile, clobber=True)
uv_out.read(testfile)
nt.assert_equal(uv_in, uv_out)
uv_in.extra_keywords.pop('float1')
uv_in.extra_keywords.pop('float2')
# check handling of complex-like keywords
# currently they are NOT supported
uv_in.extra_keywords['complex1'] = np.complex64(5.3 + 1.2j)
uv_in.extra_keywords['complex2'] = 6.9 + 4.6j
nt.assert_raises(TypeError, uv_in.write_miriad, testfile, clobber=True)
def test_breakReadMiriad():
"""Test Miriad file checking."""
uv_in = UVData()
uv_out = UVData()
nt.assert_raises(IOError, uv_in.read, 'foo', file_type='miriad')
miriad_file = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
testfile = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
uv_in.Nblts += 10
uv_in.write_miriad(testfile, clobber=True, run_check=False)
uvtest.checkWarnings(uv_out.read, [testfile], {'run_check': False},
message=['Nblts does not match the number of unique blts in the data'])
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
uv_in.Nbls += 10
uv_in.write_miriad(testfile, clobber=True, run_check=False)
uvtest.checkWarnings(uv_out.read, [testfile], {'run_check': False},
message=['Nbls does not match the number of unique baselines in the data'])
uvtest.checkWarnings(uv_in.read, [miriad_file],
known_warning='miriad')
uv_in.Ntimes += 10
uv_in.write_miriad(testfile, clobber=True, run_check=False)
uvtest.checkWarnings(uv_out.read, [testfile], {'run_check': False},
message=['Ntimes does not match the number of unique times in the data'])
def test_readWriteReadMiriad():
"""
PAPER file Miriad loopback test.
Read in Miriad PAPER file, write out as new Miriad file, read back in and
check for object equality.
"""
uv_in = UVData()
uv_out = UVData()
testfile = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
write_file = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
write_file2 = os.path.join(DATA_PATH, 'test/outtest_miriad2.uv')
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
uv_in.write_miriad(write_file, clobber=True)
uv_out.read(write_file)
nt.assert_equal(uv_in, uv_out)
# check that we can read & write phased data
uv_in2 = copy.deepcopy(uv_in)
uv_in2.phase_to_time(Time(np.mean(uv_in2.time_array), format='jd'))
uv_in2.write_miriad(write_file, clobber=True)
uv_out.read(write_file)
nt.assert_equal(uv_in2, uv_out)
# check that trying to overwrite without clobber raises an error
nt.assert_raises(ValueError, uv_in.write_miriad, write_file)
# check that if x_orientation is set, it's read back out properly
uv_in.x_orientation = 'east'
uv_in.write_miriad(write_file, clobber=True)
uv_out.read(write_file)
nt.assert_equal(uv_in, uv_out)
# check that if antenna_diameters is set, it's read back out properly
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
uv_in.antenna_diameters = np.zeros((uv_in.Nants_telescope,), dtype=np.float) + 14.0
uv_in.write_miriad(write_file, clobber=True)
uv_out.read(write_file)
nt.assert_equal(uv_in, uv_out)
# check that antenna diameters get written if not exactly float
uv_in.antenna_diameters = np.zeros((uv_in.Nants_telescope,), dtype=np.float32) + 14.0
uv_in.write_miriad(write_file, clobber=True)
uv_out.read(write_file)
nt.assert_equal(uv_in, uv_out)
# check that trying to write a file with unknown phasing raises an error
uv_in.set_unknown_phase_type()
nt.assert_raises(ValueError, uv_in.write_miriad, write_file, clobber=True)
# check for backwards compatibility with old keyword 'diameter' for antenna diameters
testfile_diameters = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvcA')
uv_in.read(testfile_diameters)
uv_in.write_miriad(write_file, clobber=True)
uv_out.read(write_file)
nt.assert_equal(uv_in, uv_out)
# check that variables 'ischan' and 'nschan' were written to new file
# need to use aipy, since pyuvdata is not currently capturing these variables
uv_in.read(write_file)
uv_aipy = aipy_extracts.UV(write_file) # on enterprise, this line makes it so you cant delete the file
nfreqs = uv_in.Nfreqs
nschan = uv_aipy['nschan']
ischan = uv_aipy['ischan']
nt.assert_equal(nschan, nfreqs)
nt.assert_equal(ischan, 1)
del(uv_aipy) # close the file so it can be used later
# check partial IO selections
full = UVData()
uvtest.checkWarnings(full.read, [testfile], known_warning='miriad')
full.write_miriad(write_file, clobber=True)
uv_in = UVData()
# test only specified bls were read, and that flipped antpair is loaded too
uv_in.read(write_file, bls=[(0, 0), (0, 1), (4, 2)])
nt.assert_equal(uv_in.get_antpairs(), [(0, 0), (0, 1), (2, 4)])
exp_uv = full.select(bls=[(0, 0), (0, 1), (4, 2)], inplace=False)
nt.assert_equal(uv_in, exp_uv)
# test all bls w/ 0 are loaded
uv_in.read(write_file, antenna_nums=[0])
diff = set(full.get_antpairs()) - set(uv_in.get_antpairs())
nt.assert_true(0 not in np.unique(diff))
exp_uv = full.select(antenna_nums=[0], inplace=False)
nt.assert_true(np.max(exp_uv.ant_1_array) == 0)
nt.assert_true(np.max(exp_uv.ant_2_array) == 0)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, antenna_nums=[0], bls=[(2, 4)])
nt.assert_true(np.array([bl in uv_in.get_antpairs() for bl in [(0, 0), (2, 4)]]).all())
exp_uv = full.select(antenna_nums=[0], bls=[(2, 4)], inplace=False)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, bls=[(2, 4, 'xy')])
nt.assert_true(np.array([bl in uv_in.get_antpairs() for bl in [(2, 4)]]).all())
exp_uv = full.select(bls=[(2, 4, 'xy')], inplace=False)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, bls=[(4, 2, 'yx')])
nt.assert_true(np.array([bl in uv_in.get_antpairs() for bl in [(2, 4)]]).all())
exp_uv = full.select(bls=[(4, 2, 'yx')], inplace=False)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, bls=(4, 2, 'yx'))
nt.assert_true(np.array([bl in uv_in.get_antpairs() for bl in [(2, 4)]]).all())
exp_uv = full.select(bls=[(4, 2, 'yx')], inplace=False)
nt.assert_equal(uv_in, exp_uv)
# test time loading
uv_in.read(write_file, time_range=[2456865.607, 2456865.609])
full_times = np.unique(full.time_array[(full.time_array > 2456865.607) & (full.time_array < 2456865.609)])
nt.assert_true(np.isclose(np.unique(uv_in.time_array), full_times).all())
exp_uv = full.select(times=full_times, inplace=False)
nt.assert_equal(uv_in, exp_uv)
# test polarization loading
uv_in.read(write_file, polarizations=['xy'])
nt.assert_equal(full.polarization_array, uv_in.polarization_array)
exp_uv = full.select(polarizations=['xy'], inplace=False)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, polarizations=[-7])
nt.assert_equal(full.polarization_array, uv_in.polarization_array)
exp_uv = full.select(polarizations=[-7], inplace=False)
nt.assert_equal(uv_in, exp_uv)
# test ant_str
uv_in.read(write_file, ant_str='auto')
nt.assert_true(np.array([blp[0] == blp[1] for blp in uv_in.get_antpairs()]).all())
exp_uv = full.select(ant_str='auto', inplace=False)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, ant_str='cross')
nt.assert_true(np.array([blp[0] != blp[1] for blp in uv_in.get_antpairs()]).all())
exp_uv = full.select(ant_str='cross', inplace=False)
nt.assert_equal(uv_in, exp_uv)
uv_in.read(write_file, ant_str='all')
nt.assert_equal(uv_in, full)
nt.assert_raises(AssertionError, uv_in.read, write_file, ant_str='auto', antenna_nums=[0, 1])
# assert exceptions
nt.assert_raises(ValueError, uv_in.read, write_file, bls='foo')
nt.assert_raises(ValueError, uv_in.read, write_file, bls=[[0, 1]])
nt.assert_raises(ValueError, uv_in.read, write_file, bls=[('foo', 'bar')])
nt.assert_raises(ValueError, uv_in.read, write_file, bls=[('foo', )])
nt.assert_raises(ValueError, uv_in.read, write_file, bls=[(1, 2), (2, 3, 'xx')])
nt.assert_raises(ValueError, uv_in.read, write_file, bls=[(2, 4, 0)])
nt.assert_raises(ValueError, uv_in.read, write_file, bls=[(2, 4, 'xy')], polarizations=['xy'])
nt.assert_raises(AssertionError, uv_in.read, write_file, antenna_nums=np.array([(0, 10)]))
nt.assert_raises(AssertionError, uv_in.read, write_file, polarizations='xx')
nt.assert_raises((AssertionError, ValueError), uv_in.read, write_file, polarizations=[1.0])
nt.assert_raises(ValueError, uv_in.read, write_file, polarizations=['yy'])
nt.assert_raises(AssertionError, uv_in.read, write_file, time_range='foo')
nt.assert_raises(AssertionError, uv_in.read, write_file, time_range=[1, 2, 3])
nt.assert_raises(AssertionError, uv_in.read, write_file, time_range=['foo', 'bar'])
nt.assert_raises(ValueError, uv_in.read, write_file, time_range=[10.1, 10.2])
nt.assert_raises(AssertionError, uv_in.read, write_file, ant_str=0)
# assert partial-read and select are same
uv_in.read(write_file, polarizations=[-7], bls=[(4, 4)])
exp_uv = full.select(polarizations=[-7], bls=[(4, 4)], inplace=False)
nt.assert_equal(uv_in, exp_uv)
# assert partial-read and select are same
uv_in.read(write_file, bls=[(4, 4, 'xy')])
exp_uv = full.select(bls=[(4, 4, 'xy')], inplace=False)
nt.assert_equal(uv_in, exp_uv)
# assert partial-read and select are same
unique_times = np.unique(full.time_array)
time_range = [2456865.607, 2456865.609]
times_to_keep = unique_times[((unique_times > 2456865.607)
& (unique_times < 2456865.609))]
uv_in.read(write_file, antenna_nums=[0], time_range=time_range)
exp_uv = full.select(antenna_nums=[0], times=times_to_keep, inplace=False)
nt.assert_equal(uv_in, exp_uv)
# assert partial-read and select are same
uv_in.read(write_file, polarizations=[-7], time_range=time_range)
exp_uv = full.select(polarizations=[-7], times=times_to_keep, inplace=False)
nt.assert_equal(uv_in, exp_uv)
# check handling for generic read selections unsupported by read_miriad
uvtest.checkWarnings(uv_in.read, [write_file], {'times': times_to_keep},
message=['Warning: a select on read keyword is set'])
exp_uv = full.select(times=times_to_keep, inplace=False)
nt.assert_equal(uv_in, exp_uv)
# check handling for generic read selections unsupported by read_miriad
blts_select = np.where(full.time_array == unique_times[0])[0]
ants_keep = [0, 2, 4]
uvtest.checkWarnings(uv_in.read, [write_file], {'blt_inds': blts_select, 'antenna_nums': ants_keep},
message=['Warning: blt_inds is set along with select on read'])
exp_uv = full.select(blt_inds=blts_select, antenna_nums=ants_keep, inplace=False)
nt.assert_not_equal(uv_in, exp_uv)
del(uv_in)
del(uv_out)
del(full)
# try metadata only read
uv_in = UVData()
uvtest.checkWarnings(uv_in.read, [testfile], {'read_data': False}, known_warning='miriad')
nt.assert_equal(uv_in.time_array, None)
nt.assert_equal(uv_in.data_array, None)
nt.assert_equal(uv_in.integration_time, None)
metadata = ['antenna_positions', 'antenna_names', 'antenna_positions', 'channel_width',
'history', 'vis_units', 'telescope_location']
for m in metadata:
nt.assert_true(getattr(uv_in, m) is not None)
# test exceptions
# multiple file read-in
uv_in = UVData()
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
new_uv = uv_in.select(freq_chans=np.arange(5), inplace=False)
new_uv.write_miriad(write_file, clobber=True)
new_uv = uv_in.select(freq_chans=np.arange(5) + 5, inplace=False)
new_uv.write_miriad(write_file2, clobber=True)
nt.assert_raises(ValueError, uv_in.read, [write_file, write_file2], read_data=False)
# read-in when data already exists
uv_in = UVData()
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
nt.assert_raises(ValueError, uv_in.read, testfile, read_data=False)
# test load_telescope_coords w/ blank Miriad
uv_in = Miriad()
uv = aipy_extracts.UV(testfile)
uvtest.checkWarnings(uv_in._load_telescope_coords, [uv], known_warning='miriad')
nt.assert_true(uv_in.telescope_location_lat_lon_alt is not None)
# test load_antpos w/ blank Miriad
uv_in = Miriad()
uv = aipy_extracts.UV(testfile)
uvtest.checkWarnings(uv_in._load_antpos, [uv], known_warning='miriad')
nt.assert_true(uv_in.antenna_positions is not None)
# test that changing precision of integraiton_time is okay
# tolerance of integration_time (1e-3) is larger than floating point type conversions
uv_in = UVData()
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
uv_in.integration_time = uv_in.integration_time.astype(np.float32)
uv_in.write_miriad(write_file, clobber=True)
new_uv = UVData()
new_uv.read(write_file)
nt.assert_equal(uv_in, new_uv)
@uvtest.skipIf_no_casa
def test_readMSWriteMiriad_CASAHistory():
"""
read in .ms file.
Write to a miriad file, read back in and check for history parameter
"""
ms_uv = UVData()
miriad_uv = UVData()
ms_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.ms')
testfile = os.path.join(DATA_PATH, 'test/outtest_miriad')
uvtest.checkWarnings(ms_uv.read_ms, [ms_file],
message='Telescope EVLA is not',
nwarnings=0)
ms_uv.write_miriad(testfile, clobber=True)
uvtest.checkWarnings(miriad_uv.read, [testfile],
message='Telescope EVLA is not')
nt.assert_equal(miriad_uv, ms_uv)
def test_rwrMiriad_antpos_issues():
"""
test warnings and errors associated with antenna position issues in Miriad files
Read in Miriad PAPER file, mess with various antpos issues and write out as
a new Miriad file, read back in and check for appropriate behavior.
"""
uv_in = UVData()
uv_out = UVData()
testfile = os.path.join(DATA_PATH, 'zen.2456865.60537.xy.uvcRREAA')
write_file = os.path.join(DATA_PATH, 'test/outtest_miriad.uv')
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
uv_in.antenna_positions = None
uvtest.checkWarnings(uv_in.write_miriad, [write_file], {'clobber': True},
message=['antenna_positions are not defined.'],
category=PendingDeprecationWarning)
uvtest.checkWarnings(uv_out.read, [write_file], nwarnings=3,
message=['Antenna positions are not present in the file.',
'Antenna positions are not present in the file.',
'antenna_positions are not defined.'],
category=[UserWarning, UserWarning, PendingDeprecationWarning])
nt.assert_equal(uv_in, uv_out)
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
ants_with_data = list(set(uv_in.ant_1_array).union(uv_in.ant_2_array))
ant_ind = np.where(uv_in.antenna_numbers == ants_with_data[0])[0]
uv_in.antenna_positions[ant_ind, :] = [0, 0, 0]
uv_in.write_miriad(write_file, clobber=True, no_antnums=True)
uvtest.checkWarnings(uv_out.read, [write_file], message=['antenna number'])
nt.assert_equal(uv_in, uv_out)
uvtest.checkWarnings(uv_in.read, [testfile], known_warning='miriad')
uv_in.antenna_positions = None
ants_with_data = sorted(list(set(uv_in.ant_1_array).union(uv_in.ant_2_array)))
new_nums = []
new_names = []
for a in ants_with_data:
new_nums.append(a)
ind = np.where(uv_in.antenna_numbers == a)[0][0]
new_names.append(uv_in.antenna_names[ind])
uv_in.antenna_numbers = np.array(new_nums)
uv_in.antenna_names = new_names
uv_in.Nants_telescope = len(uv_in.antenna_numbers)
uvtest.checkWarnings(uv_in.write_miriad, [write_file], {'clobber': True, 'no_antnums': True},
message=['antenna_positions are not defined.'],
category=PendingDeprecationWarning)
uvtest.checkWarnings(uv_out.read, [write_file], nwarnings=3,
message=['Antenna positions are not present in the file.',
'Antenna positions are not present in the file.',
'antenna_positions are not defined.'],
category=[UserWarning, UserWarning, PendingDeprecationWarning])
nt.assert_equal(uv_in, uv_out)
def test_multi_files():
"""
Reading multiple files at once.
"""
uv_full = UVData()
uvfits_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
testfile1 = os.path.join(DATA_PATH, 'test/uv1')
testfile2 = os.path.join(DATA_PATH, 'test/uv2')
uvtest.checkWarnings(uv_full.read_uvfits, [uvfits_file], message='Telescope EVLA is not')
uvtest.checkWarnings(uv_full.unphase_to_drift, category=PendingDeprecationWarning,
message='The xyz array in ENU_from_ECEF is being '
'interpreted as (Npts, 3)')
uv1 = copy.deepcopy(uv_full)
uv2 = copy.deepcopy(uv_full)
uv1.select(freq_chans=np.arange(0, 32))
uv2.select(freq_chans=np.arange(32, 64))
uv1.write_miriad(testfile1, clobber=True)
uv2.write_miriad(testfile2, clobber=True)
uvtest.checkWarnings(uv1.read, [[testfile1, testfile2]], nwarnings=2,
message=['Telescope EVLA is not', 'Telescope EVLA is not'])
# Check history is correct, before replacing and doing a full object check
nt.assert_true(uvutils._check_histories(uv_full.history + ' Downselected to '
'specific frequencies using pyuvdata. '
'Combined data along frequency axis using'
' pyuvdata.', uv1.history))
uv1.history = uv_full.history
nt.assert_equal(uv1, uv_full)
def test_antpos_units():
"""
Read uvfits, write miriad. Check written antpos are in ns.
"""
uv = UVData()
uvfits_file = os.path.join(DATA_PATH, 'day2_TDEM0003_10s_norx_1src_1spw.uvfits')
testfile = os.path.join(DATA_PATH, 'test/uv_antpos_units')
uvtest.checkWarnings(uv.read_uvfits, [uvfits_file], message='Telescope EVLA is not')
uv.write_miriad(testfile, clobber=True)
auv = aipy_extracts.UV(testfile)
aantpos = auv['antpos'].reshape(3, -1).T * const.c.to('m/ns').value
aantpos = aantpos[uv.antenna_numbers, :]
aantpos = (uvutils.ECEF_from_rotECEF(aantpos, uv.telescope_location_lat_lon_alt[1])
- uv.telescope_location)
nt.assert_true(np.allclose(aantpos, uv.antenna_positions))
def test_readMiriadwriteMiraid_check_time_format():
"""
test time_array is converted properly from Miriad format
"""
# test read-in
fname = os.path.join(DATA_PATH, 'zen.2457698.40355.xx.HH.uvcA')
uvd = UVData()
uvd.read(fname)
uvd_t = uvd.time_array.min()
uvd_l = uvd.lst_array.min()
uv = aipy_extracts.UV(fname)
uv_t = uv['time'] + uv['inttime'] / (24 * 3600.) / 2
lat, lon, alt = uvd.telescope_location_lat_lon_alt
t1 = Time(uv['time'], format='jd', location=(lon, lat))
dt = TimeDelta(uv['inttime'] / 2, format='sec')
t2 = t1 + dt
delta_lst = t2.sidereal_time('apparent').radian - t1.sidereal_time('apparent').radian
uv_l = uv['lst'] + delta_lst
# assert starting time array and lst array are shifted by half integration
nt.assert_almost_equal(uvd_t, uv_t)
nt.assert_almost_equal(uvd_l, uv_l, delta=1e-8)
# test write-out
fout = os.path.join(DATA_PATH, 'ex_miriad')
uvd.write_miriad(fout, clobber=True)
# assert equal to original miriad time
uv2 = aipy_extracts.UV(fout)
nt.assert_almost_equal(uv['time'], uv2['time'])
nt.assert_almost_equal(uv['lst'], uv2['lst'])
if os.path.exists(fout):
shutil.rmtree(fout)