Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/GeoscienceAustralia/PyRate
09 August 2023, 08:52:18 UTC
  • Code
  • Branches (23)
  • Releases (1)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/CI-patch
    • refs/heads/data
    • refs/heads/dependabot/pip/joblib-1.2.0
    • refs/heads/dependabot/pip/numpy-1.22.0
    • refs/heads/dependabot/pip/scipy-1.10.0
    • refs/heads/develop
    • refs/heads/gh-pages
    • refs/heads/master
    • refs/heads/mg/actions
    • refs/heads/sb/largetifs-enhancements
    • refs/heads/sb/orbfit-independent-method
    • refs/heads/sb/orbital-correction-experiements
    • refs/heads/sb/phase-closure-correction
    • refs/heads/sb/upgrade-ci-ubuntu
    • refs/heads/sb/use-mpi-shared
    • refs/tags/0.3.0
    • refs/tags/0.4.0
    • refs/tags/0.4.1
    • refs/tags/0.4.2
    • refs/tags/0.4.3
    • refs/tags/0.5.0
    • refs/tags/0.6.0
    • refs/tags/0.6.1
    • 0.2.0
  • e7b697c
  • /
  • pyrate
  • /
  • process.py
Raw File Download Save again
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge
swh:1:cnt:c9ff7700bca2a5691333d634174b2e3d45ad47a8
origin badgedirectory badge
swh:1:dir:498a694dac19c7d882850c13ee2c4c9309b9b777
origin badgerevision badge
swh:1:rev:29ce59c68bc134810a3df70cdcce0284e8d35980
origin badgesnapshot badge
swh:1:snp:e85aafb5fc900c1df2eebb773a8b8e11798084c1

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
Tip revision: 29ce59c68bc134810a3df70cdcce0284e8d35980 authored by Matt Garthwaite on 26 June 2020, 00:13:01 UTC
Merge pull request #274 from GeoscienceAustralia/develop
Tip revision: 29ce59c
process.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2020 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
# coding: utf-8
"""
This Python module runs the main PyRate processing workflow
"""
import os
from os.path import join
import pickle as cp
from collections import OrderedDict
from typing import List, Tuple
import numpy as np

from pyrate.core import (shared, algorithm, orbital, ref_phs_est as rpe, 
                         ifgconstants as ifc, mpiops, config as cf, 
                         timeseries, mst, covariance as vcm_module, 
                         stack, refpixel)
from pyrate.core.aps import wrap_spatio_temporal_filter
from pyrate.core.shared import Ifg, PrereadIfg, get_tiles, mpi_vs_multiprocess_logging
from pyrate.core.logger import pyratelogger as log
from pyrate.prepifg import find_header
from pyrate.configuration import MultiplePaths

MASTER_PROCESS = 0


def _join_dicts(dicts):
    """
    Function to concatenate dictionaries
    """
    if dicts is None:  # pragma: no cover
        return
    assembled_dict = {k: v for D in dicts for k, v in D.items()}
    return assembled_dict


