https://github.com/RadioAstronomySoftwareGroup/pyuvdata
Tip revision: c2cd6c296ccb7aa31f3333b27193baf5f547682a authored by Nicholas Kern on 04 June 2018, 15:34:20 UTC
moved combine_uvdata to uvdata.py and updated tests
moved combine_uvdata to uvdata.py and updated tests
Tip revision: c2cd6c2
uvh5.py
# -*- mode: python; coding: utf-8 -*-
# Copyright (c) 2018 The HERA Collaboration
# Licensed under the 2-clause BSD License
"""Class for reading and writing HDF5 files."""
import numpy as np
import os
from uvdata import UVData
import utils as uvutils
class UVH5(UVData):
"""
Defines an HDF5-specific subclass of UVData for reading and writing uvh5 files.
This class should not be interacted with directly, instead use the read_uvh5
and write_uvh5 methods on the UVData class.
"""
def read_uvh5(self, filename, run_check=True,
check_extra=True, run_check_acceptability=True):
"""
Read in data from a UVH5 file.
Args:
filename: The file name to read.
run_check: Option to check for the existence and proper shapes of
parameters after reading in the file. Default is True.
check_extra: Option to check optional parameters as well as required
ones. Default is True.
run_check_acceptability: Option to check acceptable range of the values of
parameters after reading in the file. Default is True.
Returns:
None
"""
import h5py
if not os.path.exists(filename):
raise(IOError, filename + ' not found')
# open hdf5 file for reading
f = h5py.File(filename, 'r')
# extract header information
header = f['/Header']
# get telescope information
latitude = header['latitude'].value
longitude = header['longitude'].value
altitude = header['altitude'].value
self.telescope_location_lat_lon_alt = (latitude, longitude, altitude)
self.instrument = header['instrument'].value
# get source information
self.object_name = header['object_name'].value
# set history appropriately
self.history = header['history'].value
if not uvutils.check_history_version(self.history, self.pyuvdata_version_str):
self.history += self.pyuvdata_version_str
# check for vis_units
if 'vis_units' in header:
self.vis_units = header['vis_units'].value
else:
# default to uncalibrated data
self.vis_units = 'UNCALIB'
# check for optional values
if 'dut1' in header:
self.dut1 = float(header['dut1'].value)
if 'earth_omega' in header:
self.earth_omega = float(header['earth_omega'].value)
if 'gst0' in header:
self.gst0 = float(header['gst0'].value)
if 'rdate' in header:
self.rdate = header['rdate'].value
if 'timesys' in header:
self.timesys = header['timesys'].value
if 'x_orientation' in header:
self.x_orientation = header['x_orientation'].value
if 'telescope_name' in header:
self.telescope_name = header['telescope_name'].value
if 'antenna_positions' in header:
self.antenna_positions = header['antenna_positions'].value
if 'antenna_diameters' in header:
self.antenna_diameters = header['antenna_diameters'].value
if 'uvplane_reference_time' in header:
self.uvplane_reference_time = int(header['uvplane_reference_time'].value)
# check for phasing information
self.phase_type = header['phase_type'].value
if self.phase_type == 'phased':
self.set_phased()
self.phase_center_ra = float(header['phase_center_ra'].value)
self.phase_center_dec = float(header['phase_center_dec'].value)
self.phase_center_epoch = float(header['phase_center_epoch'].value)
elif self.phase_type == 'drift':
self.set_drift()
self.zenith_dec = header['zenith_dec'].value
self.zenith_ra = header['zenith_ra'].value
else:
self.set_unknown_phase_type()
# get antenna arrays
# cast to native python int type
self.Nants_data = int(header['Nants_data'].value)
self.Nants_telescope = int(header['Nants_telescope'].value)
self.ant_1_array = header['ant_1_array'].value
self.ant_2_array = header['ant_2_array'].value
self.antenna_names = list(header['antenna_names'].value)
self.antenna_numbers = header['antenna_numbers'].value
# get baseline array
self.baseline_array = self.antnums_to_baseline(self.ant_1_array,
self.ant_2_array)
self.Nbls = len(np.unique(self.baseline_array))
# get uvw array
self.uvw_array = header['uvw_array'].value
# get time information
self.time_array = header['time_array'].value
self.integration_time = float(header['integration_time'].value)
self.lst_array = header['lst_array'].value
# get frequency information
self.freq_array = header['freq_array'].value
self.channel_width = float(header['channel_width'].value)
self.spw_array = header['spw_array'].value
# get polarization information
self.polarization_array = header['polarization_array'].value
# get data shapes
self.Nfreqs = int(header['Nfreqs'].value)
self.Npols = int(header['Npols'].value)
self.Ntimes = int(header['Ntimes'].value)
self.Nblts = int(header['Nblts'].value)
self.Nspws = int(header['Nspws'].value)
# get extra_keywords
if "extra_keywords" in header:
self.extra_keywords = {}
for key in header["extra_keywords"].keys():
self.extra_keywords[key] = header["extra_keywords"][key].value
# read data array
dgrp = f['/Data']
self.data_array = dgrp['visdata'].value
# read the flag array
self.flag_array = dgrp['flags'].value
# get sample information
self.nsample_array = dgrp['nsample_array'].value
# check if the object has all required UVParameters
if run_check:
self.check(check_extra=check_extra,
run_check_acceptability=run_check_acceptability)
return
def write_uvh5(self, filename, run_check=True, check_extra=True,
run_check_acceptability=True, clobber=False):
"""
Write a UVData object to a UVH5 file.
Args:
filename: The UVH5 file to write to.
run_check: Option to check for the existence and proper shapes of
parameters before writing the file. Default is True.
check_extra: Option to check optional parameters as well as required
ones. Default is True.
run_check_acceptability: Option to check acceptable range of the values of
parameters before writing the file. Default is True.
clobber: Option to overwrite the file if it already exists. Default is False.
Returns:
None
"""
import h5py
if run_check:
self.check(check_extra=check_extra,
run_check_acceptability=run_check_acceptability)
if os.path.exists(filename):
if clobber:
print "File exists; clobbering"
else:
raise ValueError("File exists; skipping")
f = h5py.File(filename, 'w')
header = f.create_group("Header")
# write out telescope and source information
header['latitude'] = self.telescope_location_lat_lon_alt[0]
header['longitude'] = self.telescope_location_lat_lon_alt[1]
header['altitude'] = self.telescope_location_lat_lon_alt[2]
header['telescope_name'] = self.telescope_name
header['instrument'] = self.instrument
header['object_name'] = self.object_name
# write out required UVParameters
header['Nants_data'] = self.Nants_data
header['Nants_telescope'] = self.Nants_telescope
header['Nbls'] = self.Nbls
header['Nblts'] = self.Nblts
header['Nfreqs'] = self.Nfreqs
header['Npols'] = self.Npols
header['Nspws'] = self.Nspws
header['Ntimes'] = self.Ntimes
header['antenna_names'] = self.antenna_names
header['antenna_numbers'] = self.antenna_numbers
header['uvw_array'] = self.uvw_array
header['vis_units'] = self.vis_units
header['channel_width'] = self.channel_width
header['time_array'] = self.time_array
header['freq_array'] = self.freq_array
header['integration_time'] = self.integration_time
header['lst_array'] = self.lst_array
header['polarization_array'] = self.polarization_array
header['spw_array'] = self.spw_array
header['ant_1_array'] = self.ant_1_array
header['ant_2_array'] = self.ant_2_array
# write out phasing information
header['phase_type'] = self.phase_type
if self.zenith_dec is not None:
header['zenith_dec'] = self.zenith_dec
if self.zenith_ra is not None:
header['zenith_ra'] = self.zenith_ra
if self.phase_center_ra is not None:
header['phase_center_ra'] = self.phase_center_ra
if self.phase_center_dec is not None:
header['phase_center_dec'] = self.phase_center_dec
if self.phase_center_epoch is not None:
header['phase_center_epoch'] = self.phase_center_epoch
# write out optional parameters
if self.antenna_positions is not None:
header['antenna_positions'] = self.antenna_positions
if self.dut1 is not None:
header['dut1'] = self.dut1
if self.earth_omega is not None:
header['earth_omega'] = self.earth_omega
if self.gst0 is not None:
header['gst0'] = self.gst0
if self.rdate is not None:
header['rdate'] = self.rdate
if self.timesys is not None:
header['timesys'] = self.timesys
if self.x_orientation is not None:
header['x_orientation'] = self.x_orientation
if self.antenna_diameters is not None:
header['antenna_diameters'] = self.antenna_diameters
if self.uvplane_reference_time is not None:
header['uvplane_reference_time'] = self.uvplane_reference_time
# write out extra keywords if it exists and has elements
if self.extra_keywords:
extra_keywords = header.create_group("extra_keywords")
for k in self.extra_keywords.keys():
extra_keywords[k] = self.extra_keywords[k]
# write out history
header['history'] = self.history
# write out data and flags
# TODO: add filter options for visdata (bitshuffle), flags, and
# nsample_array (lzf compression)
dgrp = f.create_group("Data")
visdata = dgrp.create_dataset("visdata", chunks=True,
data=self.data_array.astype(np.complex64))
flags = dgrp.create_dataset("flags", chunks=True,
data=self.flag_array)
nsample_array = dgrp.create_dataset("nsample_array", chunks=True,
data=self.nsample_array.astype(np.float32))
return