##### https://git.tu-berlin.de/afigiel/undo-vc-drr
Tip revision: 4e025b1
recovery.py
``````import sys

def build_neighbourhood(edgelist):
neigh = dict()
for a, b in edgelist:
if a not in neigh:
neigh[a] = []
neigh[a].append(b)
if b not in neigh:
neigh[b] = []
neigh[b].append(a)
return neigh

def verify_solution(edgelist, sol):
feasible = True
uncovered = 0
for a, b in edgelist:
if a not in sol and b not in sol:
uncovered += 1
if uncovered == 0:
print("# Solution is feasible")
else:
print("# Solution is unfeasible")
feasible = False

neigh = build_neighbourhood(edgelist)

local_optimum = True
for a, neighs in neigh.items():
if a not in sol:
continue
found = True
for n in neighs:
if n not in sol:
found = False
break
if found:
local_optimum = False
break

if local_optimum:
print("# Solution passed basic optimality test")
else:
print("# Solution failed basic optimality test")

return feasible

def get_variables(types, file):
var = dict()
for i in range(len(types)):
line = next(file)
assert(line.startswith("#"))
words = line.split()
assert(len(words) >= 3)
assert(words[2] == "=")

if types[i] == "s":
var[words[1]] = words[3]
elif types[i] == "l":
var[words[1]] = words[3:]
return var

kernel_file = open(sys.argv[1], "r")
kernel_solution = open(sys.argv[2], "r")

# read solution for the kernel
VC = set()
for line in kernel_solution:
if line.startswith("#"):
continue
words = line.split()
if len(words) == 0:
continue
assert(len(words) == 1)
assert(words[0] not in VC)

print("# Kernel solution size =", len(VC))

kernel_edgelist = []

for line in kernel_file:
if line.startswith("#"):
continue
words = line.split()
if len(words) == 0:
continue
assert(len(words) == 2)
kernel_edgelist.append((words[0], words[1]))

print("# Verifying kernel solution")
feasible = verify_solution(kernel_edgelist, VC)
if not feasible:
print("[ERROR] Kernel solution is infeasible")
sys.exit(1)

# read and apply instructions to recover solution for original graph
kernel_file.seek(0)

inoptimality_detected = False
fixed_inoptimalities = 0

used_units = set()
kernel_solsize = len(VC)
recovery_kdiff = 0

while True:
line = next(kernel_file, None)
if line is None:
break
if line.startswith("# RECOVERY_KDIFF"):
recovery_kdiff += int(line.split()[3])
continue
if not line.startswith("## START_RECOVERY_UNIT"):
continue

words = line.split()
unit = words[2]

solsize_backup = len(VC)
expected_kdiff = 0