def _create_ifg_dict(dest_tifs, params):
    """
    1. Convert ifg phase data into numpy binary files.
    2. Save the preread_ifgs dict with information about the ifgs that are
    later used for fast loading of Ifg files in IfgPart class

    :param list dest_tifs: List of destination tifs
    :param dict params: Config dictionary
    :param list tiles: List of all Tile instances

    :return: preread_ifgs: Dictionary containing information regarding
                interferograms that are used later in workflow
    :rtype: dict
    """
    ifgs_dict = {}
    nifgs = len(dest_tifs)
    process_tifs = mpiops.array_split(dest_tifs)
    for d in process_tifs:
        ifg = shared._prep_ifg(d, params)
        ifgs_dict[d] = PrereadIfg(path=d,
                                  nan_fraction=ifg.nan_fraction,
                                  master=ifg.master,
                                  slave=ifg.slave,
                                  time_span=ifg.time_span,
                                  nrows=ifg.nrows,
                                  ncols=ifg.ncols,
                                  metadata=ifg.meta_data)
        ifg.close()
    ifgs_dict = _join_dicts(mpiops.comm.allgather(ifgs_dict))

    preread_ifgs_file = join(params[cf.TMPDIR], 'preread_ifgs.pk')

    if mpiops.rank == MASTER_PROCESS:

        # add some extra information that's also useful later
        gt, md, wkt = shared.get_geotiff_header_info(process_tifs[0])
        epochlist = algorithm.get_epochs(ifgs_dict)[0]
        log.info('Found {} unique epochs in the {} interferogram network'.format(len(epochlist.dates), nifgs))
        ifgs_dict['epochlist'] = epochlist
        ifgs_dict['gt'] = gt
        ifgs_dict['md'] = md
        ifgs_dict['wkt'] = wkt
        # dump ifgs_dict file for later use
        cp.dump(ifgs_dict, open(preread_ifgs_file, 'wb'))

    mpiops.comm.barrier()
    preread_ifgs = OrderedDict(sorted(cp.load(open(preread_ifgs_file, 'rb')).items()))
    log.debug('Finished converting phase_data to numpy in process {}'.format(mpiops.rank))
    return preread_ifgs


def _mst_calc(dest_tifs, params, tiles, preread_ifgs):
    """
    MPI wrapper function for MST calculation
    """
    process_tiles = mpiops.array_split(tiles)
    log.info('Calculating minimum spanning tree matrix')

    def _save_mst_tile(tile, i, preread_ifgs):
        """
        Convenient inner loop for mst tile saving
        """
        mst_tile = mst.mst_multiprocessing(tile, dest_tifs, preread_ifgs, params)
        # locally save the mst_mat
        mst_file_process_n = join(params[cf.TMPDIR], 'mst_mat_{}.npy'.format(i))
        np.save(file=mst_file_process_n, arr=mst_tile)

    for t in process_tiles:
        _save_mst_tile(t, t.index, preread_ifgs)
    log.debug('Finished mst calculation for process {}'.format(mpiops.rank))
    mpiops.comm.barrier()


def _ref_pixel_calc(ifg_paths: List[str], params: dict) -> Tuple[int, int]:
    """
    Wrapper for reference pixel calculation
    """

    lon = params[cf.REFX]
    lat = params[cf.REFY]

    ifg = Ifg(ifg_paths[0])
    ifg.open(readonly=True)
    # assume all interferograms have same projection and will share the same transform
    transform = ifg.dataset.GetGeoTransform()

    if lon == -1 or lat == -1:

        log.info('Searching for best reference pixel location')
        half_patch_size, thresh, grid = refpixel.ref_pixel_setup(ifg_paths, params)
        process_grid = mpiops.array_split(grid)
        refpixel.save_ref_pixel_blocks(process_grid, half_patch_size, ifg_paths, params)
        mean_sds = refpixel._ref_pixel_mpi(process_grid, half_patch_size, ifg_paths, thresh, params)
        mean_sds = mpiops.comm.gather(mean_sds, root=0)
        if mpiops.rank == MASTER_PROCESS:
            mean_sds = np.hstack(mean_sds)

        refpixel_returned = mpiops.run_once(refpixel.find_min_mean, mean_sds, grid)

        if isinstance(refpixel_returned, ValueError):
            from pyrate.core.refpixel import RefPixelError
            raise RefPixelError(
                "Reference pixel calculation returned an all nan slice!\n"
                "Cannot continue downstream computation. Please change reference pixel algorithm used before "
                "continuing.")
        refy, refx = refpixel_returned   # row first means first value is latitude
        log.info('Selected reference pixel coordinate (x, y): ({}, {})'.format(refx, refy))
        lon, lat = refpixel.convert_pixel_value_to_geographic_coordinate(refx, refy, transform)
        log.info('Selected reference pixel coordinate (lon, lat): ({}, {})'.format(lon, lat))

    else:
        log.info('Using reference pixel from config file (lon, lat): ({}, {})'.format(lon, lat))
        log.warning("Ensure user supplied reference pixel values are in lon/lat")
        refx, refy = refpixel.convert_geographic_coordinate_to_pixel_value(lon, lat, transform)
        log.info('Converted reference pixel coordinate (x, y): ({}, {})'.format(refx, refy))

    refpixel.update_refpix_metadata(ifg_paths, refx, refy, transform, params)

    log.debug("refpx, refpy: "+str(refx) + " " + str(refy))
    ifg.close()
    return int(refx), int(refy)


