# -*- coding: utf-8 -*- """Class for reading and writing Miriad files. """ from __future__ import absolute_import, division, print_function from astropy import constants as const import os import shutil import numpy as np import copy import six import warnings from .uvdata import UVData from . import telescopes as uvtel from . import utils as uvutils import itertools from . import aipy_extracts class Miriad(UVData): """ Defines a Miriad-specific subclass of UVData for reading and writing Miriad files. This class should not be interacted with directly, instead use the read_miriad and write_miriad methods on the UVData class. """ def _pol_to_ind(self, pol): if self.polarization_array is None: raise ValueError("Can't index polarization {p} because " "polarization_array is not set".format(p=pol)) pol_ind = np.argwhere(self.polarization_array == pol) if len(pol_ind) != 1: raise ValueError("multiple matches for pol={pol} in " "polarization_array".format(pol=pol)) return pol_ind def read_miriad(self, filepath, antenna_nums=None, ant_str=None, bls=None, polarizations=None, time_range=None, read_data=True, phase_type=None, correct_lat_lon=True, run_check=True, check_extra=True, run_check_acceptability=True): """ Read in data from a miriad file. Args: filepath: The miriad file directory to read from. antenna_nums: The antennas numbers to read into the object. bls: A list of antenna number tuples (e.g. [(0,1), (3,2)]) or a list of baseline 3-tuples (e.g. [(0,1,'xx'), (2,3,'yy')]) specifying baselines to keep in the object. For length-2 tuples, the ordering of the numbers within the tuple does not matter. For length-3 tuples, the polarization string is in the order of the two antennas. If length-3 tuples are provided, the polarizations argument below must be None. ant_str: A string containing information about what kinds of visibility data to read-in. Can be 'auto', 'cross', 'all'. Cannot provide ant_str if antenna_nums and/or bls is not None. polarizations: List of polarization integers or strings to read-in. Ex: ['xx', 'yy', ...] time_range: len-2 list containing min and max range of times (Julian Date) to read-in. Ex: [2458115.20, 2458115.40] read_data: Read in the visibility and flag data. If set to false, only the basic header info and metadata (if read_metadata is True) will be read in. Results in an incompletely defined object (check will not pass). Default True. phase_type: Either 'drift' meaning zenith drift, 'phased' meaning the data are phased to a single RA/Dec or None and it will be guessed at based on the file. Default None. correct_lat_lon: flag -- that only matters if altitude is missing -- to update the latitude and longitude from the known_telescopes list run_check: Option to check for the existence and proper shapes of parameters after reading in the file. Default is True. check_extra: Option to check optional parameters as well as required ones. Default is True. run_check_acceptability: Option to check acceptable range of the values of parameters after reading in the file. Default is True. """ if not os.path.exists(filepath): raise IOError(filepath + ' not found') uv = aipy_extracts.UV(filepath) # load metadata (default_miriad_variables, other_miriad_variables, extra_miriad_variables, check_variables) = self.read_miriad_metadata(uv, correct_lat_lon=correct_lat_lon) # read through the file and get the data _source = uv['source'] # check source of initial visibility history_update_string = ' Downselected to specific ' n_selects = 0 # select on ant_str if provided if ant_str is not None: # type check assert isinstance(ant_str, (str, np.str)), "ant_str must be fed as a string" assert antenna_nums is None and bls is None, "ant_str must be None if antenna_nums or bls is not None" aipy_extracts.uv_selector(uv, ant_str) if ant_str != 'all': history_update_string += 'antenna pairs' n_selects += 1 # select on antenna_nums and/or bls using aipy.uv_selector if antenna_nums is not None or bls is not None: antpair_str = '' if antenna_nums is not None: # type check err_msg = "antenna_nums must be fed as a list of antenna number integers" assert isinstance(antenna_nums, (np.ndarray, list)), err_msg assert isinstance(antenna_nums[0], (int, np.integer)), err_msg # get all possible combinations antpairs = list(itertools.combinations_with_replacement(antenna_nums, 2)) # convert antenna numbers to string form required by aipy_extracts.uv_selector antpair_str += ','.join(['_'.join([str(a) for a in ap]) for ap in antpairs]) history_update_string += 'antennas' n_selects += 1 if bls is not None: if isinstance(bls, tuple) and (len(bls) == 2 or len(bls) == 3): bls = [bls] if not all(isinstance(item, tuple) for item in bls): raise ValueError( 'bls must be a list of tuples of antenna numbers (optionally with polarization).') if all([len(item) == 2 for item in bls]): if not all([isinstance(item[0], six.integer_types + (np.integer,)) for item in bls] + [isinstance(item[1], six.integer_types + (np.integer,)) for item in bls]): raise ValueError( 'bls must be a list of tuples of antenna numbers (optionally with polarization).') elif all([len(item) == 3 for item in bls]): if polarizations is not None: raise ValueError('Cannot provide length-3 tuples and also specify polarizations.') if not all([isinstance(item[2], str) for item in bls]): raise ValueError('The third element in each bl must be a polarization string') else: raise ValueError('bls tuples must be all length-2 or all length-3') # convert ant-pair tuples to string form required by aipy_extracts.uv_selector if len(antpair_str) > 0: antpair_str += ',' bl_str_list = [] bl_pols = set() for bl in bls: if bl[0] <= bl[1]: bl_str_list.append(str(bl[0]) + '_' + str(bl[1])) if len(bl) == 3: bl_pols.add(bl[2]) else: bl_str_list.append(str(bl[1]) + '_' + str(bl[0])) if len(bl) == 3: bl_pols.add(bl[2][::-1]) antpair_str += ','.join(bl_str_list) if len(bl_pols) > 0: polarizations = list(bl_pols) if n_selects > 0: history_update_string += ', baselines' else: history_update_string += 'baselines' n_selects += 1 aipy_extracts.uv_selector(uv, antpair_str) # select on time range if time_range is not None: # type check err_msg = "time_range must be a len-2 list of Julian Date floats, Ex: [2458115.2, 2458115.6]" assert isinstance(time_range, (list, np.ndarray)), err_msg assert len(time_range) == 2, err_msg assert np.array([isinstance(t, (float, np.float, np.float64)) for t in time_range]).all(), err_msg uv.select('time', time_range[0], time_range[1], include=True) if n_selects > 0: history_update_string += ', times' else: history_update_string += 'times' n_selects += 1 # select on polarizations if polarizations is not None: # type check err_msg = "pols must be a list of polarization strings or ints, Ex: ['xx', ...] or [-5, ...]" assert isinstance(polarizations, (list, np.ndarray)), err_msg assert np.array(map(lambda p: isinstance(p, (str, np.str, int, np.integer)), polarizations)).all(), err_msg # convert to pol integer if string polarizations = [p if isinstance(p, (int, np.integer)) else uvutils.polstr2num(p) for p in polarizations] # iterate through all possible pols and reject if not in pols pol_list = [] for p in np.arange(-8, 5): if p not in polarizations: uv.select('polarization', p, p, include=False) else: pol_list.append(p) # assert not empty assert len(pol_list) > 0, "No polarizations in data matched {}".format(polarizations) if n_selects > 0: history_update_string += ', polarizations' else: history_update_string += 'polarizations' n_selects += 1 history_update_string += ' using pyuvdata.' if n_selects > 0: self.history += history_update_string data_accumulator = {} pol_list = [] for (uvw, t, (i, j)), d, f in uv.all(raw=True): # control for the case of only a single spw not showing up in # the dimension # Note that the (i, j) tuple is calculated from a baseline number in # _miriad (see miriad_wrap.h). The i, j values are also adjusted by _miriad # to start at 0 rather than 1. if len(d.shape) == 1: d.shape = (1,) + d.shape self.Nspws = d.shape[0] self.spw_array = np.arange(self.Nspws) else: raise ValueError("Sorry. Files with more than one spectral " "window (spw) are not yet supported. A great " "project for the interested student!") try: cnt = uv['cnt'] except(KeyError): cnt = np.ones(d.shape, dtype=np.float) ra = uv['ra'] dec = uv['dec'] lst = uv['lst'] source = uv['source'] if source != _source: raise ValueError('This appears to be a multi source file, which is not supported.') else: _source = source # check extra variables for changes compared with initial value for extra_variable in check_variables.keys(): if type(check_variables[extra_variable]) == str: if uv[extra_variable] != check_variables[extra_variable]: check_variables.pop(extra_variable) else: if not np.allclose(uv[extra_variable], check_variables[extra_variable]): check_variables.pop(extra_variable) try: data_accumulator[uv['pol']].append([uvw, t, i, j, d, f, cnt, ra, dec]) except(KeyError): data_accumulator[uv['pol']] = [[uvw, t, i, j, d, f, cnt, ra, dec]] pol_list.append(uv['pol']) # NB: flag types in miriad are usually ints if len(list(data_accumulator.keys())) == 0: raise ValueError('No data is present, probably as a result of ' 'select on read that excludes all the data') for pol, data in data_accumulator.items(): data_accumulator[pol] = np.array(data) self.polarization_array = np.array(pol_list) if polarizations is None: # A select on read would make the header npols not match the pols in the data if len(self.polarization_array) != self.Npols: warnings.warn('npols={npols} but found {n} pols in data file'.format( npols=self.Npols, n=len(self.polarization_array))) self.Npols = len(pol_list) # makes a data_array (and flag_array) of zeroes to be filled in by # data values # any missing data will have zeros # use set to get the unique list of all times ever listed in the file # iterate over polarizations and all spectra (bls and times) in two # nested loops, then flatten into a single vector, then set # then list again. times = list(set( np.concatenate([[k[1] for k in d] for d in data_accumulator.values()]))) times = np.sort(times) ant_i_unique = list(set( np.concatenate([[k[2] for k in d] for d in data_accumulator.values()]))) ant_j_unique = list(set( np.concatenate([[k[3] for k in d] for d in data_accumulator.values()]))) sorted_unique_ants = sorted(list(set(ant_i_unique + ant_j_unique))) ant_i_unique = np.array(ant_i_unique) ant_j_unique = np.array(ant_j_unique) # Determine maximum digits needed to distinguish different values if sorted_unique_ants[-1] > 0: ndig_ant = np.ceil(np.log10(sorted_unique_ants[-1])).astype(int) + 1 else: ndig_ant = 1 # Be excessive in precision because we use the floating point values as dictionary keys later prec_t = - 2 * np.floor(np.log10(self._time_array.tols[-1])).astype(int) ndig_t = (np.ceil(np.log10(times[-1])).astype(int) + prec_t + 2) blts = [] for d in data_accumulator.values(): for k in d: blt = ["{1:.{0}f}".format(prec_t, k[1]).zfill(ndig_t), str(k[2]).zfill(ndig_ant), str(k[3]).zfill(ndig_ant)] blt = "_".join(blt) blts.append(blt) unique_blts = np.unique(np.array(blts)) reverse_inds = dict(zip(unique_blts, range(len(unique_blts)))) self.Nants_data = len(sorted_unique_ants) # load antennas and antenna positions using sorted unique ants list self._load_antpos(uv, sorted_unique_ants=sorted_unique_ants) # form up a grid which indexes time and baselines along the 'long' # axis of the visdata array tij_grid = np.array([list(map(float, x.split("_"))) for x in unique_blts]) t_grid, ant_i_grid, ant_j_grid = tij_grid.T # set the data sizes if antenna_nums is None and bls is None and ant_str is None and time_range is None: try: self.Nblts = uv['nblts'] if self.Nblts != len(t_grid): warnings.warn('Nblts does not match the number of unique blts in the data') self.Nblts = len(t_grid) except(KeyError): self.Nblts = len(t_grid) else: # The select on read will make the header nblts not match the number of unique blts self.Nblts = len(t_grid) if time_range is None: try: self.Ntimes = uv['ntimes'] if self.Ntimes != len(times): warnings.warn('Ntimes does not match the number of unique times in the data') self.Ntimes = len(times) except(KeyError): self.Ntimes = len(times) else: # The select on read will make the header ntimes not match the number of unique times self.Ntimes = len(times) self.time_array = t_grid self.ant_1_array = ant_i_grid.astype(int) self.ant_2_array = ant_j_grid.astype(int) self.baseline_array = self.antnums_to_baseline(ant_i_grid.astype(int), ant_j_grid.astype(int)) if antenna_nums is None and bls is None and ant_str is None: try: self.Nbls = uv['nbls'] if self.Nbls != len(np.unique(self.baseline_array)): warnings.warn('Nbls does not match the number of unique baselines in the data') self.Nbls = len(np.unique(self.baseline_array)) except(KeyError): self.Nbls = len(np.unique(self.baseline_array)) else: # The select on read will make the header nbls not match the number of unique bls self.Nbls = len(np.unique(self.baseline_array)) # slot the data into a grid self.data_array = np.zeros((self.Nblts, self.Nspws, self.Nfreqs, self.Npols), dtype=np.complex64) self.flag_array = np.ones(self.data_array.shape, dtype=np.bool) self.uvw_array = np.zeros((self.Nblts, 3)) # NOTE: Using our lst calculator, which uses astropy, # instead of _miriad values which come from pyephem. # The differences are of order 5 seconds. if self.telescope_location is not None: self.set_lsts_from_time_array() self.nsample_array = np.ones(self.data_array.shape, dtype=np.float) self.freq_array = (np.arange(self.Nfreqs) * self.channel_width + uv['sfreq'] * 1e9) # Tile freq_array to shape (Nspws, Nfreqs). # Currently does not actually support Nspws>1! self.freq_array = np.tile(self.freq_array, (self.Nspws, 1)) # Temporary arrays to hold polarization axis, which will be collapsed ra_pol_list = np.zeros((self.Nblts, self.Npols)) dec_pol_list = np.zeros((self.Nblts, self.Npols)) uvw_pol_list = np.zeros((self.Nblts, 3, self.Npols)) c_ns = const.c.to('m/ns').value for pol, data in data_accumulator.items(): pol_ind = self._pol_to_ind(pol) for ind, d in enumerate(data): blt = ["{1:.{0}f}".format(prec_t, d[1]).zfill(ndig_t), str(d[2]).zfill(ndig_ant), str(d[3]).zfill(ndig_ant)] blt = "_".join(blt) blt_index = reverse_inds[blt] self.data_array[blt_index, :, :, pol_ind] = d[4] self.flag_array[blt_index, :, :, pol_ind] = d[5] self.nsample_array[blt_index, :, :, pol_ind] = d[6] # because there are uvws/ra/dec for each pol, and one pol may not # have that visibility, we collapse along the polarization # axis but avoid any missing visbilities uvw = d[0] * c_ns uvw.shape = (1, 3) uvw_pol_list[blt_index, :, pol_ind] = uvw ra_pol_list[blt_index, pol_ind] = d[7] dec_pol_list[blt_index, pol_ind] = d[8] # Collapse pol axis for ra_list, dec_list, and uvw_list ra_list = np.zeros(self.Nblts) dec_list = np.zeros(self.Nblts) for blt_index in range(self.Nblts): test = ~np.all(self.flag_array[blt_index, :, :, :], axis=(0, 1)) good_pol = np.where(test)[0] if len(good_pol) == 1: # Only one good pol, use it self.uvw_array[blt_index, :] = uvw_pol_list[blt_index, :, good_pol] ra_list[blt_index] = ra_pol_list[blt_index, good_pol] dec_list[blt_index] = dec_pol_list[blt_index, good_pol] elif len(good_pol) > 1: # Multiple good pols, check for consistency. pyuvdata does not # support pol-dependent uvw, ra, or dec. if np.any(np.diff(uvw_pol_list[blt_index, :, good_pol], axis=0)): raise ValueError('uvw values are different by polarization.') else: self.uvw_array[blt_index, :] = uvw_pol_list[blt_index, :, good_pol[0]] if np.any(np.diff(ra_pol_list[blt_index, good_pol])): raise ValueError('ra values are different by polarization.') else: ra_list[blt_index] = ra_pol_list[blt_index, good_pol[0]] if np.any(np.diff(dec_pol_list[blt_index, good_pol])): raise ValueError('dec values are different by polarization.') else: dec_list[blt_index] = dec_pol_list[blt_index, good_pol[0]] else: # No good pols for this blt. Fill with first one. self.uvw_array[blt_index, :] = uvw_pol_list[blt_index, :, 0] ra_list[blt_index] = ra_pol_list[blt_index, 0] dec_list[blt_index] = dec_pol_list[blt_index, 0] # get unflagged blts blt_good = np.where(~np.all(self.flag_array, axis=(1, 2, 3))) single_ra = np.isclose(np.mean(np.diff(ra_list[blt_good])), 0.) single_time = np.isclose(np.mean(np.diff(self.time_array[blt_good])), 0.) # first check to see if the phase_type was specified. if phase_type is not None: if phase_type is 'phased': self.set_phased() elif phase_type is 'drift': self.set_drift() else: raise ValueError('The phase_type was not recognized. ' 'Set the phase_type to "drift" or "phased" to ' 'reflect the phasing status of the data') else: # check if ra is constant throughout file; if it is, # file is tracking if not, file is drift scanning # check if there's only one unflagged time if not single_time: if single_ra: self.set_phased() else: self.set_drift() else: # if there's only one time, checking for consistent RAs doesn't work. # instead check for the presence of an epoch variable, which isn't # really a good option, but at least it prevents crashes. if 'epoch' in uv.vartable.keys(): self.set_phased() else: self.set_drift() if self.phase_type == 'phased': # check that the RA values do not vary if not single_ra: raise ValueError('phase_type is "phased" but the RA values are varying.') self.phase_center_ra = float(ra_list[0]) self.phase_center_dec = float(dec_list[0]) self.phase_center_epoch = uv['epoch'] else: # check that the RA values are not constant (if more than one time present) if (single_ra and not single_time): raise ValueError('phase_type is "drift" but the RA values are constant.') self.zenith_ra = ra_list self.zenith_dec = dec_list try: self.set_telescope_params() except ValueError as ve: warnings.warn(str(ve)) # check if object has all required uv_properties set if run_check: self.check(check_extra=check_extra, run_check_acceptability=run_check_acceptability) def write_miriad(self, filepath, run_check=True, check_extra=True, run_check_acceptability=True, clobber=False, no_antnums=False): """ Write the data to a miriad file. Args: filename: The miriad file directory to write to. run_check: Option to check for the existence and proper shapes of parameters before writing the file. Default is True. check_extra: Option to check optional parameters as well as required ones. Default is True. run_check_acceptability: Option to check acceptable range of the values of parameters before writing the file. Default is True. clobber: Option to overwrite the filename if the file already exists. Default is False. no_antnums: Option to not write the antnums variable to the file. Should only be used for testing purposes. """ if run_check: self.check(check_extra=check_extra, run_check_acceptability=run_check_acceptability) # check for multiple spws if self.data_array.shape[1] > 1: raise ValueError('write_miriad currently only handles single spw files.') if os.path.exists(filepath): if clobber: print('File exists: clobbering') shutil.rmtree(filepath) else: raise ValueError('File exists: skipping') if self.Nfreqs > 1: freq_spacing = self.freq_array[0, 1:] - self.freq_array[0, :-1] if not np.isclose(np.min(freq_spacing), np.max(freq_spacing), rtol=self._freq_array.tols[0], atol=self._freq_array.tols[1]): raise ValueError('The frequencies are not evenly spaced (probably ' 'because of a select operation). The miriad format ' 'does not support unevenly spaced frequencies.') if not np.isclose(np.max(freq_spacing), self.channel_width, rtol=self._freq_array.tols[0], atol=self._freq_array.tols[1]): raise ValueError('The frequencies are separated by more than their ' 'channel width (probably because of a select operation). ' 'The miriad format does not support frequencies ' 'that are spaced by more than their channel width.') uv = aipy_extracts.UV(filepath, status='new') # initialize header variables uv._wrhd('obstype', 'mixed-auto-cross') # avoid inserting extra \n. if not self.history[-1] == '\n': self.history += '\n' uv._wrhd('history', self.history) # recognized miriad variables uv.add_var('nchan', 'i') uv['nchan'] = self.Nfreqs uv.add_var('npol', 'i') uv['npol'] = self.Npols uv.add_var('nspect', 'i') uv['nspect'] = self.Nspws uv.add_var('inttime', 'd') uv['inttime'] = self.integration_time uv.add_var('sdf', 'd') uv['sdf'] = self.channel_width / 1e9 # in GHz uv.add_var('source', 'a') uv['source'] = self.object_name uv.add_var('telescop', 'a') uv['telescop'] = self.telescope_name uv.add_var('latitud', 'd') uv['latitud'] = self.telescope_location_lat_lon_alt[0] uv.add_var('longitu', 'd') uv['longitu'] = self.telescope_location_lat_lon_alt[1] uv.add_var('nants', 'i') if self.x_orientation is not None: uv.add_var('xorient', 'a') uv['xorient'] = self.x_orientation if self.antenna_diameters is not None: if not np.allclose(self.antenna_diameters, self.antenna_diameters[0]): warnings.warn('Antenna diameters are not uniform, but miriad only' 'supports a single diameter. Skipping.') else: uv.add_var('antdiam', 'd') uv['antdiam'] = float(self.antenna_diameters[0]) # These are added to make files written by pyuvdata more "miriad correct", and # should be changed when support for more than one spectral window is added. # 'nschan' is the number of channels per spectral window, and 'ischan' is the # starting channel for each spectral window. Both should be arrays of size Nspws. # Also note that indexing in Miriad is 1-based uv.add_var('nschan', 'i') uv['nschan'] = self.Nfreqs uv.add_var('ischan', 'i') uv['ischan'] = 1 # Miriad has no way to keep track of antenna numbers, so the antenna # numbers are simply the index for each antenna in any array that # describes antenna attributes (e.g. antpos for the antenna_postions). # Therefore on write, nants (which gives the size of the antpos array) # needs to be increased to be the max value of antenna_numbers+1 and the # antpos array needs to be inflated with zeros at locations where we # don't have antenna information. These inflations need to be undone at # read. If the file was written by pyuvdata, then the variable antnums # will be present and we can use it, otherwise we need to test for zeros # in the antpos array and/or antennas with no visibilities. nants = np.max(self.antenna_numbers) + 1 uv['nants'] = nants if self.antenna_positions is not None: # Miriad wants antenna_positions to be in absolute coordinates # (not relative to array center) in a rotated ECEF frame where the # x-axis goes through the local meridian. rel_ecef_antpos = np.zeros((nants, 3), dtype=self.antenna_positions.dtype) for ai, num in enumerate(self.antenna_numbers): rel_ecef_antpos[num, :] = self.antenna_positions[ai, :] # find zeros so antpos can be zeroed there too antpos_length = np.sqrt(np.sum(np.abs(rel_ecef_antpos)**2, axis=1)) ecef_antpos = rel_ecef_antpos + self.telescope_location longitude = self.telescope_location_lat_lon_alt[1] antpos = uvutils.rotECEF_from_ECEF(ecef_antpos, longitude) # zero out bad locations (these are checked on read) antpos[np.where(antpos_length == 0), :] = [0, 0, 0] uv.add_var('antpos', 'd') # Miriad stores antpos values in units of ns, pyuvdata uses meters. uv['antpos'] = antpos.T.flatten() / const.c.to('m/ns').value uv.add_var('sfreq', 'd') uv['sfreq'] = self.freq_array[0, 0] / 1e9 # first spw; in GHz if self.phase_type == 'phased': uv.add_var('epoch', 'r') uv['epoch'] = self.phase_center_epoch # required pyuvdata variables that are not recognized miriad variables uv.add_var('ntimes', 'i') uv['ntimes'] = self.Ntimes uv.add_var('nbls', 'i') uv['nbls'] = self.Nbls uv.add_var('nblts', 'i') uv['nblts'] = self.Nblts uv.add_var('visunits', 'a') uv['visunits'] = self.vis_units uv.add_var('instrume', 'a') uv['instrume'] = self.instrument uv.add_var('altitude', 'd') uv['altitude'] = self.telescope_location_lat_lon_alt[2] # optional pyuvdata variables that are not recognized miriad variables if self.dut1 is not None: uv.add_var('dut1', 'd') uv['dut1'] = self.dut1 if self.earth_omega is not None: uv.add_var('degpdy', 'd') uv['degpdy'] = self.earth_omega if self.gst0 is not None: uv.add_var('gst0', 'd') uv['gst0'] = self.gst0 if self.rdate is not None: uv.add_var('rdate', 'a') uv['rdate'] = self.rdate if self.timesys is not None: uv.add_var('timesys', 'a') uv['timesys'] = self.timesys # other extra keywords # set up dictionaries to map common python types to miriad types # NB: arrays/lists/dicts could potentially be written as strings or 1D # vectors. This is not supported at present! # NB: complex numbers *should* be supportable, but are not currently # supported due to unexplained errors in _miriad and/or its underlying libraries numpy_types = {np.int8: int, np.int16: int, np.int32: int, np.int64: int, np.uint8: int, np.uint16: int, np.uint32: int, np.uint64: int, np.float16: float, np.float32: float, np.float64: float, np.float128: float, } types = {str: 'a', int: 'i', float: 'd', bool: 'a', # booleans are stored as strings and changed back on read } for key, value in self.extra_keywords.items(): if type(value) in numpy_types.keys(): if numpy_types[type(value)] == int: value = int(value) elif numpy_types[type(value)] == float: value = float(value) elif type(value) == bool: value = str(value) elif type(value) not in types.keys(): raise TypeError('Extra keyword {keyword} is of {keytype}. ' 'Only strings and real numbers are ' 'supported in miriad.'.format(keyword=key, keytype=type(value))) if len(str(key)) > 8: warnings.warn('key {key} in extra_keywords is longer than 8 ' 'characters. It will be truncated to 8 as required ' 'by the miriad file format.'.format(key=key)) uvkeyname = str(key)[:8] # name must be string, max 8 letters typestring = types[type(value)] uv.add_var(uvkeyname, typestring) uv[uvkeyname] = value if not no_antnums: # Add in the antenna_numbers so we have them if we read this file back in. # For some reason Miriad doesn't handle an array of integers properly, # so convert to floats here and integers on read. uv.add_var('antnums', 'd') uv['antnums'] = self.antenna_numbers.astype(np.float64) # antenna names is a foreign concept in miriad but required in other formats. # Miriad can't handle arrays of strings, so we make it into one long # comma-separated string and convert back on read. ant_name_str = '[' + ', '.join(self.antenna_names) + ']' uv.add_var('antnames', 'a') uv['antnames'] = ant_name_str # variables that can get updated with every visibility uv.add_var('pol', 'i') uv.add_var('lst', 'd') uv.add_var('cnt', 'd') uv.add_var('ra', 'd') uv.add_var('dec', 'd') # write data c_ns = const.c.to('m/ns').value for viscnt, blt in enumerate(self.data_array): uvw = (self.uvw_array[viscnt] / c_ns).astype(np.double) # NOTE issue 50 on conjugation t = self.time_array[viscnt] i = self.ant_1_array[viscnt] j = self.ant_2_array[viscnt] uv['lst'] = self.lst_array[viscnt] if self.phase_type == 'phased': uv['ra'] = self.phase_center_ra uv['dec'] = self.phase_center_dec elif self.phase_type == 'drift': uv['ra'] = self.zenith_ra[viscnt] uv['dec'] = self.zenith_dec[viscnt] else: raise ValueError('The phasing type of the data is unknown. ' 'Set the phase_type to "drift" or "phased" to ' 'reflect the phasing status of the data') # NOTE only writing spw 0, not supporting multiple spws for write for polcnt, pol in enumerate(self.polarization_array): uv['pol'] = pol.astype(np.int) uv['cnt'] = self.nsample_array[viscnt, 0, :, polcnt].astype(np.double) data = self.data_array[viscnt, 0, :, polcnt] flags = self.flag_array[viscnt, 0, :, polcnt] if i > j: i, j, data = j, i, np.conjugate(data) preamble = (uvw, t, (i, j)) uv.write(preamble, data, flags) def read_miriad_metadata(self, filename, correct_lat_lon=True): """ Read in metadata (parameter info) but not data from a miriad file. Args: filename : The miriad file to read correct_lat_lon: flag -- that only matters if altitude is missing -- to update the latitude and longitude from the known_telescopes list Returns: default_miriad_variables: list of default miriad variables other_miriad_variables: list of other miriad variables extra_miriad_variables: list of extra, non-standard variables check_variables: dict of extra miriad variables """ # check for data array if self.data_array is not None: raise ValueError('data_array is already defined, cannot read metadata') # get UV descriptor if isinstance(filename, (str, np.str)): uv = aipy_extracts.UV(filename) elif isinstance(filename, aipy_extracts.UV): uv = filename # load miriad variables (default_miriad_variables, other_miriad_variables, extra_miriad_variables) = self._load_miriad_variables(uv) # dict of extra variables check_variables = {} for extra_variable in extra_miriad_variables: check_variables[extra_variable] = uv[extra_variable] # keep all single valued extra_variables as extra_keywords for key in check_variables.keys(): if type(check_variables[key]) == str: value = check_variables[key].replace('\x00', '') # check for booleans encoded as strings if value == 'True': value = True elif value == 'False': value = False self.extra_keywords[key] = value else: self.extra_keywords[key] = check_variables[key] # Check for items in itemtable to put into extra_keywords # These will end up as variables in written files, but is internally consistent. for key in uv.items(): # A few items that are not needed, we read elsewhere, or is not supported # when downselecting, so we don't read here. if key not in ['vartable', 'history', 'obstype'] and key not in other_miriad_variables: if type(uv[key]) == str: value = uv[key].replace('\x00', '') value = uv[key].replace('\x01', '') if value == 'True': value = True elif value == 'False': value = False self.extra_keywords[key] = value else: self.extra_keywords[key] = uv[key] # load telescope coords self._load_telescope_coords(uv, correct_lat_lon=correct_lat_lon) # load antenna positions self._load_antpos(uv) return (default_miriad_variables, other_miriad_variables, extra_miriad_variables, check_variables) def _load_miriad_variables(self, uv): """ Load miriad variables from an aipy.miriad UV descriptor. Args: uv: aipy.miriad.UV instance Returns: default_miriad_variables: list of default miriad variables other_miriad_varialbes: list of other miriad varialbes extra_miriad_variables: list of extra, non-standard variables """ # list of miriad variables always read # NB: this includes variables in try/except (i.e. not all variables are # necessarily present in the miriad file) default_miriad_variables = ['nchan', 'npol', 'inttime', 'sdf', 'source', 'telescop', 'latitud', 'longitu', 'altitude', 'history', 'visunits', 'instrume', 'dut1', 'gst0', 'rdate', 'timesys', 'xorient', 'cnt', 'ra', 'dec', 'lst', 'pol', 'nants', 'antnames', 'nblts', 'ntimes', 'nbls', 'sfreq', 'epoch', 'antpos', 'antnums', 'degpdy', 'antdiam', ] # list of miriad variables not read, but also not interesting # NB: nspect (I think) is number of spectral windows, will want one day # NB: xyphase & xyamp are "On-line X Y phase/amplitude measurements" which we may want in # a calibration object some day # NB: systemp, xtsys & ytsys are "System temperatures of the antenna/X/Y feed" # which we may want in a calibration object some day # NB: freqs, leakage and bandpass may be part of a calibration object some day other_miriad_variables = ['nspect', 'obsdec', 'vsource', 'ischan', 'restfreq', 'nschan', 'corr', 'freq', 'freqs', 'leakage', 'bandpass', 'tscale', 'coord', 'veldop', 'time', 'obsra', 'operator', 'version', 'axismax', 'axisrms', 'xyphase', 'xyamp', 'systemp', 'xtsys', 'ytsys', 'baseline'] extra_miriad_variables = [] for variable in uv.vars(): if (variable not in default_miriad_variables and variable not in other_miriad_variables): extra_miriad_variables.append(variable) miriad_header_data = {'Nfreqs': 'nchan', 'Npols': 'npol', 'integration_time': 'inttime', 'channel_width': 'sdf', # in Ghz! 'object_name': 'source', 'telescope_name': 'telescop' } for item in miriad_header_data: if isinstance(uv[miriad_header_data[item]], str): header_value = uv[miriad_header_data[item]].replace('\x00', '') else: header_value = uv[miriad_header_data[item]] setattr(self, item, header_value) self.history = uv['history'] if not uvutils.check_history_version(self.history, self.pyuvdata_version_str): self.history += self.pyuvdata_version_str self.channel_width *= 1e9 # change from GHz to Hz # check for pyuvdata variables that are not recognized miriad variables if 'visunits' in uv.vartable.keys(): self.vis_units = uv['visunits'].replace('\x00', '') else: self.vis_units = 'UNCALIB' # assume no calibration if 'instrume' in uv.vartable.keys(): self.instrument = uv['instrume'].replace('\x00', '') else: self.instrument = self.telescope_name # set instrument = telescope if 'dut1' in uv.vartable.keys(): self.dut1 = uv['dut1'] if 'degpdy' in uv.vartable.keys(): self.earth_omega = uv['degpdy'] if 'gst0' in uv.vartable.keys(): self.gst0 = uv['gst0'] if 'rdate' in uv.vartable.keys(): self.rdate = uv['rdate'].replace('\x00', '') if 'timesys' in uv.vartable.keys(): self.timesys = uv['timesys'].replace('\x00', '') if 'xorient' in uv.vartable.keys(): self.x_orientation = uv['xorient'].replace('\x00', '') return default_miriad_variables, other_miriad_variables, extra_miriad_variables def _load_telescope_coords(self, uv, correct_lat_lon=True): """ Load telescope lat, lon alt coordinates from aipy.miriad UV descriptor. Args: uv: aipy.miriad.UV instance correct_lat_lon: flag -- that only matters if altitude is missing -- to update the latitude and longitude from the known_telescopes list """ # check if telescope name is present if self.telescope_name is None: self._load_miriad_variables(uv) latitude = uv['latitud'] # in units of radians longitude = uv['longitu'] try: altitude = uv['altitude'] self.telescope_location_lat_lon_alt = (latitude, longitude, altitude) except(KeyError): # get info from known telescopes. Check to make sure the lat/lon values match reasonably well telescope_obj = uvtel.get_telescope(self.telescope_name) if telescope_obj is not False: tol = 2 * np.pi * 1e-3 / (60.0 * 60.0 * 24.0) # 1mas in radians lat_close = np.isclose(telescope_obj.telescope_location_lat_lon_alt[0], latitude, rtol=0, atol=tol) lon_close = np.isclose(telescope_obj.telescope_location_lat_lon_alt[1], longitude, rtol=0, atol=tol) if correct_lat_lon: self.telescope_location_lat_lon_alt = telescope_obj.telescope_location_lat_lon_alt else: self.telescope_location_lat_lon_alt = (latitude, longitude, telescope_obj.telescope_location_lat_lon_alt[2]) if lat_close and lon_close: if correct_lat_lon: warnings.warn('Altitude is not present in Miriad file, ' 'using known location values for ' '{telescope_name}.'.format(telescope_name=telescope_obj.telescope_name)) else: warnings.warn('Altitude is not present in Miriad file, ' 'using known location altitude value ' 'for {telescope_name} and lat/lon from ' 'file.'.format(telescope_name=telescope_obj.telescope_name)) else: warn_string = ('Altitude is not present in file ') if not lat_close and not lon_close: warn_string = warn_string + 'and latitude and longitude values do not match values ' else: if not lat_close: warn_string = warn_string + 'and latitude value does not match value ' else: warn_string = warn_string + 'and longitude value does not match value ' if correct_lat_lon: warn_string = (warn_string + 'for {telescope_name} in known ' 'telescopes. Using values from known ' 'telescopes.'.format(telescope_name=telescope_obj.telescope_name)) warnings.warn(warn_string) else: warn_string = (warn_string + 'for {telescope_name} in known ' 'telescopes. Using altitude value from known ' 'telescopes and lat/lon from ' 'file.'.format(telescope_name=telescope_obj.telescope_name)) warnings.warn(warn_string) else: warnings.warn('Altitude is not present in Miriad file, and ' 'telescope {telescope_name} is not in ' 'known_telescopes. Telescope location will be ' 'set using antenna positions.' .format(telescope_name=self.telescope_name)) def _load_antpos(self, uv, sorted_unique_ants=[], correct_lat_lon=True): """ Load antennas and their positions from a Miriad UV descriptor. Args: uv: aipy.miriad.UV instance. sorted_unique_ants: list of unique antennas correct_lat_lon: flag -- that only matters if altitude is missing -- to update the latitude and longitude from the known_telescopes list """ # check if telescope coords exist if self.telescope_location_lat_lon_alt is None: self._load_telescope_coords(uv, correct_lat_lon=correct_lat_lon) latitude = uv['latitud'] # in units of radians longitude = uv['longitu'] # Miriad has no way to keep track of antenna numbers, so the antenna # numbers are simply the index for each antenna in any array that # describes antenna attributes (e.g. antpos for the antenna_postions). # Therefore on write, nants (which gives the size of the antpos array) # needs to be increased to be the max value of antenna_numbers+1 and the # antpos array needs to be inflated with zeros at locations where we # don't have antenna information. These inflations need to be undone at # read. If the file was written by pyuvdata, then the variable antnums # will be present and we can use it, otherwise we need to test for zeros # in the antpos array and/or antennas with no visibilities. try: # The antnums variable will only exist if the file was written with pyuvdata. # For some reason Miriad doesn't handle an array of integers properly, # so we convert to floats on write and back here self.antenna_numbers = uv['antnums'].astype(int) self.Nants_telescope = len(self.antenna_numbers) except(KeyError): self.antenna_numbers = None self.Nants_telescope = None nants = uv['nants'] try: # Miriad stores antpos values in units of ns, pyuvdata uses meters. antpos = uv['antpos'].reshape(3, nants).T * const.c.to('m/ns').value # first figure out what are good antenna positions so we can only # use the non-zero ones to evaluate position information antpos_length = np.sqrt(np.sum(np.abs(antpos)**2, axis=1)) good_antpos = np.where(antpos_length > 0)[0] mean_antpos_length = np.mean(antpos_length[good_antpos]) if mean_antpos_length > 6.35e6 and mean_antpos_length < 6.39e6: absolute_positions = True else: absolute_positions = False # Miriad stores antpos values in a rotated ECEF coordinate system # where the x-axis goes through the local meridan. Need to convert # these positions back to standard ECEF and if they are absolute positions, # subtract off the telescope position to make them relative to the array center. ecef_antpos = uvutils.ECEF_from_rotECEF(antpos, longitude) if self.telescope_location is not None: if absolute_positions: rel_ecef_antpos = ecef_antpos - self.telescope_location # maintain zeros because they mark missing data rel_ecef_antpos[np.where(antpos_length == 0)[0]] = ecef_antpos[np.where(antpos_length == 0)[0]] else: rel_ecef_antpos = ecef_antpos else: self.telescope_location = np.mean(ecef_antpos[good_antpos, :], axis=0) valid_location = self._telescope_location.check_acceptability()[0] # check to see if this could be a valid telescope_location if valid_location: mean_lat, mean_lon, mean_alt = self.telescope_location_lat_lon_alt tol = 2 * np.pi / (60.0 * 60.0 * 24.0) # 1 arcsecond in radians mean_lat_close = np.isclose(mean_lat, latitude, rtol=0, atol=tol) mean_lon_close = np.isclose(mean_lon, longitude, rtol=0, atol=tol) if mean_lat_close and mean_lon_close: # this looks like a valid telescope_location, and the # mean antenna lat & lon values are close. Set the # telescope_location using the file lat/lons and the mean alt. # Then subtract it off of the antenna positions warnings.warn('Telescope location is not set, but antenna ' 'positions are present. Mean antenna latitude and ' 'longitude values match file values, so ' 'telescope_position will be set using the ' 'mean of the antenna altitudes') self.telescope_location_lat_lon_alt = (latitude, longitude, mean_alt) rel_ecef_antpos = ecef_antpos - self.telescope_location else: # this looks like a valid telescope_location, but the # mean antenna lat & lon values are not close. Set the # telescope_location using the file lat/lons at sea level. # Then subtract it off of the antenna positions self.telescope_location_lat_lon_alt = (latitude, longitude, 0) warn_string = ('Telescope location is set at sealevel at ' 'the file lat/lon coordinates. Antenna ' 'positions are present, but the mean ' 'antenna ') rel_ecef_antpos = ecef_antpos - self.telescope_location if not mean_lat_close and not mean_lon_close: warn_string += ('latitude and longitude values do not ' 'match file values so they are not used ' 'for altiude.') elif not mean_lat_close: warn_string += ('latitude value does not ' 'match file values so they are not used ' 'for altiude.') else: warn_string += ('longitude value does not ' 'match file values so they are not used ' 'for altiude.') warnings.warn(warn_string) else: # This does not give a valid telescope_location. Instead # calculate it from the file lat/lon and sea level for altiude self.telescope_location_lat_lon_alt = (latitude, longitude, 0) warn_string = ('Telescope location is set at sealevel at ' 'the file lat/lon coordinates. Antenna ' 'positions are present, but the mean ' 'antenna ') warn_string += ('position does not give a ' 'telescope_location on the surface of the ' 'earth.') if absolute_positions: rel_ecef_antpos = ecef_antpos - self.telescope_location else: warn_string += (' Antenna positions do not appear to be ' 'on the surface of the earth and will be treated ' 'as relative.') rel_ecef_antpos = ecef_antpos warnings.warn(warn_string) if self.Nants_telescope is not None: # in this case there is an antnums variable # (meaning that the file was written with pyuvdata), so we'll use it if nants == self.Nants_telescope: # no inflation, so just use the positions self.antenna_positions = rel_ecef_antpos else: # there is some inflation, just use the ones that appear in antnums self.antenna_positions = np.zeros((self.Nants_telescope, 3), dtype=antpos.dtype) for ai, num in enumerate(self.antenna_numbers): self.antenna_positions[ai, :] = rel_ecef_antpos[num, :] else: # there is no antnums variable (meaning that this file was not # written by pyuvdata), so we test for antennas with non-zero # positions and/or that appear in the visibility data # (meaning that they have entries in ant_1_array or ant_2_array) antpos_length = np.sqrt(np.sum(np.abs(antpos)**2, axis=1)) good_antpos = np.where(antpos_length > 0)[0] # take the union of the antennas with good positions (good_antpos) # and the antennas that have visisbilities (sorted_unique_ants) # if there are antennas with visibilities but zeroed positions we issue a warning below ants_use = set(good_antpos).union(sorted_unique_ants) # ants_use are the antennas we'll keep track of in the UVData # object, so they dictate Nants_telescope self.Nants_telescope = len(ants_use) self.antenna_numbers = np.array(list(ants_use)) self.antenna_positions = np.zeros((self.Nants_telescope, 3), dtype=rel_ecef_antpos.dtype) for ai, num in enumerate(self.antenna_numbers): if antpos_length[num] == 0: warnings.warn('antenna number {n} has visibilities ' 'associated with it, but it has a position' ' of (0,0,0)'.format(n=num)) else: # leave bad locations as zeros to make them obvious self.antenna_positions[ai, :] = rel_ecef_antpos[num, :] except(KeyError): # there is no antpos variable warnings.warn('Antenna positions are not present in the file.') self.antenna_positions = None if self.antenna_numbers is None: # there are no antenna_numbers or antenna_positions, so just use # the antennas present in the visibilities # (Nants_data will therefore match Nants_telescope) self.antenna_numbers = np.array(sorted_unique_ants) self.Nants_telescope = len(self.antenna_numbers) # antenna names is a foreign concept in miriad but required in other formats. try: # Here we deal with the way pyuvdata tacks it on to keep the # name information if we have it: # make it into one long comma-separated string ant_name_var = uv['antnames'] if isinstance(ant_name_var, str): ant_name_str = ant_name_var.replace('\x00', '') ant_name_list = ant_name_str[1:-1].split(', ') self.antenna_names = ant_name_list else: # Backwards compatibility for old way of storing antenna_names. # This is a horrible hack to save & recover antenna_names array. # Miriad can't handle arrays of strings and AIPY use to not handle # long enough single strings to put them all into one string # so we convert them into hex values and then into floats on # write and convert back to strings here warnings.warn('This file was written with an old version of ' 'pyuvdata, which has been deprecated. Rewrite this ' 'file with write_miriad to ensure future ' 'compatibility') ant_name_flt = uv['antnames'] ant_name_list = [] for elem in ant_name_flt: an = '%x' % elem.astype(np.int64) # python2 in try, python3 in except try: an = an.decode('hex') except AttributeError: an = bytes.fromhex(an).decode() ant_name_list.append(an) self.antenna_names = ant_name_list except(KeyError): self.antenna_names = self.antenna_numbers.astype(str).tolist() # check for antenna diameters try: self.antenna_diameters = uv['antdiam'] except(KeyError): # backwards compatibility for when keyword was 'diameter' try: self.antenna_diameters = uv['diameter'] # if we find it, we need to remove it from extra_keywords to keep from writing it out self.extra_keywords.pop('diameter') except(KeyError): pass if self.antenna_diameters is not None: self.antenna_diameters = (self.antenna_diameters * np.ones(self.Nants_telescope, dtype=np.float))