Raw File
assembly_refine.py
#This module performs refinement with the assembly graph
#########################################################

import networkx as nx
import re
import logging
from collections import namedtuple

from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord

from ragout.shared import config
from ragout.shared.datatypes import Contig, Scaffold

Edge = namedtuple("Edge", ["start", "end"])
logger = logging.getLogger()

#PUBLIC:
########################################################

#does the job
def refine_scaffolds(graph_file, scaffolds):
    max_path_len = config.ASSEMBLY_MAX_PATH_LEN
    logger.info("Refining with assembly graph")
    logger.debug("Max path len = {0}".format(max_path_len))
    new_scaffolds = _insert_from_graph(graph_file, scaffolds, max_path_len)
    return new_scaffolds


#PRIVATE:
#########################################################

#ignore heavy python-graphviz
def _load_dot(filename):
    graph = nx.MultiDiGraph()
    edges = {}
    pattern = re.compile("([0-9]+)\s*\->\s*([0-9]+)\s*\[.*=.*\"(.+)\".*\];")
    for line in open(filename, "r").read().splitlines():
        m = pattern.match(line)
        if not m:
            continue

        v1, v2 = int(m.group(1)), int(m.group(2))
        graph.add_node(v1)
        graph.add_node(v2)
        graph.add_edge(v1, v2, label=m.group(3))
        edges[m.group(3)] = Edge(v1, v2)
    return graph, edges


#check if there is no multiedges along the path
def _check_unique(graph, path):
    for v1, v2 in zip(path[:-1], path[1:]):
        assert graph.has_edge(v1, v2)
        if len(graph[v1][v2]) > 1:
            return False
    return True


#finds a unique path between two nodes in graph
def _get_unique_path(graph, edges, prev_cont, new_cont, max_path_len):
    try:
        src = edges[str(prev_cont)].end
        dst = edges[str(new_cont)].start
    except KeyError:
        logger.debug("contigs are not in the graph")
        return None

    if src == dst:
        logger.debug("adjacent contigs {0} -- {1}".format(prev_cont, new_cont))
        return None

    if not nx.has_path(graph, src, dst):
        logger.debug("no path {0} -- {1}".format(prev_cont, new_cont))
        return None

    paths = [p for p in nx.all_shortest_paths(graph, src, dst)]
    if len(paths) > 1 or not _check_unique(graph, paths[0]):
        logger.debug("multiple paths {0} -- {1}".format(prev_cont, new_cont))
        return None

    path = paths[0]
    if len(path) > max_path_len:
        logger.debug("too long path {0} -- {1} of length {2}"
                       .format(prev_cont, new_cont, len(path)))
        return None

    path_edges = []
    for p_start, p_end in zip(path[:-1], path[1:]):
        found_edge = None
        for edge_id, edge in edges.items():
            if edge == Edge(p_start, p_end):
                found_edge = edge_id
                break
        assert found_edge
        path_edges.append(found_edge)

    logger.debug("unique path {0} -- {1} of length {2}"
                 .format(prev_cont, new_cont, len(path)))

    return path_edges


#inserts contigs from the assembly graph into scaffolds
def _insert_from_graph(graph_file, scaffolds_in, max_path_len):
    new_scaffolds = []
    graph, edges = _load_dot(graph_file)

    ordered_contigs = set()
    for scf in scaffolds_in:
        ordered_contigs |= set(map(lambda s: s.name, scf.contigs))

    for scf in scaffolds_in:
        new_scaffolds.append(Scaffold(scf.name))

        for prev_cont, new_cont in zip(scf.contigs[:-1], scf.contigs[1:]):
            new_scaffolds[-1].contigs.append(prev_cont)

            #find unique path
            path_edges = _get_unique_path(graph, edges, prev_cont, new_cont, max_path_len)
            if path_edges is None:
                continue

            #check path consistency
            consistent = True
            for edge in path_edges:
                if edge[1:] in ordered_contigs:
                    logger.debug("Path inconsistency {0} -- {1}: {2}"
                                 .format(prev_cont, new_cont, edge))
                    consistent = False
                    break
            if not consistent:
                continue

            #insert contigs along the path
            for edge in path_edges:
                new_scaffolds[-1].contigs[-1].gap = 0
                new_scaffolds[-1].contigs.append(Contig.from_sting(edge))
                new_scaffolds[-1].contigs[-1].gap = 0

        new_scaffolds[-1].contigs.append(new_cont)
    return new_scaffolds
back to top