import random
import math
from scipy.stats import rv_continuous
import sys
import numpy
import threading
from os import path, fork
import os
import json
from matrices import MatrixList, make_zero_matrix
import argparse
from traffic_gen import generate_network_traffic, per_tor_to_per_mili
from mwm import compute_intervals_mwm, time_stat
import compute_tp
import distributed_win
import compute_total_load
import datetime
import time
from make_graphs import make_graphs_ex
import shutil
import default_params
import logging
logger = logging.getLogger()
handler = logging.FileHandler(__file__+".log")
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
# Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime).
# It goes host by host under the rack and generate its traffic. If there are x active flow simultanously they will share bandwidth
# equally until one is completed.
# python run_analysis.py --generate_traffic
def run_optimal(conf):
optimal_conf = conf.copy()
optimal_conf["top"] = None
optimal_conf["compute_epoch"] = 1
optimal_conf["agg_interval"] = 1
optimal_conf["agg_epoch_delay"] = 0
tps, total_tp, dif_stats = compute_tp.compute_throughput(**optimal_conf)
compute_tp.write_results(tps, total_tp, dif_stats, **optimal_conf)
def run_online(conf):
online_conf = conf.copy()
online_conf["top"] = None
online_conf["compute_epoch"] = 1
online_conf["agg_interval"] = 1
online_conf["agg_epoch_delay"] = 1
tps, total_tp, dif_stats = compute_tp.compute_throughput(**online_conf)
compute_tp.write_results(tps, total_tp, dif_stats, **online_conf)
def run_centralized(conf):
run_optimal(conf)
run_online(conf)
for agg_interval in default_params.AGG_INTERVALS:
for compute_epoch in default_params.COMPUTE_EPOCHES:
if default_params.FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
continue
if compute_epoch < agg_interval:
continue
for agg_epoch_delay in default_params.AGG_EPOCH_DELAYS:
conf["compute_epoch"] = compute_epoch
conf["agg_interval"] = agg_interval
conf["agg_epoch_delay"] = agg_epoch_delay
tps, total_tp, dif_stats = compute_tp.compute_throughput(**conf)
compute_tp.write_results(tps, total_tp, dif_stats, **conf)
def run_distributed_only(conf):
for window in default_params.WINDOWS:
for iterations in default_params.ITERATIONS:
conf["window"] = window
conf["threshold"] = -1
conf["iterations"] = iterations
tps, total_tp, dif_stats = distributed_win.compute_dist_only_throughput_ex(**conf)
distributed_win.write_results(tps, total_tp, dif_stats, **conf)
def run_distributed(conf):
for agg_interval in default_params.AGG_INTERVALS:
for compute_epoch in default_params.COMPUTE_EPOCHES:
if default_params.FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
continue
if compute_epoch < agg_interval:
continue
for agg_epoch_delay in default_params.AGG_EPOCH_DELAYS:
conf["compute_epoch"] = compute_epoch
conf["agg_interval"] = agg_interval
conf["agg_epoch_delay"] = agg_epoch_delay
for window in default_params.WINDOWS:
for threshold in default_params.THRESHOLDS:
for iterations in default_params.ITERATIONS:
conf["window"] = window
conf["threshold"] = threshold
conf["iterations"] = iterations
tps, total_tp, dif_stats = distributed_win.compute_throughput_ex(**conf)
distributed_win.write_results(tps, total_tp, dif_stats, **conf)
def compute_mwm(conf):
time_stats = {}
for agg_interval in default_params.AGG_INTERVALS:
time_stat.reset()
compute_intervals_mwm(interval_length=agg_interval, **conf)
var = time_stat.get_var()
time_stats[agg_interval] = dict(avg=time_stat.get_avg(), var=var, std_err=var**0.5)
json.dump(time_stats, open(path.join(conf["output_dir"], "mwm_time_stats.json"), "w"),
sort_keys=True, indent=4, separators=(',', ': '))
def run(conf, generate_traffic, compute_load, skip_mwm, skip_centralized, skip_distributed):
default_params.update_globals(**conf)
if generate_traffic:
flow_size_avg, flow_size_var = generate_network_traffic(**conf) # Generates host random traffic matrices
per_tor_to_per_mili(**conf)
total_load = compute_total_load.compute_load(**conf)
print ("****** the total load is: ", total_load)
compute_total_load.write_results(total_load, **conf)
conf["check_existing"] = False
elif compute_load:
total_load = compute_total_load.compute_load(**conf)
print ("****** the total load is: ", total_load)
compute_total_load.write_results(total_load, **conf)
flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
else:
total_load = json.load(open(compute_total_load.get_total_load_file_path(**conf)))["total_load"]
flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
conf["run_id"] = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
conf["total_load"] = total_load
conf["flow_avg"] = flow_size_avg
conf["flow_var"] = flow_size_var
if path.isfile("flow_size_multi_stats.json"):
flow_size_multi_stats = json.load(open("flow_size_multi_stats.json"))
else:
flow_size_multi_stats = {}
flow_size_multi_stats[conf["run_id"]] = dict(avg=flow_size_avg, var=flow_size_var)
json.dump(
flow_size_multi_stats,
open(path.join(conf["output_dir"], "flow_size_multi_stats.json"), "w"), indent=True, sort_keys=True)
for max_degree in default_params.DEGREES:
conf["max_degree"] = max_degree
if not skip_mwm:
compute_mwm(conf)
if not skip_centralized:
run_centralized(conf)
if not skip_distributed:
run_distributed_only(conf)
run_distributed(conf)
def multi_run(base_conf, runs, generate_traffic, compute_load, skip_mwm, skip_centralized, skip_distributed, del_dir=False):
output_dir = base_conf["output_dir"]
if path.isdir(output_dir):
if del_dir:
shutil.rmtree(output_dir)
os.mkdir(output_dir)
else:
os.mkdir(output_dir)
output_file = open(path.join(output_dir, "multi_run.out"), "w")
sys.stderr = output_file
sys.stdout = output_file
for i in range(runs):
conf = dict(base_conf)
run(conf, generate_traffic, compute_load, skip_mwm, skip_centralized, skip_distributed)
graphs_dir = base_conf.get("graphs_dir", "multi_res")
if not path.isdir(graphs_dir):
os.mkdir(graphs_dir)
make_graphs_ex(base_conf, graphs_dir=graphs_dir)
def is_existed(pid):
res, status = os.waitpid(pid, os.WNOHANG)
if res == pid:
if os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
else:
assert os.WIFEXITED(status)
returncode = os.WEXITSTATUS(status)
logger.debug("pid %d existed, returncode=%d", pid, returncode)
return True
return False
def poll_forks(forks, max_forks):
while len(forks) > max_forks:
new_forks = []
for pid in forks:
if is_existed(pid):
logger.info("child %d exited", pid)
else:
new_forks.append(pid)
forks = new_forks
time.sleep(60)
return forks
def main():
logger.info("start")
parser = argparse.ArgumentParser(description='Generate random traffic.')
parser.add_argument('--generate_traffic', action="store_true",
help='should generate traffic')
parser.add_argument('--compute_load', action="store_true",
help='should explicitly compute load')
parser.add_argument('--skip_mwm', action="store_true",
help='should skip computing MWMs')
parser.add_argument('--skip_centralized', action="store_true",
help='should skip simulating centralized')
parser.add_argument('--skip_distributed', action="store_true",
help='should skip simulating distributed')
parser.add_argument('--conf', default="conf.json", type=open,
help='configuration file (default: conf.json)')
parser.add_argument('--runs', default=1, type=int,
help='number of runs')
parser.add_argument('--max_forks', default=0, type=int,
help='should fork processes')
parser.add_argument('--del_dir', action="store_true",
help='should delete existing output dir')
args = parser.parse_args()
base_conf = json.load(args.conf)
factors = base_conf.get("flow_dist_factors")
if factors is None:
factors = [base_conf["flow_dist_factor"]]
rates = base_conf.get("rates")
if rates is None:
rates = [base_conf["rate"]]
forks = []
for rate in rates:
for factor in factors:
if args.max_forks:
forks = poll_forks(forks, args.max_forks - 1)
pid = fork()
if pid > 0:
logger.info("forked %d for factor=%f, rate=%d", pid, factor, rate)
forks.append(pid)
continue
elif pid < 0:
logger.exception("fork exception")
raise Exception("fork exception!")
conf = dict(base_conf)
conf["rate"] = rate
conf["flow_dist_factor"] = factor
conf["output_dir"] = base_conf["output_dir"] + "_%f_%d" % (factor, rate)
multi_run(conf, args.runs, args.generate_traffic, args.compute_load,
args.skip_mwm, args.skip_centralized, args.skip_distributed, args.del_dir)
if args.max_forks:
exit(0)
logger.info("waiting last forks %s", str(forks))
poll_forks(forks, 0)
logger.info("end.")
if __name__ == "__main__":
main()