Revision 4e025b1d3b5ef68cd5804bf2712331b87c107c1e authored by Aleskander Figiel on 04 June 2022, 12:32:09 UTC, committed by Aleskander Figiel on 04 June 2022, 12:32:09 UTC
0 parent
Raw File
recovery.py
import sys

def build_neighbourhood(edgelist):
    neigh = dict()
    for a, b in edgelist:
        if a not in neigh:
            neigh[a] = []
        neigh[a].append(b)
        if b not in neigh:
            neigh[b] = []
        neigh[b].append(a)
    return neigh

def verify_solution(edgelist, sol):
    feasible = True
    uncovered = 0
    for a, b in edgelist:
       if a not in sol and b not in sol:
            uncovered += 1
    if uncovered == 0:
        print("# Solution is feasible") 
    else:
        print("# Solution is unfeasible") 
        feasible = False

    neigh = build_neighbourhood(edgelist)
    
    local_optimum = True
    for a, neighs in neigh.items():
        if a not in sol:
            continue
        found = True
        for n in neighs:
            if n not in sol:
                found = False
                break
        if found:
            local_optimum = False
            break

    if local_optimum:
        print("# Solution passed basic optimality test") 
    else:
        print("# Solution failed basic optimality test") 
        

    return feasible

def get_variables(types, file):
    var = dict()
    for i in range(len(types)):
        line = next(file)
        assert(line.startswith("#"))
        words = line.split()
        assert(len(words) >= 3)
        assert(words[2] == "=")

        if types[i] == "s":
            var[words[1]] = words[3]
        elif types[i] == "l":
            var[words[1]] = words[3:]
    return var
        

kernel_file = open(sys.argv[1], "r")
kernel_solution = open(sys.argv[2], "r")

# read solution for the kernel
VC = set()
for line in kernel_solution:
    if line.startswith("#"):
        continue
    words = line.split()
    if len(words) == 0:
        continue
    assert(len(words) == 1)
    assert(words[0] not in VC)
    VC.add(words[0])

print("# Kernel solution size =", len(VC))

# read kernel edgelist
kernel_edgelist = []

for line in kernel_file:
    if line.startswith("#"):
        continue
    words = line.split()
    if len(words) == 0:
        continue
    assert(len(words) == 2)
    kernel_edgelist.append((words[0], words[1]))

print("# Verifying kernel solution")
feasible = verify_solution(kernel_edgelist, VC)
if not feasible:
    print("[ERROR] Kernel solution is infeasible")
    sys.exit(1)

# read and apply instructions to recover solution for original graph
kernel_file.seek(0)

inoptimality_detected = False
fixed_inoptimalities = 0

used_units = set()
kernel_solsize = len(VC)
recovery_kdiff = 0

