https://bitbucket.org/NetaRS/sched_analytics
Revision 07c8d061e765d2f96d8e1ad7d8bde35f1fbeba32 authored by NetaRS on 17 October 2020, 12:40:45 UTC, committed by NetaRS on 17 October 2020, 12:40:45 UTC
1 parent 6aa6250
Raw File
Tip revision: 07c8d061e765d2f96d8e1ad7d8bde35f1fbeba32 authored by NetaRS on 17 October 2020, 12:40:45 UTC
Add peer num to graph name
Tip revision: 07c8d06
run_analysis.py
import random
import math
from scipy.stats import rv_continuous
import sys
import numpy
import threading
from os import path
import json
from matrices import MatrixList, make_zero_matrix
import argparse

from traffic_gen import generate_network_traffic, per_tor_to_per_mili
from mwm import compute_intervals_mwm, time_stat
import compute_tp
import distributed_win
import compute_total_load
import datetime

# Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime). 
# It goes host by host under the rack and generate its traffic.  If there are x active flow simultanously they will share bandwidth
# equally until one is completed. 
# python run_analysis.py --generate_traffic

AGG_INTERVALS = [20, 40, 80]
AGG_EPOCH_DELAYS = [0, 1, 2]
COMPUTE_EPOCHES = AGG_INTERVALS
WINDOWS = [1, 2]
THRESHOLDS = [0.1*i for i in range(11)]
ITERATIONS = [1,2,3]
FORCE_COMPUTE_EQUAL_AGG = True
DEGREES = [1,2,3,4]


def update_globals(agg_intervals=AGG_INTERVALS,
                   agg_epoch_delays=AGG_EPOCH_DELAYS,
                   compute_epoches=COMPUTE_EPOCHES,
                   windows=WINDOWS,
                   thresholds=THRESHOLDS,
                   iterations=ITERATIONS,
                   force_compute_equal_agg=FORCE_COMPUTE_EQUAL_AGG,
                   degrees=DEGREES,
                   max_degree=None,
                   **kwargs):
    global AGG_INTERVALS, AGG_EPOCH_DELAYS, COMPUTE_EPOCHES, WINDOWS, THRESHOLDS, ITERATIONS, FORCE_COMPUTE_EQUAL_AGG, DEGREES
    AGG_INTERVALS = agg_intervals
    AGG_EPOCH_DELAYS = agg_epoch_delays
    COMPUTE_EPOCHES = compute_epoches
    WINDOWS = windows
    THRESHOLDS = thresholds
    ITERATIONS = iterations
    FORCE_COMPUTE_EQUAL_AGG = force_compute_equal_agg
    if max_degree is not None:
        DEGREES = [max_degree]
    else:
        DEGREES = degrees


def run_optimal(conf):
    optimal_conf = conf.copy()
    optimal_conf["top"] = None
    optimal_conf["compute_epoch"] = 1
    optimal_conf["agg_interval"] = 1
    optimal_conf["agg_epoch_delay"] = 0
    tps, total_tp, dif_stats = compute_tp.compute_throughput(**optimal_conf)
    compute_tp.write_results(tps, total_tp, dif_stats, **optimal_conf)


def run_online(conf):
    online_conf = conf.copy()
    online_conf["top"] = None
    online_conf["compute_epoch"] = 1
    online_conf["agg_interval"] = 1
    online_conf["agg_epoch_delay"] = 1
    tps, total_tp, dif_stats = compute_tp.compute_throughput(**online_conf)
    compute_tp.write_results(tps, total_tp, dif_stats, **online_conf)


def run_centralized(conf):
    run_optimal(conf)
    run_online(conf)
    for agg_interval in AGG_INTERVALS:
        for compute_epoch in COMPUTE_EPOCHES:
            if FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
                continue
            if compute_epoch < agg_interval:
                continue
            for agg_epoch_delay in AGG_EPOCH_DELAYS:
                conf["compute_epoch"] = compute_epoch
                conf["agg_interval"] = agg_interval
                conf["agg_epoch_delay"] = agg_epoch_delay
                tps, total_tp, dif_stats = compute_tp.compute_throughput(**conf)
                compute_tp.write_results(tps, total_tp, dif_stats, **conf)


