"""Class for reading and writing Miriad files.""" from astropy import constants as const import os import shutil import numpy as np import warnings import aipy as a from uvdata import UVData import telescopes as uvtel class Miriad(UVData): """ Defines a Miriad-specific subclass of UVData for reading and writing Miriad files. This class should not be interacted with directly, instead use the read_miriad and write_miriad methods on the UVData class. """ def _pol_to_ind(self, pol): if self.polarization_array is None: raise(ValueError, "Can't index polarization {p} because " "polarization_array is not set".format(p=pol)) pol_ind = np.argwhere(self.polarization_array == pol) if len(pol_ind) != 1: raise(ValueError, "multiple matches for pol={pol} in " "polarization_array".format(pol=pol)) return pol_ind def read_miriad(self, filepath, correct_lat_lon=True, run_check=True, run_check_acceptability=True): """ Read in data from a miriad file. Args: filepath: The miriad file directory to read from. correct_lat_lon: flag -- that only matters if altitude is missing -- to update the latitude and longitude from the known_telescopes list run_check: Option to check for the existence and proper shapes of required parameters after reading in the file. Default is True. run_check_acceptability: Option to check acceptable range of the values of required parameters after reading in the file. Default is True. """ if not os.path.exists(filepath): raise(IOError, filepath + ' not found') uv = a.miriad.UV(filepath) miriad_header_data = {'Nfreqs': 'nchan', 'Npols': 'npol', 'integration_time': 'inttime', 'channel_width': 'sdf', # in Ghz! 'object_name': 'source', # NB: telescope_name and instrument are treated # as the same 'telescope_name': 'telescop', 'instrument': 'telescop' } for item in miriad_header_data: if isinstance(uv[miriad_header_data[item]], str): header_value = uv[miriad_header_data[item]].replace('\x00', '') else: header_value = uv[miriad_header_data[item]] setattr(self, item, header_value) latitude = uv['latitud'] # in units of radians longitude = uv['longitu'] try: altitude = uv['altitude'] self.telescope_location_lat_lon_alt = (latitude, longitude, altitude) except(KeyError): # get info from known telescopes. Check to make sure the lat/lon values match reasonably well telescope_obj = uvtel.get_telescope(self.telescope_name) if telescope_obj is not False: tol = 2 * np.pi * 1e-3 / (60.0 * 60.0 * 24.0) # 1mas in radians lat_close = np.isclose(telescope_obj.telescope_location_lat_lon_alt[0], latitude, rtol=0, atol=tol) lon_close = np.isclose(telescope_obj.telescope_location_lat_lon_alt[1], longitude, rtol=0, atol=tol) if correct_lat_lon: self.telescope_location_lat_lon_alt = telescope_obj.telescope_location_lat_lon_alt else: self.telescope_location_lat_lon_alt = (latitude, longitude, telescope_obj.telescope_location_lat_lon_alt[2]) if lat_close and lon_close: if correct_lat_lon: warnings.warn('Altitude is not present in Miriad file, ' 'using known location values for ' '{telescope_name}.'.format(telescope_name=telescope_obj.telescope_name)) else: warnings.warn('Altitude is not present in Miriad file, ' 'using known location altitude value ' 'for {telescope_name} and lat/lon from ' 'file.'.format(telescope_name=telescope_obj.telescope_name)) else: warn_string = ('Altitude is not present in file ') if not lat_close and not lon_close: warn_string = warn_string + 'and latitude and longitude values do not match values ' else: if not lat_close: warn_string = warn_string + 'and latitude value does not match value ' else: warn_string = warn_string + 'and longitude value does not match value ' if correct_lat_lon: warn_string = (warn_string + 'for {telescope_name} in known ' 'telescopes. Using values from known ' 'telescopes.'.format(telescope_name=telescope_obj.telescope_name)) warnings.warn(warn_string) else: warn_string = (warn_string + 'for {telescope_name} in known ' 'telescopes. Using altitude value from known ' 'telescopes and lat/lon from ' 'file.'.format(telescope_name=telescope_obj.telescope_name)) warnings.warn(warn_string) else: warnings.warn('Altitude is not present in Miriad file, and ' + 'telescope {telescope_name} is not in ' + 'known_telescopes. Telescope location not ' 'set.'.format(telescope_name=self.telescope_name)) self.history = uv['history'] if self.pyuvdata_version_str not in self.history.replace('\n', ''): self.history += self.pyuvdata_version_str self.channel_width *= 1e9 # change from GHz to Hz # read through the file and get the data _source = uv['source'] # check source of initial visibility data_accumulator = {} pol_list = [] for (uvw, t, (i, j)), d, f in uv.all(raw=True): # control for the case of only a single spw not showing up in # the dimension # Note that the (i, j) tuple is calculated from a baseline number in # aipy (see miriad_wrap.h). The i, j values are also adjusted by aipy # to start at 0 rather than 1. if len(d.shape) == 1: d.shape = (1,) + d.shape self.Nspws = d.shape[0] self.spw_array = np.arange(self.Nspws) else: raise(ValueError, """Sorry. Files with more than one spectral window (spw) are not yet supported. A great project for the interested student!""") try: cnt = uv['cnt'] except(KeyError): cnt = np.ones(d.shape, dtype=np.float) ra = uv['ra'] dec = uv['dec'] lst = uv['lst'] source = uv['source'] if source != _source: raise(ValueError, 'This appears to be a multi source file, which is not supported.') else: _source = source try: data_accumulator[uv['pol']].append([uvw, t, i, j, d, f, cnt, ra, dec]) except(KeyError): data_accumulator[uv['pol']] = [[uvw, t, i, j, d, f, cnt, ra, dec]] pol_list.append(uv['pol']) # NB: flag types in miriad are usually ints self.polarization_array = np.array(pol_list) if len(self.polarization_array) != self.Npols: warnings.warn('npols={npols} but found {l} pols in data file'.format( npols=self.Npols, l=len(self.polarization_array))) # makes a data_array (and flag_array) of zeroes to be filled in by # data values # any missing data will have zeros # use set to get the unique list of all times ever listed in the file # iterate over polarizations and all spectra (bls and times) in two # nested loops, then flatten into a single vector, then set # then list again. times = list(set( np.concatenate([[k[1] for k in d] for d in data_accumulator.values()]))) times = np.sort(times) ant_i_unique = list(set( np.concatenate([[k[2] for k in d] for d in data_accumulator.values()]))) ant_j_unique = list(set( np.concatenate([[k[3] for k in d] for d in data_accumulator.values()]))) sorted_unique_ants = sorted(list(set(ant_i_unique + ant_j_unique))) unique_blts = [] for d in data_accumulator.values(): for k in d: blt = [k[1], k[2], k[3]] if blt not in unique_blts: unique_blts.append(blt) self.Nants_data = len(sorted_unique_ants) # Miriad has no way to keep track of antenna numbers, so the antenna # numbers are simply the index for each antenna in any array that # describes antenna attributes (e.g. antpos for the antenna_postions). # Therefore on write, nants (which gives the size of the antpos array) # needs to be increased to be the max value of antenna_numbers+1 and the # antpos array needs to be inflated with zeros at locations where we # don't have antenna information. These inflations need to be undone at # read. If the file was written by pyuvdata, then the variable antnums # will be present and we can use it, otherwise we need to test for zeros # in the antpos array and/or antennas with no visibilities. try: # The antnums variable will only exist if the file was written with pyuvdata. # For some reason Miriad doesn't handle an array of integers properly, # so we convert to floats on write and back here self.antenna_numbers = uv['antnums'].astype(int) self.Nants_telescope = len(self.antenna_numbers) except(KeyError): self.antenna_numbers = None self.Nants_telescope = None nants = uv['nants'] try: antpos = uv['antpos'].reshape(3, nants).T if self.Nants_telescope is not None: # in this case there is an antnums variable # (meaning that the file was written with pyuvdata), so we'll use it if nants == self.Nants_telescope: # no inflation, so just use the positions self.antenna_positions = antpos else: # there is some inflation, just use the ones that appear in antnums self.antenna_positions = np.zeros((self.Nants_telescope, 3), dtype=antpos.dtype) for ai, num in enumerate(self.antenna_numbers): self.antenna_positions[ai, :] = antpos[num, :] else: # there is no antnums variable (meaning that this file was not # written by pyuvdata), so we test for antennas with non-zero # positions and/or that appear in the visibility data # (meaning that they have entries in ant_1_array or ant_2_array) antpos_length = np.sqrt(np.sum(np.abs(antpos)**2, axis=1)) good_antpos = np.where(antpos_length > 0)[0] # take the union of the antennas with good positions (good_antpos) # and the antennas that have visisbilities (sorted_unique_ants) # if there are antennas with visibilities but zeroed positions we issue a warning below ants_use = set(good_antpos).union(sorted_unique_ants) # ants_use are the antennas we'll keep track of in the UVData # object, so they dictate Nants_telescope self.Nants_telescope = len(ants_use) self.antenna_numbers = np.array(list(ants_use)) self.antenna_positions = np.zeros((self.Nants_telescope, 3), dtype=antpos.dtype) for ai, num in enumerate(self.antenna_numbers): self.antenna_positions[ai, :] = antpos[num, :] if antpos_length[num] == 0: warnings.warn('antenna number {n} has visibilities ' 'associated with it, but it has a position' ' of (0,0,0)'.format(n=num)) except(KeyError): # there is no antpos variable self.antenna_positions = None if self.antenna_numbers is None: # there are no antenna_numbers or antenna_positions, so just use # the antennas present in the visibilities # (Nants_data will therefore match Nants_telescope) self.antenna_numbers = np.array(sorted_unique_ants) self.Nants_telescope = len(self.antenna_numbers) # antenna names is a foreign concept in miriad but required in other formats. try: # Here we deal with the way pyuvdata tacks it on to keep the name information if we have it: # This is a horrible hack to save & recover antenna_names array. Miriad can't handle arrays # of strings or a long enough single string to put them all into one string # so we convert them into hex values and then into floats on write and convert # back to strings here ant_name_flt = uv['antnames'] ant_name_list = [('%x' % elem.astype(np.int64)).decode('hex') for elem in ant_name_flt] self.antenna_names = ant_name_list except(KeyError): self.antenna_names = self.antenna_numbers.astype(str).tolist() # form up a grid which indexes time and baselines along the 'long' # axis of the visdata array t_grid = [] ant_i_grid = [] ant_j_grid = [] for t in times: for ant_i in ant_i_unique: for ant_j in ant_j_unique: if ant_i > ant_j: continue if [t, ant_i, ant_j] not in unique_blts: continue t_grid.append(t) ant_i_grid.append(ant_i) ant_j_grid.append(ant_j) ant_i_grid = np.array(ant_i_grid) ant_j_grid = np.array(ant_j_grid) t_grid = np.array(t_grid) # set the data sizes try: self.Nblts = uv['nblts'] if self.Nblts != len(t_grid): warnings.warn('Nblts does not match the number of unique blts in the data') except(KeyError): self.Nblts = len(t_grid) try: self.Ntimes = uv['ntimes'] if self.Ntimes != len(times): warnings.warn('Ntimes does not match the number of unique times in the data') except(KeyError): self.Ntimes = len(times) self.time_array = t_grid self.ant_1_array = ant_i_grid self.ant_2_array = ant_j_grid self.baseline_array = self.antnums_to_baseline(ant_i_grid, ant_j_grid) try: self.Nbls = uv['nbls'] if self.Nbls != len(np.unique(self.baseline_array)): warnings.warn('Nbls does not match the number of unique baselines in the data') except(KeyError): self.Nbls = len(np.unique(self.baseline_array)) # slot the data into a grid self.data_array = np.zeros((self.Nblts, self.Nspws, self.Nfreqs, self.Npols), dtype=np.complex64) self.flag_array = np.ones(self.data_array.shape, dtype=np.bool) self.uvw_array = np.zeros((self.Nblts, 3)) # NOTE: Using our lst calculator, which uses astropy, # instead of aipy values which come from pyephem. # The differences are of order 5 seconds. self.set_lsts_from_time_array() self.nsample_array = np.ones(self.data_array.shape, dtype=np.float) self.freq_array = (np.arange(self.Nfreqs) * self.channel_width + uv['sfreq'] * 1e9) # Tile freq_array to shape (Nspws, Nfreqs). # Currently does not actually support Nspws>1! self.freq_array = np.tile(self.freq_array, (self.Nspws, 1)) # Temporary arrays to hold polarization axis, which will be collapsed ra_pol_list = np.zeros((self.Nblts, self.Npols)) dec_pol_list = np.zeros((self.Nblts, self.Npols)) uvw_pol_list = np.zeros((self.Nblts, 3, self.Npols)) for pol, data in data_accumulator.iteritems(): pol_ind = self._pol_to_ind(pol) for ind, d in enumerate(data): t, ant_i, ant_j = d[1], d[2], d[3] blt_index = np.where(np.logical_and(np.logical_and(t == t_grid, ant_i == ant_i_grid), ant_j == ant_j_grid))[0].squeeze() self.data_array[blt_index, :, :, pol_ind] = d[4] self.flag_array[blt_index, :, :, pol_ind] = d[5] self.nsample_array[blt_index, :, :, pol_ind] = d[6] # because there are uvws/ra/dec for each pol, and one pol may not # have that visibility, we collapse along the polarization # axis but avoid any missing visbilities uvw = d[0] * const.c.to('m/ns').value uvw.shape = (1, 3) uvw_pol_list[blt_index, :, pol_ind] = uvw ra_pol_list[blt_index, pol_ind] = d[7] dec_pol_list[blt_index, pol_ind] = d[8] # Collapse pol axis for ra_list, dec_list, and uvw_list ra_list = np.zeros(self.Nblts) dec_list = np.zeros(self.Nblts) for blt_index in xrange(self.Nblts): test = ~np.all(self.flag_array[blt_index, :, :, :], axis=(0, 1)) good_pol = np.where(test)[0] if len(good_pol) == 1: # Only one good pol, use it self.uvw_array[blt_index, :] = uvw_pol_list[blt_index, :, good_pol] ra_list[blt_index] = ra_pol_list[blt_index, good_pol] dec_list[blt_index] = dec_pol_list[blt_index, good_pol] elif len(good_pol) > 1: # Multiple good pols, check for consistency. pyuvdata does not # support pol-dependent uvw, ra, or dec. if np.any(np.diff(uvw_pol_list[blt_index, :, good_pol], axis=0)): raise ValueError('uvw values are different by polarization.') else: self.uvw_array[blt_index, :] = uvw_pol_list[blt_index, :, good_pol[0]] if np.any(np.diff(ra_pol_list[blt_index, good_pol])): raise ValueError('ra values are different by polarization.') else: ra_list[blt_index] = ra_pol_list[blt_index, good_pol[0]] if np.any(np.diff(dec_pol_list[blt_index, good_pol])): raise ValueError('dec values are different by polarization.') else: dec_list[blt_index] = dec_pol_list[blt_index, good_pol[0]] else: # No good pols for this blt. Fill with first one. self.uvw_array[blt_index, :] = uvw_pol_list[blt_index, :, 0] ra_list[blt_index] = ra_pol_list[blt_index, 0] dec_list[blt_index] = dec_pol_list[blt_index, 0] # check if ra is constant throughout file; if it is, # file is tracking if not, file is drift scanning blt_good = np.where(~np.all(self.flag_array, axis=(1, 2, 3))) if np.isclose(np.mean(np.diff(ra_list[blt_good])), 0.): self.set_phased() self.phase_center_ra = ra_list[0] self.phase_center_dec = dec_list[0] self.phase_center_epoch = uv['epoch'] else: self.set_drift() self.zenith_ra = ra_list self.zenith_dec = dec_list self.vis_units = 'UNCALIB' # assume no calibration try: self.set_telescope_params() except ValueError, ve: warnings.warn(str(ve)) # check if object has all required uv_properties set if run_check: self.check(run_check_acceptability=run_check_acceptability) def write_miriad(self, filepath, run_check=True, run_check_acceptability=True, clobber=False, no_antnums=False): """ Write the data to a miriad file. Args: filename: The miriad file directory to write to. run_check: Option to check for the existence and proper shapes of required parameters before writing the file. Default is True. run_check_acceptability: Option to check acceptable range of the values of required parameters before writing the file. Default is True. clobber: Option to overwrite the filename if the file already exists. Default is False. no_antnums: Option to not write the antnums variable to the file. Should only be used for testing purposes. """ # check for multiple spws if self.data_array.shape[1] > 1: raise ValueError('write_miriad currently only handles single spw files.') if os.path.exists(filepath): if clobber: print 'File exists: clobbering' shutil.rmtree(filepath) else: raise ValueError('File exists: skipping') freq_spacing = self.freq_array[0, 1:] - self.freq_array[0, :-1] if not np.isclose(np.min(freq_spacing), np.max(freq_spacing), rtol=self._freq_array.tols[0], atol=self._freq_array.tols[1]): raise ValueError('The frequencies are not evenly spaced (probably ' 'because of a select operation). The miriad format ' 'does not support unevenly spaced frequencies.') if not np.isclose(np.max(freq_spacing), self.channel_width, rtol=self._freq_array.tols[0], atol=self._freq_array.tols[1]): raise ValueError('The frequencies are separated by more than their ' 'channel width (probably because of a select operation). ' 'The miriad format does not support frequencies ' 'that are spaced by more than their channel width.') uv = a.miriad.UV(filepath, status='new') # initialize header variables uv._wrhd('obstype', 'mixed-auto-cross') uv._wrhd('history', self.history + '\n') # recognized miriad variables uv.add_var('nchan', 'i') uv['nchan'] = self.Nfreqs uv.add_var('npol', 'i') uv['npol'] = self.Npols uv.add_var('nspect', 'i') uv['nspect'] = self.Nspws uv.add_var('inttime', 'd') uv['inttime'] = self.integration_time uv.add_var('sdf', 'd') uv['sdf'] = self.channel_width / 1e9 # in GHz uv.add_var('source', 'a') uv['source'] = self.object_name uv.add_var('telescop', 'a') uv['telescop'] = self.telescope_name uv.add_var('latitud', 'd') uv['latitud'] = self.telescope_location_lat_lon_alt[0] uv.add_var('longitu', 'd') uv['longitu'] = self.telescope_location_lat_lon_alt[1] uv.add_var('nants', 'i') # Miriad has no way to keep track of antenna numbers, so the antenna # numbers are simply the index for each antenna in any array that # describes antenna attributes (e.g. antpos for the antenna_postions). # Therefore on write, nants (which gives the size of the antpos array) # needs to be increased to be the max value of antenna_numbers+1 and the # antpos array needs to be inflated with zeros at locations where we # don't have antenna information. These inflations need to be undone at # read. If the file was written by pyuvdata, then the variable antnums # will be present and we can use it, otherwise we need to test for zeros # in the antpos array and/or antennas with no visibilities. nants = np.max(self.antenna_numbers) + 1 uv['nants'] = nants if self.antenna_positions is not None: antpos = np.zeros((nants, 3), dtype=self.antenna_positions.dtype) for ai, num in enumerate(self.antenna_numbers): antpos[num, :] = self.antenna_positions[ai, :] uv.add_var('antpos', 'd') uv['antpos'] = antpos.T.flatten() uv.add_var('sfreq', 'd') uv['sfreq'] = self.freq_array[0, 0] / 1e9 # first spw; in GHz if self.phase_type == 'phased': uv.add_var('epoch', 'r') uv['epoch'] = self.phase_center_epoch # required pyuvdata variables that are not recognized miriad variables uv.add_var('ntimes', 'i') uv['ntimes'] = self.Ntimes uv.add_var('nbls', 'i') uv['nbls'] = self.Nbls uv.add_var('nblts', 'i') uv['nblts'] = self.Nblts uv.add_var('visunits', 'a') uv['visunits'] = self.vis_units uv.add_var('instrume', 'a') uv['instrume'] = self.instrument uv.add_var('altitude', 'd') uv['altitude'] = self.telescope_location_lat_lon_alt[2] if not no_antnums: # Add in the antenna_numbers so we have them if we read this file back in # for some reason Miriad doesn't handle an array of integers properly, # so convert to floats here and integers on read uv.add_var('antnums', 'd') uv['antnums'] = self.antenna_numbers.astype(np.float64) # antenna names is a foreign concept in miriad but required in other formats. # This is a horrible hack to save antenna_names array. Miriad can't handle arrays # of strings or a long enough single string to put them all into one string # so we convert them into hex values and then into floats and convert # back to strings on read ant_name_flt = np.array([int(elem.encode("hex"), 16) for elem in self.antenna_names]).astype(np.float64) uv.add_var('antnames', 'd') uv['antnames'] = ant_name_flt # variables that can get updated with every visibility uv.add_var('pol', 'i') uv.add_var('lst', 'd') uv.add_var('cnt', 'd') uv.add_var('ra', 'd') uv.add_var('dec', 'd') # write data for viscnt, blt in enumerate(self.data_array): uvw = (self.uvw_array[viscnt] / const.c.to('m/ns').value).astype(np.double) # NOTE issue 50 on conjugation t = self.time_array[viscnt] i = self.ant_1_array[viscnt] j = self.ant_2_array[viscnt] uv['lst'] = self.lst_array[viscnt] if self.phase_type == 'phased': uv['ra'] = self.phase_center_ra uv['dec'] = self.phase_center_dec elif self.phase_type == 'drift': uv['ra'] = self.zenith_ra[viscnt] uv['dec'] = self.zenith_dec[viscnt] else: raise ValueError('The phasing type of the data is unknown. ' 'Set the phase_type to "drift" or "phased" to ' 'reflect the phasing status of the data') # NOTE only writing spw 0, not supporting multiple spws for write for polcnt, pol in enumerate(self.polarization_array): uv['pol'] = pol.astype(np.int) uv['cnt'] = self.nsample_array[viscnt, 0, :, polcnt].astype(np.double) data = self.data_array[viscnt, 0, :, polcnt] flags = self.flag_array[viscnt, 0, :, polcnt] if i > j: i, j, data = j, i, np.conjugate(data) preamble = (uvw, t, (i, j)) uv.write(preamble, data, flags) if run_check: """Check for acceptable units.""" self.check(run_check_acceptability=run_check_acceptability)