def _orb_fit_calc(multi_paths: List[MultiplePaths], params, preread_ifgs=None) -> None:
    """
    MPI wrapper for orbital fit correction
    """
    if not params[cf.ORBITAL_FIT]:
        log.info('Orbital correction not required!')
        print('Orbital correction not required!')
        return
    log.info('Calculating orbital correction')

    ifg_paths = [p.sampled_path for p in multi_paths]
    if preread_ifgs:  # don't check except for mpi tests
        # perform some general error/sanity checks
        log.debug('Checking Orbital error correction status')
        if mpiops.run_once(shared.check_correction_status, ifg_paths, ifc.PYRATE_ORBITAL_ERROR):
            log.debug('Orbital error correction not required as all ifgs are already corrected!')
            return  # return if True condition returned

    if params[cf.ORBITAL_FIT_METHOD] == 1:
        prcs_ifgs = mpiops.array_split(ifg_paths)
        orbital.remove_orbital_error(prcs_ifgs, params, preread_ifgs)
    else:
        # Here we do all the multilooking in one process, but in memory
        # can use multiple processes if we write data to disc during
        # remove_orbital_error step
        # A performance comparison should be made for saving multilooked
        # files on disc vs in memory single process multilooking
        if mpiops.rank == MASTER_PROCESS:
            headers = [find_header(p, params) for p in multi_paths]
            orbital.remove_orbital_error(ifg_paths, params, headers, preread_ifgs=preread_ifgs)
    mpiops.comm.barrier()
    log.debug('Finished Orbital error correction')


def _ref_phase_estimation(ifg_paths, params, refpx, refpy):
    """
    Wrapper for reference phase estimation.
    """
    log.info("Calculating reference phase and correcting each interferogram")
    if len(ifg_paths) < 2:
        raise rpe.ReferencePhaseError(
            "At least two interferograms required for reference phase correction ({len_ifg_paths} "
            "provided).".format(len_ifg_paths=len(ifg_paths))
        )

    if mpiops.run_once(shared.check_correction_status, ifg_paths, ifc.PYRATE_REF_PHASE):
        log.debug('Finished reference phase correction')
        return

    if params[cf.REF_EST_METHOD] == 1:
        ref_phs = rpe.est_ref_phase_method1(ifg_paths, params)
    elif params[cf.REF_EST_METHOD] == 2:
        ref_phs = rpe.est_ref_phase_method2(ifg_paths, params, refpx, refpy)
    else:
        raise rpe.ReferencePhaseError("No such option, use '1' or '2'.")

    # Save reference phase numpy arrays to disk.
    ref_phs_file = os.path.join(params[cf.TMPDIR], 'ref_phs.npy')
    if mpiops.rank == MASTER_PROCESS:
        collected_ref_phs = np.zeros(len(ifg_paths), dtype=np.float64)
        process_indices = mpiops.array_split(range(len(ifg_paths)))
        collected_ref_phs[process_indices] = ref_phs
        for r in range(1, mpiops.size):
            process_indices = mpiops.array_split(range(len(ifg_paths)), r)
            this_process_ref_phs = np.zeros(shape=len(process_indices),
                                            dtype=np.float64)
            mpiops.comm.Recv(this_process_ref_phs, source=r, tag=r)
            collected_ref_phs[process_indices] = this_process_ref_phs
        np.save(file=ref_phs_file, arr=collected_ref_phs)
    else:
        mpiops.comm.Send(ref_phs, dest=MASTER_PROCESS, tag=mpiops.rank)
    log.debug('Finished reference phase correction')

    # Preserve old return value so tests don't break.
    if isinstance(ifg_paths[0], Ifg):
        ifgs = ifg_paths
    else:
        ifgs = [Ifg(ifg_path) for ifg_path in ifg_paths]
    mpiops.comm.barrier()
    return ref_phs, ifgs


