Raw File
Tip revision: ed1f2acca39de9eb5f34a6cb5b0c8db1492f74f2 authored by NetaRS on 12 December 2020, 09:53:39 UTC
bounded traffic distributed
Tip revision: ed1f2ac
import sys
from os import path
import json
import argparse
from matrices import MatrixList
from mwm import DictList

# Compute the throughput based on traffic files (in milisecond representations)
# and maximum weight matching of different granularities and timing

# Let MWM(t1,t2) be the maximum weight matching obtained by looking at all
# traffic in interval [t1,t2]

# argv[1] is the algorithm we are using:
# + offline_milisecond - compute throuput for t based on MWM(t,t)
# + online_milisecond - compute throughput for t based on MWM(t-1,t-1)

# Next options used frame sizes given in argv[2]: (e.g., 100 milisecond)
# For time t and frame size f, let start_frame be the first time-slot of
# the frame containing t.
# + epoch_milisecond -  througput for t based on MWM(start_frame-1,start_frame-1)
# + offline_epoch -  througput for t based on MWM(start_frame,start_frame+f-1)
# + online_epoch -  througput for t based on MWM(start_frame-f,start_frame-1)

# The script will print the aggregated total througput of 3 seconds on
# all racks, and will print to file the vector of per-milisecond total throughput

def get_new_mode_name_by_params(compute_epoch=1, agg_interval=1, agg_epoch_delay=0):
    if agg_epoch_delay ==0:
        mode = "offline"
        mode = "online_delay" + str(agg_epoch_delay)
    mode+= "_epoch"+str(compute_epoch) + "_agg"+str(agg_interval)
    return mode

def get_params_by_ol_mode_name(mode_name, compute_epoch=1):
    if mode_name == "offline_milisecond":
        return dict(compute_epoch=1, agg_interval=1, agg_epoch_delay=0)
    if mode_name == "online_milisecond":
        return dict(compute_epoch=1, agg_interval=1, agg_epoch_delay=1)
    if mode_name == "epoch_milisecond":
        return dict(compute_epoch=compute_epoch, agg_interval=1, agg_epoch_delay=1)
    if mode_name == "offline_epoch":
        return dict(compute_epoch=compute_epoch, agg_interval=compute_epoch, agg_epoch_delay=0)
    if mode_name == "online_epoch":
        return dict(compute_epoch=compute_epoch, agg_interval=compute_epoch, agg_epoch_delay=1)
    return {}

def write_results(tps, total_tp, dif_stats,
                  compute_epoch=1, agg_interval=1, agg_epoch_delay=0, n_milis=5000, top=None,
                  output_dir=".", max_degree=1, total_load=0, run_id="", flow_avg=0, flow_var=0, **kwargs):
    mode_name = get_new_mode_name_by_params(compute_epoch, agg_interval, agg_epoch_delay)
    test_file_path = path.join(output_dir, "res_"+str(n_milis)+"_"+mode_name+".json")
    with open(test_file_path, "w") as test_res_file:
        json.dump({"total_tp": total_tp, "tps": tps}, test_res_file)
    all_res_file_path = path.join(output_dir, "res_"+str(n_milis)+".csv")
    if not path.isfile(all_res_file_path):
        with open(all_res_file_path, "w") as all_res_file:
            row = ["mode", "total_tp", "n_milis", "compute_epoch", "agg_interval", "agg_epoch_delay", "top", "max_degree",
                   "links_avg", "links_var", "change_avg", "change_var", "total_load", "run_id", "flow_avg", "flow_var", "tps"]
            all_res_file.write(",".join(map(str, row))+"\n")
    with open(all_res_file_path, "a+") as all_res_file:
        row = [mode_name, total_tp, n_milis, compute_epoch, agg_interval, agg_epoch_delay, top, max_degree,
               dif_stats["totals"].get_avg(), dif_stats["totals"].get_var(),
               dif_stats["changes"].get_avg(), dif_stats["changes"].get_var(),
               total_load, run_id, flow_avg, flow_var] +tps
        all_res_file.write(",".join(map(str, row))+"\n")

