Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • 9fe4569
  • /
  • pyaps.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge
swh:1:cnt:283c14d6a2d8a514075bfb80f66d0d76fcb7d968
directory badge
swh:1:dir:9fe4569968d540ed47abf6a3d37372f784c1ca22

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
pyaps.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2017 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module implements atmospheric corrections derived 
from external weather model data. The algorithms are based on 
those implemented in the 'PyAPS' package developed by Caltech
"""
from __future__ import print_function
import logging
import os
import re
import glob2
import numpy as np
from joblib import Parallel, delayed
from osgeo import gdalconst, gdal
import PyAPS as pa

from pyrate import config as cf
from pyrate import ifgconstants as ifc
from pyrate import prepifg
from pyrate import gamma
from operator import itemgetter

log = logging.getLogger(__name__)

PYRATEPATH = os.environ['PYRATEPATH']
ECMWF_DIR = os.path.join(PYRATEPATH, 'ECMWF')
ECMWF_PRE = 'ERA-Int_'
ECMWF_EXT = '_12.grib'  # TODO: build dynamically with closest available grib
APS_STATUS = 'REMOVED'
GEOTIFF = 'GEOTIFF'
ECMWF = 'ECMWF'
GAMMA_PTN = re.compile(r'\d{8}')
ROIPAC_PTN = re.compile(r'\d{6}')


def remove_aps_delay(input_ifgs, params, process_indices=None):

    def get_incidence_map():
        if params[cf.APS_ELEVATION_MAP] is not None:
            f, e = os.path.basename(params[cf.APS_ELEVATION_MAP]).split(
                '.')
        else:
            f, e = os.path.basename(params[cf.APS_INCIDENCE_MAP]).split(
                '.')
        multilooked = os.path.join(
            params[cf.OUT_DIR],
            f + '_' + e +
            '_{looks}rlks_{crop}cr.tif'.format(
                looks=params[cf.IFG_LKSX],
                crop=params[
                    cf.IFG_CROP_OPT]))
        assert os.path.exists(multilooked), \
            'cropped and multilooked incidence map file not found. ' \
            'Use apsmethod=1, Or run prepifg with gamma processor'
        ds = gdal.Open(multilooked, gdalconst.GA_ReadOnly)
        if params[cf.APS_INCIDENCE_MAP] is not None:
            incidence_map = ds.ReadAsArray()
        else:
            incidence_map = 90 - ds.ReadAsArray()
        ds = None  # close file
        return incidence_map

    if process_indices is not None:
        ifgs = [itemgetter(p)(input_ifgs) for p in process_indices]
        [ifg.close() for i, ifg in enumerate(input_ifgs)
         if i not in process_indices]
    else:
        ifgs = input_ifgs

    lat, lon, nx, ny, dem, mlooked_dem = read_dem(params)
    dem_header = (lon, lat, nx, ny)

    incidence_angle = None

    if params[cf.APS_METHOD] == 1:
        incidence_map = np.ones_like(dem)  # dummy
    elif params[cf.APS_METHOD] == 2:
        incidence_map = get_incidence_map()
    else:
        raise PyAPSException('PyAPS method must be 1 or 2')

    list_of_dates_for_grb_download = []

    parallel = params[cf.PARALLEL]
    data_paths = [i.data_path for i in ifgs]

    if parallel:
        aps_delay = Parallel(n_jobs=params[cf.PROCESSES], verbose=50)(
            delayed(parallel_aps)(d, dem, dem_header,
                               incidence_angle,
                               incidence_map, list_of_dates_for_grb_download,
                               mlooked_dem, params)
            for d in data_paths)
    else:
        aps_delay = []
        for d in data_paths:  # demo for only one ifg
            aps_delay.append(parallel_aps(d, dem, dem_header, incidence_angle,
                                          incidence_map,
                                          list_of_dates_for_grb_download,
                                          mlooked_dem, params))

    for i, ifg in enumerate(ifgs):
        ifg.phase_data -= aps_delay[i]  # remove delay
        # add to ifg.meta_data
        ifg.meta_data[ifc.PYRATE_WEATHER_ERROR] = APS_STATUS
        # write meta_data to file
        ifg.dataset.SetMetadataItem(ifc.PYRATE_WEATHER_ERROR, APS_STATUS)
        ifg.write_modified_phase()
        # ifg.close()  # close ifg files, required for gdal dataset to close files

    return ifgs


def parallel_aps(data_path, dem, dem_header, incidence_angle, incidence_map,
                 list_of_dates_for_grb_download, mlooked_dem, params):
    if params[cf.PROCESSOR] == 1:  # gamma
        date_pair = [i for i in GAMMA_PTN.findall(os.path.basename(data_path))]
    elif params[cf.PROCESSOR] == 0:  # roipac
        # adding 20 to dates here, so dates before 2000 won't work
        # TODO: fix pre 2000 dates
        date_pair = ['20' + i for i in
                     ROIPAC_PTN.findall(os.path.basename(data_path))]
    else:
        raise AttributeError('processor needs to be gamma(1) or roipac(0)')
    list_of_dates_for_grb_download += date_pair
    first_grb = os.path.join(ECMWF_DIR,
                             ECMWF_PRE + date_pair[0] + ECMWF_EXT)
    second_grb = os.path.join(ECMWF_DIR,
                              ECMWF_PRE + date_pair[1] + ECMWF_EXT)
    # download .grb file if does not exist
    if not (os.path.exists(first_grb) and os.path.exists(second_grb)):
        # download weather files at 12:00 UTC (other options 00:00, 06:00, 18:00)
        pa.ecmwf_download(date_pair, '12', 'ECMWF')

    # rdr_correction(date_pair)
    # TODO: lat lon correction when lat and lon files are available
    # aps1.getgeodelay(phs1, inc=23.0, wvl=0.056,
    #   lat=os.path.join(PYAPS_EXAMPLES, 'lat.flt'),
    #   lon=os.path.join(PYAPS_EXAMPLES, 'lon.flt'))
    # aps2.getgeodelay(phs2, inc=23.0, wvl=0.056,
    #   lat=os.path.join(PYAPS_EXAMPLES, 'lat.flt'),
    #   lon=os.path.join(PYAPS_EXAMPLES, 'lon.flt'))
    # LLphs = phs2-phs1
    # print dem_header, mlooked_dem
    if params[cf.APS_METHOD] == 1:
        # no need to calculate incidence angle for all ifgs, they are the same
        if incidence_angle is None:
            incidence_angle = get_incidence_angle(date_pair, params)
        aps_delay = geo_correction(date_pair, dem_header, dem, incidence_angle)
    elif params[cf.APS_METHOD] == 2:
        # no need to calculate incidence map for all ifgs, they are the same
        aps_delay = geo_correction(date_pair, dem_header, dem, incidence_map)
    else:
        raise PyAPSException('APS method must be 1 or 2')
    return aps_delay


def rdr_correction(date_pair):
    """ using rdr coordinates to remove APS """
    raise NotImplementedError("This has not been implemented yet for PyRate")
    # aps1 = pa.PyAPS_rdr(
    #     os.path.join(ECMWF_DIR, ECMWF_PRE + date_pair[0] + ECMWF_EXT),
    #     SML_TEST_DEM_ROIPAC, grib='ECMWF', verb=True,
    #     demfmt='HGT', demtype=np.int16)
    # aps2 = pa.PyAPS_rdr(
    #     os.path.join(ECMWF_DIR, ECMWF_PRE + date_pair[1] + ECMWF_EXT),
    #     SML_TEST_DEM_ROIPAC, grib='ECMWF', verb=True,
    #     demfmt='HGT', demtype=np.int16)
    # phs1 = np.zeros((aps1.ny, aps1.nx))
    # phs2 = np.zeros((aps2.ny, aps2.nx))
    # print 'Without Lat Lon files'
    # # using random incidence angle
    # aps1.getdelay(phs1, inc=23.0)
    # aps2.getdelay(phs2, inc=23.0)
    # aps_delay = phs2 - phs1  # delay in meters as we don't provide wavelength
    # return aps_delay


def geo_correction(date_pair, dem_header, dem, incidence_angle_or_map):

    """ using geo coordinates to remove APS """

    aps1 = pa.PyAPSPyRateGeo(
        os.path.join(ECMWF_DIR, ECMWF_PRE + date_pair[0] + ECMWF_EXT),
        dem_header=dem_header, dem=dem, grib=ECMWF, verb=True)
    aps2 = pa.PyAPSPyRateGeo(
        os.path.join(ECMWF_DIR, ECMWF_PRE + date_pair[1] + ECMWF_EXT),
        dem_header=dem_header, dem=dem, grib=ECMWF, verb=True)
    phs1 = np.zeros((aps1.ny, aps1.nx))
    phs2 = np.zeros((aps2.ny, aps2.nx))
    print('Without Lat Lon files')
    aps1.getdelay_pyrate(phs1, dem, inc=incidence_angle_or_map)
    aps2.getdelay_pyrate(phs2, dem, inc=incidence_angle_or_map)
    aps_delay = phs2 - phs1  # delay in meters as we don't provide wavelength
    return aps_delay


def remove_aps_delay_original(ifgs, params):
    list_of_dates_for_grb_download = []

    incidence_angle = None
    incidence_map = None
    for ifg in ifgs:  # demo for only one ifg
        if params[cf.PROCESSOR] == 1:  # gamma
            PTN = re.compile(r'\d{8}')
            date_pair = [i for i in PTN.findall(os.path.basename(ifg.data_path))]
        elif params[cf.PROCESSOR] == 0:  # roipac
            # adding 20 to dates here, so dates before 2000 won't work
            # TODO: fix pre 2000 dates
            PTN = re.compile(r'\d{6}')
            date_pair = ['20' + i for i in
                         PTN.findall(os.path.basename(ifg.data_path))]
        else:
            raise AttributeError('processor needs to be gamma(1) or roipac(0)')

        list_of_dates_for_grb_download += date_pair

        first_grb = os.path.join(ECMWF_DIR,
                                 ECMWF_PRE + date_pair[0] + ECMWF_EXT)
        second_grb = os.path.join(ECMWF_DIR,
                                  ECMWF_PRE + date_pair[1] + ECMWF_EXT)

        # download .grb file if does not exist
        if not (os.path.exists(first_grb) and os.path.exists(second_grb)):
            # download weather files at 12:00 UTC (other options 00:00, 06:00, 18:00)
            pa.ecmwf_download(date_pair, '12', 'ECMWF')

        def get_incidence_map():
            """
            :param incidence_map:
            :param params:
            :param inc_or_ele: 1 when incidence map, 0 when elevation map
            :return:
            """
            if params[cf.APS_ELEVATION_MAP] is not None:
                f, e = os.path.basename(params[cf.APS_ELEVATION_MAP]).split(
                    '.')
            else:
                f, e = os.path.basename(params[cf.APS_INCIDENCE_MAP]).split(
                    '.')
            multilooked = os.path.join(
                params[cf.OUT_DIR],
                f + '_' + e +
                '_{looks}rlks_{crop}cr.tif'.format(
                    looks=params[cf.IFG_LKSX],
                    crop=params[
                        cf.IFG_CROP_OPT]))
            assert os.path.exists(multilooked), \
                'cropped and multilooked incidence map file not found. ' \
                'Use apsmethod=1, Or run prepifg with gamma processor'
            ds = gdal.Open(multilooked, gdalconst.GA_ReadOnly)
            if params[cf.APS_INCIDENCE_MAP] is not None:
                incidence_map = ds.ReadAsArray()
            else:
                incidence_map = 90 - ds.ReadAsArray()
            ds = None  # close file
            return incidence_map

        if params[cf.APS_METHOD] == 1:
            # no need to calculate incidence angle for all ifgs, they are the same
            if incidence_angle is None:
                incidence_angle = get_incidence_angle(date_pair, params)
            aps_delay = geo_correction(date_pair, params, incidence_angle)
        elif params[cf.APS_METHOD] == 2:
            # no need to calculate incidence map for all ifgs, they are the same
            if incidence_map is None:
                if params[cf.APS_INCIDENCE_MAP] is not None:
                    incidence_map = get_incidence_map()
                else:  # elevation map was provided
                    assert params[cf.APS_ELEVATION_MAP] is not None
                    incidence_map = get_incidence_map()
            aps_delay = geo_correction_original(date_pair, params,
                                                incidence_map)
        else:
            raise PyAPSException('PyAPS method must be 1 or 2')


        ifg.phase_data -= aps_delay  # remove delay
        # add it to the meta_data dict
        ifg.meta_data[ifc.PYRATE_WEATHER_ERROR] = APS_STATUS
        # write meta_data to file
        ifg.dataset.SetMetadataItem(ifc.PYRATE_WEATHER_ERROR, APS_STATUS)

        ifg.write_modified_phase()


def geo_correction_original(date_pair, params, incidence_angle_or_map):

    dem_file = params[cf.DEM_FILE]
    geotif_dem = os.path.join(
        params[cf.OUT_DIR], os.path.basename(dem_file).split('.')[0] + '.tif')

    mlooked_dem = prepifg.mlooked_path(geotif_dem,
                                       looks=params[cf.IFG_LKSX],
                                       crop_out=params[cf.IFG_CROP_OPT])
    # make sure mlooked dem exist
    if not os.path.exists(mlooked_dem):
        raise prepifg.PreprocessError('mlooked dem was not found.'
                                      'Please run prepifg.')

    dem_header = gamma.parse_dem_header(params[cf.DEM_HEADER_FILE])
    lat, lon, nx, ny = return_pyaps_lat_lon(dem_header)


    """ using geo coordinates to remove APS """

    aps1 = pa.PyAPS_geo(
        os.path.join(ECMWF_DIR, ECMWF_PRE + date_pair[0] + ECMWF_EXT),
        mlooked_dem, grib=ECMWF, verb=True,
        demfmt=GEOTIFF, demtype=np.float32, dem_header=(lon, lat, nx, ny))
    aps2 = pa.PyAPS_geo(
        os.path.join(ECMWF_DIR, ECMWF_PRE + date_pair[1] + ECMWF_EXT),
        mlooked_dem, grib=ECMWF, verb=True,
        demfmt=GEOTIFF, demtype=np.float32, dem_header=(lon, lat, nx, ny))
    phs1 = np.zeros((aps1.ny, aps1.nx))
    phs2 = np.zeros((aps2.ny, aps2.nx))
    print('Without Lat Lon files')
    aps1.getdelay(phs1, inc=incidence_angle_or_map)
    aps2.getdelay(phs2, inc=incidence_angle_or_map)
    aps_delay = phs2 - phs1  # delay in meters as we don't provide wavelength
    return aps_delay


def read_dem(params):
    dem_file = params[cf.DEM_FILE]
    geotif_dem = os.path.join(
        params[cf.OUT_DIR], os.path.basename(dem_file).split('.')[0] + '.tif')
    mlooked_dem = prepifg.mlooked_path(geotif_dem,
                                       looks=params[cf.IFG_LKSX],
                                       crop_out=params[cf.IFG_CROP_OPT])
    # make sure mlooked dem exist
    if not os.path.exists(mlooked_dem):
        raise prepifg.PreprocessError('mlooked dem was not found.'
                                      'Please run prepifg.')
    dem_header = gamma.parse_dem_header(params[cf.DEM_HEADER_FILE])
    lat, lon, nx, ny = return_pyaps_lat_lon(dem_header)

    ds = gdal.Open(mlooked_dem, gdalconst.GA_ReadOnly)
    dem = ds.ReadAsArray()
    ds = None
    return lat, lon, nx, ny, dem, mlooked_dem


def get_incidence_angle(date_pair, params):
    # incidence angle exists in unw header files, not in dem
    SLC_DIR = params[cf.SLC_DIR] if params[cf.SLC_DIR] else \
        params[cf.OBS_DIR]
    header_path = glob2.glob(os.path.join(
        SLC_DIR, '**/*%s*slc.par' % date_pair[0]))[0]
    header = gamma.parse_epoch_header(header_path)
    incidence_angle = header[ifc.INCIDENCE_ANGLE]
    return incidence_angle


def return_pyaps_lat_lon(dem_header):
    nx, ny = dem_header[ifc.PYRATE_NCOLS], dem_header[ifc.PYRATE_NROWS]
    lat = np.zeros((2, 1))
    lon = np.zeros((2, 1))
    lat[1] = dem_header[ifc.PYRATE_LAT]
    lon[0] = dem_header[ifc.PYRATE_LONG]
    if lon[0] < 0:
        lon[0] += 360.0
    dx = np.float(dem_header[ifc.PYRATE_X_STEP])
    dy = np.float(dem_header[ifc.PYRATE_Y_STEP])
    lat[0] = lat[1] + dy * ny
    lon[1] = lon[0] + dx * nx
    return lat, lon, nx, ny


class PyAPSException(Exception):
    """
    generic exception class for APS correction
    """
    pass


def _check_aps_ifgs(ifgs):
    # this function to be replaced with generic status check in shared module
    flags = [i.dataset.GetMetadataItem(ifc.PYRATE_WEATHER_ERROR) for i in ifgs]
    count = sum([f == APS_STATUS for f in flags])
    if (count < len(flags)) and (count > 0):
        log.debug('Detected mix of corrected and uncorrected '
                      'PyAPS delay in ifgs')

        for i, flag in zip(ifgs, flags):
            if flag:
                msg = '%s: prior PyAPS delay correction detected'
            else:
                msg = '%s: no PyAPS delay correction detected'
            logging.debug(msg % i.data_path)
        raise PyAPSException('Mixed PyAPS removal status in ifg list')


def _aps_delay_required(ifgs, params):
    # this functionality to be integrated in to run_pyrate script
    log.info('Removing PyAPS delay')

    if not params[cf.APS_CORRECTION]:
        log.info('PyAPS delay removal not required')
        return False

    # perform some general error/sanity checks
    flags = [i.dataset.GetMetadataItem(ifc.PYRATE_WEATHER_ERROR) for i in ifgs]

    if all(flags):
        log.info('Skipped PyAPS delay removal, ifgs are already PyAPS corrected')
        return False
    else:
        _check_aps_ifgs(ifgs)
    return True

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API