Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision 686db1b2da0eb33a4dd5a1509a3e51129a43609f authored by Debabrata Deka on 02 December 2020, 10:17:21 UTC, committed by GitHub on 02 December 2020, 10:17:21 UTC
Add architecture ppc64le to travis build
1 parent 031e062
  • Files
  • Changes
  • fdbb28c
  • /
  • ragout
  • /
  • breakpoint_graph
  • /
  • permutation.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • directory
  • content
revision badge
swh:1:rev:686db1b2da0eb33a4dd5a1509a3e51129a43609f
directory badge
swh:1:dir:a6bc7e7d4c7eb04432b536dfa8632503d11bfa4b
content badge
swh:1:cnt:55ad3b1b4e4101f6bc07ff1dce6c62aba6089a21

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • revision
  • directory
  • content
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
permutation.py
#(c) 2013-2014 by Authors
#This file is a part of Ragout program.
#Released under the BSD license (see LICENSE file)

"""
This module provides PermutationContainer class
which parses files, stores permutations and
provides some other usefull functions
"""

from __future__ import absolute_import
from __future__ import division
from collections import defaultdict
import logging
import os
from copy import deepcopy

from ragout.shared.debug import DebugConfig
from ragout.shared import config
from ragout.shared.datatypes import Block, Permutation, output_permutations
import ragout.breakpoint_graph.repeat_resolver as rr
from ragout.six.moves import filter

logger = logging.getLogger()
debugger = DebugConfig.get_instance()


class PermException(Exception):
    pass


class PermutationContainer:
    def __init__(self, block_coords_file, recipe,
                 resolve_repeats, allow_ref_indels, phylogeny):
        """
        Parses permutation files referenced from recipe and filters duplications
        """
        self.ref_perms = []
        self.target_perms = []
        self.recipe = recipe

        logging.info("Reading %s", block_coords_file)
        permutations = _parse_blocks_coords(block_coords_file)

        has_sequences = set()
        draft_names = set()
        for p in permutations:
            if p.genome_name not in recipe["genomes"]:
                continue
            p.draft = recipe["genomes"][p.genome_name]["draft"]
            if p.draft:
                draft_names.add(p.genome_name)

            has_sequences.add(p.genome_name)
            if p.genome_name == recipe["target"]:
                self.target_perms.append(p)
            else:
                self.ref_perms.append(p)

        for genome in recipe["genomes"]:
            if genome not in has_sequences:
                raise PermException("No sequences read for genome {0}. Check "
                                    "recipe for correctness.".format(genome))
        _check_coverage(self.ref_perms + self.target_perms)

        logger.debug("Read %d reference sequences", len(self.ref_perms))
        if not len(self.ref_perms):
            raise PermException("No synteny blocks found in "
                                "reference sequences")
        logger.debug("Read %d target sequences", len(self.target_perms))
        if not len(self.target_perms):
            raise PermException("No synteny blocks found in "
                                "target sequences")

        self._filter_indels(allow_ref_indels)
        logger.debug("%d target sequences left after indel filtering",
                     len(self.target_perms))

        repeats = _find_repeats(self.ref_perms + self.target_perms)
        ###
        if resolve_repeats:
            if phylogeny is None:
                raise PermException("Resolving repeats with "
                                    "yet unknown phylogeny")
            rr.resolve_repeats(self.ref_perms, self.target_perms,
                               repeats, phylogeny, draft_names)
        ###
        self._filter_repeats(repeats)
        logger.debug("%d target sequences left after repeat filtering",
                     len(self.target_perms))
        if not len(self.target_perms):
            raise PermException("No synteny blocks found in the target "
                                "genome after repeat/indel filtering.")

        if debugger.debugging:
            filtered_file = os.path.join(debugger.debug_dir, "filtered_contigs.txt")
            output_permutations(self.target_perms, filtered_file)

    def _filter_indels(self, allow_ref_indels):
        """
        Keep only blocks that appear in target and
        all references (or one reference, if allow_ref_indels is set)
        """
        multiplicity = defaultdict(int)
        target_blocks = set()
        reference_blocks = set()
        def process(perms, block_set):
            for perm in perms:
                for block in perm.blocks:
                    multiplicity[block.block_id] += 1
                    block_set.add(block.block_id)

        process(self.target_perms, target_blocks)
        process(self.ref_perms, reference_blocks)

        if allow_ref_indels:
            to_keep = target_blocks.intersection(reference_blocks)
        else:
            num_genomes = len(self.recipe["genomes"])
            #to_keep = set(filter(lambda b: multiplicity[b] >= num_genomes,
            #                     multiplicity))
            to_keep = set([b for b in multiplicity if multiplicity[b] >= num_genomes])

        self.ref_perms = _filter_permutations(self.ref_perms, to_keep)
        self.target_perms = _filter_permutations(self.target_perms, to_keep)


    def _filter_repeats(self, repeats):
        """
        Filters repetitive blocks
        """
        self.target_perms = _filter_permutations(self.target_perms, repeats,
                                                 inverse=True)
        self.ref_perms = _filter_permutations(self.ref_perms, repeats,
                                              inverse=True)


