import random
import math
from scipy.stats import rv_continuous
import sys
import numpy
import threading
from os import path
import json
from matrices import MatrixList, make_zero_matrix
import argparse
from bisect import bisect_left
from compute_tp import SeriesStats
# Generate traffic rack by rack according ot the flow distribution (flow_dist and get_flow_size), and arrival process (nextTime).
# It goes host by host under the rack and generate its traffic. If there are x active flow simultanously they will share bandwidth
# equally until one is completed.
class HULL_Dist0(rv_continuous):
"Flow distribution"
BASE_HULL_AVERAGE = 100000 # in bytes
def __init__(self, factor=1):
super(HULL_Dist0, self).__init__()
self.factor = factor
self.average = HULL_Dist.BASE_HULL_AVERAGE * factor
def _cdf(self, x):
return 1 - (4761.904 * self.factor / x)**1.05
class HULL_Dist(object):
"Flow distribution"
BASE_HULL_AVERAGE = 100000 # in bytes
BASE_HULL_CONSTANT = 100000.0 / 21
def __init__(self, factor=1):
self.factor = factor
self.average = HULL_Dist.BASE_HULL_AVERAGE * factor
self.constant = HULL_Dist.BASE_HULL_CONSTANT * factor
def rvs(self):
u = random.random()
val = self.constant/u**(1/1.05)
return val
def cdf_to_expected_value_smooth(cdf):
y0 = 0
x0 = 0
ev = 0
for x, y in cdf:
ev += 0.5 * (y - y0) * (x + x0)
y0 = y
x0 = x
return ev
def cdf_to_expected_value_discrete(cdf):
y0 = 0
x0 = 0
ev = 0
for x, y in cdf:
ev += 0.5 * (y - y0) * x
y0 = y
x0 = x
return ev
def read_cdf(file_name):
return [map(float, line.strip().split()) for line in open(file_name).readlines()]
class RVContinuesDistByArray:
def __init__(self, cdf_array, factor=1):
self.cdf_array = cdf_array
self.average = cdf_to_expected_value_smooth(cdf_array) * factor
self.ys = [t[1] for t in self.cdf_array]
self.factor = factor
def rvs(self):
u = random.random()
i = bisect_left(self.ys, u)
x1, y1 = self.cdf_array[i]
x0, y0 = self.cdf_array[i-1]
val = (u-y0) / (y1-y0) * (x1-x0) + x0
return self.factor * val
@staticmethod
def from_file(file_name, factor=1):
return RVContinuesDistByArray(read_cdf(file_name), factor)
def nextTime(rateParameter): # rateParameter = 1/lambda_time
return -math.log(1.0 - random.random()) / rateParameter
lambda_time0 = 5208.3 # in micro
# print lambda_time
rate = 250 # in bits per microsecond
number_of_hosts = 10
number_of_hosts_per_rack = 10
flow_size_stats = SeriesStats()
def get_flow_size_dist(flow_dist_name="HULL", flow_dist_factor=1, **kwargs):
global flow_size_stats
flow_size_stats = SeriesStats()
if flow_dist_name == "HULL":
return HULL_Dist(flow_dist_factor)
else:
cdf_file_name = flow_dist_name+"_CDF"
return RVContinuesDistByArray.from_file(cdf_file_name, flow_dist_factor)
def get_flow_size(dist): # in bits
size = int(dist.rvs()) * 8 # convert from bytes to bits
#if size > 8 * (3 * 10**7): # fixing the maximal size of the flow as 30MB
# size = 8 * (3 * 10**7)
flow_size_stats.add(size)
return size
def generate_tor_traffic(
tor,
n_tors,
n_hosts_per_tor,
n_host_peers,
n_milis,
lambda_time,
max_rate,
flow_size_dist):
mili_vectors = make_zero_matrix(n_milis, n_tors)
for host in range(n_hosts_per_tor):
flow_start_times = []
last = 0
# t in micro
while last < n_milis * 1000: # convert n_milis to micro seconds
delta = int(nextTime(1 / lambda_time)) # lambda_time - the arrival frequency
last += delta
flow_start_times.append(last) # in micro seconds
peers = []
for x in range(n_host_peers): #choose the hosts to communicate with
t = random.randint(0, (n_tors-1)*n_hosts_per_tor - 1)
# t is host
if t >= tor * n_hosts_per_tor:
t += n_hosts_per_tor #skipping the rack's host
peers.append(t)
hosts_per_flow = []
for x in range(len(flow_start_times)):# assigning hosts to flows
y = random.randint(0, n_host_peers - 1)
hosts_per_flow.append(peers[y])
flow_sizes = []
for x in range(len(flow_start_times)):# assigning size to flows
flow_sizes.append(get_flow_size(flow_size_dist)) # in bits
micro_vectors = {}
mili_host_vectors = {}
for tt in range(n_milis * 10): # work in granularity of 100 micro sec
t = tt * 100 # convert to micro seconds
micro_vector = [0 for x in range(n_tors)]
active_flows = set()
for x in range(len(flow_start_times)):
if flow_start_times[x] < t: # adding to "active_flows" all the flows that should be sent in this second
if flow_sizes[x] > 0:
active_flows.add(x)
else:
break
num_active = len(active_flows) # the number of flows in the current second
if num_active > 0:
dec = (100 * max_rate) / num_active # dec = traffic per active flow in 100 micro sec.
else:
dec = 0 # if there are no flows in these 100 micro second - then there is no traffic
for flow in active_flows: # accumulate the flow sizes per second
dst_peer = hosts_per_flow[flow]
dst_tor = dst_peer // 10
if flow_sizes[flow] >= dec: # if the remaining flow size is higher than allowed
micro_vector[dst_tor] += dec # add dec to relevant tor (tor of flow peer)
flow_sizes[flow] -= dec
else:
micro_vector[dst_tor] += flow_sizes[flow]
flow_sizes[flow] = 0
micro_vectors[t] = micro_vector
for t in range(n_milis): # send the per micro second flows
for micro in range(10):
for hostt in range(n_tors):
mili_vectors[t][hostt] += micro_vectors[t *
1000 + micro * 100][hostt]
print "done host " + str(host) + " at rack " + str(tor)
return mili_vectors
def generate_network_traffic0(n_tors=80,
n_hosts_per_tor=10,
n_host_peers=10,
n_milis=5000,
lambda_time=5208.3, # in micro
rate=250, # in bits per microsecond
output_dir=".",
**kwargs):
output_pattern = path.join(output_dir, "matrix_tor_%d")
matrix_list = MatrixList(output_pattern)
for i in range(n_tors):
print "tor:", i
mili_vectors = generate_tor_traffic(i,
n_tors=n_tors,
n_hosts_per_tor=n_hosts_per_tor,
n_host_peers=n_host_peers,
n_milis=n_milis,
lambda_time=lambda_time,
max_rate=rate)
matrix_list[i] = mili_vectors
def generate_network_traffic(n_tors=80,
n_hosts_per_tor=10,
n_host_peers=10,
n_milis=5000,
rate=250, # average rate in bits per microsecond
max_rate=1000, # in bits per microsecond
output_dir=".",
flow_dist_name="HULL",
**kwargs):
flow_size_dist = get_flow_size_dist(flow_dist_name, **kwargs)
lambda_time = flow_size_dist.average * 8.0 / rate # in micro secs
output_pattern = path.join(output_dir, "matrix_tor_%d")
matrix_list = MatrixList(output_pattern)
for i in range(n_tors):
print "tor:", i
mili_vectors = generate_tor_traffic(i,
n_tors=n_tors,
n_hosts_per_tor=n_hosts_per_tor,
n_host_peers=n_host_peers,
n_milis=n_milis,
lambda_time=lambda_time,
max_rate=max_rate,
flow_size_dist=flow_size_dist)
matrix_list[i] = mili_vectors
avg = flow_size_stats.get_avg()
var = flow_size_stats.get_var()
json.dump(
dict(avg=avg, var=var),
open(path.join(output_dir, "flow_size_stats.json"), "w"), indent=True, sort_keys=True)
return avg, var
def per_tor_to_per_mili(n_tors=80, n_milis=5000, output_dir=".", **kwargs):
per_tor_pattern = path.join(output_dir, "matrix_tor_%d")
per_tor_matrix_list = MatrixList(per_tor_pattern, lazy_read=True)
per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
per_mili_matrix_list = MatrixList(per_mili_pattern)
per_tor_iterators = [per_tor_matrix_list[i] for i in range(n_tors)]
for t in range(n_milis):
mili_matrix = [per_tor_iterators[i].next() for i in range(n_tors)]
per_mili_matrix_list[t] = mili_matrix
def main():
parser = argparse.ArgumentParser(description='Generate random traffic.')
parser.add_argument('--conf', default="conf.json", type=open,
help='configuration file (default: conf.json)')
args = parser.parse_args()
conf = json.load(args.conf)
generate_network_traffic(**conf)
per_tor_to_per_mili(**conf)
if __name__ == "__main__":
main()