# Copyright (c) 2024 Radio Astronomy Software Group # Licensed under the 2-clause BSD License import copy import numpy as np import pytest from pyuvdata import ( AiryBeam, BeamInterface, GaussianBeam, ShortDipoleBeam, UniformBeam, UVBeam, utils, ) from pyuvdata.testing import check_warnings @pytest.fixture(scope="function") def airy() -> AiryBeam: return AiryBeam(diameter=14.0) @pytest.fixture() def gaussian() -> GaussianBeam: return GaussianBeam(diameter=14.0) @pytest.fixture() def xy_grid_coarse(): nfreqs = 5 freqs = np.linspace(100e6, 130e6, nfreqs) xy_half_n = 6 zmax = np.radians(60) # Degrees arr = np.arange(-xy_half_n, xy_half_n) x_arr, y_arr = np.meshgrid(arr, arr) x_arr = x_arr.flatten() y_arr = y_arr.flatten() radius = np.sqrt(x_arr**2 + y_arr**2) / float(xy_half_n) za_array = radius * zmax az_array = np.arctan2(y_arr, x_arr) + np.pi # convert from -180->180 to 0->360 return az_array, za_array, freqs @pytest.fixture() def gaussian_uv(gaussian, az_za_coords) -> UVBeam: az, za = az_za_coords return gaussian.to_uvbeam( axis1_array=az, axis2_array=za, freq_array=np.array([1e8]) ) @pytest.mark.parametrize( ["beam_obj", "kwargs"], [ [AiryBeam, {"diameter": 14.0}], [GaussianBeam, {"diameter": 14.0}], [UniformBeam, {"include_cross_pols": False}], [ShortDipoleBeam, {}], ], ) @pytest.mark.parametrize("init_beam_type", ["efield", "power"]) @pytest.mark.parametrize("final_beam_type", ["efield", "power"]) @pytest.mark.parametrize("coord_sys", ["az_za", "healpix"]) def test_beam_interface( beam_obj, kwargs, init_beam_type, final_beam_type, az_za_coords, xy_grid_coarse, coord_sys, ): analytic = beam_obj(**kwargs) nfreqs = 20 freq_array = np.linspace(100e6, 150e6, nfreqs) if coord_sys == "healpix": nside = 64 if init_beam_type == "efield": ordering = "ring" else: ordering = "nested" healpix_pixel_array = np.arange(12 * nside**2, dtype=int) to_uvbeam_kwargs = { "nside": nside, "ordering": ordering, "healpix_pixel_array": healpix_pixel_array, } try: from astropy_healpix import HEALPix except ImportError: with pytest.raises( ImportError, match="astropy_healpix is not installed but is " "required for healpix functionality. ", ): uvb = analytic.to_uvbeam( beam_type=init_beam_type, freq_array=freq_array, **to_uvbeam_kwargs ) pytest.importorskip("astropy_healpix") hp_obj = HEALPix(nside=nside, order=ordering) hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(healpix_pixel_array) za_array, az_array = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( hpx_lat.radian, hpx_lon.radian ) # downselect places to check above_horizon = np.nonzero(za_array <= (np.pi / 2.0)) az_array = az_array[above_horizon] za_array = za_array[above_horizon] az_array = az_array[::10] za_array = za_array[::10] else: az_array, za_array = az_za_coords to_uvbeam_kwargs = {"axis1_array": az_array, "axis2_array": za_array} include_cross_pols = kwargs.get("include_cross_pols", True) to_uvbeam_kwargs["pixel_coordinate_system"] = coord_sys uvb = analytic.to_uvbeam( beam_type=init_beam_type, freq_array=freq_array, **to_uvbeam_kwargs ) bi_analytic = BeamInterface(analytic, final_beam_type) if final_beam_type != init_beam_type: if final_beam_type == "efield": with pytest.raises( ValueError, match="Input beam is a power UVBeam but beam_type is specified as " "'efield'. It's not possible to convert a power beam to an " "efield beam, either provide an efield UVBeam or do not " "specify `beam_type`.", ): BeamInterface(uvb, final_beam_type) return warn_type = UserWarning msg = ( "Input beam is an efield UVBeam but beam_type is specified as " "'power'. Converting efield beam to power." ) else: warn_type = None msg = "" with check_warnings(warn_type, match=msg): bi_uvbeam = BeamInterface( uvb, final_beam_type, include_cross_pols=include_cross_pols ) if coord_sys == "az_za": az_za_grid = True else: az_za_grid = False analytic_data = bi_analytic.compute_response( az_array=az_array, za_array=za_array, freq_array=freq_array, az_za_grid=az_za_grid, ) uvb_data = bi_uvbeam.compute_response( az_array=az_array, za_array=za_array, freq_array=freq_array, az_za_grid=az_za_grid, ) np.testing.assert_allclose(analytic_data, uvb_data, rtol=0, atol=1e-14) # now on a grid that is not the same as where the beam was evaluated # larger differences of course az_vals, za_vals, freqs = xy_grid_coarse analytic_data = bi_analytic.compute_response( az_array=az_vals, za_array=za_vals, freq_array=freqs, az_za_grid=True ) uvb_data = bi_uvbeam.compute_response( az_array=az_vals, za_array=za_vals, freq_array=freqs, az_za_grid=True ) if not (coord_sys == "healpix" and isinstance(analytic, ShortDipoleBeam)): np.testing.assert_allclose(analytic_data, uvb_data, rtol=0, atol=1e-1) else: # the comparison falls apart at zenith because there's no healpix # pixel right at zenith and the dipole beam changes quickly there. az_mesh, za_mesh = np.meshgrid(az_vals, za_vals) az_mesh = az_mesh.flatten() za_mesh = za_mesh.flatten() wh_not_zenith = np.nonzero(za_mesh != 0) np.testing.assert_allclose( analytic_data[:, :, :, wh_not_zenith], uvb_data[:, :, :, wh_not_zenith], rtol=0, atol=1e-1, ) def test_beam_interface_errors(): with pytest.raises( ValueError, match="beam must be a UVBeam or an AnalyticBeam instance." ): BeamInterface("foo", "power") @pytest.mark.parametrize( ["param", "value"], [ ["az_array", None], ["za_array", None], ["freq_array", None], ["az_array", np.zeros((10, 10), dtype=float)], ["za_array", np.zeros((10, 10), dtype=float)], ["freq_array", np.zeros((10, 10), dtype=float)], ], ) def test_compute_response_errors(param, value): orig_kwargs = { "az_array": np.deg2rad(np.linspace(0, 360, 36, endpoint=False)), "za_array": np.deg2rad(np.linspace(0, 90, 10)), "freq_array": np.deg2rad(np.linspace(100, 200, 5)), } compute_kwargs = copy.deepcopy(orig_kwargs) compute_kwargs["az_za_grid"] = True compute_kwargs[param] = value analytic = ShortDipoleBeam() bi_analytic = BeamInterface(analytic, beam_type="efield") with pytest.raises( ValueError, match=f"{param} must be a one-dimensional numpy array" ): bi_analytic.compute_response(**compute_kwargs) uvb = analytic.to_uvbeam( beam_type="power", freq_array=orig_kwargs["freq_array"], axis1_array=orig_kwargs["az_array"], axis2_array=orig_kwargs["za_array"], ) bi_uvb = BeamInterface(uvb) if param != "freq_array": with pytest.raises( ValueError, match=f"{param} must be a one-dimensional numpy array" ): bi_uvb.compute_response(**compute_kwargs) elif value is not None: with pytest.raises(ValueError, match="freq_array must be one-dimensional"): bi_uvb.compute_response(**compute_kwargs) else: # this shouldn't error bi_uvb.compute_response(**compute_kwargs) @pytest.mark.parametrize("beam_obj", ["airy", "gaussian", "gaussian_uv"]) def test_idempotent_instantiation(beam_obj, request): beam = BeamInterface(request.getfixturevalue(beam_obj)) beam2 = BeamInterface(beam) assert beam == beam2 def test_properties(airy: AiryBeam): intf = BeamInterface(airy) assert airy.Npols == intf.Npols assert airy.Nfeeds == intf.Nfeeds assert np.all(airy.polarization_array == intf.polarization_array) assert np.all(airy.feed_array == intf.feed_array) assert np.all(airy.feed_angle == intf.feed_angle) def test_clone(airy): intf = BeamInterface(airy) intf_clone = intf.clone(beam_type="power") assert intf != intf_clone @pytest.mark.parametrize("uvbeam", [True, False], ids=["uvbeam", "analytic"]) @pytest.mark.parametrize("allow_mutation", [True, False], ids=["mutate", "nomutate"]) @pytest.mark.parametrize( "include_cross_pols", [True, False, None], ids=["incx", "nox", "xpolnone"] ) def test_as_power( uvbeam: bool, allow_mutation: bool, include_cross_pols: bool, gaussian, gaussian_uv ): beam = gaussian_uv if uvbeam else gaussian intf = BeamInterface(beam) intf_power = intf.as_power_beam( allow_beam_mutation=allow_mutation, include_cross_pols=include_cross_pols ) if include_cross_pols is None: include_cross_pols = True assert intf_power.beam_type == "power" assert intf_power.Npols == 4 if include_cross_pols else 2 if uvbeam: if allow_mutation: assert intf.beam.beam_type == "power" else: assert intf.beam.beam_type == "efield" def test_as_power_noop(airy): """Ensure that calling as_power_beam on a power beam is a no-op.""" intf = BeamInterface(airy, beam_type="power") intf2 = intf.as_power_beam() assert intf is intf2 with pytest.warns(UserWarning, match="as_power_beam does not modify cross pols"): intf2 = intf.as_power_beam(include_cross_pols=False) assert intf is intf2 @pytest.mark.parametrize("uvbeam", [True, False]) def test_with_feeds(uvbeam: bool, gaussian, gaussian_uv): beam = gaussian_uv if uvbeam else gaussian intf = BeamInterface(beam) intf_feedx = intf.with_feeds(["x"]) assert intf_feedx.feed_array == ["x"] def test_with_feeds_ordering(airy): intf = BeamInterface(airy) intf_feedx = intf.with_feeds(["y", "x"], maintain_ordering=True) assert np.all(intf_feedx.feed_array == ["x", "y"]) intf_feedyx = intf.with_feeds(["y", "x"], maintain_ordering=False) assert np.all(intf_feedyx.feed_array == ["y", "x"]) @pytest.mark.filterwarnings("ignore:Input beam is an efield UVBeam") @pytest.mark.filterwarnings("ignore:Selected polarizations are not evenly spaced") def test_with_feeds_ordering_power(gaussian_uv): # beam = AiryBeam(diameter=14.0).to_uvbeam(freq_array=np.array([1e8]), nside=16) intf = BeamInterface(gaussian_uv, beam_type="power") intf_feedx = intf.with_feeds(["y", "x"], maintain_ordering=True) assert np.all(intf_feedx.polarization_array == [-5, -6, -7, -8]) intf_feedyx = intf.with_feeds(["y", "x"], maintain_ordering=False) # N.b. (Karto), this used to check against [-6, -8, -7, -5], but I _think_ this # was actually a bug, in that UVBeam.select was sensitive to the ordering of # pol arguments for polarization_array *only*, and not anything else with a # polarization axis. assert np.all(intf_feedyx.polarization_array == [-5, -6, -7, -8])