def _find_repeats(permutations):
    """
    Returns a set of repetitive blocks
    """
    index = defaultdict(set)
    repeats = set()
    for perm in permutations:
        for block in perm.blocks:
            if perm.genome_name in index[block.block_id]:
                repeats.add(block.block_id)
            else:
                index[block.block_id].add(perm.genome_name)
    return repeats


def _filter_permutations(permutations, blocks, inverse=False):
    """
    Filters given blocks from permutations
    """
    new_perms = []
    filter_func = lambda b: (b.block_id in blocks) != inverse

    for perm in permutations:
        new_blocks = list(filter(filter_func, perm.blocks))
        if new_blocks:
            new_perms.append(deepcopy(perm))
            new_perms[-1].blocks = new_blocks
    return new_perms


def _parse_blocks_coords(filename):
    """
    Parses a file with blocks coords
    """
    perm_by_id = {}
    with open(filename, "r") as f:
        header = True
        for line in f:
            line = line.strip()
            if not line:
                continue

            if header:
                if line.startswith("Seq_id"):
                    continue

                if line.startswith("-"):
                    header = False
                    continue

                chr_id, chr_size, seq_name = line.split("\t")
                tokens = seq_name.split(".", 1)
                if len(tokens) != 2:
                    raise PermException("permutation ids in " + filename +
                                        " do not follow naming convention: " +
                                        "'genome.chromosome'")

                genome_name, chr_name = tokens
                perm_by_id[chr_id] = Permutation(genome_name, chr_name,
                                                 int(chr_size), [])

            else:
                if line.startswith("Seq_id") or line.startswith("-"):
                    continue

                if line.startswith("Block"):
                    block_id = int(line.split(" ")[1][1:])
                    continue

                seq_id, sign, start, end, _length = line.split("\t")
                if sign == "-":
                    start, end = end, start
                if int(end) < int(start):
                    raise PermException("Error in permutations file format")

                sign_num = 1 if sign == "+" else -1
                perm_by_id[seq_id].blocks.append(Block(block_id, sign_num,
                                                      int(start), int(end)))

    for perm in perm_by_id.values():
        perm.blocks.sort(key=lambda b: b.start)

    #out_perms = list(filter(lambda b: len(b.blocks), perm_by_id.values()))
    out_perms = [b for b in  perm_by_id.values() if len(b.blocks)]
    if not len(out_perms):
        raise PermException("Permutations file is empty")
    return out_perms


def _check_coverage(permutations):
    """
    Checks if synteny blocks coverage is acceptable
    """
    by_genome = defaultdict(list)
    for perm in permutations:
        by_genome[perm.genome_name].append(perm)

    warning_genomes = []
    for genome_name, genome_perms in by_genome.items():
        total_length = 0
        total_covered = 0
        for perm in genome_perms:
            total_length += perm.length()
            for block in perm.blocks:
                total_covered += block.length()

        coverage = float(total_covered) / total_length
        logger.info("\"{0}\" synteny blocks coverage: {1:2.4}%"
                     .format(genome_name, 100 * coverage))
        if coverage < config.vals["min_synteny_coverage"]:
            warning_genomes.append((genome_name, coverage))

    if warning_genomes:
        #for genome_name, coverage in warning_genomes:
        #    logger.warning("\"{0}\" synteny blocks coverage: {1:2.4}%"
        #                 .format(genome_name, 100 * coverage))
        logger.warning("Some genomes have low synteny block coverage "
                       "- results can be inaccurate. "
                       "Possible reasons: \n"
                       "\t1. Genome is too distant from others\n"
                       "\t2. Synteny block parameters are chosen incorrectly"
                       "\n\tTry to change synteny blocks paramsters or "
                       "remove this genome from the comparison")
The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API