https://bitbucket.org/NetaRS/sched_analytics
Raw File
Tip revision: ed1f2acca39de9eb5f34a6cb5b0c8db1492f74f2 authored by NetaRS on 12 December 2020, 09:53:39 UTC
bounded traffic distributed
Tip revision: ed1f2ac
run_analysis.py
import random
import math
from scipy.stats import rv_continuous
import sys
import numpy
import threading
from os import path, fork
import os
import json
from matrices import MatrixList, make_zero_matrix
import argparse

from traffic_gen import generate_network_traffic, per_tor_to_per_mili
from mwm import compute_intervals_mwm, time_stat
import compute_tp
import distributed_win
import compute_total_load
import datetime
import time
from make_graphs import make_graphs_ex
import shutil
import default_params
import logging

logger = logging.getLogger()
handler = logging.FileHandler(__file__+".log")
formatter = logging.Formatter(
        '%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

# Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime). 
# It goes host by host under the rack and generate its traffic.  If there are x active flow simultanously they will share bandwidth
# equally until one is completed. 
# python run_analysis.py --generate_traffic


def run_optimal(conf):
    optimal_conf = conf.copy()
    optimal_conf["top"] = None
    optimal_conf["compute_epoch"] = 1
    optimal_conf["agg_interval"] = 1
    optimal_conf["agg_epoch_delay"] = 0
    tps, total_tp, dif_stats = compute_tp.compute_throughput(**optimal_conf)
    compute_tp.write_results(tps, total_tp, dif_stats, **optimal_conf)


def run_online(conf):
    online_conf = conf.copy()
    online_conf["top"] = None
    online_conf["compute_epoch"] = 1
    online_conf["agg_interval"] = 1
    online_conf["agg_epoch_delay"] = 1
    tps, total_tp, dif_stats = compute_tp.compute_throughput(**online_conf)
    compute_tp.write_results(tps, total_tp, dif_stats, **online_conf)


def run_centralized(conf):
    run_optimal(conf)
    run_online(conf)
    for agg_interval in default_params.AGG_INTERVALS:
        for compute_epoch in default_params.COMPUTE_EPOCHES:
            if default_params.FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
                continue
            if compute_epoch < agg_interval:
                continue
            for agg_epoch_delay in default_params.AGG_EPOCH_DELAYS:
                conf["compute_epoch"] = compute_epoch
                conf["agg_interval"] = agg_interval
                conf["agg_epoch_delay"] = agg_epoch_delay
                tps, total_tp, dif_stats = compute_tp.compute_throughput(**conf)
                compute_tp.write_results(tps, total_tp, dif_stats, **conf)


def run_distributed_only(conf):
    for window in default_params.WINDOWS:
        for iterations in default_params.ITERATIONS:
            conf["window"] = window
            conf["threshold"] = -1
            conf["iterations"] = iterations
            tps, total_tp, dif_stats = distributed_win.compute_dist_only_throughput_ex(**conf)
            distributed_win.write_results(tps, total_tp, dif_stats, **conf)


def run_distributed(conf):
    for agg_interval in default_params.AGG_INTERVALS:
        for compute_epoch in default_params.COMPUTE_EPOCHES:
            if default_params.FORCE_COMPUTE_EQUAL_AGG and compute_epoch != agg_interval:
                continue
            if compute_epoch < agg_interval:
                continue
            for agg_epoch_delay in default_params.AGG_EPOCH_DELAYS:
                conf["compute_epoch"] = compute_epoch
                conf["agg_interval"] = agg_interval
                conf["agg_epoch_delay"] = agg_epoch_delay
                for window in default_params.WINDOWS:
                    for threshold in default_params.THRESHOLDS:
                        for iterations in default_params.ITERATIONS:
                            conf["window"] = window
                            conf["threshold"] = threshold
                            conf["iterations"] = iterations

                            tps, total_tp, dif_stats = distributed_win.compute_throughput_ex(**conf)
                            distributed_win.write_results(tps, total_tp, dif_stats, **conf)


def compute_mwm(conf):
    time_stats = {}
    for agg_interval in default_params.AGG_INTERVALS:
        time_stat.reset()
        compute_intervals_mwm(interval_length=agg_interval, **conf)
        var = time_stat.get_var()
        time_stats[agg_interval] = dict(avg=time_stat.get_avg(), var=var, std_err=var**0.5)
        json.dump(time_stats, open(path.join(conf["output_dir"], "mwm_time_stats.json"), "w"),
                  sort_keys=True, indent=4, separators=(',', ': '))


def run(conf, generate_traffic, compute_load, skip_mwm, skip_centralized, skip_distributed):
    default_params.update_globals(**conf)
    if generate_traffic:
        flow_size_avg, flow_size_var = generate_network_traffic(**conf)  # Generates host random traffic matrices
        per_tor_to_per_mili(**conf)
        total_load = compute_total_load.compute_load(**conf)
        print ("****** the total load is: ", total_load)
        compute_total_load.write_results(total_load, **conf)
        conf["check_existing"] = False
    elif compute_load:
        total_load = compute_total_load.compute_load(**conf)
        print ("****** the total load is: ", total_load)
        compute_total_load.write_results(total_load, **conf)
        flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
        flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
    else:
        total_load = json.load(open(compute_total_load.get_total_load_file_path(**conf)))["total_load"]
        flow_size_stats = json.load(open(path.join(conf["output_dir"], "flow_size_stats.json")))
        flow_size_avg, flow_size_var = flow_size_stats["avg"], flow_size_stats["var"]
    conf["run_id"] = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    conf["total_load"] = total_load
    conf["flow_avg"] = flow_size_avg
    conf["flow_var"] = flow_size_var
    if path.isfile("flow_size_multi_stats.json"):
        flow_size_multi_stats = json.load(open("flow_size_multi_stats.json"))
    else:
        flow_size_multi_stats = {}
    flow_size_multi_stats[conf["run_id"]] = dict(avg=flow_size_avg, var=flow_size_var)
    json.dump(
        flow_size_multi_stats,
        open(path.join(conf["output_dir"], "flow_size_multi_stats.json"), "w"), indent=True, sort_keys=True)
    for max_degree in default_params.DEGREES:
        conf["max_degree"] = max_degree
        if not skip_mwm:
            compute_mwm(conf)
        if not skip_centralized:
            run_centralized(conf)
        if not skip_distributed:
            run_distributed_only(conf)
            run_distributed(conf)
 
 
def multi_run(base_conf, runs, generate_traffic, compute_load, skip_mwm, skip_centralized, skip_distributed, del_dir=False):
    output_dir = base_conf["output_dir"]
    if path.isdir(output_dir):
        if del_dir:
            shutil.rmtree(output_dir)
            os.mkdir(output_dir)
    else:
        os.mkdir(output_dir)
    output_file = open(path.join(output_dir, "multi_run.out"), "w")
    sys.stderr = output_file
    sys.stdout = output_file
    for i in range(runs):
        conf = dict(base_conf)
        run(conf, generate_traffic, compute_load, skip_mwm, skip_centralized, skip_distributed)
    graphs_dir = base_conf.get("graphs_dir", "multi_res")
    if not path.isdir(graphs_dir):
        os.mkdir(graphs_dir)
    make_graphs_ex(base_conf, graphs_dir=graphs_dir)


def is_existed(pid):
    res, status = os.waitpid(pid, os.WNOHANG)
    if res == pid:
        if os.WIFSIGNALED(status):
            returncode = -os.WTERMSIG(status)
        else:
            assert os.WIFEXITED(status)
            returncode = os.WEXITSTATUS(status)
        logger.debug("pid %d existed, returncode=%d", pid, returncode)
        return True
    return False


def poll_forks(forks, max_forks):
    while len(forks) > max_forks:
        new_forks = []
        for pid in forks:
            if is_existed(pid):
                logger.info("child %d exited", pid)
            else:
                new_forks.append(pid)
        forks = new_forks
        time.sleep(60)
    return forks
 
    
def main():
    logger.info("start")
    parser = argparse.ArgumentParser(description='Generate random traffic.')
    parser.add_argument('--generate_traffic', action="store_true",
                        help='should generate traffic')
    parser.add_argument('--compute_load', action="store_true",
                        help='should explicitly compute load')
    parser.add_argument('--skip_mwm', action="store_true",
                        help='should skip computing MWMs')
    parser.add_argument('--skip_centralized', action="store_true",
                        help='should skip simulating centralized')
    parser.add_argument('--skip_distributed', action="store_true",
                        help='should skip simulating distributed')
    parser.add_argument('--conf', default="conf.json", type=open,
                        help='configuration file (default: conf.json)')
    parser.add_argument('--runs', default=1, type=int,
                        help='number of runs')
    parser.add_argument('--max_forks', default=0, type=int,
                        help='should fork processes')
    parser.add_argument('--del_dir', action="store_true",
                        help='should delete existing output dir')
    
    args = parser.parse_args()
    base_conf = json.load(args.conf)
    factors = base_conf.get("flow_dist_factors")
    if factors is None:
        factors = [base_conf["flow_dist_factor"]]
    rates = base_conf.get("rates")
    if rates is None:
        rates = [base_conf["rate"]]
    forks = []
    for rate in rates:
        for factor in factors:
            if args.max_forks:
                forks = poll_forks(forks, args.max_forks - 1)
                pid = fork()
                if pid > 0:
                    logger.info("forked %d for factor=%f, rate=%d", pid, factor, rate)
                    forks.append(pid)
                    continue
                elif pid < 0:
                    logger.exception("fork exception")
                    raise Exception("fork exception!")
            conf = dict(base_conf)
            conf["rate"] = rate
            conf["flow_dist_factor"] = factor
            conf["output_dir"] = base_conf["output_dir"] + "_%f_%d" % (factor, rate)
            multi_run(conf, args.runs, args.generate_traffic, args.compute_load,
                      args.skip_mwm, args.skip_centralized, args.skip_distributed, args.del_dir)
            if args.max_forks:
                exit(0)

    logger.info("waiting last forks %s", str(forks))
    poll_forks(forks, 0)
    logger.info("end.")


if __name__ == "__main__":
    main()
back to top