https://bitbucket.org/NetaRS/sched_analytics
Raw File
Tip revision: ed1f2acca39de9eb5f34a6cb5b0c8db1492f74f2 authored by NetaRS on 12 December 2020, 09:53:39 UTC
bounded traffic distributed
Tip revision: ed1f2ac
traffic_gen.py
import random
import math
from scipy.stats import rv_continuous
import sys
import numpy
import threading
from os import path
import json
from matrices import MatrixList, make_zero_matrix
import argparse
from bisect import bisect_left
from compute_tp import SeriesStats

# Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime). 
# It goes host by host under the rack and generate its traffic.  If there are x active flow simultanously they will share bandwidth
# equally until one is completed. 


class HULL_Dist0(rv_continuous):
    "Flow distribution"
    BASE_HULL_AVERAGE = 100000  # in bytes
    
    def __init__(self, factor=1):
        super(HULL_Dist0, self).__init__()
        self.factor = factor
        self.average = HULL_Dist.BASE_HULL_AVERAGE * factor

    def _cdf(self, x):
        return 1 - (4761.904 * self.factor / x)**1.05


class HULL_Dist(object):
    "Flow distribution"
    BASE_HULL_AVERAGE = 100000  # in bytes
    BASE_HULL_CONSTANT = 100000.0 / 21
    
    def __init__(self, factor=1):
        self.factor = factor
        self.average = HULL_Dist.BASE_HULL_AVERAGE * factor
        self.constant = HULL_Dist.BASE_HULL_CONSTANT * factor

    def rvs(self):
        u = random.random()
        val = self.constant/u**(1/1.05)
        return val
  
    
def cdf_to_expected_value_smooth(cdf):
    y0 = 0
    x0 = 0
    ev = 0
    for x, y in cdf:
        ev += 0.5 * (y - y0) * (x + x0)
        y0 = y
        x0 = x
    return ev


def cdf_to_expected_value_discrete(cdf):
    y0 = 0
    x0 = 0
    ev = 0
    for x, y in cdf:
        ev += 0.5 * (y - y0) * x
        y0 = y
        x0 = x
    return ev


def read_cdf(file_name):
    return [map(float, line.strip().split()) for line in open(file_name).readlines()]


class RVContinuesDistByArray:
    def __init__(self, cdf_array, factor=1):
        self.cdf_array = cdf_array
        self.average = cdf_to_expected_value_smooth(cdf_array) * factor
        self.ys = [t[1] for t in self.cdf_array]
        self.factor = factor
        
    def rvs(self):
        u = random.random()
        i = bisect_left(self.ys, u)
        x1, y1 = self.cdf_array[i]
        x0, y0 = self.cdf_array[i-1]
        val = (u-y0) / (y1-y0) * (x1-x0) + x0
        return self.factor * val

    @staticmethod
    def from_file(file_name, factor=1):
        return RVContinuesDistByArray(read_cdf(file_name), factor)


def nextTime(rateParameter):    # rateParameter = 1/lambda_time
    return -math.log(1.0 - random.random()) / rateParameter


lambda_time0 = 5208.3  # in micro
# print lambda_time
rate = 250  # in bits per microsecond
number_of_hosts = 10
number_of_hosts_per_rack = 10
flow_size_stats = SeriesStats()


def get_flow_size_dist(flow_dist_name="HULL", flow_dist_factor=1, **kwargs):
    global flow_size_stats
    flow_size_stats = SeriesStats()
    if flow_dist_name == "HULL":
        return HULL_Dist(flow_dist_factor)
    else:
        cdf_file_name = flow_dist_name+"_CDF"
        return RVContinuesDistByArray.from_file(cdf_file_name, flow_dist_factor)
        

def get_flow_size(dist):  # in bits
    size = int(dist.rvs()) * 8   # convert from bytes to bits
    #if size > 8 * (3 * 10**7):   # fixing the maximal size of the flow as 30MB
    #    size = 8 * (3 * 10**7)
    flow_size_stats.add(size)
    return size