while True:
    line = next(kernel_file, None)
    if line is None:
        break
    if line.startswith("# RECOVERY_KDIFF"):
        recovery_kdiff += int(line.split()[3])
        continue
    if not line.startswith("## START_RECOVERY_UNIT"):
        continue

    words = line.split()
    unit = words[2]

    solsize_backup = len(VC)
    expected_kdiff = 0

    used_units.add(unit)

    if unit == "add_to_vc":
        var = get_variables("l", kernel_file)
        for v in var["v"]:
            VC.add(v)
            expected_kdiff += 1
    elif unit == "deg2":
        var = get_variables("sss", kernel_file)
        if var["a"] in VC:
            VC.add(var["b"])
        else:
            VC.add(var["v"])
        expected_kdiff = 1
    elif unit == "deg3":
        var = get_variables("ssss", kernel_file)
        num = 0
        S = [var["a"], var["b"], var["c"]]
        for s in S:
            if s in VC:
                num += 1
        if num == 3:
            pass
        elif num == 2:
            for i in range(3):
                if S[i] not in VC:
                    break
            j = (i+1)%3
            VC.remove(S[j])
            VC.add(var["v"])
        elif num == 1:
            assert(S[1] in VC)
            VC.remove(S[1])
            VC.add(var["v"])
        else:
            assert(False)
        expected_kdiff = 0
    elif unit == "clique_neigh":
        var = get_variables("sll", kernel_file)
        M = [(var["M"][2*i], var["M"][2*i+1]) for i in range(len(var["M"]) // 2)]

        num = 0
        for c1, c2 in M:
            if c1 in VC:
                num += 1
        if num == len(M):
            for c2 in var["C2"]:
                VC.add(c2)
        else:
            assert(num == len(M)-1)
            for c2 in var["C2"]:
                VC.add(c2)
            for c1, c2 in M:
                if c1 not in VC:
                    VC.remove(c2)
                    break
            VC.add(var["v"])
        expected_kdiff = len(var["C2"])
    elif unit == "struction":
        var = get_variables("sll", kernel_file)
        W = [(int(var["W"][3*i]), int(var["W"][3*i+1]), var["W"][3*i+2]) for i in range(len(var["W"]) // 3)]

        count = 0
        for i, j, v in W:
            if v in VC:
                count += 1
        size = len(VC)
        if count == len(W):
            for i, j, v in W:
                VC.remove(v)
            for v in var["Nv"]:
                VC.add(v)
        else:
            I = []
            for i, j, v in W:
                if v not in VC:
                    if len(I) == 0:
                        I.append(i)
                    assert(I[0] == i)
                    I.append(j)
            for i, j, v in W:
                if v in VC:
                    VC.remove(v)
            VC.add(var["v"])
            for v in var["Nv"]:
                VC.add(v)
            for i in I:
                VC.remove(var["Nv"][i])
        expected_kdiff = len(var["Nv"]) - len(W)
    elif unit == "oe_delete":
        var = get_variables("sss", kernel_file)
        if var["a"] not in VC and var["b"] not in VC:
            assert(var["c"] in VC)
            VC.remove(var["c"])
            VC.add(var["b"])
        expected_kdiff = 0
    elif unit == "magnet":
        var = get_variables("sssll", kernel_file)
        if var["c"] in VC:
            VC.add(var["a"])
            VC.add(var["b"])
            VC.remove(var["c"])
        else:
            A_covered = True
            B_covered = True
            for x in var["A"]:
                if x not in VC:
                    A_covered = False
                    break
            for x in var["B"]:
                if x not in VC:
                    B_covered = False
                    break
            assert(A_covered or B_covered)
            if A_covered:
                VC.add(var["b"])
            elif B_covered:
                VC.add(var["a"])
        expected_kdiff = 1
    elif unit == "alternative":
        var = get_variables("lllll", kernel_file)
        NAmB_covered = True
        NBmA_covered = True
        for x in var["NAmB"]:
            if x not in VC:
                NAmB_covered = False
                break
        for x in var["NBmA"]:
            if x not in VC:
                NBmA_covered = False
                break
        if NAmB_covered:
            for b in var["B"]:
                VC.add(b)
        elif NBmA_covered:
            for a in var["A"]:
                VC.add(a)
        else:
            assert(False)
        for x in var["NAB"]:
            VC.add(x)
        expected_kdiff = len(var["A"]) + len(var["NAB"])
    elif unit == "undeg2":
        var = get_variables("ssss", kernel_file)
        num = 0
        S = [var["vv"], var["a"], var["b"]]
        for x in S:
            if x in VC:
                num += 1
        if num == 3:
            inoptimality_detected = True
            fixed_inoptimalities += 1
            expected_kdiff -= 1
            for x in S:
                VC.remove(x)
            VC.add(var["v"])
        elif num == 2:
            for x in S:
                if x in VC:
                    VC.remove(x)
            VC.add(var["v"])
        elif num == 1:
            assert(var["vv"] in VC)
            VC.remove(var["vv"])
        else:
            assert(False)
        expected_kdiff -= 1
    elif unit == "undeg3":
        var = get_variables("ssss", kernel_file)
        num = 0
        S = [var["v"], var["a"], var["b"], var["c"]]
        for x in S:
            if x in VC:
                num += 1
        if num == 4:
            inoptimality_detected = True
            fixed_inoptimalities += 1
            VC.remove(var["v"])
            expected_kdiff = -1
        elif num == 3:
            if var["v"] in VC:
                VC.remove(var["v"])
                if var["a"] not in VC:
                    VC.add(var["a"])
                elif var["b"] not in VC:
                    VC.add(var["b"])
                elif var["c"] not in VC:
                    VC.add(var["c"])
        elif num == 2:
            assert(var["v"] in VC)
            VC.remove(var["v"])
            if var["a"] in VC:
                VC.add(var["c"])
            elif var["b"] in VC:
                VC.add(var["a"])
            elif var["c"] in VC:
                VC.add(var["b"])
            else:
                assert(False)
        elif num == 1:
            assert(var["v"] in VC)
            VC.remove(var["v"])
            VC.add(var["b"])
        else:
            assert(False)
        expected_kdiff += 0
    elif unit == "uncn":
        var = get_variables("ssss", kernel_file)
        num = 0
        S = [var["v"], var["a"], var["b"], var["c"]]
        for x in S:
            if x in VC:
                num += 1
        if num == 4:
            inoptimality_detected = True
            fixed_inoptimalities += 1
            VC.remove(var["v"])
            VC.remove(var["c"])
            expected_kdiff = -1
        elif num == 3:
            VC.add(var["a"])
            VC.add(var["b"])
            if var["c"] in VC:
                VC.remove(var["c"])
            if var["v"] in VC:
                VC.remove(var["v"])
        elif num == 2:
            assert(var["v"] in VC)
            assert(var["c"] not in VC)
            VC.remove(var["v"])
        else:
            assert(False)
        expected_kdiff -= 1
    elif unit == "undom":
        var = get_variables("ss", kernel_file)
        if var["u"] in VC:
            VC.remove(var["u"])
        else:
            assert(var["v"] in VC)
            VC.remove(var["v"])
        expected_kdiff = -1
    elif unit == "rename":
        rename = dict()
        while True:
            line = next(kernel_file)
            if line.startswith("## END_RECOVERY_UNIT"):
                break
            words = line.split()
            assert(words[0] == "#" and words[2] == "->")
            rename[words[1]] = words[3]

        new_VC = set()
        for v in VC:
            new_VC.add(rename[v])
        assert(len(VC) == len(new_VC))
        VC = new_VC
        # skip the tail check below
        continue
    else:
        print("[ERROR] Unknown recovery unit:", unit)
        sys.exit(1)
    assert(expected_kdiff == len(VC) - solsize_backup)

    tail = next(kernel_file)
    assert(tail.startswith("## END_RECOVERY_UNIT"))

# solution for the original graph is now recovered
print("# Recovered solution size =", len(VC))
if fixed_inoptimalities > 0:
    print("# [WARN] fixed", fixed_inoptimalities, "inoptimalities")
else:
    assert(len(VC) == kernel_solsize + recovery_kdiff)

if len(sys.argv) == 4:
    # read original edgelist
    original_edgelist = []

    original_file = open(sys.argv[3], "r")
    for line in original_file:
        if line.startswith("#"):
            continue
        words = line.split()
        if len(words) == 0:
            continue
        assert(len(words) == 2)
        original_edgelist.append((words[0], words[1]))

    print("# Verifying recovered solution")
    feasible = verify_solution(original_edgelist, VC)
    if not feasible:
        print("[ERROR] Recovered solution is infeasible")
        sys.exit(1)

for v in sorted(list(VC)):
    print(v)
back to top