https://github.com/pierre-guillou/pdiags_bench
Tip revision: 8a2de479abf7bcbf2002b4f8f620c635217cc088 authored by Julien J Tierny on 30 January 2023, 16:30:54 UTC
added reference to the arxiv repo
added reference to the arxiv repo
Tip revision: 8a2de47
dionysus_gudhi_persistence.py
import argparse
import os
import re
import subprocess
import sys
import time
import dionysus
import numpy as np
def read_simplicial_complex(dataset):
start = time.time()
with open(dataset, "rb") as src:
magic = src.read(20)
if magic != b"TTKSimplicialComplex":
print("Not a TTK Simplicial Complex file")
raise TypeError
ncells = int.from_bytes(src.read(4), "little", signed=True)
print(f"Number of cells: {ncells}")
dim = int.from_bytes(src.read(4), "little", signed=True)
print(f"Global dataset dimension: {dim}")
dims = [0, 0, 0, 0]
for i, _ in enumerate(dims):
dims[i] = int.from_bytes(src.read(4), "little", signed=True)
for i in range(dim + 1):
print(f" {dims[i]} cells of dimension {i}")
values = np.fromfile(src, dtype=np.double, count=ncells)
num_entries = int.from_bytes(src.read(4), "little", signed=True)
print(f"Number of entries in boundary matrix: {num_entries}")
edges = np.fromfile(src, dtype=np.int32, count=2 * dims[1])
triangles = np.fromfile(src, dtype=np.int32, count=3 * dims[2])
tetras = np.fromfile(src, dtype=np.int32, count=4 * dims[3])
print(f"Read TTK Simplicial Complex file: {time.time() - start:.3f}s")
return dims, values, (edges, triangles, tetras)
class Ripser_SparseDM:
def __init__(self):
self.dist_mat = None
self.diag = None
self.maxdim = 0
print("Using the Ripser backend")
def fill_dist_mat(self, dims, vals, edges):
edges = edges.reshape(-1, 2)
I = np.zeros(dims[0] + dims[1], dtype=np.int32)
J = np.zeros(dims[0] + dims[1], dtype=np.int32)
V = np.zeros(dims[0] + dims[1], dtype=np.double)
if dims[2] != 0:
self.maxdim = 1
if dims[3] != 0:
self.maxdim = 2
for i in range(dims[0]):
I[i] = i
J[i] = i
V[i] = vals[i]
for i, e in enumerate(edges):
o = dims[0] + i
I[o] = e[0]
J[o] = e[1]
V[o] = vals[dims[0] + i]
with open("dist_mat", "w") as dst:
for i, j, v in zip(I, J, V):
dst.write(f"{i} {j} {v}\n")
def compute_pers(self):
cmd = (
["backends_src/ripser/ripser"]
+ ["--format", "sparse"]
+ ["--dim", "2"]
+ ["dist_mat"]
)
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
) as proc:
self.diag = [[], [], []]
if proc.returncode != 0:
print(proc.stderr.read())
raise subprocess.CalledProcessError(proc.returncode, cmd)
for line in proc.stdout.readlines():
if "intervals" in line:
dim = int(line.strip()[-2])
line = line.strip()
m = re.search(r"\[(\d+|\d+.\d+),(\d+|\d+.\d+)?\)", line)
if m is not None:
self.diag[dim].append((m.groups()[0], m.groups()[1]))
os.remove("dist_mat")
def write_diag(self, output):
n_pairs = 0
for pairs in self.diag:
n_pairs += len(pairs)
if n_pairs == 0:
return
with open(output, "w") as dst:
for dim, pairs in enumerate(self.diag):
for birth, death in pairs:
dst.write(f"{dim} {birth} {death}\n")
class Dionysus_Filtration:
def __init__(self):
self.f = dionysus.Filtration()
self.diag = None
print("Using the Dionysus2 backend")
def add(self, verts, val):
self.f.append(dionysus.Simplex(verts, val))
def compute_pers(self):
self.f.sort()
m = dionysus.homology_persistence(self.f)
self.diag = dionysus.init_diagrams(m, self.f)
def write_diag(self, output):
with open(output, "w") as dst:
for i, pair in enumerate(self.diag):
for pt in pair:
dst.write(f"{i} {pt.birth} {pt.death}\n")
class Gudhi_SimplexTree:
def __init__(self):
import gudhi
self.st = gudhi.SimplexTree()
self.pairs = []
print("Using the Gudhi Simplex Tree backend")
def add(self, verts, val):
self.st.insert(verts, filtration=val)
def compute_pers(self):
self.pairs = self.st.persistence()
def write_diag(self, output):
with open(output, "w") as dst:
for dim, (birth, death) in self.pairs:
dst.write(f"{dim} {birth:.3f} {death:.3f}\n")
def compute_persistence(wrapper, dims, values, cpx, output):
start = time.time()
edges, triangles, tetras = cpx
if isinstance(wrapper, Ripser_SparseDM):
wrapper.fill_dist_mat(dims, values, edges)
else:
for i in range(dims[0]):
wrapper.add([i], values[i])
for i in range(dims[1]):
o = 2 * i
a = dims[0] + i
wrapper.add(edges[o : o + 2], values[a])
for i in range(dims[2]):
o = 3 * i
a = dims[0] + dims[1] + i
wrapper.add(triangles[o : o + 3], values[a])
for i in range(dims[3]):
o = 4 * i
a = dims[0] + dims[1] + dims[2] + i
wrapper.add(tetras[o : o + 4], values[a])
prec = round(time.time() - start, 3)
print(f"Filled filtration/simplex tree/distance matrix: {prec}s")
start = time.time()
wrapper.compute_pers()
pers = round(time.time() - start, 3)
print(f"Computed persistence: {pers}s")
wrapper.write_diag(output)
return (prec, pers)
def run(dataset, output, backend="Gudhi", simplicial=True):
if simplicial:
dims, vals, cpx = read_simplicial_complex(dataset)
dispatch = {
"Dionysus": Dionysus_Filtration,
"Gudhi": Gudhi_SimplexTree,
"Ripser": Ripser_SparseDM,
}
return compute_persistence(dispatch[backend](), dims, vals, cpx, output)
if backend == "Gudhi":
print("Use the Gudhi Cubical Complex backend")
import gudhi
start = time.time()
cpx = gudhi.CubicalComplex(perseus_file=dataset)
print(f"Loaded Perseus file: {time.time() - start:.3f}s")
print(f"Number of simplices: {cpx.num_simplices()}")
print(f"Global dimension: {cpx.dimension()}")
start = time.time()
diag = cpx.persistence()
pers = round(time.time() - start, 3)
print(f"Computed persistence: {pers}s")
with open(output, "w") as dst:
for dim, (birth, death) in diag:
dst.write(f"{dim} {birth:.3f} {death:.3f}\n")
return (0.0, pers)
print("Cannot use Dionysus with cubical complexes")
return (0.0, 0.0)
def main():
parser = argparse.ArgumentParser(
description="Apply Gudhi, Dionysus2 or Ripser on the given dataset"
)
parser.add_argument(
"-i",
type=str,
help="Path to input dataset",
default="datasets/fuel_64x64x64_uint8_order_expl.tsc",
dest="input_dataset",
)
parser.add_argument(
"-o",
type=str,
help="Output diagram file name",
default="out.gudhi",
dest="output_diagram",
)
parser.add_argument(
"-p",
"--gudhi_path",
type=str,
help="Path to Gudhi Python module",
default="build_dirs/gudhi/src/python",
)
parser.add_argument(
"-b", choices=["gudhi", "dionysus", "ripser"], default="gudhi", dest="backend"
)
args = parser.parse_args()
ext = args.input_dataset.split(".")[-1]
if ext not in ["tsc", "pers"]:
print("Input dataset not supported")
if ext == "pers" and args.backend != "gudhi":
print("Perseus Cubical Complex files can only be processed by Gudhi")
return
if ext == "pers" and "expl" in args.input_dataset:
print("Perseus Simplicial Complex files not supported")
return
# prepend path to Gudhi Python package to PYTHONPATH
sys.path = [args.gudhi_path] + sys.path
run(
args.input_dataset,
args.output_diagram,
backend=args.backend.capitalize(),
simplicial="expl" in args.input_dataset or "tsc" in args.input_dataset,
)
if __name__ == "__main__":
main()