# -*- mode: python; coding: utf-8 -* # Copyright (c) 2018 Radio Astronomy Software Group # Licensed under the 2-clause BSD License """ Class for reading and writing casa measurement sets. Requires casacore. """ from __future__ import absolute_import, division, print_function import numpy as np import os import re import warnings from astropy import constants as const import astropy.time as time from . import UVData from . import parameter as uvp from . import telescopes from . import utils as uvutils try: import casacore.tables as tables except ImportError: # pragma: no cover uvutils._reraise_context('casacore is not installed but is required for ' 'measurement set functionality') """ This dictionary defines the mapping between CASA polarization numbers and AIPS polarization numbers """ polDict = {1: 1, 2: 2, 3: 3, 4: 4, 5: -1, 6: -3, 7: -4, 8: -2, 9: -5, 10: -7, 11: -8, 12: -6} # convert from casa polarization integers to pyuvdata class MS(UVData): """ Defines a class for reading and writing casa measurement sets. Attributes ---------- ms_required_extra : list of str Names of optional UVParameters that are required for MS """ ms_required_extra = ['datacolumn', 'antenna_positions'] def _ms_hist_to_string(self, history_table): ''' converts a CASA history table into a string that can be stored as the uvdata history parameter. Also stores messages column as a list for consitency with other uvdata types Args: history_table, a casa table object Returns: string containing only message column (consistent with other UVDATA history strings) string enconding complete casa history table converted with \n denoting rows and ';' denoting column breaks ''' # string to store just the usual uvdata history message_str = '' # string to store all the casa history info history_str = '' # Do not touch the history table if it has no information if history_table.nrows() > 0: history_str = 'APP_PARAMS;CLI_COMMAND;APPLICATION;MESSAGE;OBJECT_ID;OBSERVATION_ID;ORIGIN;PRIORITY;TIME\n' app_params = history_table.getcol('APP_PARAMS')['array'] cli_command = history_table.getcol('CLI_COMMAND')['array'] application = history_table.getcol('APPLICATION') message = history_table.getcol('MESSAGE') obj_id = history_table.getcol('OBJECT_ID') obs_id = history_table.getcol('OBSERVATION_ID') origin = history_table.getcol('ORIGIN') priority = history_table.getcol('PRIORITY') times = history_table.getcol('TIME') # Now loop through columns and generate history string ntimes = len(times) for tbrow in range(ntimes): message_str += str(message[tbrow]) newline = str(app_params[tbrow]) \ + ';' + str(cli_command[tbrow]) \ + ';' + str(application[tbrow]) \ + ';' + str(message[tbrow]) \ + ';' + str(obj_id[tbrow]) \ + ';' + str(obs_id[tbrow]) \ + ';' + str(origin[tbrow]) \ + ';' + str(priority[tbrow]) \ + ';' + str(times[tbrow]) + '\n' history_str += newline if tbrow < ntimes - 1: message_str += '\n' def is_not_ascii(s): return any(ord(c) >= 128 for c in s) def find_not_ascii(s): output = [] for c in s: if ord(c) >= 128: output += c return output return message_str, history_str # ms write functionality to be added later. def write_ms(self): ''' writing ms is not yet supported ''' def read_ms(self, filepath, run_check=True, check_extra=True, run_check_acceptability=True, data_column='DATA', pol_order='AIPS'): ''' read in a casa measurement set Args: filepath: name of the measurement set folder run_check: Option to check for the existence and proper shapes of parameters after reading in the file. Default is True. check_extra: Option to check optional parameters as well as required ones. Default is True. run_check_acceptability: Option to check the values of parameters after reading in the file. Default is True. data_column: specify which CASA measurement set data column to read from (can be 'DATA','CORRECTED', or 'MODEL') pol_order: use 'AIPS' or 'CASA' ordering of polarizations? ''' # make sure user requests a valid data_column if data_column != 'DATA' and data_column != 'CORRECTED_DATA' and data_column != 'MODEL': raise ValueError( 'Invalid data_column value supplied. Use \'Data\',\'MODEL\' or \'CORRECTED_DATA\'') if not os.path.exists(filepath): raise IOError(filepath + ' not found') # set visibility units if(data_column == 'DATA'): self.vis_units = "UNCALIB" elif(data_column == 'CORRECTED_DATA'): self.vis_units = "JY" elif(data_column == 'MODEL'): self.vis_units = "JY" # limit length of extra_keywords keys to 8 characters to match uvfits & miriad self.extra_keywords['DATA_COL'] = data_column # get frequency information from spectral window table tb_spws = tables.table(filepath + '/SPECTRAL_WINDOW', ack=False) spw_names = tb_spws.getcol('NAME') self.Nspws = len(spw_names) if self.Nspws > 1: raise ValueError('Sorry. Files with more than one spectral' 'window (spw) are not yet supported. A ' 'great project for the interested student!') freqs = tb_spws.getcol('CHAN_FREQ') self.freq_array = freqs self.Nfreqs = int(freqs.shape[1]) self.channel_width = float(tb_spws.getcol('CHAN_WIDTH')[0, 0]) self.Nspws = int(freqs.shape[0]) self.spw_array = np.arange(self.Nspws) tb_spws.close() # now get the data tb = tables.table(filepath, ack=False) # check for multiple subarrays. importuvfits does not appear to preserve subarray information! subarray = np.unique(np.int32(tb.getcol('ARRAY_ID')) - 1) if len(set(subarray)) > 1: raise ValueError('This file appears to have multiple subarray ' 'values; only files with one subarray are ' 'supported.') times_unique = time.Time( np.unique(tb.getcol('TIME') / (3600. * 24.)), format='mjd').jd # check for multiple data description ids (combination of spw id & pol id) data_desc_id = np.unique(np.int32(tb.getcol('DATA_DESC_ID') - 1)) if len(set(data_desc_id)) > 1: raise ValueError('This file appears to have multiple data description ID ' 'values; only files with one data description ID are ' 'supported.') self.Ntimes = int(len(times_unique)) # FITS uvw direction convention is opposite ours and Miriad's. # CASA's convention is unclear: the docs contradict themselves, # but empirically it appears to match uvfits # So conjugate the visibilities and flip the uvws: data_array = np.conj(tb.getcol(data_column)) self.Nblts = int(data_array.shape[0]) flag_array = tb.getcol('FLAG') # CASA stores data in complex array with dimension NbltsxNfreqsxNpols if(len(data_array.shape) == 3): data_array = np.expand_dims(data_array, axis=1) flag_array = np.expand_dims(flag_array, axis=1) self.data_array = data_array self.flag_array = flag_array self.Npols = int(data_array.shape[-1]) # FITS uvw direction convention is opposite ours and Miriad's. # CASA's convention is unclear: the docs contradict themselves, # but empirically it appears to match uvfits # So conjugate the visibilities and flip the uvws: self.uvw_array = -1 * tb.getcol('UVW') self.ant_1_array = tb.getcol('ANTENNA1').astype(np.int32) self.ant_2_array = tb.getcol('ANTENNA2').astype(np.int32) self.Nants_data = len(np.unique(np.concatenate( (np.unique(self.ant_1_array), np.unique(self.ant_2_array))))) self.baseline_array = self.antnums_to_baseline( self.ant_1_array, self.ant_2_array) self.Nbls = len(np.unique(self.baseline_array)) # Get times. MS from cotter are modified Julian dates in seconds (thanks to Danny Jacobs for figuring out the proper conversion) self.time_array = time.Time( tb.getcol('TIME') / (3600. * 24.), format='mjd').jd # Polarization array tbPol = tables.table(filepath + '/POLARIZATION', ack=False) num_pols = tbPol.getcol('NUM_CORR') # get pol setup ID from data description table tb_data_desc = tables.table(filepath + '/DATA_DESCRIPTION', ack=False) pol_id = tb_data_desc.getcol('POLARIZATION_ID')[0] # use an assert because I don't think it's possible to make a file that fails this, but I'm not sure assert(num_pols[pol_id] == self.Npols) if np.unique(num_pols).size > 1: # use getvarcol method, which returns a dict polList = tbPol.getvarcol('CORR_TYPE')['r' + str(pol_id + 1)][0].tolist() else: # list of lists, probably with each list corresponding to SPW. polList = tbPol.getcol('CORR_TYPE')[pol_id] self.polarization_array = np.zeros(len(polList), dtype=np.int32) for polnum in range(len(polList)): self.polarization_array[polnum] = int(polDict[polList[polnum]]) tbPol.close() # Integration time # use first interval and assume rest are constant (though measurement set has all integration times for each Nblt ) # self.integration_time=tb.getcol('INTERVAL')[0] # for some reason, interval ends up larger than the difference between times... if len(times_unique) == 1: self.integration_time = np.ones_like(self.time_array, dtype=np.float64) else: # assume that all times in the file are the same size int_time = self._calc_single_integration_time() self.integration_time = np.ones_like(self.time_array, dtype=np.float64) * int_time # open table with antenna location information tbAnt = tables.table(filepath + '/ANTENNA', ack=False) tbObs = tables.table(filepath + '/OBSERVATION', ack=False) self.telescope_name = tbObs.getcol('TELESCOPE_NAME')[0] self.instrument = tbObs.getcol('TELESCOPE_NAME')[0] tbObs.close() # Use Telescopes.py dictionary to set array position full_antenna_positions = tbAnt.getcol('POSITION') xyz_telescope_frame = tbAnt.getcolkeyword( 'POSITION', 'MEASINFO')['Ref'] antFlags = np.empty(len(full_antenna_positions), dtype=bool) antFlags[:] = False for antnum in range(len(antFlags)): antFlags[antnum] = np.all(full_antenna_positions[antnum, :] == 0) if(xyz_telescope_frame == 'ITRF'): self.telescope_location = np.array( np.mean(full_antenna_positions[np.invert(antFlags), :], axis=0)) if self.telescope_location is None: try: self.set_telescope_params() except ValueError: warnings.warn('Telescope frame is not ITRF and telescope is not ' 'in known_telescopes, so telescope_location is not set.') # antenna names ant_names = tbAnt.getcol('STATION') ant_diams = tbAnt.getcol('DISH_DIAMETER') self.antenna_diameters = ant_diams[ant_diams > 0] self.Nants_telescope = len(antFlags[np.invert(antFlags)]) test_name = ant_names[0] names_same = True for antnum in range(len(ant_names)): if(not(ant_names[antnum] == test_name)): names_same = False if(not(names_same)): # cotter measurement sets store antenna names in the NAMES column. self.antenna_names = ant_names else: # importuvfits measurement sets store antenna names in the STATION column. self.antenna_names = tbAnt.getcol('NAME') self.antenna_numbers = np.arange(len(self.antenna_names)).astype(int) nAntOrig = len(self.antenna_names) ant_names = [] for antNum in range(len(self.antenna_names)): if not(antFlags[antNum]): ant_names.append(self.antenna_names[antNum]) self.antenna_names = ant_names self.antenna_numbers = self.antenna_numbers[np.invert(antFlags)] relative_positions = np.zeros_like(full_antenna_positions) relative_positions = full_antenna_positions - self.telescope_location.reshape(1, 3) self.antenna_positions = relative_positions[np.invert(antFlags), :] tbAnt.close() tbField = tables.table(filepath + '/FIELD', ack=False) if(tbField.getcol('PHASE_DIR').shape[1] == 2): self.phase_type = 'drift' self.set_drift() elif(tbField.getcol('PHASE_DIR').shape[1] == 1): self.phase_type = 'phased' # MSv2.0 appears to assume J2000. Not sure how to specifiy otherwise epoch_string = tb.getcolkeyword('UVW', 'MEASINFO')['Ref'] # for measurement sets made with COTTER, this keyword is ITRF instead of the epoch if epoch_string == 'ITRF': self.phase_center_epoch = 2000. else: self.phase_center_epoch = float( tb.getcolkeyword('UVW', 'MEASINFO')['Ref'][1:]) self.phase_center_ra = float(tbField.getcol('PHASE_DIR')[0][0][0]) self.phase_center_dec = float(tbField.getcol('PHASE_DIR')[0][0][1]) self.set_phased() # set LST array from times and itrf self.set_lsts_from_time_array() # set the history parameter _, self.history = self._ms_hist_to_string(tables.table(filepath + '/HISTORY', ack=False)) # CASA weights column keeps track of number of data points averaged. if not uvutils._check_history_version(self.history, self.pyuvdata_version_str): self.history += self.pyuvdata_version_str # 'WEIGHT_SPECTRUM' is optional - some files may not have per-channel values if 'WEIGHT_SPECTRUM' in tb.colnames(): self.nsample_array = tb.getcol('WEIGHT_SPECTRUM') else: self.nsample_array = tb.getcol('WEIGHT') # Propagate the weights in frequency self.nsample_array = np.stack([self.nsample_array for chan in range(self.Nfreqs)], axis=1) if(len(self.nsample_array.shape) == 3): self.nsample_array = np.expand_dims(self.nsample_array, axis=1) self.object_name = tbField.getcol('NAME')[0] tbField.close() tb.close() # order polarizations self.reorder_pols(order=pol_order) if run_check: self.check(check_extra=check_extra, run_check_acceptability=run_check_acceptability)