def main(params):
    """
    Top level function to perform PyRate workflow on given interferograms

    :return: refpt: tuple of reference pixel x and y position
    :rtype: tuple
    :return: maxvar: array of maximum variance values of interferograms
    :rtype: ndarray
    :return: vcmt: Variance-covariance matrix array
    :rtype: ndarray
    """
    mpi_vs_multiprocess_logging("process", params)

    ifg_paths = []
    for ifg_path in params[cf.INTERFEROGRAM_FILES]:
        ifg_paths.append(ifg_path.sampled_path)

    rows, cols = params["rows"], params["cols"]

    return process_ifgs(ifg_paths, params, rows, cols)


def process_ifgs(ifg_paths, params, rows, cols):
    """
    Top level function to perform PyRate workflow on given interferograms

    :param list ifg_paths: List of interferogram paths
    :param dict params: Dictionary of configuration parameters
    :param int rows: Number of sub-tiles in y direction
    :param int cols: Number of sub-tiles in x direction

    :return: refpt: tuple of reference pixel x and y position
    :rtype: tuple
    :return: maxvar: array of maximum variance values of interferograms
    :rtype: ndarray
    :return: vcmt: Variance-covariance matrix array
    :rtype: ndarray
    """

    if mpiops.size > 1:  # turn of multiprocessing during mpi jobs
        params[cf.PARALLEL] = False
    outdir = params[cf.TMPDIR]
    if not os.path.exists(outdir):
        shared.mkdir_p(outdir)

    tiles = mpiops.run_once(get_tiles, ifg_paths[0], rows, cols)

    preread_ifgs = _create_ifg_dict(ifg_paths, params=params)

    # validate user supplied ref pixel
    refpixel.validate_supplied_lat_lon(params)
    refpx, refpy = _ref_pixel_calc(ifg_paths, params)

    # remove non ifg keys
    _ = [preread_ifgs.pop(k) for k in ['gt', 'epochlist', 'md', 'wkt']]

    multi_paths = params[cf.INTERFEROGRAM_FILES]
    _orb_fit_calc(multi_paths, params, preread_ifgs)

    _ref_phase_estimation(ifg_paths, params, refpx, refpy)

    shared.save_numpy_phase(ifg_paths, tiles, params)
    _mst_calc(ifg_paths, params, tiles, preread_ifgs)

    # spatio-temporal aps filter
    wrap_spatio_temporal_filter(ifg_paths, params, tiles, preread_ifgs)

    maxvar, vcmt = _maxvar_vcm_calc(ifg_paths, params, preread_ifgs)
    # save phase data tiles as numpy array for timeseries and stackrate calc

    shared.save_numpy_phase(ifg_paths, tiles, params)

    _timeseries_calc(ifg_paths, params, vcmt, tiles, preread_ifgs)

    _stack_calc(ifg_paths, params, vcmt, tiles, preread_ifgs)

    log.info('PyRate workflow completed')
    return (refpx, refpy), maxvar, vcmt


def _stack_calc(ifg_paths, params, vcmt, tiles, preread_ifgs):
    """
    MPI wrapper for stacking calculation
    """
    process_tiles = mpiops.array_split(tiles)
    log.info('Calculating rate map from stacking')
    output_dir = params[cf.TMPDIR]
    for t in process_tiles:
        log.info('Stacking of tile {}'.format(t.index))
        ifg_parts = [shared.IfgPart(p, t, preread_ifgs, params) for p in ifg_paths]
        mst_grid_n = np.load(os.path.join(output_dir, 'mst_mat_{}.npy'.format(t.index)))
        rate, error, samples = stack.stack_rate_array(ifg_parts, params, vcmt, mst_grid_n)
        # declare file names
        np.save(file=os.path.join(output_dir, 'stack_rate_{}.npy'.format(t.index)), arr=rate)
        np.save(file=os.path.join(output_dir, 'stack_error_{}.npy'.format(t.index)), arr=error)
        np.save(file=os.path.join(output_dir, 'stack_samples_{}.npy'.format(t.index)), arr=samples)
    mpiops.comm.barrier()
    log.debug("Finished stack rate calc!")


