https://bitbucket.org/NetaRS/sched_analytics
Tip revision: ed1f2acca39de9eb5f34a6cb5b0c8db1492f74f2 authored by NetaRS on 12 December 2020, 09:53:39 UTC
bounded traffic distributed
bounded traffic distributed
Tip revision: ed1f2ac
compute_tp.py
import sys
from os import path
import json
import argparse
from matrices import MatrixList
from mwm import DictList
# Compute the throughput based on traffic files (in milisecond representations)
# and maximum weight matching of different granularities and timing
# Let MWM(t1,t2) be the maximum weight matching obtained by looking at all
# traffic in interval [t1,t2]
# argv[1] is the algorithm we are using:
# + offline_milisecond - compute throuput for t based on MWM(t,t)
# + online_milisecond - compute throughput for t based on MWM(t-1,t-1)
# Next options used frame sizes given in argv[2]: (e.g., 100 milisecond)
# For time t and frame size f, let start_frame be the first time-slot of
# the frame containing t.
# + epoch_milisecond - througput for t based on MWM(start_frame-1,start_frame-1)
# + offline_epoch - througput for t based on MWM(start_frame,start_frame+f-1)
# + online_epoch - througput for t based on MWM(start_frame-f,start_frame-1)
# The script will print the aggregated total througput of 3 seconds on
# all racks, and will print to file the vector of per-milisecond total throughput
def get_new_mode_name_by_params(compute_epoch=1, agg_interval=1, agg_epoch_delay=0):
if agg_epoch_delay ==0:
mode = "offline"
else:
mode = "online_delay" + str(agg_epoch_delay)
mode+= "_epoch"+str(compute_epoch) + "_agg"+str(agg_interval)
return mode
def get_params_by_ol_mode_name(mode_name, compute_epoch=1):
if mode_name == "offline_milisecond":
return dict(compute_epoch=1, agg_interval=1, agg_epoch_delay=0)
if mode_name == "online_milisecond":
return dict(compute_epoch=1, agg_interval=1, agg_epoch_delay=1)
if mode_name == "epoch_milisecond":
return dict(compute_epoch=compute_epoch, agg_interval=1, agg_epoch_delay=1)
if mode_name == "offline_epoch":
return dict(compute_epoch=compute_epoch, agg_interval=compute_epoch, agg_epoch_delay=0)
if mode_name == "online_epoch":
return dict(compute_epoch=compute_epoch, agg_interval=compute_epoch, agg_epoch_delay=1)
return {}
def write_results(tps, total_tp, dif_stats,
compute_epoch=1, agg_interval=1, agg_epoch_delay=0, n_milis=5000, top=None,
output_dir=".", max_degree=1, total_load=0, run_id="", flow_avg=0, flow_var=0, **kwargs):
mode_name = get_new_mode_name_by_params(compute_epoch, agg_interval, agg_epoch_delay)
test_file_path = path.join(output_dir, "res_"+str(n_milis)+"_"+mode_name+".json")
with open(test_file_path, "w") as test_res_file:
json.dump({"total_tp": total_tp, "tps": tps}, test_res_file)
all_res_file_path = path.join(output_dir, "res_"+str(n_milis)+".csv")
if not path.isfile(all_res_file_path):
with open(all_res_file_path, "w") as all_res_file:
row = ["mode", "total_tp", "n_milis", "compute_epoch", "agg_interval", "agg_epoch_delay", "top", "max_degree",
"links_avg", "links_var", "change_avg", "change_var", "total_load", "run_id", "flow_avg", "flow_var", "tps"]
all_res_file.write(",".join(map(str, row))+"\n")
with open(all_res_file_path, "a+") as all_res_file:
row = [mode_name, total_tp, n_milis, compute_epoch, agg_interval, agg_epoch_delay, top, max_degree,
dif_stats["totals"].get_avg(), dif_stats["totals"].get_var(),
dif_stats["changes"].get_avg(), dif_stats["changes"].get_var(),
total_load, run_id, flow_avg, flow_var] +tps
all_res_file.write(",".join(map(str, row))+"\n")
class SeriesStats:
def __init__(self):
self.sum = 0
self.sum_sqr = 0
self.count = 0
def add(self, x):
self.sum += x
self.sum_sqr += x**2
self.count += 1
def get_avg(self):
return self.sum * 1.0 / self.count
def get_var(self):
return self.sum_sqr * 1.0 / self.count - self.get_avg()**2
def matches_dif(new_matches, old_matches, dif_stats):
new_matches = {tuple(p) for p in new_matches}
old_matches = {tuple(p) for p in old_matches}
total = len(new_matches)
change = len(set(new_matches) ^ set(old_matches))
dif_stats["totals"].add(total)
dif_stats["changes"].add(change)
def compute_throughput(compute_epoch=1, agg_interval=1, agg_epoch_delay=0, n_milis=5000, top=None, output_dir=".", **kwargs):
print dict(compute_epoch=compute_epoch, agg_interval=agg_interval, agg_epoch_delay=agg_epoch_delay)
per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
per_mili_matrix_list = MatrixList(per_mili_pattern)
if top:
per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
else:
per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
per_mili_match_list = DictList(per_mili_match_pattern)
if top:
per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
else:
per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
per_interval_match_list = DictList(per_interval_match_pattern)
def get_matches(t):
end = t - t % compute_epoch - (agg_epoch_delay-1) * compute_epoch
if end > n_milis:
end -= compute_epoch
start = end - agg_interval
print "start", start, "end", end
if start < 0:
return {}
if agg_interval == 1:
return list(per_mili_match_list[start])
return per_interval_match_list[(agg_interval, start, end-1)]
tps = []
total_tp = 0
dif_stats = dict(totals=SeriesStats(), changes=SeriesStats())
old_matches = []
for t in range(n_milis):
print "\r", t, "/", n_milis,
matrix = list(per_mili_matrix_list[t])
matches = get_matches(t)
matches_dif(matches, old_matches, dif_stats)
tp = sum([matrix[m[0]][m[1]] + matrix[m[1]][m[0]] for m in matches])
tps.append(tp)
total_tp += tp
old_matches = matches
return tps, total_tp, dif_stats
def main():
parser = argparse.ArgumentParser(
description="""Compute throughput for each mili [t,t+1) using MWM of interval [start, end) where
end = t - t%compute_epoch - (agg_epoch_delay-1)*compute_epoch, and
start = end - agg_interval.
""",
epilog="""Examples:
online_milisecond: "compute_tp.py --agg_epoch_delay 1 --agg_interval 1 --compute_epoch 1"
offline_milisecond: "compute_tp.py --agg_epoch_delay 0 --agg_interval 1 --compute_epoch 1"
epoch_milisecond: "compute_tp.py --agg_epoch_delay 1 --agg_interval 1 --compute_epoch 100"
online_epoch: "compute_tp.py --agg_epoch_delay 1 --agg_interval 100 --compute_epoch 100"
offline_epoch: "compute_tp.py --agg_epoch_delay 0 --agg_interval 100 --compute_epoch 100"
""")
parser.add_argument('--conf', default="conf.json", type=open,
help='configuration file (default: conf.json)')
parser.add_argument('--compute_epoch', default=1, type=int,
help='each compute interval length (default: 1 = mili)')
parser.add_argument('--agg_interval', default=1, type=int,
help='each aggregation interval length (default: 1 = mili)')
parser.add_argument('--agg_epoch_delay', default=0, type=int,
help='number of compute epoch delays before aggregation (default: 0 = offline)')
args = parser.parse_args()
conf = json.load(args.conf)
conf["compute_epoch"] = args.compute_epoch
conf["agg_interval"] = args.agg_interval
conf["agg_epoch_delay"] = args.agg_epoch_delay
tps, total_tp, dif_stats = compute_throughput(**conf)
write_results(tps, total_tp, dif_stats, **conf)
if __name__ == "__main__":
main()