class SeriesStats:
    def __init__(self):
        self.sum = 0
        self.sum_sqr = 0
        self.count = 0
    def add(self, x):
        self.sum += x
        self.sum_sqr += x**2
        self.count += 1
    def get_avg(self):
        return self.sum * 1.0 / self.count
    def get_var(self):
        return self.sum_sqr * 1.0 / self.count - self.get_avg()**2

def matches_dif(new_matches, old_matches, dif_stats):
    new_matches = {tuple(p) for p in new_matches}
    old_matches = {tuple(p) for p in old_matches}
    total = len(new_matches)
    change = len(set(new_matches) ^ set(old_matches))

def compute_throughput(compute_epoch=1, agg_interval=1, agg_epoch_delay=0, n_milis=5000, top=None, output_dir=".", **kwargs):
    print dict(compute_epoch=compute_epoch, agg_interval=agg_interval, agg_epoch_delay=agg_epoch_delay)
    per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
    per_mili_matrix_list = MatrixList(per_mili_pattern)
    if top:
        per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
        per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
    per_mili_match_list = DictList(per_mili_match_pattern)

    if top:
        per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
        per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
    per_interval_match_list = DictList(per_interval_match_pattern)
    def get_matches(t):
        end = t - t % compute_epoch - (agg_epoch_delay-1) * compute_epoch
        if end > n_milis:
            end -= compute_epoch
        start = end - agg_interval
        print "start", start, "end", end
        if start < 0:
            return {}
        if agg_interval == 1:
            return list(per_mili_match_list[start])
        return per_interval_match_list[(agg_interval, start, end-1)]
    tps = []
    total_tp = 0
    dif_stats = dict(totals=SeriesStats(), changes=SeriesStats())
    old_matches = []
    for t in range(n_milis):
        print "\r", t, "/", n_milis,
        matrix = list(per_mili_matrix_list[t])
        matches = get_matches(t)
        matches_dif(matches, old_matches, dif_stats)
        tp = sum([matrix[m[0]][m[1]] + matrix[m[1]][m[0]] for m in matches])
        total_tp += tp
        old_matches = matches
    return tps, total_tp, dif_stats

def main():
    parser = argparse.ArgumentParser(
        description="""Compute throughput for each mili [t,t+1) using MWM of interval [start, end) where
    end = t - t%compute_epoch - (agg_epoch_delay-1)*compute_epoch, and
    start = end - agg_interval.
    online_milisecond:  " --agg_epoch_delay 1 --agg_interval 1   --compute_epoch 1"
    offline_milisecond: " --agg_epoch_delay 0 --agg_interval 1   --compute_epoch 1"
    epoch_milisecond:   " --agg_epoch_delay 1 --agg_interval 1   --compute_epoch 100"
    online_epoch:       " --agg_epoch_delay 1 --agg_interval 100 --compute_epoch 100"
    offline_epoch:      " --agg_epoch_delay 0 --agg_interval 100 --compute_epoch 100"
    parser.add_argument('--conf', default="conf.json", type=open,
                        help='configuration file (default: conf.json)')
    parser.add_argument('--compute_epoch', default=1, type=int,
                        help='each compute interval length (default: 1 = mili)')
    parser.add_argument('--agg_interval', default=1, type=int,
                        help='each aggregation interval length (default: 1 = mili)')
    parser.add_argument('--agg_epoch_delay', default=0, type=int,
                        help='number of compute epoch delays before aggregation (default: 0 = offline)')
    args = parser.parse_args()
    conf = json.load(args.conf)
    conf["compute_epoch"] = args.compute_epoch
    conf["agg_interval"] = args.agg_interval
    conf["agg_epoch_delay"] = args.agg_epoch_delay
    tps, total_tp, dif_stats = compute_throughput(**conf)
    write_results(tps, total_tp, dif_stats, **conf)

if __name__ == "__main__":
back to top