def generate_tor_traffic(
                        tor,
                        n_tors,
                        n_hosts_per_tor,
                        n_host_peers,
                        n_milis,
                        lambda_time,
                        max_rate,
                        flow_size_dist):
    mili_vectors = make_zero_matrix(n_milis, n_tors)

    for host in range(n_hosts_per_tor):
        flow_start_times = []
        last = 0

        # t in micro
        while last < n_milis * 1000:                # convert n_milis to micro seconds
            delta = int(nextTime(1 / lambda_time))  # lambda_time - the arrival frequency
            last += delta
            flow_start_times.append(last)           # in micro seconds

        peers = []
        for x in range(n_host_peers): #choose the hosts to communicate with
            t = random.randint(0, (n_tors-1)*n_hosts_per_tor - 1)
            # t is host
            if t >= tor * n_hosts_per_tor:
                t += n_hosts_per_tor #skipping the rack's host
            peers.append(t)

        hosts_per_flow = []
        for x in range(len(flow_start_times)):# assigning hosts to flows
            y = random.randint(0, n_host_peers - 1)
            hosts_per_flow.append(peers[y])

        flow_sizes = []
        for x in range(len(flow_start_times)):# assigning size to flows
            flow_sizes.append(get_flow_size(flow_size_dist))          # in bits

        micro_vectors = {}
        mili_host_vectors = {}
        for tt in range(n_milis * 10):                  # work in granularity of 100 micro sec
            t = tt * 100                                # convert to micro seconds
            micro_vector = [0 for x in range(n_tors)]
            active_flows = set()
            for x in range(len(flow_start_times)):
                if flow_start_times[x] < t:             # adding to "active_flows" all the flows that should be sent in this second
                    if flow_sizes[x] > 0:
                        active_flows.add(x)
                else:
                    break
            num_active = len(active_flows)              # the number of flows in the current second
            if num_active > 0:
                dec = (100 * max_rate) / num_active         # dec = traffic per active flow in 100 micro sec.
            else:
                dec = 0                                 # if there are no flows in these 100 micro second - then there is no traffic
            for flow in active_flows:                   # accumulate the flow sizes per second
                dst_peer = hosts_per_flow[flow]
                dst_tor = dst_peer // 10
                if flow_sizes[flow] >= dec:             # if the remaining flow size is higher than allowed
                    micro_vector[dst_tor] += dec     # add dec to relevant tor (tor of flow peer)
                    flow_sizes[flow] -= dec
                else:
                    micro_vector[dst_tor] += flow_sizes[flow]
                    flow_sizes[flow] = 0
            micro_vectors[t] = micro_vector

        for t in range(n_milis):                        # send the per micro second flows
            for micro in range(10):
                for hostt in range(n_tors):
                    mili_vectors[t][hostt] += micro_vectors[t *
                                                            1000 + micro * 100][hostt]

        print "done host " + str(host) + " at rack " + str(tor)
    return mili_vectors


def generate_network_traffic0(n_tors=80,
                             n_hosts_per_tor=10,
                             n_host_peers=10,
                             n_milis=5000,
                             lambda_time=5208.3,  # in micro
                             rate=250,  # in bits per microsecond
                             output_dir=".",
                             **kwargs):
    output_pattern = path.join(output_dir, "matrix_tor_%d")
    matrix_list = MatrixList(output_pattern)
    for i in range(n_tors):
        print "tor:", i
        mili_vectors = generate_tor_traffic(i,
                                            n_tors=n_tors,
                                            n_hosts_per_tor=n_hosts_per_tor,
                                            n_host_peers=n_host_peers,
                                            n_milis=n_milis,
                                            lambda_time=lambda_time,
                                            max_rate=rate)
        matrix_list[i] = mili_vectors


def generate_network_traffic(n_tors=80,
                             n_hosts_per_tor=10,
                             n_host_peers=10,
                             n_milis=5000,
                             rate=250,  # average rate in bits per microsecond
                             max_rate=1000,  # in bits per microsecond
                             output_dir=".",
                             flow_dist_name="HULL",
                             **kwargs):
    flow_size_dist = get_flow_size_dist(flow_dist_name, **kwargs)
    lambda_time = flow_size_dist.average * 8.0 / rate  # in micro secs
    output_pattern = path.join(output_dir, "matrix_tor_%d")
    matrix_list = MatrixList(output_pattern)
    for i in range(n_tors):
        print "tor:", i
        mili_vectors = generate_tor_traffic(i,
                                            n_tors=n_tors,
                                            n_hosts_per_tor=n_hosts_per_tor,
                                            n_host_peers=n_host_peers,
                                            n_milis=n_milis,
                                            lambda_time=lambda_time,
                                            max_rate=max_rate,
                                            flow_size_dist=flow_size_dist)
        matrix_list[i] = mili_vectors
    avg = flow_size_stats.get_avg()
    var = flow_size_stats.get_var()
    json.dump(
        dict(avg=avg, var=var),
        open(path.join(output_dir, "flow_size_stats.json"), "w"), indent=True, sort_keys=True)
    return avg, var


def per_tor_to_per_mili(n_tors=80, n_milis=5000, output_dir=".", **kwargs):
    per_tor_pattern = path.join(output_dir, "matrix_tor_%d")
    per_tor_matrix_list = MatrixList(per_tor_pattern, lazy_read=True)
    per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
    per_mili_matrix_list = MatrixList(per_mili_pattern)
    
    per_tor_iterators = [per_tor_matrix_list[i] for i in range(n_tors)]
    
    for t in range(n_milis):
        mili_matrix = [per_tor_iterators[i].next() for i in range(n_tors)]
        per_mili_matrix_list[t] = mili_matrix
    

def main():
    parser = argparse.ArgumentParser(description='Generate random traffic.')
    parser.add_argument('--conf', default="conf.json", type=open,
                        help='configuration file (default: conf.json)')
    
    args = parser.parse_args()
    conf = json.load(args.conf)
    
    generate_network_traffic(**conf)
    per_tor_to_per_mili(**conf)


if __name__ == "__main__":
    main()
back to top