https://bitbucket.org/NetaRS/sched_analytics
Revision 07c8d061e765d2f96d8e1ad7d8bde35f1fbeba32 authored by NetaRS on 17 October 2020, 12:40:45 UTC, committed by NetaRS on 17 October 2020, 12:40:45 UTC
1 parent 6aa6250
Tip revision: 07c8d061e765d2f96d8e1ad7d8bde35f1fbeba32 authored by NetaRS on 17 October 2020, 12:40:45 UTC
Add peer num to graph name
Add peer num to graph name
Tip revision: 07c8d06
run_analysis.py
import random
import math
from scipy.stats import rv_continuous
import sys
import numpy
import threading
from os import path
import json
from matrices import MatrixList, make_zero_matrix
import argparse
from traffic_gen import generate_network_traffic, per_tor_to_per_mili
from mwm import compute_intervals_mwm, time_stat
import compute_tp
import distributed_win
import compute_total_load
import datetime
# Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime).
# It goes host by host under the rack and generate its traffic. If there are x active flow simultanously they will share bandwidth
# equally until one is completed.
# python run_analysis.py --generate_traffic
AGG_INTERVALS = [20, 40, 80]
AGG_EPOCH_DELAYS = [0, 1, 2]
COMPUTE_EPOCHES = AGG_INTERVALS
WINDOWS = [1, 2]
THRESHOLDS = [0.1*i for i in range(11)]
ITERATIONS = [1,2,3]
FORCE_COMPUTE_EQUAL_AGG = True
DEGREES = [1,2,3,4]
def update_globals(agg_intervals=AGG_INTERVALS,
agg_epoch_delays=AGG_EPOCH_DELAYS,
compute_epoches=COMPUTE_EPOCHES,
windows=WINDOWS,
thresholds=THRESHOLDS,
iterations=ITERATIONS,
force_compute_equal_agg=FORCE_COMPUTE_EQUAL_AGG,
degrees=DEGREES,
max_degree=None,
**kwargs):
global AGG_INTERVALS, AGG_EPOCH_DELAYS, COMPUTE_EPOCHES, WINDOWS, THRESHOLDS, ITERATIONS, FORCE_COMPUTE_EQUAL_AGG, DEGREES
AGG_INTERVALS = agg_intervals
AGG_EPOCH_DELAYS = agg_epoch_delays
COMPUTE_EPOCHES = compute_epoches
WINDOWS = windows
THRESHOLDS = thresholds
ITERATIONS = iterations
FORCE_COMPUTE_EQUAL_AGG = force_compute_equal_agg
if max_degree is not None:
DEGREES = [max_degree]
else:
DEGREES = degrees
def run_optimal(conf):
optimal_conf = conf.copy()
optimal_conf["top"] = None
optimal_conf["compute_epoch"] = 1
optimal_conf["agg_interval"] = 1
optimal_conf["agg_epoch_delay"] = 0
tps, total_tp, dif_stats = compute_tp.compute_throughput(**optimal_conf)
compute_tp.write_results(tps, total_tp, dif_stats, **optimal_conf)
def run_online(conf):
online_conf = conf.copy()
online_conf["top"] = None
online_conf["compute_epoch"] = 1
online_conf["agg_interval"] = 1
online_conf["agg_epoch_delay"] = 1
tps, total_tp, dif_stats = compute_tp.compute_throughput(**online_conf)
compute_tp.write_results(tps, total_tp, dif_stats, **online_conf)
def run_centralized(conf):
run_optimal(conf)
run_online(conf)
for agg_interval in AGG_INTERVALS:
for compute_epoch in COMPUTE_EPOCHES:
if FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
continue
if compute_epoch < agg_interval:
continue
for agg_epoch_delay in AGG_EPOCH_DELAYS:
conf["compute_epoch"] = compute_epoch
conf["agg_interval"] = agg_interval
conf["agg_epoch_delay"] = agg_epoch_delay
tps, total_tp, dif_stats = compute_tp.compute_throughput(**conf)
compute_tp.write_results(tps, total_tp, dif_stats, **conf)
def run_distributed_only(conf):
for window in WINDOWS:
for iterations in ITERATIONS:
conf["window"] = window
conf["threshold"] = -1
conf["iterations"] = iterations
tps, total_tp, dif_stats = distributed_win.compute_dist_only_throughput_ex(**conf)
distributed_win.write_results(tps, total_tp, dif_stats, **conf)
def run_distributed(conf):
for agg_interval in AGG_INTERVALS:
for compute_epoch in COMPUTE_EPOCHES:
if FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
continue
if compute_epoch < agg_interval:
continue
for agg_epoch_delay in AGG_EPOCH_DELAYS:
conf["compute_epoch"] = compute_epoch
conf["agg_interval"] = agg_interval
conf["agg_epoch_delay"] = agg_epoch_delay
for window in WINDOWS:
for threshold in THRESHOLDS:
for iterations in ITERATIONS:
conf["window"] = window
conf["threshold"] = threshold
conf["iterations"] = iterations
tps, total_tp, dif_stats = distributed_win.compute_throughput_ex(**conf)
distributed_win.write_results(tps, total_tp, dif_stats, **conf)
def compute_mwm(conf):
time_stats = {}
for agg_interval in AGG_INTERVALS:
time_stat.reset()
compute_intervals_mwm(interval_length=agg_interval, **conf)
var = time_stat.get_var()
time_stats[agg_interval] = dict(avg=time_stat.get_avg(), var=var, std_err=var**0.5)
json.dump(time_stats, open(path.join(conf["output_dir"], "mwm_time_stats.json"), "w"),
sort_keys=True, indent=4, separators=(',', ': '))
def main():
parser = argparse.ArgumentParser(description='Generate random traffic.')
parser.add_argument('--generate_traffic', action="store_true",
help='should generate traffic')
parser.add_argument('--compute_load', action="store_true",
help='should explicitly compute load')
parser.add_argument('--skip_mwm', action="store_true",
help='should skip computing MWMs')
parser.add_argument('--skip_centralized', action="store_true",
help='should skip simulating centralized')
parser.add_argument('--skip_distributed', action="store_true",
help='should skip simulating distributed')
parser.add_argument('--conf', default="conf.json", type=open,
help='configuration file (default: conf.json)')
parser.add_argument('--runs', default=1, type=int,
help='number of runs')
args = parser.parse_args()
base_conf = json.load(args.conf)
for run in range(args.runs):
conf = dict(base_conf)
update_globals(**conf)
if args.generate_traffic:
flow_size_avg, flow_size_var = generate_network_traffic(**conf) # Generates host random traffic matrices
per_tor_to_per_mili(**conf)
total_load = compute_total_load.compute_load(**conf)
print ("****** the total load is: ", total_load)
compute_total_load.write_results(total_load, **conf)
conf["check_existing"] = False
elif args.compute_load:
total_load = compute_total_load.compute_load(**conf)
print ("****** the total load is: ", total_load)
compute_total_load.write_results(total_load, **conf)
flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
else:
total_load = json.load(open(compute_total_load.get_total_load_file_path(**conf)))["total_load"]
flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
conf["run_id"] = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
conf["total_load"] = total_load
conf["flow_avg"] = flow_size_avg
conf["flow_var"] = flow_size_var
if path.isfile("flow_size_multi_stats.json"):
flow_size_multi_stats = json.load(open("flow_size_multi_stats.json"))
else:
flow_size_multi_stats = {}
flow_size_multi_stats[conf["run_id"]] = dict(avg=flow_size_avg, var=flow_size_var)
json.dump(
flow_size_multi_stats,
open(path.join(conf["output_dir"], "flow_size_multi_stats.json"), "w"), indent=True, sort_keys=True)
for max_degree in DEGREES:
conf["max_degree"] = max_degree
if not args.skip_mwm:
compute_mwm(conf)
if not args.skip_centralized:
run_centralized(conf)
if not args.skip_distributed:
run_distributed_only(conf)
run_distributed(conf)
if __name__ == "__main__":
main()
Computing file changes ...