test_collect_loops.py
from typing import List
from datetime import date
import pytest
import numpy as np
import networkx as nx
from pyrate.core.phase_closure.mst_closure import Edge, __setup_edges, __find_closed_loops
from pyrate.core.phase_closure.collect_loops import dedupe_loops, find_loops
def test_collect_loops():
graph = np.array(
[
[0, 1, 0, 1, 0],
[1, 0, 1, 0, 1],
[0, 1, 0, 1, 0],
[1, 0, 1, 0, 1],
[0, 1, 0, 1, 0]
]
)
n = 4
count, all_loops = find_loops(graph, n)
assert count == 6
deduped_loops = dedupe_loops(all_loops)
np.testing.assert_array_equal(deduped_loops, [[0, 1, 2, 3], [0, 1, 4, 3], [1, 2, 3, 4]])
def test_count_loops():
graph = np.array(
[
[0, 1, 1, 1],
[1, 0, 1, 1],
[1, 1, 0, 1],
[1, 1, 1, 0],
]
)
n = 4
count, all_loops = find_loops(graph, n)
assert len(all_loops) == 6
np.testing.assert_array_equal(all_loops, [[0, 1, 2, 3], [0, 1, 3, 2], [0, 2, 1, 3], [0, 2, 3, 1], [0, 3, 1, 2],
[0, 3, 2, 1]])
deduped_loops = dedupe_loops(all_loops)
np.testing.assert_array_equal(deduped_loops, [all_loops[0]])
def __find_closed_loops_nx(edges: List[Edge], max_loop_length: int) -> List[List[date]]:
g = nx.Graph()
edges = [(we.first, we.second) for we in edges]
g.add_edges_from(edges)
dg = nx.DiGraph(g)
print(f"Evaluating all possible loops using NetworkX simple_cycles function")
simple_cycles = list(nx.simple_cycles(dg))
print(f"Total number of possible loops is {len(simple_cycles)}")
print(f"Discarding loops with less than 3 edges and more than {max_loop_length} edges")
loop_subset = [scc for scc in simple_cycles
if
(len(scc) > 2) # three or more edges reqd for closed loop
and
(len(scc) <= max_loop_length) # discard loops exceeding max loop length
]
print(f"Number of remaining loops is {len(loop_subset)}")
# also discard loops when the loop members are the same
return dedupe_loops(loop_subset)
@pytest.fixture(scope='module')
def available_edges(cropa_geotifs):
available_edges = __setup_edges(cropa_geotifs)
return available_edges
def max_loop_length(available_edges):
all_possible_loops = __find_closed_loops_nx(available_edges, max_loop_length=100)
max_length = max([len(l) for l in all_possible_loops])
return max_length
def test_find_closed_loops_vs_networkx(available_edges):
max_length = max_loop_length(available_edges)
for n in range(max_length + 1):
print(f'====checking for max_loop_length {n}=====')
networkx_loops = __find_closed_loops_nx(available_edges, max_loop_length=n)
networkx_loops = [sorted(l) for l in networkx_loops]
networkx_set = {tuple(l) for l in networkx_loops}
our_loops = __find_closed_loops(available_edges, max_loop_length=n)
our_loops = [sorted(l) for l in our_loops]
our_set = {tuple(l) for l in our_loops}
assert networkx_set == our_set