https://github.com/estherjulien/HybridML
Raw File
Tip revision: 9985e6d930e8b98eb03330a964c2c3fc8788630c authored by estherjulien on 01 August 2022, 11:54:59 UTC
HybridCode deleted from test_data_gen.py
Tip revision: 9985e6d
LGT_network.py
import networkx as nx
import numpy as np
import random


'''
Code for generating LGT network
- Apdated from Remie Janssen's code for the paper:
Janssen, R., & Liu, P. (2021). Comparing the topology of phylogenetic network generators. 
Journal of Bioinformatics and Computational Biology, 19(06), 2140012.

- Originally, the method is from the paper:
Joan Carles Pons, Celine Scornavacca, and Gabriel Cardona. Generation of level-k LGT
networks. IEEE/ACM transactions on computational biology and bioinformatics, 17(1):158–164,
2019.
'''


# get last node from network
def last_node(net):
    return max(net.nodes())


# speciation event
def speciate(net, leaf):
    l = last_node(net)
    net.add_edge(leaf, l+1, length=np.random.random())
    net.add_edge(leaf, l+2, length=np.random.random())


# lateral gene transfer event
def lgt(net, leaf1, leaf2):
    # change length of either first or second leaf, depending on which one is higher.
    leaf1_path_tmp = nx.edge_dfs(net, leaf1, orientation='reverse')
    leaf1_path = {(p1, p2) for p1, p2, _ in leaf1_path_tmp}

    len_leaf1 = 0
    for edge in leaf1_path:
        try:
            len_leaf1 += net.edges[edge]["length"]
        except KeyError:
            len_leaf1 += net.edges[edge]["lenght"]

    leaf2_path_tmp = nx.edge_dfs(net, leaf2, orientation='reverse')
    leaf2_path = {(p1, p2) for p1, p2, _ in leaf2_path_tmp}

    len_leaf2 = 0
    for edge in leaf2_path:
        try:
            len_leaf2 += net.edges[edge]["length"]
        except KeyError:
            len_leaf2 += net.edges[edge]["lenght"]

    change_leaf_ind = np.argmin([len_leaf1, len_leaf2])
    change_leaf = [leaf1, leaf2][change_leaf_ind]
    change_leaf_length = [len_leaf1, len_leaf2][change_leaf_ind]
    other_leaf_length = [len_leaf1, len_leaf2][1 - change_leaf_ind]

    for p in net.predecessors(change_leaf):
        leaf_parent = p

    try:
        old_path_length = net.edges[(leaf_parent, change_leaf)]["length"]
    except KeyError:
        old_path_length = net.edges[(leaf_parent, change_leaf)]["lenght"]

    new_path_length = other_leaf_length - (change_leaf_length - old_path_length)
    net.remove_edge(leaf_parent, change_leaf)
    net.add_edge(leaf_parent, change_leaf, length=new_path_length)

    # add new edge
    net.add_edge(leaf1, leaf2, secondary=True, length=0)
    l = last_node(net)
    net.add_edge(leaf1, l+1, length=np.random.random())
    net.add_edge(leaf2, l+2, length=np.random.random())


# return leaves from network
def leaves(net):
    return [u for u in net.nodes() if net.out_degree(u) == 0]


# return the non trivial blobs of the network
def non_trivial_blobs(net):
    blobs = list(nx.biconnected_components(nx.Graph(net)))
    return [bl for bl in blobs if len(bl) > 2]


def internal_blobs(net):
    internal_nodes = set([u for u in net.nodes() if net.out_degree(u)>0])
    blobs = list(nx.biconnected_components(nx.Graph(net)))
    blobs = [bl for bl in blobs if len(bl) > 2]
    nodes_in_blobs = set().union(*blobs)
    nodes_not_in_blobs = internal_nodes - nodes_in_blobs
    blobs.extend([{u} for u in nodes_not_in_blobs])
    return blobs


def compute_hash(net):
    mapping_blobs = {}
    blobs = internal_blobs(net)
    for blob in blobs:
        for node in blob:
            mapping_blobs[node] = blob

    mapping = {}
    for l in leaves(net):
        # parent = net.predecessors(l)
        parent = list(net.pred[l])[0]
        mapping[l] = mapping_blobs[parent]
    return mapping


# return internal and external pair of leaves
def internal_and_external_pairs(net):
    lvs = leaves(net)
    pairs = [(l1, l2) for l1 in lvs for l2 in lvs if l1 != l2]
    mapping = compute_hash(net)
    internal_pairs = []
    external_pairs = []
    for pair in pairs:
        # check if pair has same parent node
        if net.in_degree(pair[0]) == 1 and net.in_degree(pair[1]) == 1:
            # same parent not allowed
            p_both = []
            for l in pair:
                for p in net.predecessors(l):
                    p_both.append(p)
            if p_both[0] == p_both[1]:
                continue
            # if grandparent of x is parent of y
            try:
                for gp in net.predecessors(p_both[0]):
                    gp_x = gp
                if gp_x == p_both[1]:
                    continue
            except NameError:
                pass
            if is_ret_cherry(net, *pair[::-1]) or is_ret_cherry(net, *pair):
                continue
        else:
            pass
        if mapping[pair[0]] == mapping[pair[1]]:
            internal_pairs.append(pair)
        else:
            external_pairs.append(pair)
    return internal_pairs, external_pairs


# return a random leave from the network
def random_leaf(net):
    return random.choice(leaves(net))


# return a random pair of leaves from the network
def random_pair(net, wint, wext):
    int_pairs, ext_pairs = internal_and_external_pairs(net)
    return random.choices(int_pairs+ext_pairs, weights=[wint]*len(int_pairs)+[wext]*len(ext_pairs))[0]


# SIMULATION
def simulation(num_steps, prob_lgt, wint, wext, ret=None):
    # probl_lgt: alpha
    # wint: weight internal, i.e. 1
    # wext: weight external, i.e. beta

    # initialize network
    net = nx.DiGraph()
    net.add_edge(0, 1, length=np.random.random())
    net.add_edge(0, 2, length=np.random.random())
    num_ret = 0
    for i in range(num_steps):
        if len(net.nodes) == 3:
            event = "spec"
        elif ret is not None and num_ret == ret:
            event = "spec"
        else:
            event = random.choices(['spec', 'lgt'], [1-prob_lgt, prob_lgt])[0]
        if event == 'spec':
            l = random.choice(leaves(net))
            speciate(net, l)
        else:
            num_ret += 1
            pair = random_pair(net, wint, wext)
            # change length of highest hanging leaf to lowest hanging leaf. Only then we can have an LGT event.
            lgt(net, pair[0], pair[1])
    return net, num_ret


# return reticulation nodes
def reticulations(G):
    return [v for v in G.nodes() if G.in_degree(v) == 2]


def is_ret_cherry(net, x, y):
    for p in net.pred[y]:
        if net.out_degree(p) > 1:
            for cp in net.succ[p]:
                if cp == y:
                    continue
                if net.in_degree(cp) > 1:
                    for ccp in net.succ[cp]:
                        if ccp == x:
                            return True
    return False


# find level per blob
def local_level(G, bicc):
    rets = list(set(reticulations(G)).intersection(bicc))
    if len(rets) == 0:
        return 0
    else:
        bicc_edges = [e for e in G.edges() if ((e[0] in bicc) & (e[1] in bicc))]
        end_nodes = [e[1] for e in bicc_edges]
        return len([ret for ret in rets if ret in end_nodes])




back to top