var = get_variables("l", kernel_file)
for v in var["v"]:
expected_kdiff += 1
elif unit == "deg2":
var = get_variables("sss", kernel_file)
if var["a"] in VC:
else:
expected_kdiff = 1
elif unit == "deg3":
var = get_variables("ssss", kernel_file)
num = 0
S = [var["a"], var["b"], var["c"]]
for s in S:
if s in VC:
num += 1
if num == 3:
pass
elif num == 2:
for i in range(3):
if S[i] not in VC:
break
j = (i+1)%3
VC.remove(S[j])
elif num == 1:
assert(S[1] in VC)
VC.remove(S[1])
else:
assert(False)
expected_kdiff = 0
elif unit == "clique_neigh":
var = get_variables("sll", kernel_file)
M = [(var["M"][2*i], var["M"][2*i+1]) for i in range(len(var["M"]) // 2)]

num = 0
for c1, c2 in M:
if c1 in VC:
num += 1
if num == len(M):
for c2 in var["C2"]:
else:
assert(num == len(M)-1)
for c2 in var["C2"]:
for c1, c2 in M:
if c1 not in VC:
VC.remove(c2)
break
expected_kdiff = len(var["C2"])
elif unit == "struction":
var = get_variables("sll", kernel_file)
W = [(int(var["W"][3*i]), int(var["W"][3*i+1]), var["W"][3*i+2]) for i in range(len(var["W"]) // 3)]

count = 0
for i, j, v in W:
if v in VC:
count += 1
size = len(VC)
if count == len(W):
for i, j, v in W:
VC.remove(v)
for v in var["Nv"]:
else:
I = []
for i, j, v in W:
if v not in VC:
if len(I) == 0:
I.append(i)
assert(I[0] == i)
I.append(j)
for i, j, v in W:
if v in VC:
VC.remove(v)
for v in var["Nv"]:
for i in I:
VC.remove(var["Nv"][i])
expected_kdiff = len(var["Nv"]) - len(W)
elif unit == "oe_delete":
var = get_variables("sss", kernel_file)
if var["a"] not in VC and var["b"] not in VC:
assert(var["c"] in VC)
VC.remove(var["c"])
expected_kdiff = 0
elif unit == "magnet":
var = get_variables("sssll", kernel_file)
if var["c"] in VC:
VC.remove(var["c"])
else:
A_covered = True
B_covered = True
for x in var["A"]:
if x not in VC:
A_covered = False
break
for x in var["B"]:
if x not in VC:
B_covered = False
break
assert(A_covered or B_covered)
if A_covered:
elif B_covered:
expected_kdiff = 1
elif unit == "alternative":
var = get_variables("lllll", kernel_file)
NAmB_covered = True
NBmA_covered = True
for x in var["NAmB"]:
if x not in VC:
NAmB_covered = False
break
for x in var["NBmA"]:
if x not in VC:
NBmA_covered = False
break
if NAmB_covered:
for b in var["B"]:
elif NBmA_covered:
for a in var["A"]:
else:
assert(False)
for x in var["NAB"]:
expected_kdiff = len(var["A"]) + len(var["NAB"])
elif unit == "undeg2":
var = get_variables("ssss", kernel_file)
num = 0
S = [var["vv"], var["a"], var["b"]]
for x in S:
if x in VC:
num += 1
if num == 3:
inoptimality_detected = True
fixed_inoptimalities += 1
expected_kdiff -= 1
for x in S:
VC.remove(x)
elif num == 2:
for x in S:
if x in VC:
VC.remove(x)
elif num == 1:
assert(var["vv"] in VC)
VC.remove(var["vv"])
else:
assert(False)
expected_kdiff -= 1
elif unit == "undeg3":
var = get_variables("ssss", kernel_file)
num = 0
S = [var["v"], var["a"], var["b"], var["c"]]
for x in S:
if x in VC:
num += 1
if num == 4:
inoptimality_detected = True
fixed_inoptimalities += 1
VC.remove(var["v"])
expected_kdiff = -1
elif num == 3:
if var["v"] in VC:
VC.remove(var["v"])
if var["a"] not in VC:
elif var["b"] not in VC:
elif var["c"] not in VC:
elif num == 2:
assert(var["v"] in VC)
VC.remove(var["v"])
if var["a"] in VC:
elif var["b"] in VC:
elif var["c"] in VC:
else:
assert(False)
elif num == 1:
assert(var["v"] in VC)
VC.remove(var["v"])
else:
assert(False)
expected_kdiff += 0
elif unit == "uncn":
var = get_variables("ssss", kernel_file)
num = 0
S = [var["v"], var["a"], var["b"], var["c"]]
for x in S:
if x in VC:
num += 1
if num == 4:
inoptimality_detected = True
fixed_inoptimalities += 1
VC.remove(var["v"])
VC.remove(var["c"])
expected_kdiff = -1
elif num == 3:
if var["c"] in VC:
VC.remove(var["c"])
if var["v"] in VC:
VC.remove(var["v"])
elif num == 2:
assert(var["v"] in VC)
assert(var["c"] not in VC)
VC.remove(var["v"])
else:
assert(False)
expected_kdiff -= 1
elif unit == "undom":
var = get_variables("ss", kernel_file)
if var["u"] in VC:
VC.remove(var["u"])
else:
assert(var["v"] in VC)
VC.remove(var["v"])
expected_kdiff = -1
elif unit == "rename":
rename = dict()
while True:
line = next(kernel_file)
if line.startswith("## END_RECOVERY_UNIT"):
break
words = line.split()
assert(words[0] == "#" and words[2] == "->")
rename[words[1]] = words[3]

new_VC = set()
for v in VC:
assert(len(VC) == len(new_VC))
VC = new_VC
# skip the tail check below
continue
else:
print("[ERROR] Unknown recovery unit:", unit)
sys.exit(1)
assert(expected_kdiff == len(VC) - solsize_backup)

tail = next(kernel_file)
assert(tail.startswith("## END_RECOVERY_UNIT"))

# solution for the original graph is now recovered
print("# Recovered solution size =", len(VC))
if fixed_inoptimalities > 0:
print("# [WARN] fixed", fixed_inoptimalities, "inoptimalities")
else:
assert(len(VC) == kernel_solsize + recovery_kdiff)

if len(sys.argv) == 4:
original_edgelist = []

original_file = open(sys.argv[3], "r")
for line in original_file:
if line.startswith("#"):
continue
words = line.split()
if len(words) == 0:
continue
assert(len(words) == 2)
original_edgelist.append((words[0], words[1]))

print("# Verifying recovered solution")
feasible = verify_solution(original_edgelist, VC)
if not feasible:
print("[ERROR] Recovered solution is infeasible")
sys.exit(1)

for v in sorted(list(VC)):
print(v)
``````