# -*- mode: python; coding: utf-8 -*- # Copyright (c) 2018 Radio Astronomy Software Group # Licensed under the 2-clause BSD License """Class for reading and writing UVH5 files.""" import numpy as np import os import warnings import h5py import json from .uvdata import UVData from .. import utils as uvutils __all__ = ["UVH5"] # define HDF5 type for interpreting HERA correlator outputs (integers) as # complex numbers _hera_corr_dtype = np.dtype([("r", "= 2: if custom_dtype: indices = (blt_inds, freq_inds, pol_inds) _write_complex_astype(data_array, visdata_dset, indices) else: if self.future_array_shapes: visdata_dset[blt_inds, freq_inds, pol_inds] = data_array else: visdata_dset[blt_inds, :, freq_inds, pol_inds] = data_array if self.future_array_shapes: flags_dset[blt_inds, freq_inds, pol_inds] = flag_array nsamples_dset[blt_inds, freq_inds, pol_inds] = nsample_array else: flags_dset[blt_inds, :, freq_inds, pol_inds] = flag_array nsamples_dset[blt_inds, :, freq_inds, pol_inds] = nsample_array elif n_reg_spaced == 1: # figure out which axis is regularly spaced if blt_reg_spaced: for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_inds, freq_idx, pol_idx) if self.future_array_shapes: _write_complex_astype( data_array[:, ifreq, ipol], visdata_dset, indices, ) else: _write_complex_astype( data_array[:, :, ifreq, ipol], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_inds, freq_idx, pol_idx ] = data_array[:, ifreq, ipol] else: visdata_dset[ blt_inds, :, freq_idx, pol_idx ] = data_array[:, :, ifreq, ipol] if self.future_array_shapes: flags_dset[blt_inds, freq_idx, pol_idx] = flag_array[ :, ifreq, ipol ] nsamples_dset[ blt_inds, freq_idx, pol_idx ] = nsample_array[:, ifreq, ipol] else: flags_dset[blt_inds, :, freq_idx, pol_idx] = flag_array[ :, :, ifreq, ipol ] nsamples_dset[ blt_inds, :, freq_idx, pol_idx ] = nsample_array[:, :, ifreq, ipol] elif freq_reg_spaced: for iblt, blt_idx in enumerate(blt_inds): for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_idx, freq_inds, pol_idx) if self.future_array_shapes: _write_complex_astype( data_array[iblt, :, ipol], visdata_dset, indices ) else: _write_complex_astype( data_array[iblt, :, :, ipol], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_idx, freq_inds, pol_idx ] = data_array[iblt, :, ipol] else: visdata_dset[ blt_idx, :, freq_inds, pol_idx ] = data_array[iblt, :, :, ipol] if self.future_array_shapes: flags_dset[blt_idx, freq_inds, pol_idx] = flag_array[ iblt, :, ipol ] nsamples_dset[ blt_idx, freq_inds, pol_idx ] = nsample_array[iblt, :, ipol] else: flags_dset[blt_idx, :, freq_inds, pol_idx] = flag_array[ iblt, :, :, ipol ] nsamples_dset[ blt_idx, :, freq_inds, pol_idx ] = nsample_array[iblt, :, :, ipol] else: # pol_reg_spaced for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): if custom_dtype: indices = (blt_idx, freq_idx, pol_inds) if self.future_array_shapes: _write_complex_astype( data_array[iblt, ifreq, :], visdata_dset, indices, ) else: _write_complex_astype( data_array[iblt, :, ifreq, :], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_idx, freq_idx, pol_inds ] = data_array[iblt, ifreq, :] else: visdata_dset[ blt_idx, :, freq_idx, pol_inds ] = data_array[iblt, :, ifreq, :] if self.future_array_shapes: flags_dset[blt_idx, freq_idx, pol_inds] = flag_array[ iblt, ifreq, : ] nsamples_dset[ blt_idx, freq_idx, pol_inds ] = nsample_array[iblt, ifreq, :] else: flags_dset[blt_idx, :, freq_idx, pol_inds] = flag_array[ iblt, :, ifreq, : ] nsamples_dset[ blt_idx, :, freq_idx, pol_inds ] = nsample_array[iblt, :, ifreq, :] else: # all axes irregularly spaced # perform a triple loop -- probably very slow! for iblt, blt_idx in enumerate(blt_inds): for ifreq, freq_idx in enumerate(freq_inds): for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_idx, freq_idx, pol_idx) if self.future_array_shapes: _write_complex_astype( data_array[iblt, ifreq, ipol], visdata_dset, indices, ) else: _write_complex_astype( data_array[iblt, :, ifreq, ipol], visdata_dset, indices, ) else: if self.future_array_shapes: visdata_dset[ blt_idx, freq_idx, pol_idx ] = data_array[iblt, ifreq, ipol] else: visdata_dset[ blt_idx, :, freq_idx, pol_idx ] = data_array[iblt, :, ifreq, ipol] if self.future_array_shapes: flags_dset[blt_idx, freq_idx, pol_idx] = flag_array[ iblt, ifreq, ipol ] nsamples_dset[ blt_idx, freq_idx, pol_idx ] = nsample_array[iblt, ifreq, ipol] else: flags_dset[blt_idx, :, freq_idx, pol_idx] = flag_array[ iblt, :, ifreq, ipol ] nsamples_dset[ blt_idx, :, freq_idx, pol_idx ] = nsample_array[iblt, :, ifreq, ipol] # append to history if desired if add_to_history is not None: history = np.string_(self.history) + np.string_(add_to_history) if "history" in f["Header"]: # erase dataset first b/c it has fixed-length string datatype del f["Header"]["history"] f["Header"]["history"] = np.string_(history) return