def run_distributed_only(conf):
    for window in WINDOWS:
        for iterations in ITERATIONS:
            conf["window"] = window
            conf["threshold"] = -1
            conf["iterations"] = iterations
            tps, total_tp, dif_stats = distributed_win.compute_dist_only_throughput_ex(**conf)
            distributed_win.write_results(tps, total_tp, dif_stats, **conf)


def run_distributed(conf):
    for agg_interval in AGG_INTERVALS:
        for compute_epoch in COMPUTE_EPOCHES:
            if FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
                continue
            if compute_epoch < agg_interval:
                continue
            for agg_epoch_delay in AGG_EPOCH_DELAYS:
                conf["compute_epoch"] = compute_epoch
                conf["agg_interval"] = agg_interval
                conf["agg_epoch_delay"] = agg_epoch_delay
                for window in WINDOWS:
                    for threshold in THRESHOLDS:
                        for iterations in ITERATIONS:
                            conf["window"] = window
                            conf["threshold"] = threshold
                            conf["iterations"] = iterations

                            tps, total_tp, dif_stats = distributed_win.compute_throughput_ex(**conf)
                            distributed_win.write_results(tps, total_tp, dif_stats, **conf)


def compute_mwm(conf):
    time_stats = {}
    for agg_interval in AGG_INTERVALS:
        time_stat.reset()
        compute_intervals_mwm(interval_length=agg_interval, **conf)
        var = time_stat.get_var()
        time_stats[agg_interval] = dict(avg=time_stat.get_avg(), var=var, std_err=var**0.5)
        json.dump(time_stats, open(path.join(conf["output_dir"], "mwm_time_stats.json"), "w"),
                  sort_keys=True, indent=4, separators=(',', ': '))
              
                            
def main():
    parser = argparse.ArgumentParser(description='Generate random traffic.')
    parser.add_argument('--generate_traffic', action="store_true",
                        help='should generate traffic')
    parser.add_argument('--compute_load', action="store_true",
                        help='should explicitly compute load')
    parser.add_argument('--skip_mwm', action="store_true",
                        help='should skip computing MWMs')
    parser.add_argument('--skip_centralized', action="store_true",
                        help='should skip simulating centralized')
    parser.add_argument('--skip_distributed', action="store_true",
                        help='should skip simulating distributed')
    parser.add_argument('--conf', default="conf.json", type=open,
                        help='configuration file (default: conf.json)')
    parser.add_argument('--runs', default=1, type=int,
                        help='number of runs')
    
    args = parser.parse_args()
    base_conf = json.load(args.conf)
    for run in range(args.runs):
        conf = dict(base_conf)
        update_globals(**conf)
        if args.generate_traffic:
            flow_size_avg, flow_size_var = generate_network_traffic(**conf)  # Generates host random traffic matrices
            per_tor_to_per_mili(**conf)
            total_load = compute_total_load.compute_load(**conf)
            print ("****** the total load is: ", total_load)
            compute_total_load.write_results(total_load, **conf)
            conf["check_existing"] = False
        elif args.compute_load:
            total_load = compute_total_load.compute_load(**conf)
            print ("****** the total load is: ", total_load)
            compute_total_load.write_results(total_load, **conf)
            flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
            flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
        else:
            total_load = json.load(open(compute_total_load.get_total_load_file_path(**conf)))["total_load"]
            flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
            flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
        conf["run_id"] = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        conf["total_load"] = total_load
        conf["flow_avg"] = flow_size_avg
        conf["flow_var"] = flow_size_var
        if path.isfile("flow_size_multi_stats.json"):
            flow_size_multi_stats = json.load(open("flow_size_multi_stats.json"))
        else:
            flow_size_multi_stats = {}
        flow_size_multi_stats[conf["run_id"]] = dict(avg=flow_size_avg, var=flow_size_var)
        json.dump(
            flow_size_multi_stats,
            open(path.join(conf["output_dir"], "flow_size_multi_stats.json"), "w"), indent=True, sort_keys=True)
        for max_degree in DEGREES:
            conf["max_degree"] = max_degree
            if not args.skip_mwm:
                compute_mwm(conf)
            if not args.skip_centralized:
                run_centralized(conf)
            if not args.skip_distributed:
                run_distributed_only(conf)
                run_distributed(conf)
                

if __name__ == "__main__":
    main()
back to top