# -*- mode: python; coding: utf-8 -*- # Copyright (c) 2018 Radio Astronomy Software Group # Licensed under the 2-clause BSD License """Class for reading and writing UVH5 files.""" import numpy as np import os import warnings import h5py from ast import literal_eval from .uvdata import UVData from .. import utils as uvutils __all__ = ["UVH5"] # define HDF5 type for interpreting HERA correlator outputs (integers) as # complex numbers _hera_corr_dtype = np.dtype([("r", "= 2: if custom_dtype: indices = (blt_inds, freq_inds, pol_inds) _write_complex_astype(data_array, visdata_dset, indices) else: if self.future_array_shapes: visdata_dset[blt_inds, freq_inds, pol_inds] = data_array else: visdata_dset[blt_inds, :, freq_inds, pol_inds] = data_array if self.future_array_shapes: flags_dset[blt_inds, freq_inds, pol_inds] = flag_array nsamples_dset[blt_inds, freq_inds, pol_inds] = nsample_array else: flags_dset[blt_inds, :, freq_inds, pol_inds] = flag_array nsamples_dset[blt_inds, :, freq_inds, pol_inds] = nsample_array elif n_reg_spaced == 1: # figure out which axis is regularly spaced if blt_reg_spaced: for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_inds, freq_idx, pol_idx) if self.future_array_shapes: _write_complex_astype( data_array[:, ifreq, ipol], visdata_dset, indices, ) else: _write_complex_astype( data_array[:, :, ifreq, ipol], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_inds, freq_idx, pol_idx ] = data_array[:, ifreq, ipol] else: visdata_dset[ blt_inds, :, freq_idx, pol_idx ] = data_array[:, :, ifreq, ipol] if self.future_array_shapes: flags_dset[blt_inds, freq_idx, pol_idx] = flag_array[ :, ifreq, ipol ] nsamples_dset[ blt_inds, freq_idx, pol_idx ] = nsample_array[:, ifreq, ipol] else: flags_dset[blt_inds, :, freq_idx, pol_idx] = flag_array[ :, :, ifreq, ipol ] nsamples_dset[ blt_inds, :, freq_idx, pol_idx ] = nsample_array[:, :, ifreq, ipol] elif freq_reg_spaced: for iblt, blt_idx in enumerate(blt_inds): for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_idx, freq_inds, pol_idx) if self.future_array_shapes: _write_complex_astype( data_array[iblt, :, ipol], visdata_dset, indices ) else: _write_complex_astype( data_array[iblt, :, :, ipol], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_idx, freq_inds, pol_idx ] = data_array[iblt, :, ipol] else: visdata_dset[ blt_idx, :, freq_inds, pol_idx ] = data_array[iblt, :, :, ipol] if self.future_array_shapes: flags_dset[blt_idx, freq_inds, pol_idx] = flag_array[ iblt, :, ipol ] nsamples_dset[ blt_idx, freq_inds, pol_idx ] = nsample_array[iblt, :, ipol] else: flags_dset[blt_idx, :, freq_inds, pol_idx] = flag_array[ iblt, :, :, ipol ] nsamples_dset[ blt_idx, :, freq_inds, pol_idx ] = nsample_array[iblt, :, :, ipol] else: # pol_reg_spaced for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): if custom_dtype: indices = (blt_idx, freq_idx, pol_inds) if self.future_array_shapes: _write_complex_astype( data_array[iblt, ifreq, :], visdata_dset, indices, ) else: _write_complex_astype( data_array[iblt, :, ifreq, :], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_idx, freq_idx, pol_inds ] = data_array[iblt, ifreq, :] else: visdata_dset[ blt_idx, :, freq_idx, pol_inds ] = data_array[iblt, :, ifreq, :] if self.future_array_shapes: flags_dset[blt_idx, freq_idx, pol_inds] = flag_array[ iblt, ifreq, : ] nsamples_dset[ blt_idx, freq_idx, pol_inds ] = nsample_array[iblt, ifreq, :] else: flags_dset[blt_idx, :, freq_idx, pol_inds] = flag_array[ iblt, :, ifreq, : ] nsamples_dset[ blt_idx, :, freq_idx, pol_inds ] = nsample_array[iblt, :, ifreq, :] else: # all axes irregularly spaced # perform a triple loop -- probably very slow! for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_idx, freq_idx, pol_idx) if self.future_array_shapes: _write_complex_astype( data_array[iblt, ifreq, ipol], visdata_dset, indices, ) else: _write_complex_astype( data_array[iblt, :, ifreq, ipol], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_idx, freq_idx, pol_idx ] = data_array[iblt, ifreq, ipol] else: visdata_dset[ blt_idx, :, freq_idx, pol_idx ] = data_array[iblt, :, ifreq, ipol] if self.future_array_shapes: flags_dset[blt_idx, freq_idx, pol_idx] = flag_array[ iblt, ifreq, ipol ] nsamples_dset[ blt_idx, freq_idx, pol_idx ] = nsample_array[iblt, ifreq, ipol] else: flags_dset[blt_idx, :, freq_idx, pol_idx] = flag_array[ iblt, :, ifreq, ipol ] nsamples_dset[ blt_idx, :, freq_idx, pol_idx ] = nsample_array[iblt, :, ifreq, ipol] # append to history if desired if add_to_history is not None: history = np.string_(self.history) + np.string_(add_to_history) if "history" in f["Header"]: # erase dataset first b/c it has fixed-length string datatype del f["Header"]["history"] f["Header"]["history"] = np.string_(history) return