import random import math from scipy.stats import rv_continuous import sys import numpy import threading from os import path import json from matrices import MatrixList, make_zero_matrix import argparse from traffic_gen import generate_network_traffic, per_tor_to_per_mili from mwm import compute_intervals_mwm, time_stat import compute_tp import distributed_win import compute_total_load import datetime # Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime). # It goes host by host under the rack and generate its traffic. If there are x active flow simultanously they will share bandwidth # equally until one is completed. # python run_analysis.py --generate_traffic AGG_INTERVALS = [20, 40, 80] AGG_EPOCH_DELAYS = [0, 1, 2] COMPUTE_EPOCHES = AGG_INTERVALS WINDOWS = [1, 2] THRESHOLDS = [0.1*i for i in range(11)] ITERATIONS = [1,2,3] FORCE_COMPUTE_EQUAL_AGG = True DEGREES = [1,2,3,4] def update_globals(agg_intervals=AGG_INTERVALS, agg_epoch_delays=AGG_EPOCH_DELAYS, compute_epoches=COMPUTE_EPOCHES, windows=WINDOWS, thresholds=THRESHOLDS, iterations=ITERATIONS, force_compute_equal_agg=FORCE_COMPUTE_EQUAL_AGG, degrees=DEGREES, max_degree=None, **kwargs): global AGG_INTERVALS, AGG_EPOCH_DELAYS, COMPUTE_EPOCHES, WINDOWS, THRESHOLDS, ITERATIONS, FORCE_COMPUTE_EQUAL_AGG, DEGREES AGG_INTERVALS = agg_intervals AGG_EPOCH_DELAYS = agg_epoch_delays COMPUTE_EPOCHES = compute_epoches WINDOWS = windows THRESHOLDS = thresholds ITERATIONS = iterations FORCE_COMPUTE_EQUAL_AGG = force_compute_equal_agg if max_degree is not None: DEGREES = [max_degree] else: DEGREES = degrees def run_optimal(conf): optimal_conf = conf.copy() optimal_conf["top"] = None optimal_conf["compute_epoch"] = 1 optimal_conf["agg_interval"] = 1 optimal_conf["agg_epoch_delay"] = 0 tps, total_tp, dif_stats = compute_tp.compute_throughput(**optimal_conf) compute_tp.write_results(tps, total_tp, dif_stats, **optimal_conf) def run_online(conf): online_conf = conf.copy() online_conf["top"] = None online_conf["compute_epoch"] = 1 online_conf["agg_interval"] = 1 online_conf["agg_epoch_delay"] = 1 tps, total_tp, dif_stats = compute_tp.compute_throughput(**online_conf) compute_tp.write_results(tps, total_tp, dif_stats, **online_conf) def run_centralized(conf): run_optimal(conf) run_online(conf) for agg_interval in AGG_INTERVALS: for compute_epoch in COMPUTE_EPOCHES: if FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval: continue if compute_epoch < agg_interval: continue for agg_epoch_delay in AGG_EPOCH_DELAYS: conf["compute_epoch"] = compute_epoch conf["agg_interval"] = agg_interval conf["agg_epoch_delay"] = agg_epoch_delay tps, total_tp, dif_stats = compute_tp.compute_throughput(**conf) compute_tp.write_results(tps, total_tp, dif_stats, **conf) def run_distributed_only(conf): for window in WINDOWS: for iterations in ITERATIONS: conf["window"] = window conf["threshold"] = -1 conf["iterations"] = iterations tps, total_tp, dif_stats = distributed_win.compute_dist_only_throughput_ex(**conf) distributed_win.write_results(tps, total_tp, dif_stats, **conf) def run_distributed(conf): for agg_interval in AGG_INTERVALS: for compute_epoch in COMPUTE_EPOCHES: if FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval: continue if compute_epoch < agg_interval: continue for agg_epoch_delay in AGG_EPOCH_DELAYS: conf["compute_epoch"] = compute_epoch conf["agg_interval"] = agg_interval conf["agg_epoch_delay"] = agg_epoch_delay for window in WINDOWS: for threshold in THRESHOLDS: for iterations in ITERATIONS: conf["window"] = window conf["threshold"] = threshold conf["iterations"] = iterations tps, total_tp, dif_stats = distributed_win.compute_throughput_ex(**conf) distributed_win.write_results(tps, total_tp, dif_stats, **conf) def compute_mwm(conf): time_stats = {} for agg_interval in AGG_INTERVALS: time_stat.reset() compute_intervals_mwm(interval_length=agg_interval, **conf) var = time_stat.get_var() time_stats[agg_interval] = dict(avg=time_stat.get_avg(), var=var, std_err=var**0.5) json.dump(time_stats, open(path.join(conf["output_dir"], "mwm_time_stats.json"), "w"), sort_keys=True, indent=4, separators=(',', ': ')) def main(): parser = argparse.ArgumentParser(description='Generate random traffic.') parser.add_argument('--generate_traffic', action="store_true", help='should generate traffic') parser.add_argument('--compute_load', action="store_true", help='should explicitly compute load') parser.add_argument('--skip_mwm', action="store_true", help='should skip computing MWMs') parser.add_argument('--skip_centralized', action="store_true", help='should skip simulating centralized') parser.add_argument('--skip_distributed', action="store_true", help='should skip simulating distributed') parser.add_argument('--conf', default="conf.json", type=open, help='configuration file (default: conf.json)') parser.add_argument('--runs', default=1, type=int, help='number of runs') args = parser.parse_args() base_conf = json.load(args.conf) for run in range(args.runs): conf = dict(base_conf) update_globals(**conf) if args.generate_traffic: flow_size_avg, flow_size_var = generate_network_traffic(**conf) # Generates host random traffic matrices per_tor_to_per_mili(**conf) total_load = compute_total_load.compute_load(**conf) print ("****** the total load is: ", total_load) compute_total_load.write_results(total_load, **conf) conf["check_existing"] = False elif args.compute_load: total_load = compute_total_load.compute_load(**conf) print ("****** the total load is: ", total_load) compute_total_load.write_results(total_load, **conf) flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json"))) flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"] else: total_load = json.load(open(compute_total_load.get_total_load_file_path(**conf)))["total_load"] flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json"))) flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"] conf["run_id"] = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") conf["total_load"] = total_load conf["flow_avg"] = flow_size_avg conf["flow_var"] = flow_size_var if path.isfile("flow_size_multi_stats.json"): flow_size_multi_stats = json.load(open("flow_size_multi_stats.json")) else: flow_size_multi_stats = {} flow_size_multi_stats[conf["run_id"]] = dict(avg=flow_size_avg, var=flow_size_var) json.dump( flow_size_multi_stats, open(path.join(conf["output_dir"], "flow_size_multi_stats.json"), "w"), indent=True, sort_keys=True) for max_degree in DEGREES: conf["max_degree"] = max_degree if not args.skip_mwm: compute_mwm(conf) if not args.skip_centralized: run_centralized(conf) if not args.skip_distributed: run_distributed_only(conf) run_distributed(conf) if __name__ == "__main__": main()