Raw File
cst_beam.py
import os
import sys
import re
import numpy as np
import warnings
from uvbeam import UVBeam
import utils as uvutils


class CSTBeam(UVBeam):
    """
    Defines a CST-specific subclass of UVBeam for reading CST text files.
    This class should not be interacted with directly, instead use the
    read_cst_beam method on the UVBeam class.
    """

    def read_cst_beam(self, filename, beam_type='power', feed_pol='x',
                      rotate_pol=True, frequency=None, telescope_name=None,
                      feed_name=None, feed_version=None, model_name=None, model_version=None,
                      history='', run_check=True, run_check_acceptability=True):

        """
        Read in data from a cst file.

        Args:
            filename: The cst file to read from.
            beam_type: what beam_type to read in ('power' or 'efield'). Defaults to 'power'.
            feed_pol: what feed or polarization the files correspond to.
                Defaults to 'x' (meaning x for efield or xx for power beams).
            rotate_pol: If True, assume the structure in the simulation is symmetric under
                90 degree rotations about the z-axis (so that the y polarization can be
                constructed by rotating the x polarization or vice versa). Default: True.
            frequency: the frequency corresponding to the filename.
                If not passed, the code attempts to parse it from the filename.
            telescope_name: the name of the telescope corresponding to the filename.
            feed_name: the name of the feed corresponding to the filename.
            feed_version: the version of the feed corresponding to the filename.
            model_name: the name of the model corresponding to the filename.
            model_version: the version of the model corresponding to the filename.
            history: A string detailing the history of the filename.
            run_check: Option to check for the existence and proper shapes of
                required parameters after reading in the file. Default is True.
            run_check_acceptability: Option to check acceptable range of the values of
                required parameters after reading in the file. Default is True.
        """
        self.telescope_name = telescope_name
        self.feed_name = feed_name
        self.feed_version = feed_version
        self.model_name = model_name
        self.model_version = model_version
        self.history = history
        if not uvutils.check_history_version(self.history, self.pyuvdata_version_str):
            self.history += self.pyuvdata_version_str

        if beam_type is 'power':
            self.set_power()
            self.Naxes_vec = 1
            if rotate_pol:
                if feed_pol is 'x':
                    self.polarization_array = np.array([-5, -6])
                else:
                    self.polarization_array = np.array([-6, -5])
            else:
                if feed_pol is 'x':
                    self.polarization_array = np.array([-5])
                else:
                    self.polarization_array = np.array([-6])
            self.Npols = len(self.polarization_array)
        else:
            self.set_efield()
            self.Naxes_vec = 2
            if rotate_pol:
                if feed_pol is 'x':
                    self.feed_array = np.array(['x', 'y'])
                else:
                    self.feed_array = np.array(['y', 'x'])
            else:
                if feed_pol is 'x':
                    self.feed_array = np.array(['x'])
                else:
                    self.feed_array = np.array(['y'])
            self.Nfeeds = len(self.feed_array)

        self.data_normalization = 'physical'
        self.antenna_type = 'simple'

        self.Nfreqs = 1
        self.Nspws = 1
        self.freq_array = np.zeros((self.Nspws, self.Nfreqs))
        self.bandpass_array = np.zeros((self.Nspws, self.Nfreqs))

        self.spw_array = np.array([0])
        self.pixel_coordinate_system = 'az_za'
        self.set_cs_params()

        out_file = open(filename, 'r')
        line = out_file.readline().strip()  # Get the first line
        out_file.close()
        raw_names = line.split(']')
        raw_names = [raw_name for raw_name in raw_names if not raw_name == '']
        column_names = []
        units = []
        for raw_name in raw_names:
            column_name, unit = tuple(raw_name.split('['))
            column_names.append(''.join(column_name.lower().split(' ')))
            units.append(unit.lower().strip())

        data = np.loadtxt(filename, skiprows=2)

        theta_col = np.where(np.array(column_names) == 'theta')[0][0]
        phi_col = np.where(np.array(column_names) == 'phi')[0][0]

        if 'deg' in units[theta_col]:
            theta_data = np.radians(data[:, theta_col])
        else:
            theta_data = data[:, theta_col]
        if 'deg' in units[phi_col]:
            phi_data = np.radians(data[:, phi_col])
        else:
            phi_data = data[:, phi_col]

        theta_axis = np.sort(np.unique(theta_data))
        phi_axis = np.sort(np.unique(phi_data))
        if not theta_axis.size * phi_axis.size == theta_data.size:
            raise ValueError('Data does not appear to be on a grid')

        delta_theta = np.diff(theta_axis)
        if not np.isclose(np.max(delta_theta), np.min(delta_theta)):
            raise ValueError('Data does not appear to be regularly gridded in zenith angle')
        delta_theta = delta_theta[0]

        delta_phi = np.diff(phi_axis)
        if not np.isclose(np.max(delta_phi), np.min(delta_phi)):
            raise ValueError('Data does not appear to be regularly gridded in azimuth angle')
        delta_phi = delta_phi[0]

        self.axis1_array = phi_axis
        self.Naxes1 = self.axis1_array.size
        self.axis2_array = theta_axis
        self.Naxes2 = self.axis2_array.size

        if self.beam_type == 'power':
            self.data_array = np.zeros(self._data_array.expected_shape(self), dtype=np.float)
        else:
            self.data_array = np.zeros(self._data_array.expected_shape(self), dtype=np.complex)

        if frequency is not None:
            self.freq_array[0] = frequency
        else:
            self.freq_array[0] = self.name2freq(filename)

        if rotate_pol:
            # for second polarization, rotate by pi/2
            rot_phi = phi_axis + np.pi / 2
            rot_phi[np.where(rot_phi >= 2 * np.pi)] -= 2 * np.pi
            roll_rot_phi = np.roll(rot_phi, int((np.pi / 2) / delta_phi))
            if not np.allclose(roll_rot_phi, phi_axis):
                raise ValueError('Rotating by pi/2 failed')

        # get beam
        if self.beam_type is 'power':
            data_col = np.where(np.array(column_names) == 'abs(v)')[0][0]
            power_beam1 = data[:, data_col].reshape((theta_axis.size, phi_axis.size), order='F') ** 2.

            self.data_array[0, 0, 0, 0, :, :] = power_beam1

            if rotate_pol:
                # rotate by pi/2 for second polarization
                power_beam2 = np.roll(power_beam1, int((np.pi / 2) / delta_phi), axis=0)
                self.data_array[0, 0, 1, 0, :, :] = power_beam2
        else:
            self.basis_vector_array = np.zeros((self.Naxes_vec, 2, self.Naxes2, self.Naxes1))
            self.basis_vector_array[0, 0, :, :] = 1.0
            self.basis_vector_array[1, 1, :, :] = 1.0

            theta_mag_col = np.where(np.array(column_names) == 'abs(theta)')[0][0]
            theta_phase_col = np.where(np.array(column_names) == 'phase(theta)')[0][0]
            phi_mag_col = np.where(np.array(column_names) == 'abs(phi)')[0][0]
            phi_phase_col = np.where(np.array(column_names) == 'phase(phi)')[0][0]

            theta_mag = data[:, theta_mag_col].reshape((theta_axis.size, phi_axis.size), order='F')
            phi_mag = data[:, phi_mag_col].reshape((theta_axis.size, phi_axis.size), order='F')
            if 'deg' in units[theta_phase_col]:
                theta_phase = np.radians(data[:, theta_phase_col])
            else:
                theta_phase = data[:, theta_phase_col]
            if 'deg' in units[phi_phase_col]:
                phi_phase = np.radians(data[:, phi_phase_col])
            else:
                phi_phase = data[:, phi_phase_col]
            theta_phase = theta_phase.reshape((theta_axis.size, phi_axis.size), order='F')
            phi_phase = theta_phase.reshape((theta_axis.size, phi_axis.size), order='F')

            theta_beam = theta_mag * np.exp(1j * theta_phase)
            phi_beam = theta_mag * np.exp(1j * theta_phase)

            self.data_array[0, 0, 0, 0, :, :] = phi_beam
            self.data_array[1, 0, 0, 0, :, :] = theta_beam

            if rotate_pol:
                # rotate by pi/2 for second polarization
                theta_beam2 = np.roll(theta_beam, int((np.pi / 2) / delta_phi), axis=0)
                phi_beam2 = np.roll(phi_beam, int((np.pi / 2) / delta_phi), axis=0)
                self.data_array[0, 0, 1, 0, :, :] = phi_beam2
                self.data_array[1, 0, 1, 0, :, :] = theta_beam2

        self.bandpass_array[0] = 1

        if frequency is None:
            warnings.warn('No frequency provided. Detected frequency is: '
                          '{freqs} Hz'.format(freqs=self.freq_array))

        if run_check:
            self.check(run_check_acceptability=run_check_acceptability)

    def name2freq(self, fname):
        """
        Method to extract the frequency from the file name, assuming the file name
        contains a substring with the frequency channel in MHz that the data represents.
        e.g. "HERA_Sim_120.87MHz.txt" should yield 120.87e6

        Args:
            fname: filename (string)

        Returns:
            extracted frequency
        """
        fi = fname.find('Hz')
        frequency = float(re.findall('\d+?\.?\d*', fname[:fi])[0])

        si_prefix = fname[fi - 1]
        si_dict = {'k': 1e3, 'M': 1e6, 'G': 1e9}
        if si_prefix in si_dict.keys():
            frequency = frequency * si_dict[si_prefix]

        return frequency
back to top