Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • 9fe4569
  • /
  • orbital.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge
swh:1:cnt:812ba860218b19feaaa0c278ad1b4b81aff1804c
directory badge
swh:1:dir:9fe4569968d540ed47abf6a3d37372f784c1ca22

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
orbital.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2017 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module implements residual orbital corrections for interferograms.
"""
# pylint: disable=invalid-name
import logging
from collections import OrderedDict
from numpy import empty, isnan, reshape, float32, squeeze
from numpy import dot, vstack, zeros, meshgrid
import numpy as np
from numpy.linalg import pinv
# from joblib import Parallel, delayed
from scipy.linalg import lstsq

from pyrate.algorithm import master_slave_ids, get_all_epochs
from pyrate import mst, shared, prepifg
from pyrate.shared import nanmedian, Ifg
from pyrate import config as cf
from pyrate import ifgconstants as ifc

log = logging.getLogger(__name__)

# Orbital correction tasks
#
# TODO: options for multilooking
# 1) do the 2nd stage mlook at prepifg.py/generate up front, then delete in
#    workflow afterward
# 2) refactor prep_ifgs() call to take input filenames and params & generate
#    mlooked versions from that
#    this needs to be more generic to call at any point in the runtime.


# Design notes:
# The orbital correction code is based on Matlab Pirate, but includes several
# enhancements. Pirate creates sparse arrays for the linear inversion, which
# contain many empty cells. This is unnecessary for the independent method, and
# temporarily wastes potentially a lot of memory.
#
# For the independent method, PyRate makes individual small design matrices and
# corrects the Ifgs one by one. If required in the correction, the offsets
# option adds an extra column of ones to include in the inversion.
#
# Network method design matrices are mostly empty, and offsets are handled
# differently. Individual design matrices (== independent method DMs) are
# placed in the sparse network design matrix. Offsets are not included in the
# smaller DMs to prevent unwanted cols being inserted. This is why some funcs
# appear to ignore the offset parameter in the networked method. Network DM
# offsets are cols of 1s in a diagonal line on the LHS of the sparse array.

# ORBITAL ERROR correction constants
INDEPENDENT_METHOD = cf.INDEPENDENT_METHOD
NETWORK_METHOD = cf.NETWORK_METHOD

PLANAR = cf.PLANAR
QUADRATIC = cf.QUADRATIC
PART_CUBIC = cf.PART_CUBIC


def remove_orbital_error(ifgs, params, preread_ifgs=None):
    """
    Wrapper function for PyRate orbital error removal functionality.

    NB: the ifg data is modified in situ, rather than create intermediate
    files. The network method assumes the given ifgs have already been reduced
    to a minimum spanning tree network.

    :param list ifgs: List of interferograms class objects
    :param dict params: Dictionary containing configuration parameters
    :param dict preread_ifgs: Dictionary containing information specifically
        for MPI jobs (optional)

    :return: None - interferogram phase data is updated and saved to disk
    """

    ifg_paths = [i.data_path for i in ifgs] \
        if isinstance(ifgs[0], Ifg) else ifgs

    mlooked = None

    # mlooking is not necessary for independent correction
    # can use multiple procesing if write_to_disc=True
    if params[cf.ORBITAL_FIT_METHOD] == 2:
        mlooked_dataset = prepifg.prepare_ifgs(
            ifg_paths,
            crop_opt=prepifg.ALREADY_SAME_SIZE,
            xlooks=params[cf.ORBITAL_FIT_LOOKS_X],
            ylooks=params[cf.ORBITAL_FIT_LOOKS_Y],
            thresh=params[cf.NO_DATA_AVERAGING_THRESHOLD],
            write_to_disc=False)
        mlooked = [Ifg(m[1]) for m in mlooked_dataset]

        for m in mlooked:
            m.initialize()
            m.nodata_value = params[cf.NO_DATA_VALUE]
            m.convert_to_nans()
            m.convert_to_mm()

    _orbital_correction(ifgs, params, mlooked=mlooked,
                        preread_ifgs=preread_ifgs)


def _orbital_correction(ifgs_or_ifg_paths, params, mlooked=None, offset=True,
                        preread_ifgs=None):
    """
    Convenience function to perform orbital correction.
    """
    degree = params[cf.ORBITAL_FIT_DEGREE]
    method = params[cf.ORBITAL_FIT_METHOD]
    # parallel = params[cf.PARALLEL]  # not implemented

    if degree not in [PLANAR, QUADRATIC, PART_CUBIC]:
        msg = "Invalid degree of %s for orbital correction" % degree
        raise OrbitalError(msg)

    log.info('Removing orbital error using orbital method correction {}'
             ' and degree={}'.format(method, degree))

    if method == NETWORK_METHOD:
        if mlooked is None:
            network_orbital_correction(ifgs_or_ifg_paths, degree, offset,
                                       params, m_ifgs=mlooked,
                                       preread_ifgs=preread_ifgs)
        else:
            _validate_mlooked(mlooked, ifgs_or_ifg_paths)
            network_orbital_correction(ifgs_or_ifg_paths, degree, offset,
                                       params, mlooked, preread_ifgs)

    elif method == INDEPENDENT_METHOD:
        # not running in parallel
        # raises swig object pickle error
        # Parallel(n_jobs=params[cf.PROCESSES], verbose=50)(
        #     delayed(_independent_correction)(ifg, degree, offset, params)
        #     for ifg in ifgs)
        for ifg in ifgs_or_ifg_paths:
            independent_orbital_correction(ifg, degree, offset, params)
    else:
        msg = "Unknown method: '%s', need INDEPENDENT or NETWORK method"
        raise OrbitalError(msg % method)


def _validate_mlooked(mlooked, ifgs):
    '''
    Basic sanity checking of the multilooked ifgs.
    '''

    if len(mlooked) != len(ifgs):
        msg = "Mismatching # ifgs and # multilooked ifgs"
        raise OrbitalError(msg)

    if not all([hasattr(i, 'phase_data') for i in mlooked]):
        msg = "Mismatching types in multilooked ifgs arg:\n%s" % mlooked
        raise OrbitalError(msg)


def _get_num_params(degree, offset=None):
    '''
    Returns number of model parameters from string parameter
    '''

    if degree == PLANAR:
        nparams = 2
    elif degree == QUADRATIC:
        nparams = 5
    elif degree == PART_CUBIC:
        nparams = 6
    else:
        msg = "Invalid orbital model degree: %s" % degree
        raise OrbitalError(msg)

    # NB: independent method only, network method handles offsets separately
    if offset is True:
        nparams += 1  # eg. y = mx + offset
    return nparams


def independent_orbital_correction(ifg, degree, offset, params):
    """
    Calculates and removes an orbital error surface from a single independent
    interferogram.

    Warning: This will write orbital error corrected phase_data to the ifg.

    :param Ifg class instance ifg: the interferogram to be corrected
    :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC)
    :param bool offset: True to calculate the model using an offset
    :param dict params: dictionary of configuration parameters

    :return: None - interferogram phase data is updated and saved to disk
    """
    ifg = shared.Ifg(ifg) if isinstance(ifg, str) else ifg
    if not ifg.is_open:
        ifg.open()
    shared.nan_and_mm_convert(ifg, params)
    # vectorise, keeping NODATA
    vphase = reshape(ifg.phase_data, ifg.num_cells)
    dm = get_design_matrix(ifg, degree, offset)

    # filter NaNs out before getting model
    clean_dm = dm[~isnan(vphase)]
    data = vphase[~isnan(vphase)]
    model = lstsq(clean_dm, data)[0]  # first arg is the model params

    # calculate forward model & morph back to 2D
    if offset:
        fullorb = np.reshape(np.dot(dm[:, :-1], model[:-1]),
                             ifg.phase_data.shape)
    else:
        fullorb = np.reshape(np.dot(dm, model), ifg.phase_data.shape)
    offset_removal = nanmedian(np.ravel(ifg.phase_data - fullorb))
    # subtract orbital error from the ifg
    ifg.phase_data -= (fullorb - offset_removal)
    # set orbfit meta tag and save phase to file
    _save_orbital_error_corrected_phase(ifg)
    if ifg.open():
        ifg.close()


def network_orbital_correction(ifgs, degree, offset, params, m_ifgs=None,
                               preread_ifgs=None):
    """
    This algorithm implements a network inversion to determine orbital
    corrections for a set of interferograms forming a connected network.

    Warning: This will write orbital error corrected phase_data to the ifgs.

    :param list ifgs: List of Ifg class objects reduced to a minimum spanning
        tree network
    :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC)
    :param bool offset: True to calculate the model using offsets
    :param dict params: dictionary of configuration parameters
    :param list m_ifgs: list of multilooked Ifg class objects
        (sequence must be multilooked versions of 'ifgs' arg)
    :param dict preread_ifgs: Dictionary containing information specifically
        for MPI jobs (optional)

    :return: None - interferogram phase data is updated and saved to disk
    """
    # pylint: disable=too-many-locals, too-many-arguments
    src_ifgs = ifgs if m_ifgs is None else m_ifgs
    src_ifgs = mst.mst_from_ifgs(src_ifgs)[3]  # use networkx mst

    vphase = vstack([i.phase_data.reshape((i.num_cells, 1)) for i in src_ifgs])
    vphase = squeeze(vphase)

    B = get_network_design_matrix(src_ifgs, degree, offset)

    # filter NaNs out before getting model
    B = B[~isnan(vphase)]
    orbparams = dot(pinv(B, 1e-6), vphase[~isnan(vphase)])

    ncoef = _get_num_params(degree)
    if preread_ifgs:
        temp_ifgs = OrderedDict(sorted(preread_ifgs.items())).values()
        ids = master_slave_ids(get_all_epochs(temp_ifgs))
    else:
        ids = master_slave_ids(get_all_epochs(ifgs))
    coefs = [orbparams[i:i+ncoef] for i in
             range(0, len(set(ids)) * ncoef, ncoef)]

    # create full res DM to expand determined coefficients into full res
    # orbital correction (eg. expand coarser model to full size)

    if preread_ifgs:
        temp_ifg = Ifg(ifgs[0])  # ifgs here are paths
        temp_ifg.open()
        dm = get_design_matrix(temp_ifg, degree, offset=False)
        temp_ifg.close()
    else:
        dm = get_design_matrix(ifgs[0], degree, offset=False)

    for i in ifgs:
        # open if not Ifg instance
        if isinstance(i, str):  # pragma: no cover
            # are paths
            i = Ifg(i)
            i.open(readonly=False)
            shared.nan_and_mm_convert(i, params)
        _remove_network_orb_error(coefs, dm, i, ids, offset)


def _remove_network_orb_error(coefs, dm, ifg, ids, offset):
    """
    Convenience function to remove network orbital error from input
    interferograms
    """
    orb = dm.dot(coefs[ids[ifg.slave]] - coefs[ids[ifg.master]])
    orb = orb.reshape(ifg.shape)
    # offset estimation
    if offset:
        # bring all ifgs to same base level
        orb -= nanmedian(np.ravel(ifg.phase_data - orb))
    # subtract orbital error from the ifg
    ifg.phase_data -= orb
    # set orbfit meta tag and save phase to file
    _save_orbital_error_corrected_phase(ifg)


def _save_orbital_error_corrected_phase(ifg):
    """
    Convenience function to update metadata and save latest phase after
    orbital fit correction
    """
    # set orbfit tags after orbital error correction
    ifg.dataset.SetMetadataItem(ifc.PYRATE_ORBITAL_ERROR, ifc.ORB_REMOVED)
    ifg.write_modified_phase()
    ifg.close()


# TODO: subtract reference pixel coordinate from x and y
def get_design_matrix(ifg, degree, offset, scale=100.0):
    """
    Returns orbital error design matrix with columns for model parameters.

    :param Ifg class instance ifg: interferogram to get design matrix for
    :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC)
    :param bool offset: True to include offset column, otherwise False.
    :param float scale: Scale factor to divide cell size by in order to
        improve inversion robustness

    :return: dm: design matrix
    :rtype: ndarray
    """

    if degree not in [PLANAR, QUADRATIC, PART_CUBIC]:
        raise OrbitalError("Invalid degree argument")

    # scaling required with higher degree models to help with param estimation
    xsize = ifg.x_size / scale if scale else ifg.x_size
    ysize = ifg.y_size / scale if scale else ifg.y_size

    # mesh needs to start at 1, otherwise first cell resolves to 0 and ignored
    xg, yg = [g+1 for g in meshgrid(range(ifg.ncols), range(ifg.nrows))]
    x = xg.reshape(ifg.num_cells) * xsize
    y = yg.reshape(ifg.num_cells) * ysize

    # TODO: performance test this vs np.concatenate (n by 1 cols)??
    dm = empty((ifg.num_cells, _get_num_params(degree, offset)), dtype=float32)

    # apply positional parameter values, multiply pixel coordinate by cell size
    # to get distance (a coord by itself doesn't tell us distance from origin)
    if degree == PLANAR:
        dm[:, 0] = x
        dm[:, 1] = y
    elif degree == QUADRATIC:
        dm[:, 0] = x**2
        dm[:, 1] = y**2
        dm[:, 2] = x * y
        dm[:, 3] = x
        dm[:, 4] = y
    elif degree == PART_CUBIC:
        dm[:, 0] = x * (y**2)
        dm[:, 1] = x**2
        dm[:, 2] = y**2
        dm[:, 3] = x * y
        dm[:, 4] = x
        dm[:, 5] = y
    if offset is True:
        dm[:, -1] = np.ones(ifg.num_cells)

    return dm


def get_network_design_matrix(ifgs, degree, offset):
    # pylint: disable=too-many-locals
    """
    Returns larger-format design matrix for network error correction. The
    network design matrix includes rows which relate to those of NaN cells.

    :param list ifgs: List of Ifg class objects
    :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC)
    :param bool offset: True to include offset cols, otherwise False.

    :return: netdm: network design matrix
    :rtype: ndarray
    """

    if degree not in [PLANAR, QUADRATIC, PART_CUBIC]:
        raise OrbitalError("Invalid degree argument")

    nifgs = len(ifgs)
    if nifgs < 1:
        # can feasibly do correction on a single Ifg/2 epochs
        raise OrbitalError("Invalid number of Ifgs: %s" % nifgs)

    # init sparse network design matrix
    nepochs = len(set(get_all_epochs(ifgs)))

    # no offsets: they are made separately below
    ncoef = _get_num_params(degree)
    shape = [ifgs[0].num_cells * nifgs, ncoef * nepochs]

    if offset:
        shape[1] += nifgs  # add extra block for offset cols

    netdm = zeros(shape, dtype=float32)

    # calc location for individual design matrices
    dates = [ifg.master for ifg in ifgs] + [ifg.slave for ifg in ifgs]
    ids = master_slave_ids(dates)
    offset_col = nepochs * ncoef  # base offset for the offset cols
    tmpdm = get_design_matrix(ifgs[0], degree, offset=False)

    # iteratively build up sparse matrix
    for i, ifg in enumerate(ifgs):
        rs = i * ifg.num_cells  # starting row
        m = ids[ifg.master] * ncoef  # start col for master
        s = ids[ifg.slave] * ncoef  # start col for slave
        netdm[rs:rs + ifg.num_cells, m:m + ncoef] = -tmpdm
        netdm[rs:rs + ifg.num_cells, s:s + ncoef] = tmpdm

        # offsets are diagonal cols across the extra array block created above
        if offset:
            netdm[rs:rs + ifg.num_cells, offset_col + i] = 1  # init offset cols

    return netdm


class OrbitalError(Exception):
    """
    Generic class for errors in orbital correction.
    """

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API