Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • 9fe4569
  • /
  • covariance.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge
swh:1:cnt:75b17fbeae3f6ef737493f0397c265865562d526
directory badge
swh:1:dir:9fe4569968d540ed47abf6a3d37372f784c1ca22

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
covariance.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2017 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module implements covariance calculation and
Variance/Covariance matrix functionality. The algorithms
are based on functions 'cvdcalc.m' and 'vcmt.m' from
the Matlab Pirate package.
"""
from __future__ import print_function
from os.path import basename, join
import logging
from numpy import array, where, isnan, real, imag, sqrt, meshgrid
from numpy import zeros, vstack, ceil, mean, exp, reshape
from numpy.linalg import norm
import numpy as np
from scipy.fftpack import fft2, ifft2, fftshift
from scipy.optimize import fmin

from pyrate import shared
from pyrate.shared import PrereadIfg
from pyrate.algorithm import master_slave_ids
from pyrate import ifgconstants as ifc
from pyrate import config as cf

# pylint: disable=too-many-arguments
# distance division factor of 1000 converts to km and is needed to match
# Matlab code output
DISTFACT = 1000

log = logging.getLogger(__name__)


def _pendiffexp(alphamod, cvdav):
    """
    Fits an exponential model to data.

    :param float alphamod: Exponential decay exponent.
    :param ndarray cvdav: Function magnitude at 0 radius (2 col array of
    radius, variance)
    """
    # pylint: disable=invalid-name
    # maxvar usually at zero lag
    mx = cvdav[1, 0]
    return norm(cvdav[1, :] - (mx * exp(-alphamod * cvdav[0, :])))


# this is not used any more
def _unique_points(points):  # pragma: no cover
    """
    Returns unique points from a list of coordinates.

    :param points: Sequence of (y,x) or (x,y) tuples.
    """
    return vstack([array(u) for u in set(points)])


def cvd(ifg_path, params, r_dist, calc_alpha=False,
        write_vals=False, save_acg=False):
    """
    Calculate the 1D covariance function of an entire interferogram as the
    radial average of its 2D autocorrelation.

    :param str ifg_path: An interferogram file path. OR
    :param Ifg class ifg_path: A pyrate.shared.Ifg class object
    :param dict params: Dictionary of configuration parameters
    :param ndarray r_dist: Array of distance values from the image centre
                (See Rdist class for more details)
    :param bool calc_alpha: If True calculate alpha
    :param bool write_vals: If True write maxvar and alpha values to
                interferogram metadata
    :param bool save_acg: If True write autocorrelation and radial distance
                data to numpy array file on disk

    :return: maxvar: The maximum variance (at zero lag)
    :rtype: float
    :return: alpha: the exponential length-scale of decay factor
    :rtype: float
    """

    if isinstance(ifg_path, str):  # used during MPI
        ifg = shared.Ifg(ifg_path)
        ifg.open()
    else:
        ifg = ifg_path

    shared.nan_and_mm_convert(ifg, params)
    # calculate 2D auto-correlation of image using the
    # spectral method (Wiener-Khinchin theorem)
    if ifg.nan_converted:  # if nancoverted earlier, convert nans back to 0's
        phase = where(isnan(ifg.phase_data), 0, ifg.phase_data)
    else:
        phase = ifg.phase_data

    maxvar, alpha = cvd_from_phase(phase, ifg, r_dist, calc_alpha,
                                   save_acg=save_acg, params=params)

    if write_vals:
        _add_metadata(ifg, maxvar, alpha)

    if isinstance(ifg_path, str):
        ifg.close()

    return maxvar, alpha


def _add_metadata(ifg, maxvar, alpha):
    """
    Convenience function for saving metadata to ifg
    """
    md = ifg.meta_data
    md[ifc.PYRATE_MAXVAR] = str(maxvar)
    md[ifc.PYRATE_ALPHA] = str(alpha)
    ifg.write_modified_phase()


def _save_cvd_data(acg, r_dist, ifg_path, outdir):
    """
    Function to save numpy array of autocorrelation data to disk
    """
    data = np.column_stack((acg, r_dist))
    data_file = join(outdir, 'cvd_data_{b}.npy'.format(
        b=basename(ifg_path).split('.')[0]))
    np.save(file=data_file, arr=data)


def cvd_from_phase(phase, ifg, r_dist, calc_alpha, save_acg=False,
                   params=None):
    """
    A convenience function used to compute radial autocovariance from phase
    data

    :param ndarray phase: An array of interferogram phase data
    :param Ifg class ifg: A pyrate.shared.Ifg class instance
    :param ndarray r_dist: Array of distance values from the image centre
                (See Rdist class for more details)
    :param bool calc_alpha: If True calculate alpha
    :param bool save_acg: If True write autocorrelation and radial distance
                data to numpy array file on disk
    :param dict params: [optional] Dictionary of configuration parameters;
                Must be provided if save_acg=True

    :return: maxvar: The maximum variance (at zero lag)
    :rtype: float
    :return: alpha: the exponential length-scale of decay factor
    :rtype: float
    """
    # pylint: disable=invalid-name
    # pylint: disable=too-many-locals

    autocorr_grid = _get_autogrid(phase)
    acg = reshape(autocorr_grid, phase.size, order='F')
    # Symmetry in image; keep only unique points
    # tmp = _unique_points(zip(acg, r_dist))
    # Sudipta: Is this faster than keeping only the 1st half as in Matlab?
    # Sudipta: Unlikely, as unique_point is a search/comparison,
    # whereas keeping 1st half is just numpy indexing.
    # If it is not faster, why was this done differently here?
    # r_dist = r_dist[:int(ceil(phase.size / 2.0)) + nrows]
    acg = acg[:len(r_dist)]
    # Alternative method to remove duplicate cells (from Matlab Pirate code)
    # r_dist = r_dist[:ceil(len(r_dist)/2)+nlines]
    #  Reason for '+nlines' term unknown
    # eg. array([x for x in set([(1,1), (2,2), (1,1)])])
    # the above shortens r_dist by some number of cells

    # pick the smallest axis to determine circle search radius
    if (ifg.x_centre * ifg.x_size) < (ifg.y_centre * ifg.y_size):
        maxdist = (ifg.x_centre+1) * ifg.x_size / DISTFACT
    else:
        maxdist = (ifg.y_centre+1) * ifg.y_size / DISTFACT

    # filter out data where the of lag distance is greater than maxdist
    # r_dist = array([e for e in rorig if e <= maxdist]) #
    # MG: prefers to use all the data
    # acg = array([e for e in rorig if e <= maxdist])
    indices_to_keep = r_dist < maxdist
    acg = acg[indices_to_keep]

    # optionally save acg vs dist observations to disk
    if save_acg:
        _save_cvd_data(acg, r_dist[indices_to_keep],
                       ifg.data_path, params[cf.TMPDIR])

    if calc_alpha:
        # bin width for collecting data
        bin_width = max(ifg.x_size, ifg.y_size) * 2 / DISTFACT  # km
        r_dist = r_dist[indices_to_keep]  # km
        # classify values of r_dist according to bin number
        rbin = ceil(r_dist / bin_width).astype(int)
        maxbin = max(rbin) - 1  # consistent with Matlab code

        cvdav = zeros(shape=(2, maxbin + 1))

        # the following stays in numpy land
        # distance instead of bin number
        cvdav[0, :] = np.multiply(range(maxbin + 1), bin_width)
        # mean variance for the bins
        cvdav[1, :] = [mean(acg[rbin == b]) for b in range(maxbin + 1)]
        # calculate best fit function maxvar*exp(-alpha*r_dist)
        alphaguess = 2 / (maxbin * bin_width)
        alpha = fmin(_pendiffexp, x0=alphaguess, args=(cvdav,), disp=False,
                     xtol=1e-6, ftol=1e-6)
        log.info("1st guess alpha {}, converged "
                 "alpha: {}".format(alphaguess, alpha))
        # maximum variance usually at the zero lag: max(acg[:len(r_dist)])
        return np.max(acg), alpha[0]  # alpha unit 1/km
    else:
        return np.max(acg), None


class RDist():
    """
    RDist class used for caching r_dist during maxvar/alpha computation
    """
    # pylint: disable=invalid-name
    def __init__(self, ifg):
        self.r_dist = None
        self.ifg = ifg
        self.nrows, self.ncols = ifg.shape

    def __call__(self):

        if self.r_dist is None:
            size = self.nrows * self.ncols
            # pixel distances from pixel at zero lag (image centre).
            xx, yy = meshgrid(range(self.ncols), range(self.nrows))
            # r_dist is distance from the center
            # doing np.divide and np.sqrt will improve performance as it keeps
            # calculations in the numpy land
            self.r_dist = np.divide(np.sqrt(((xx - self.ifg.x_centre) *
                                             self.ifg.x_size) ** 2 +
                                            ((yy - self.ifg.y_centre) *
                                             self.ifg.y_size) ** 2),
                                    DISTFACT)  # km
            self.r_dist = reshape(self.r_dist, size, order='F')
            self.r_dist = self.r_dist[:int(ceil(size / 2.0)) + self.nrows]

        return self.r_dist


def _get_autogrid(phase):
    """
    Helper function to assist with memory re-allocation during FFT calculation
    """
    autocorr_grid = _calc_autoc_grid(phase)
    nzc = np.sum(np.sum(phase != 0))
    autocorr_grid = fftshift(real(autocorr_grid)) / nzc
    return autocorr_grid


def _calc_autoc_grid(phase):
    """
    Helper function to assist with memory re-allocation during FFT calculation
    """
    pspec = _calc_power_spectrum(phase)
    autocorr_grid = ifft2(pspec)
    return autocorr_grid.astype(dtype=np.complex64)


def _calc_power_spectrum(phase):
    """
    Helper function to assist with memory re-allocation during FFT calculation
    """
    fft_phase = fft2(phase)
    pspec = real(fft_phase) ** 2 + imag(fft_phase) ** 2
    return pspec.astype(dtype=np.float32)


def get_vcmt(ifgs, maxvar):
    """
    Assembles a temporal variance/covariance matrix using the method
    described by Biggs et al., Geophys. J. Int, 2007. Matrix elements are
    evaluated according to sig_i * sig_j * C_ij where i and j are two
    interferograms and C is a matrix of coefficients:

    C = 1 if the master and slave epochs of i and j are equal
    C = 0.5 if have i and j share either a common master or slave epoch
    C = -0.5 if the master of i or j equals the slave of the other
    C = 0 otherwise

    :param list ifgs: A list of pyrate.shared.Ifg class objects.
    :param ndarray maxvar: numpy array of maximum variance values for the
                interferograms.

    :return: vcm_t: temporal variance-covariance matrix
    :rtype: ndarray
    """
    # pylint: disable=too-many-locals
    # c=0.5 for common master or slave; c=-0.5 if master
    # of one matches slave of another

    if isinstance(ifgs, dict):
        from collections import OrderedDict
        ifgs = {k: v for k, v in ifgs.items() if isinstance(v, PrereadIfg)}
        ifgs = OrderedDict(sorted(ifgs.items()))
        # pylint: disable=redefined-variable-type
        ifgs = ifgs.values()

    nifgs = len(ifgs)
    vcm_pat = zeros((nifgs, nifgs))

    dates = [ifg.master for ifg in ifgs] + [ifg.slave for ifg in ifgs]
    ids = master_slave_ids(dates)

    for i, ifg in enumerate(ifgs):
        mas1, slv1 = ids[ifg.master], ids[ifg.slave]

        for j, ifg2 in enumerate(ifgs):
            mas2, slv2 = ids[ifg2.master], ids[ifg2.slave]
            if mas1 == mas2 or slv1 == slv2:
                vcm_pat[i, j] = 0.5

            if mas1 == slv2 or slv1 == mas2:
                vcm_pat[i, j] = -0.5

            if mas1 == mas2 and slv1 == slv2:
                vcm_pat[i, j] = 1.0  # diagonal elements

    # make covariance matrix in time domain
    std = sqrt(maxvar).reshape((nifgs, 1))
    vcm_t = std * std.transpose()
    return vcm_t * vcm_pat

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API