def _maxvar_vcm_calc(ifg_paths, params, preread_ifgs):
    """
    MPI wrapper for maxvar and vcmt computation
    """
    log.info('Calculating the temporal variance-covariance matrix')
    process_indices = mpiops.array_split(range(len(ifg_paths)))

    def _get_r_dist(ifg_path):
        """
        Get RDIst class object
        """
        ifg = Ifg(ifg_path)
        ifg.open()
        r_dist = vcm_module.RDist(ifg)()
        ifg.close()
        return r_dist

    r_dist = mpiops.run_once(_get_r_dist, ifg_paths[0])
    prcs_ifgs = mpiops.array_split(ifg_paths)
    process_maxvar = []
    for n, i in enumerate(prcs_ifgs):
        log.debug('Calculating maxvar for {} of process ifgs {} of total {}'.format(n+1, len(prcs_ifgs), len(ifg_paths)))
        process_maxvar.append(vcm_module.cvd(i, params, r_dist, calc_alpha=True, write_vals=True, save_acg=True)[0])
    if mpiops.rank == MASTER_PROCESS:
        maxvar = np.empty(len(ifg_paths), dtype=np.float64)
        maxvar[process_indices] = process_maxvar
        for i in range(1, mpiops.size):  # pragma: no cover
            rank_indices = mpiops.array_split(range(len(ifg_paths)), i)
            this_process_ref_phs = np.empty(len(rank_indices), dtype=np.float64)
            mpiops.comm.Recv(this_process_ref_phs, source=i, tag=i)
            maxvar[rank_indices] = this_process_ref_phs
    else:  # pragma: no cover
        maxvar = np.empty(len(ifg_paths), dtype=np.float64)
        mpiops.comm.Send(np.array(process_maxvar, dtype=np.float64), dest=MASTER_PROCESS, tag=mpiops.rank)

    mpiops.comm.barrier()
    maxvar = mpiops.comm.bcast(maxvar, root=0)
    vcmt = mpiops.run_once(vcm_module.get_vcmt, preread_ifgs, maxvar)
    log.debug("Finished maxvar and vcm calc!")
    return maxvar, vcmt


def _timeseries_calc(ifg_paths, params, vcmt, tiles, preread_ifgs):
    """
    MPI wrapper for time series calculation.
    """
    if params[cf.TIME_SERIES_CAL] == 0:
        log.info('Time Series Calculation not required')
        return

    if params[cf.TIME_SERIES_METHOD] == 1:
        log.info('Calculating time series using Laplacian Smoothing method')
    elif params[cf.TIME_SERIES_METHOD] == 2:
        log.info('Calculating time series using SVD method')

    output_dir = params[cf.TMPDIR]
    total_tiles = len(tiles)
    process_tiles = mpiops.array_split(tiles)
    for t in process_tiles:
        log.debug("Calculating time series for tile "+str(t.index)+" out of "+str(total_tiles))
        ifg_parts = [shared.IfgPart(p, t, preread_ifgs, params) for p in ifg_paths]
        mst_tile = np.load(os.path.join(output_dir, 'mst_mat_{}.npy'.format(t.index)))
        res = timeseries.time_series(ifg_parts, params, vcmt, mst_tile)
        tsincr, tscum, _ = res
        np.save(file=os.path.join(output_dir, 'tsincr_{}.npy'.format(t.index)), arr=tsincr)
        np.save(file=os.path.join(output_dir, 'tscuml_{}.npy'.format(t.index)), arr=tscum)
    mpiops.comm.barrier()
    log.debug("Finished timeseries calc!")

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API