import sys from os import path import json import argparse from matrices import MatrixList from mwm import DictList # Compute the throughput based on traffic files (in milisecond representations) # and maximum weight matching of different granularities and timing # Let MWM(t1,t2) be the maximum weight matching obtained by looking at all # traffic in interval [t1,t2] # argv[1] is the algorithm we are using: # + offline_milisecond - compute throuput for t based on MWM(t,t) # + online_milisecond - compute throughput for t based on MWM(t-1,t-1) # Next options used frame sizes given in argv[2]: (e.g., 100 milisecond) # For time t and frame size f, let start_frame be the first time-slot of # the frame containing t. # + epoch_milisecond - througput for t based on MWM(start_frame-1,start_frame-1) # + offline_epoch - througput for t based on MWM(start_frame,start_frame+f-1) # + online_epoch - througput for t based on MWM(start_frame-f,start_frame-1) # The script will print the aggregated total througput of 3 seconds on # all racks, and will print to file the vector of per-milisecond total throughput def get_new_mode_name_by_params(compute_epoch=1, agg_interval=1, agg_epoch_delay=0): if agg_epoch_delay ==0: mode = "offline" else: mode = "online_delay" + str(agg_epoch_delay) mode+= "_epoch"+str(compute_epoch) + "_agg"+str(agg_interval) return mode def get_params_by_ol_mode_name(mode_name, compute_epoch=1): if mode_name == "offline_milisecond": return dict(compute_epoch=1, agg_interval=1, agg_epoch_delay=0) if mode_name == "online_milisecond": return dict(compute_epoch=1, agg_interval=1, agg_epoch_delay=1) if mode_name == "epoch_milisecond": return dict(compute_epoch=compute_epoch, agg_interval=1, agg_epoch_delay=1) if mode_name == "offline_epoch": return dict(compute_epoch=compute_epoch, agg_interval=compute_epoch, agg_epoch_delay=0) if mode_name == "online_epoch": return dict(compute_epoch=compute_epoch, agg_interval=compute_epoch, agg_epoch_delay=1) return {} def write_results(tps, total_tp, dif_stats, compute_epoch=1, agg_interval=1, agg_epoch_delay=0, n_milis=5000, top=None, output_dir=".", max_degree=1, total_load=0, run_id="", flow_avg=0, flow_var=0, **kwargs): mode_name = get_new_mode_name_by_params(compute_epoch, agg_interval, agg_epoch_delay) test_file_path = path.join(output_dir, "res_"+str(n_milis)+"_"+mode_name+".json") with open(test_file_path, "w") as test_res_file: json.dump({"total_tp": total_tp, "tps": tps}, test_res_file) all_res_file_path = path.join(output_dir, "res_"+str(n_milis)+".csv") if not path.isfile(all_res_file_path): with open(all_res_file_path, "w") as all_res_file: row = ["mode", "total_tp", "n_milis", "compute_epoch", "agg_interval", "agg_epoch_delay", "top", "max_degree", "links_avg", "links_var", "change_avg", "change_var", "total_load", "run_id", "flow_avg", "flow_var", "tps"] all_res_file.write(",".join(map(str, row))+"\n") with open(all_res_file_path, "a+") as all_res_file: row = [mode_name, total_tp, n_milis, compute_epoch, agg_interval, agg_epoch_delay, top, max_degree, dif_stats["totals"].get_avg(), dif_stats["totals"].get_var(), dif_stats["changes"].get_avg(), dif_stats["changes"].get_var(), total_load, run_id, flow_avg, flow_var] +tps all_res_file.write(",".join(map(str, row))+"\n") class SeriesStats: def __init__(self): self.sum = 0 self.sum_sqr = 0 self.count = 0 def add(self, x): self.sum += x self.sum_sqr += x**2 self.count += 1 def get_avg(self): return self.sum * 1.0 / self.count def get_var(self): return self.sum_sqr * 1.0 / self.count - self.get_avg()**2 def matches_dif(new_matches, old_matches, dif_stats): new_matches = {tuple(p) for p in new_matches} old_matches = {tuple(p) for p in old_matches} total = len(new_matches) change = len(set(new_matches) ^ set(old_matches)) dif_stats["totals"].add(total) dif_stats["changes"].add(change) def compute_throughput(compute_epoch=1, agg_interval=1, agg_epoch_delay=0, n_milis=5000, top=None, output_dir=".", **kwargs): print dict(compute_epoch=compute_epoch, agg_interval=agg_interval, agg_epoch_delay=agg_epoch_delay) per_mili_pattern = path.join(output_dir, "matrix_mili_%d") per_mili_matrix_list = MatrixList(per_mili_pattern) if top: per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1))) else: per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d"+("_deg_%d" % kwargs.get("max_degree", 1))) per_mili_match_list = DictList(per_mili_match_pattern) if top: per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1))) else: per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d"+("_deg_%d" % kwargs.get("max_degree", 1))) per_interval_match_list = DictList(per_interval_match_pattern) def get_matches(t): end = t - t % compute_epoch - (agg_epoch_delay-1) * compute_epoch if end > n_milis: end -= compute_epoch start = end - agg_interval print "start", start, "end", end if start < 0: return {} if agg_interval == 1: return list(per_mili_match_list[start]) return per_interval_match_list[(agg_interval, start, end-1)] tps = [] total_tp = 0 dif_stats = dict(totals=SeriesStats(), changes=SeriesStats()) old_matches = [] for t in range(n_milis): print "\r", t, "/", n_milis, matrix = list(per_mili_matrix_list[t]) matches = get_matches(t) matches_dif(matches, old_matches, dif_stats) tp = sum([matrix[m[0]][m[1]] + matrix[m[1]][m[0]] for m in matches]) tps.append(tp) total_tp += tp old_matches = matches return tps, total_tp, dif_stats def main(): parser = argparse.ArgumentParser( description="""Compute throughput for each mili [t,t+1) using MWM of interval [start, end) where end = t - t%compute_epoch - (agg_epoch_delay-1)*compute_epoch, and start = end - agg_interval. """, epilog="""Examples: online_milisecond: "compute_tp.py --agg_epoch_delay 1 --agg_interval 1 --compute_epoch 1" offline_milisecond: "compute_tp.py --agg_epoch_delay 0 --agg_interval 1 --compute_epoch 1" epoch_milisecond: "compute_tp.py --agg_epoch_delay 1 --agg_interval 1 --compute_epoch 100" online_epoch: "compute_tp.py --agg_epoch_delay 1 --agg_interval 100 --compute_epoch 100" offline_epoch: "compute_tp.py --agg_epoch_delay 0 --agg_interval 100 --compute_epoch 100" """) parser.add_argument('--conf', default="conf.json", type=open, help='configuration file (default: conf.json)') parser.add_argument('--compute_epoch', default=1, type=int, help='each compute interval length (default: 1 = mili)') parser.add_argument('--agg_interval', default=1, type=int, help='each aggregation interval length (default: 1 = mili)') parser.add_argument('--agg_epoch_delay', default=0, type=int, help='number of compute epoch delays before aggregation (default: 0 = offline)') args = parser.parse_args() conf = json.load(args.conf) conf["compute_epoch"] = args.compute_epoch conf["agg_interval"] = args.agg_interval conf["agg_epoch_delay"] = args.agg_epoch_delay tps, total_tp, dif_stats = compute_throughput(**conf) write_results(tps, total_tp, dif_stats, **conf) if __name__ == "__main__": main()