# This Python module is part of the PyRate software package. # # Copyright 2017 Geoscience Australia # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This Python module implements residual orbital corrections for interferograms. """ # pylint: disable=invalid-name import logging from collections import OrderedDict from numpy import empty, isnan, reshape, float32, squeeze from numpy import dot, vstack, zeros, meshgrid import numpy as np from numpy.linalg import pinv # from joblib import Parallel, delayed from scipy.linalg import lstsq from pyrate.algorithm import master_slave_ids, get_all_epochs from pyrate import mst, shared, prepifg from pyrate.shared import nanmedian, Ifg from pyrate import config as cf from pyrate import ifgconstants as ifc log = logging.getLogger(__name__) # Orbital correction tasks # # TODO: options for multilooking # 1) do the 2nd stage mlook at prepifg.py/generate up front, then delete in # workflow afterward # 2) refactor prep_ifgs() call to take input filenames and params & generate # mlooked versions from that # this needs to be more generic to call at any point in the runtime. # Design notes: # The orbital correction code is based on Matlab Pirate, but includes several # enhancements. Pirate creates sparse arrays for the linear inversion, which # contain many empty cells. This is unnecessary for the independent method, and # temporarily wastes potentially a lot of memory. # # For the independent method, PyRate makes individual small design matrices and # corrects the Ifgs one by one. If required in the correction, the offsets # option adds an extra column of ones to include in the inversion. # # Network method design matrices are mostly empty, and offsets are handled # differently. Individual design matrices (== independent method DMs) are # placed in the sparse network design matrix. Offsets are not included in the # smaller DMs to prevent unwanted cols being inserted. This is why some funcs # appear to ignore the offset parameter in the networked method. Network DM # offsets are cols of 1s in a diagonal line on the LHS of the sparse array. # ORBITAL ERROR correction constants INDEPENDENT_METHOD = cf.INDEPENDENT_METHOD NETWORK_METHOD = cf.NETWORK_METHOD PLANAR = cf.PLANAR QUADRATIC = cf.QUADRATIC PART_CUBIC = cf.PART_CUBIC def remove_orbital_error(ifgs, params, preread_ifgs=None): """ Wrapper function for PyRate orbital error removal functionality. NB: the ifg data is modified in situ, rather than create intermediate files. The network method assumes the given ifgs have already been reduced to a minimum spanning tree network. :param list ifgs: List of interferograms class objects :param dict params: Dictionary containing configuration parameters :param dict preread_ifgs: Dictionary containing information specifically for MPI jobs (optional) :return: None - interferogram phase data is updated and saved to disk """ ifg_paths = [i.data_path for i in ifgs] \ if isinstance(ifgs[0], Ifg) else ifgs mlooked = None # mlooking is not necessary for independent correction # can use multiple procesing if write_to_disc=True if params[cf.ORBITAL_FIT_METHOD] == 2: mlooked_dataset = prepifg.prepare_ifgs( ifg_paths, crop_opt=prepifg.ALREADY_SAME_SIZE, xlooks=params[cf.ORBITAL_FIT_LOOKS_X], ylooks=params[cf.ORBITAL_FIT_LOOKS_Y], thresh=params[cf.NO_DATA_AVERAGING_THRESHOLD], write_to_disc=False) mlooked = [Ifg(m[1]) for m in mlooked_dataset] for m in mlooked: m.initialize() m.nodata_value = params[cf.NO_DATA_VALUE] m.convert_to_nans() m.convert_to_mm() _orbital_correction(ifgs, params, mlooked=mlooked, preread_ifgs=preread_ifgs) def _orbital_correction(ifgs_or_ifg_paths, params, mlooked=None, offset=True, preread_ifgs=None): """ Convenience function to perform orbital correction. """ degree = params[cf.ORBITAL_FIT_DEGREE] method = params[cf.ORBITAL_FIT_METHOD] # parallel = params[cf.PARALLEL] # not implemented if degree not in [PLANAR, QUADRATIC, PART_CUBIC]: msg = "Invalid degree of %s for orbital correction" % degree raise OrbitalError(msg) log.info('Removing orbital error using orbital method correction {}' ' and degree={}'.format(method, degree)) if method == NETWORK_METHOD: if mlooked is None: network_orbital_correction(ifgs_or_ifg_paths, degree, offset, params, m_ifgs=mlooked, preread_ifgs=preread_ifgs) else: _validate_mlooked(mlooked, ifgs_or_ifg_paths) network_orbital_correction(ifgs_or_ifg_paths, degree, offset, params, mlooked, preread_ifgs) elif method == INDEPENDENT_METHOD: # not running in parallel # raises swig object pickle error # Parallel(n_jobs=params[cf.PROCESSES], verbose=50)( # delayed(_independent_correction)(ifg, degree, offset, params) # for ifg in ifgs) for ifg in ifgs_or_ifg_paths: independent_orbital_correction(ifg, degree, offset, params) else: msg = "Unknown method: '%s', need INDEPENDENT or NETWORK method" raise OrbitalError(msg % method) def _validate_mlooked(mlooked, ifgs): ''' Basic sanity checking of the multilooked ifgs. ''' if len(mlooked) != len(ifgs): msg = "Mismatching # ifgs and # multilooked ifgs" raise OrbitalError(msg) if not all([hasattr(i, 'phase_data') for i in mlooked]): msg = "Mismatching types in multilooked ifgs arg:\n%s" % mlooked raise OrbitalError(msg) def _get_num_params(degree, offset=None): ''' Returns number of model parameters from string parameter ''' if degree == PLANAR: nparams = 2 elif degree == QUADRATIC: nparams = 5 elif degree == PART_CUBIC: nparams = 6 else: msg = "Invalid orbital model degree: %s" % degree raise OrbitalError(msg) # NB: independent method only, network method handles offsets separately if offset is True: nparams += 1 # eg. y = mx + offset return nparams def independent_orbital_correction(ifg, degree, offset, params): """ Calculates and removes an orbital error surface from a single independent interferogram. Warning: This will write orbital error corrected phase_data to the ifg. :param Ifg class instance ifg: the interferogram to be corrected :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC) :param bool offset: True to calculate the model using an offset :param dict params: dictionary of configuration parameters :return: None - interferogram phase data is updated and saved to disk """ ifg = shared.Ifg(ifg) if isinstance(ifg, str) else ifg if not ifg.is_open: ifg.open() shared.nan_and_mm_convert(ifg, params) # vectorise, keeping NODATA vphase = reshape(ifg.phase_data, ifg.num_cells) dm = get_design_matrix(ifg, degree, offset) # filter NaNs out before getting model clean_dm = dm[~isnan(vphase)] data = vphase[~isnan(vphase)] model = lstsq(clean_dm, data)[0] # first arg is the model params # calculate forward model & morph back to 2D if offset: fullorb = np.reshape(np.dot(dm[:, :-1], model[:-1]), ifg.phase_data.shape) else: fullorb = np.reshape(np.dot(dm, model), ifg.phase_data.shape) offset_removal = nanmedian(np.ravel(ifg.phase_data - fullorb)) # subtract orbital error from the ifg ifg.phase_data -= (fullorb - offset_removal) # set orbfit meta tag and save phase to file _save_orbital_error_corrected_phase(ifg) if ifg.open(): ifg.close() def network_orbital_correction(ifgs, degree, offset, params, m_ifgs=None, preread_ifgs=None): """ This algorithm implements a network inversion to determine orbital corrections for a set of interferograms forming a connected network. Warning: This will write orbital error corrected phase_data to the ifgs. :param list ifgs: List of Ifg class objects reduced to a minimum spanning tree network :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC) :param bool offset: True to calculate the model using offsets :param dict params: dictionary of configuration parameters :param list m_ifgs: list of multilooked Ifg class objects (sequence must be multilooked versions of 'ifgs' arg) :param dict preread_ifgs: Dictionary containing information specifically for MPI jobs (optional) :return: None - interferogram phase data is updated and saved to disk """ # pylint: disable=too-many-locals, too-many-arguments src_ifgs = ifgs if m_ifgs is None else m_ifgs src_ifgs = mst.mst_from_ifgs(src_ifgs)[3] # use networkx mst vphase = vstack([i.phase_data.reshape((i.num_cells, 1)) for i in src_ifgs]) vphase = squeeze(vphase) B = get_network_design_matrix(src_ifgs, degree, offset) # filter NaNs out before getting model B = B[~isnan(vphase)] orbparams = dot(pinv(B, 1e-6), vphase[~isnan(vphase)]) ncoef = _get_num_params(degree) if preread_ifgs: temp_ifgs = OrderedDict(sorted(preread_ifgs.items())).values() ids = master_slave_ids(get_all_epochs(temp_ifgs)) else: ids = master_slave_ids(get_all_epochs(ifgs)) coefs = [orbparams[i:i+ncoef] for i in range(0, len(set(ids)) * ncoef, ncoef)] # create full res DM to expand determined coefficients into full res # orbital correction (eg. expand coarser model to full size) if preread_ifgs: temp_ifg = Ifg(ifgs[0]) # ifgs here are paths temp_ifg.open() dm = get_design_matrix(temp_ifg, degree, offset=False) temp_ifg.close() else: dm = get_design_matrix(ifgs[0], degree, offset=False) for i in ifgs: # open if not Ifg instance if isinstance(i, str): # pragma: no cover # are paths i = Ifg(i) i.open(readonly=False) shared.nan_and_mm_convert(i, params) _remove_network_orb_error(coefs, dm, i, ids, offset) def _remove_network_orb_error(coefs, dm, ifg, ids, offset): """ Convenience function to remove network orbital error from input interferograms """ orb = dm.dot(coefs[ids[ifg.slave]] - coefs[ids[ifg.master]]) orb = orb.reshape(ifg.shape) # offset estimation if offset: # bring all ifgs to same base level orb -= nanmedian(np.ravel(ifg.phase_data - orb)) # subtract orbital error from the ifg ifg.phase_data -= orb # set orbfit meta tag and save phase to file _save_orbital_error_corrected_phase(ifg) def _save_orbital_error_corrected_phase(ifg): """ Convenience function to update metadata and save latest phase after orbital fit correction """ # set orbfit tags after orbital error correction ifg.dataset.SetMetadataItem(ifc.PYRATE_ORBITAL_ERROR, ifc.ORB_REMOVED) ifg.write_modified_phase() ifg.close() # TODO: subtract reference pixel coordinate from x and y def get_design_matrix(ifg, degree, offset, scale=100.0): """ Returns orbital error design matrix with columns for model parameters. :param Ifg class instance ifg: interferogram to get design matrix for :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC) :param bool offset: True to include offset column, otherwise False. :param float scale: Scale factor to divide cell size by in order to improve inversion robustness :return: dm: design matrix :rtype: ndarray """ if degree not in [PLANAR, QUADRATIC, PART_CUBIC]: raise OrbitalError("Invalid degree argument") # scaling required with higher degree models to help with param estimation xsize = ifg.x_size / scale if scale else ifg.x_size ysize = ifg.y_size / scale if scale else ifg.y_size # mesh needs to start at 1, otherwise first cell resolves to 0 and ignored xg, yg = [g+1 for g in meshgrid(range(ifg.ncols), range(ifg.nrows))] x = xg.reshape(ifg.num_cells) * xsize y = yg.reshape(ifg.num_cells) * ysize # TODO: performance test this vs np.concatenate (n by 1 cols)?? dm = empty((ifg.num_cells, _get_num_params(degree, offset)), dtype=float32) # apply positional parameter values, multiply pixel coordinate by cell size # to get distance (a coord by itself doesn't tell us distance from origin) if degree == PLANAR: dm[:, 0] = x dm[:, 1] = y elif degree == QUADRATIC: dm[:, 0] = x**2 dm[:, 1] = y**2 dm[:, 2] = x * y dm[:, 3] = x dm[:, 4] = y elif degree == PART_CUBIC: dm[:, 0] = x * (y**2) dm[:, 1] = x**2 dm[:, 2] = y**2 dm[:, 3] = x * y dm[:, 4] = x dm[:, 5] = y if offset is True: dm[:, -1] = np.ones(ifg.num_cells) return dm def get_network_design_matrix(ifgs, degree, offset): # pylint: disable=too-many-locals """ Returns larger-format design matrix for network error correction. The network design matrix includes rows which relate to those of NaN cells. :param list ifgs: List of Ifg class objects :param str degree: model to fit (PLANAR / QUADRATIC / PART_CUBIC) :param bool offset: True to include offset cols, otherwise False. :return: netdm: network design matrix :rtype: ndarray """ if degree not in [PLANAR, QUADRATIC, PART_CUBIC]: raise OrbitalError("Invalid degree argument") nifgs = len(ifgs) if nifgs < 1: # can feasibly do correction on a single Ifg/2 epochs raise OrbitalError("Invalid number of Ifgs: %s" % nifgs) # init sparse network design matrix nepochs = len(set(get_all_epochs(ifgs))) # no offsets: they are made separately below ncoef = _get_num_params(degree) shape = [ifgs[0].num_cells * nifgs, ncoef * nepochs] if offset: shape[1] += nifgs # add extra block for offset cols netdm = zeros(shape, dtype=float32) # calc location for individual design matrices dates = [ifg.master for ifg in ifgs] + [ifg.slave for ifg in ifgs] ids = master_slave_ids(dates) offset_col = nepochs * ncoef # base offset for the offset cols tmpdm = get_design_matrix(ifgs[0], degree, offset=False) # iteratively build up sparse matrix for i, ifg in enumerate(ifgs): rs = i * ifg.num_cells # starting row m = ids[ifg.master] * ncoef # start col for master s = ids[ifg.slave] * ncoef # start col for slave netdm[rs:rs + ifg.num_cells, m:m + ncoef] = -tmpdm netdm[rs:rs + ifg.num_cells, s:s + ncoef] = tmpdm # offsets are diagonal cols across the extra array block created above if offset: netdm[rs:rs + ifg.num_cells, offset_col + i] = 1 # init offset cols return netdm class OrbitalError(Exception): """ Generic class for errors in orbital correction. """