import sys
from os import path
import json
import random
import argparse
from matrices import MatrixList, add_with_matrix, make_zero_matrix, subtract_with_matrix, multiply_with_scalar
from mwm import DictList
from collections import defaultdict
from copy import deepcopy
# import pdb; pdb.set_trace()
# from pudb import set_trace; set_trace()
from numpy import inf
# This script runs our distributed algorithm, which in turn is based on iSLIP
from compute_tp import SeriesStats
DIR = "outputs/run0/"
DEBUG_ITERATION = False
# computing timeslot [x,x+1]
# filename contains traffic for [x,x+1] --> loaded to traffic
# current_matching is the matching used in x
# prev_traffic corresponds to timeslots [x-1,x]
# mwm_matching is the last computed mwm, its per millisecond weights are in weight_mwm
def update_distributed_pairing(last_pairing, window_weight, centralized_pairing, centralized_weight, threshold, iterations, n_tors=80):
def window_pair_weight(x, y):
return window_weight[x][y] + window_weight[y][x]
def connect(x, y):
if last_pairing[x] == y:
return
if last_pairing[x] != -1:
last_pairing[last_pairing[x]] = -1
if last_pairing[y] != -1:
last_pairing[last_pairing[y]] = -1
last_pairing[x] = y
last_pairing[y] = x
requests = []
def request_connect(x, y):
requests.append((x, y))
def process_requests():
random.shuffle(requests)
updated = set()
for (x , y) in requests:
if x in updated or y in updated:
continue
connect(x, y)
updated.add(x)
updated.add(y)
# This function is being read between each two MWM computations
for iteration in range(iterations):
for tor in range(n_tors):
cur_peer = last_pairing[tor] # get the last (distributed) connection of the specific ToR
if cur_peer == -1:
cur_traffic = 0
elif cur_peer == centralized_pairing[tor]:
cur_traffic = window_pair_weight(tor, cur_peer)
if cur_traffic >= threshold * centralized_weight[tor]:
continue
else:
cur_traffic = window_pair_weight(tor, cur_peer)
ivector = {}
for j in range(n_tors): # going over all the other ToRs
bi_traffic = window_pair_weight(tor, j) # compute their weights to and from the specific ToR
if bi_traffic > cur_traffic:
ivector[j] = bi_traffic
# if DEBUG_ITERATION: # Candidates to replace the traffic
# print sorted(ivector.keys(), key = ivector.get, reverse=True)
for k in sorted(ivector.keys(), key=ivector.get,
reverse=True): # going from the maximal to the lowest weight ToR
dk = last_pairing[k] # checking the (distributed) ToR peer of the selected ToR
if dk == -1:
# if the ToR we wanto to connect to does noot have any distributed connection, we can connect with it (can be switched order with the last "if")
request_connect(tor, k)
break
if centralized_pairing[k] == dk and window_pair_weight(k, dk) >= threshold * centralized_weight[k]:
# we check it tf the current connectuion is made by the centrelized - if so - we must check the threshold
# if threshold == 0 - we won't break the cenrelized
continue
if window_pair_weight(k, dk) < window_pair_weight(k, tor):
# if the ToR we wanto to connect to does noot have any distributed connection, we can connect with it (can be switched order with the last "if")
request_connect(tor, k)
break
process_requests()
def update_distributed_pairing_ex(last_pairing, window_weight, centralized_pairing, centralized_weight, threshold,
iterations, n_tors=80, max_degree=1, use_max_peer_cent_weights=False, **kwargs):
max_peer_centralized_weight = {
u: max([centralized_weight[u][v] for v in centralized_pairing[u]] + [0]) for u in centralized_pairing
}
def window_pair_weight(x, y):
return window_weight[x][y] + window_weight[y][x]
def is_connected(x, y):
if x not in last_pairing:
return False
return y in last_pairing[x]
def is_protected(x, y):
if not is_connected(x, y):
return False
if x not in centralized_pairing:
return False
if y not in centralized_pairing[x]:
return False
weight = window_pair_weight(x, y)
if use_max_peer_cent_weights:
return weight >= threshold * max([max_peer_centralized_weight[x], max_peer_centralized_weight[y]])
return weight >= threshold * centralized_weight[x][y]
def get_minimal_unprotected_peer(x):
if len(last_pairing[x]) < max_degree:
return None, 0
min_peer = None
min_weight = inf
for peer in last_pairing[x]:
if is_protected(x, peer):
continue
peer_weight = window_pair_weight(x, peer)
if min_peer is None or peer_weight < min_weight:
min_peer = peer
min_weight = peer_weight
return min_peer, min_weight
def connect(x, y):
assert x != y
if y in last_pairing[x]:
assert x in last_pairing[y]
return
xd, xdw = get_minimal_unprotected_peer(x)
yd, ydw = get_minimal_unprotected_peer(y)
weight = window_pair_weight(x, y)
if weight < xdw or weight < ydw:
return
if xd is not None:
last_pairing[xd].remove(x)
last_pairing[x].remove(xd)
if yd is not None:
last_pairing[yd].remove(y)
last_pairing[y].remove(yd)
last_pairing[x].add(y)
last_pairing[y].add(x)
requests = []
def request_connect(x, y):
requests.append((x, y))
def process_requests():
random.shuffle(requests)
for (x, y) in requests:
connect(x, y)
# This function is being read between each two MWM computations
for iteration in range(iterations):
for tor in range(n_tors):
min_peer, min_weight = get_minimal_unprotected_peer(tor)
ivector = {}
free_degree = max_degree
for j in range(n_tors): # going over all the other ToRs
if j == tor:
continue
#if j in last_pairing[tor]:
# free_degree -= 1
# continue
if is_protected(tor, j):
free_degree -= 1
continue
peer_weight = window_pair_weight(tor, j)
if peer_weight == 0 or peer_weight < min_weight:
continue
_, peer_min_weight = get_minimal_unprotected_peer(j)
if peer_weight <= peer_min_weight:
continue
ivector[j] = peer_weight
for peer in sorted(ivector.keys(), key=ivector.get,
reverse=True)[:free_degree]: # going from the maximal to the lowest weight ToR
request_connect(tor, peer)
process_requests()
def update_distributed_pairing_ex_bounded(last_pairing, window_weight, centralized_pairing, centralized_weight,
threshold, iterations, n_tors=80, max_degree=1,
use_max_peer_cent_weights=False, max_reqs=None, **kwargs):
if max_reqs is None:
return update_distributed_pairing_ex(last_pairing, window_weight, centralized_pairing, centralized_weight,
threshold, iterations, n_tors=n_tors, max_degree=max_degree,
use_max_peer_cent_weights=use_max_peer_cent_weights, **kwargs)
max_peer_centralized_weight = {
u: max([centralized_weight[u][v] for v in centralized_pairing[u]] + [0]) for u in centralized_pairing
}
def window_pair_weight(x, y):
return window_weight[x][y] + window_weight[y][x]
def is_connected(x, y):
if x not in last_pairing:
return False
return y in last_pairing[x]
def is_protected(x, y):
if not is_connected(x, y):
return False
if x not in centralized_pairing:
return False
if y not in centralized_pairing[x]:
return False
weight = window_pair_weight(x, y)
if use_max_peer_cent_weights:
return weight >= threshold * max([max_peer_centralized_weight[x], max_peer_centralized_weight[y]])
return weight >= threshold * centralized_weight[x][y]
def get_minimal_unprotected_peer(x):
if len(last_pairing[x]) < max_degree:
return None, 0
min_peer = None
min_weight = inf
for peer in last_pairing[x]:
if is_protected(x, peer):
continue
peer_weight = window_pair_weight(x, peer)
if min_peer is None or peer_weight < min_weight:
min_peer = peer
min_weight = peer_weight
return min_peer, min_weight
def disconnect(x, y):
if x is None or y is None:
return
if y not in last_pairing[x]:
return
last_pairing[y].remove(x)
last_pairing[x].remove(y)
def simple_connect(x, y):
last_pairing[x].add(y)
last_pairing[y].add(x)
assert len(last_pairing[x]) <= max_degree
assert len(last_pairing[y]) <= max_degree
def disconnect_unprotected(x):
dsts = list(last_pairing[x])
for y in dsts:
if not is_protected(x, y):
disconnect(x, y)
def connect(x, y):
assert x != y
if y in last_pairing[x]:
assert x in last_pairing[y]
return
xd, xdw = get_minimal_unprotected_peer(x)
yd, ydw = get_minimal_unprotected_peer(y)
weight = window_pair_weight(x, y)
if weight < xdw or weight < ydw:
return
if xd is not None:
last_pairing[xd].remove(x)
last_pairing[x].remove(xd)
if yd is not None:
last_pairing[yd].remove(y)
last_pairing[y].remove(yd)
last_pairing[x].add(y)
last_pairing[y].add(x)
requests = []
def request_connect(x, y):
requests.append((x, y))
def process_requests():
random.shuffle(requests)
for (x, y) in requests:
connect(x, y)
# This function is being read between each two MWM computations
for iteration in range(iterations):
requested_peers = defaultdict(set)
free_degrees = {}
for tor in range(n_tors):
min_peer, min_weight = get_minimal_unprotected_peer(tor)
free_degree = max_degree
peer_weights = {}
for j in range(n_tors): # going over all the other ToRs
if j == tor:
continue
# if j in last_pairing[tor]:
# free_degree -= 1
# continue
if is_protected(tor, j):
free_degree -= 1
continue
if is_connected(tor, j): # new fix
disconnect(tor, j)
peer_weight = window_pair_weight(tor, j)
if peer_weight == 0 or peer_weight < min_weight:
continue
peer_weights[j] = peer_weight
free_degrees[tor] = free_degree
potential_peers = sorted(peer_weights.keys(), key=lambda j: peer_weights[j], reverse=True)
requested_peers[tor] = set(potential_peers[:max_reqs])
grunted_peers = defaultdict(set)
for tor in range(n_tors):
ivector = {}
free_degree = free_degrees[tor]
for j in requested_peers[tor]:
if tor in requested_peers[j]:
peer_weight = window_pair_weight(tor, j)
ivector[j] = peer_weight
grunted_peers[tor] = set(sorted(ivector.keys(), key=ivector.get, reverse=True)[:free_degree])
for tor in range(n_tors):
for j in grunted_peers[tor]: # going from the maximal to the lowest weight ToR
if tor in grunted_peers[j]:
#request_connect(tor, j)
#connect(tor, j)
simple_connect(tor, j) # new fix
#process_requests()
def compare_matchings(a, b):
# print "comparing "
# print a
# print b
for i in range(80):
if a[i] != b[i]:
print str(i) + " " + str(a[i]) + "->" + str(b[i])
def verify_matching(matching):
prev = -1
for a in sorted(matching.values()):
if a != -1 and a == prev:
return False
prev = a
return True
class AllZeroRow(object):
def __getitem__(self, item):
return 0
class AllZerosMatrix(object):
zero_row = AllZeroRow()
def __getitem__(self, item):
return self.zero_row
zero_matrix = AllZerosMatrix()
class TrafficWindow:
def __init__(self, length, delay=0):
self.length = length
self.delay = delay
self.sum = None
self.pending = []
self.matrices = []
def add(self, matrix):
if self.delay:
self.pending.append(matrix)
if len(self.pending) > self.delay:
matrix = self.pending.pop(0)
else:
return
if len(self.matrices) == self.length:
out = self.matrices.pop(0)
subtract_with_matrix(self.sum, out, 1.0 / self.length)
if self.sum is None:
self.sum = multiply_with_scalar(matrix, 1.0 / self.length)
else:
add_with_matrix(self.sum, matrix, 1.0 / self.length)
self.matrices.append(matrix)
def get_sum(self):
if self.sum is None:
return zero_matrix
return self.sum
def pairs_dif(new_pairs, old_pairs, dif_stats):
total = len(new_pairs)
change = len(new_pairs ^ old_pairs)
dif_stats["totals"].add(total)
dif_stats["changes"].add(change)
def pairing_to_pairs(pairing):
return {tuple(sorted([k, pairing[k]])) for k in pairing if pairing[k] != -1}
def compute_dist_only_throughput(window=1, iterations=3, win_delay=0,
n_milis=5000, n_tors=80, output_dir=".", **kwargs):
per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
per_mili_matrix_list = MatrixList(per_mili_pattern)
tps = []
total_tp = 0
trafic_window = TrafficWindow(window, win_delay) # initialize the time period backward from which the distributed is computed
distributed_pairing = defaultdict(lambda: -1) # initialize the current pairs
centralized_matches = dict(pairing=defaultdict(lambda: -1), weights=defaultdict(int))
dif_stats = dict(totals=SeriesStats(), changes=SeriesStats())
old_pairs = pairing_to_pairs(distributed_pairing)
for t in range(n_milis):
print "\r", t, "/", n_milis,
matrix = list(per_mili_matrix_list[t])
if t != 0:
update_distributed_pairing(
last_pairing=distributed_pairing,
window_weight=trafic_window.get_sum(),
centralized_pairing=centralized_matches["pairing"],
centralized_weight=centralized_matches["weights"], # doesnt matter
threshold=1, # doesnt matter
iterations=iterations,
n_tors=n_tors
)
new_pairs = pairing_to_pairs(distributed_pairing)
pairs_dif(new_pairs, old_pairs, dif_stats)
old_pairs = new_pairs
tp = sum([matrix[x][distributed_pairing[x]] for x in distributed_pairing if
distributed_pairing[x] != -1]) # update the total throuhput of every pair
tps.append(tp)
total_tp += tp
trafic_window.add(matrix) # update the traffic window with the current traffic
return tps, total_tp, dif_stats
def pairing_to_pairs_ex(pairing):
return {tuple(sorted([k, j])) for k in pairing for j in pairing[k] if j != -1}
def compute_dist_only_throughput_ex(window=1, iterations=3, win_delay=0,
n_milis=5000, n_tors=80, output_dir=".", **kwargs):
if kwargs.get("max_degree", 1) == 1:
return compute_dist_only_throughput(window=window, iterations=iterations, win_delay=win_delay,
n_milis=n_milis, n_tors=n_tors, output_dir=output_dir, **kwargs)
per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
per_mili_matrix_list = MatrixList(per_mili_pattern)
tps = []
total_tp = 0
trafic_window = TrafficWindow(window,
win_delay) # initialize the time period backward from which the distributed is computed
distributed_pairing = defaultdict(set) # initialize the current pairs
centralized_matches = dict(links=defaultdict(set), weights=None)
dif_stats = dict(totals=SeriesStats(), changes=SeriesStats())
old_pairs = pairing_to_pairs_ex(distributed_pairing)
for t in range(n_milis):
print "\r", t, "/", n_milis,
matrix = list(per_mili_matrix_list[t])
if t != 0:
update_distributed_pairing_ex_bounded(
last_pairing=distributed_pairing,
window_weight=trafic_window.get_sum(),
centralized_pairing=centralized_matches["links"],
centralized_weight=centralized_matches["weights"], # doesnt matter
iterations=iterations,
n_tors=n_tors,
**kwargs
)
new_pairs = pairing_to_pairs_ex(distributed_pairing)
pairs_dif(new_pairs, old_pairs, dif_stats)
old_pairs = new_pairs
tp = sum([matrix[x][y] for x in distributed_pairing for y in distributed_pairing[x]])
tps.append(tp)
total_tp += tp
trafic_window.add(matrix) # update the traffic window with the current traffic
return tps, total_tp, dif_stats
def compute_throughput(compute_epoch=1, agg_interval=1, agg_epoch_delay=0, top=None,
window=1, threshold=1, iterations=3, win_delay=0,
n_milis=5000, n_tors=80, output_dir=".", **kwargs):
print dict(compute_epoch=compute_epoch, agg_interval=agg_interval, agg_epoch_delay=agg_epoch_delay)
per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
per_mili_matrix_list = MatrixList(per_mili_pattern)
if top:
per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
else:
per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
per_mili_match_list = DictList(per_mili_match_pattern)
if top:
per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
else:
per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
per_interval_match_list = DictList(per_interval_match_pattern)
def get_centralized_matches(t):
end = t - t % compute_epoch - (agg_epoch_delay - 1) * compute_epoch # t - t % compute_epoch = the beging of current decission interval
# (agg_epoch_delay - 1) * compute_epoch = how many decissions interval we should go back to the MWM interval
# Therefore, t - t % compute_epoch - (agg_epoch_delay - 1) * compute_epoch = the end of interval to be considered by MWM
if end > n_milis:
end -= compute_epoch
start = end - agg_interval # The point where the WMW interval starts
print "start", start, "end", end
if start < 0:
mwm = []
#traffic = make_zero_matrix(n_tors, n_tors)
elif agg_interval == 1: # if the MWM is based on 1 millisec
mwm = per_mili_match_list[start] # reading the MWM of the given mili
traffic = list(per_mili_matrix_list[start]) # reading the traffic from given milli matrix
else:
mwm = per_interval_match_list[(agg_interval, start, end - 1)]
traffic = make_zero_matrix(n_tors, n_tors)
for tt in range(start, end): # every milisec the traffic is accumuilted for the thresh computation next
matrix = per_mili_matrix_list[tt]
add_with_matrix(traffic, matrix)
pairing = defaultdict(lambda : -1)
pairing.update({m[0]: m[1] for m in mwm})
pairing.update({m[1]: m[0] for m in mwm})
mwm_weight = defaultdict(int)
mwm_weight.update({x: (traffic[x][pairing[x]] + traffic[pairing[x]][x]) / agg_interval for x in pairing}) # for each pair, the traffic is updated
res = {}
res["pairing"] = pairing
res["weights"] = mwm_weight
#res["last_traffic"] = traffic
return res
tps = []
total_tp = 0
trafic_window = TrafficWindow(window, win_delay) # initialize the time period backward from which the distributed is computed
distributed_pairing = defaultdict(lambda : -1) # initialize the current pairs
dif_stats = dict(totals=SeriesStats(), changes=SeriesStats())
old_pairs = pairing_to_pairs(distributed_pairing)
for z in range(n_milis / compute_epoch): # the number of centrelized computations (the total time of the experimennts (in milisec)/ the amount of milisec takng into consideration per computation)
start = z*compute_epoch # the point from which the current computation is (stated) to be considered
centralized_matches = get_centralized_matches(start)
distributed_pairing = centralized_matches["pairing"].copy()
for t in range(z*compute_epoch, (z+1)*compute_epoch): # between each two MWM updates, the dist computation is taking place
print "\r", t, "/", n_milis,
matrix = list(per_mili_matrix_list[t])
if t != 0: # to prevent immediate change to centralized : z*compute_epoch
update_distributed_pairing(
distributed_pairing,
trafic_window.get_sum(),
centralized_matches["pairing"],
centralized_matches["weights"],
threshold,
iterations,
n_tors
)
new_pairs = pairing_to_pairs(distributed_pairing)
pairs_dif(new_pairs, old_pairs, dif_stats)
old_pairs = new_pairs
tp = sum([matrix[x][distributed_pairing[x]] for x in distributed_pairing if distributed_pairing[x] != -1]) #update the total throuhput of every pair
tps.append(tp)
total_tp += tp
trafic_window.add(matrix) # update the traffic window with the current traffic
return tps, total_tp, dif_stats
def compute_throughput_ex(compute_epoch=1, agg_interval=1, agg_epoch_delay=0, top=None,
window=1, threshold=1, iterations=3, win_delay=0,
n_milis=5000, n_tors=80, output_dir=".", **kwargs):
if kwargs.get("max_degree", 1) == 1:
return compute_throughput(compute_epoch=compute_epoch, agg_interval=agg_interval, agg_epoch_delay=agg_epoch_delay, top=top,
window=window, iterations=iterations, win_delay=win_delay, threshold=threshold,
n_milis=n_milis, n_tors=n_tors, output_dir=output_dir, **kwargs)
print dict(compute_epoch=compute_epoch, agg_interval=agg_interval, agg_epoch_delay=agg_epoch_delay)
per_mili_pattern = path.join(output_dir, "matrix_mili_%d")
per_mili_matrix_list = MatrixList(per_mili_pattern)
if top:
per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
else:
per_mili_match_pattern = path.join(output_dir, "mwm_mili_%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
per_mili_match_list = DictList(per_mili_match_pattern)
if top:
per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d_top_" + str(top)+("_deg_%d" % kwargs.get("max_degree", 1)))
else:
per_interval_match_pattern = path.join(output_dir, "mwm_agg_%d_%d-%d"+("_deg_%d" % kwargs.get("max_degree", 1)))
per_interval_match_list = DictList(per_interval_match_pattern)
def get_centralized_matches(t):
end = t - t % compute_epoch - (agg_epoch_delay - 1) * compute_epoch
# t - t % compute_epoch = the beging of current decission interval
# (agg_epoch_delay - 1) * compute_epoch = how many decissions interval we should go back to the MWM interval
# Therefore, t - t % compute_epoch - (agg_epoch_delay - 1) * compute_epoch = the end of interval to be considered by MWM
if end > n_milis:
end -= compute_epoch
start = end - agg_interval # The point where the WMW interval starts
print "start", start, "end", end
if start < 0:
#mwm = []
# traffic = make_zero_matrix(n_tors, n_tors)
return dict(links=defaultdict(set), weights=None)
elif agg_interval == 1: # if the MWM is based on 1 millisec
mwm = per_mili_match_list[start] # reading the MWM of the given mili
traffic = list(per_mili_matrix_list[start]) # reading the traffic from given milli matrix
else:
mwm = per_interval_match_list[(agg_interval, start, end - 1)]
traffic = make_zero_matrix(n_tors, n_tors)
for tt in range(start, end): # every milisec the traffic is accumuilted for the thresh computation next
matrix = per_mili_matrix_list[tt]
add_with_matrix(traffic, matrix)
links = {} #defaultdict(set)
mwm_weight = defaultdict(lambda: defaultdict(int))
for m in mwm:
x, y = m
if x not in links:
links[x] = set()
if y not in links:
links[y] = set()
links[x].add(y)
links[y].add(x)
weight = (traffic[x][y] + traffic[y][x]) / agg_interval
mwm_weight[x][y] = weight
mwm_weight[y][x] = weight
res = dict(links=links, weights=mwm_weight)
# res["last_traffic"] = traffic
return res
tps = []
total_tp = 0
trafic_window = TrafficWindow(window,
win_delay) # initialize the time period backward from which the distributed is computed
distributed_pairing = defaultdict(set) # initialize the current pairs
dif_stats = dict(totals=SeriesStats(), changes=SeriesStats())
old_pairs = pairing_to_pairs_ex(distributed_pairing)
for z in range(
n_milis / compute_epoch): # the number of centrelized computations (the total time of the experimennts (in milisec)/ the amount of milisec takng into consideration per computation)
start = z * compute_epoch # the point from which the current computation is (stated) to be considered
centralized_matches = get_centralized_matches(start)
distributed_pairing = defaultdict(set)
distributed_pairing.update({x: set(centralized_matches["links"][x]) for x in centralized_matches["links"]})
for t in range(z * compute_epoch,
(z + 1) * compute_epoch): # between each two MWM updates, the dist computation is taking place
print "\r", t, "/", n_milis,
matrix = list(per_mili_matrix_list[t])
if t != 0: # to prevent immediate change to centralized : z*compute_epoch
update_distributed_pairing_ex_bounded(
last_pairing=distributed_pairing,
window_weight=trafic_window.get_sum(),
centralized_pairing=centralized_matches["links"],
centralized_weight=centralized_matches["weights"],
threshold=threshold,
iterations=iterations,
n_tors=n_tors,
**kwargs
)
"""if distributed_pairing != centralized_matches["links"]:
dif = {}
for n in range(n_tors):
if distributed_pairing.get(n,set()) != centralized_matches["links"].get(n,set()):
in_dist = distributed_pairing[n] - centralized_matches["links"][n]
in_cent = centralized_matches["links"][n] - distributed_pairing[n]
dif[n] = (in_dist, in_cent)
print "deviation from centralized"
"""
new_pairs = pairing_to_pairs_ex(distributed_pairing)
pairs_dif(new_pairs, old_pairs, dif_stats)
old_pairs = new_pairs
tp = sum([matrix[u][v] for u in distributed_pairing for v in distributed_pairing[u]])
tps.append(tp)
total_tp += tp
trafic_window.add(matrix) # update the traffic window with the current traffic
return tps, total_tp, dif_stats
def write_results(tps, total_tp, dif_stats,
compute_epoch=1, agg_interval=1, agg_epoch_delay=0, top=None,
window=1, threshold=1, iterations=3, win_delay=0,
n_milis=5000, output_dir=".",
max_degree=1, total_load=0, run_id="", flow_avg=0, flow_var=0, **kwargs):
conf_name = "dist_delay" + str(agg_epoch_delay) + "_epoch"+str(compute_epoch) + "_agg"+str(agg_interval)
conf_name += "_win"+str(window) + "_t"+str(threshold) + "_i"+str(iterations) + "_wd"+str(win_delay)
test_file_path = path.join(output_dir, "res_"+str(n_milis)+"_"+conf_name+".json")
with open(test_file_path, "w") as test_res_file:
json.dump({"total_tp": total_tp, "tps": tps}, test_res_file)
all_res_file_path = path.join(output_dir, "res_"+str(n_milis)+".csv")
if not path.isfile(all_res_file_path):
with open(all_res_file_path, "w") as all_res_file:
row = ["mode", "total_tp", "n_milis", "compute_epoch", "agg_interval", "agg_epoch_delay", "top", "max_degree",
"links_avg", "links_var", "change_avg", "change_var", "total_load", "run_id", "flow_avg", "flow_var", "tps"]
all_res_file.write(",".join(map(str, row))+"\n")
with open(all_res_file_path, "a+") as all_res_file:
row = [conf_name, total_tp, n_milis, compute_epoch, agg_interval, agg_epoch_delay, top, max_degree,
dif_stats["totals"].get_avg(), dif_stats["totals"].get_var(),
dif_stats["changes"].get_avg(), dif_stats["changes"].get_var(),
total_load, run_id, flow_avg, flow_var] + tps
all_res_file.write(",".join(map(str, row))+"\n")
def main():
parser = argparse.ArgumentParser(
description="""Compute throughput for each mili [t,t+1) using MWM of interval [start, end) where
end = t - t%compute_epoch - (agg_epoch_delay-1)*compute_epoch, and
start = end - agg_interval.
""",
epilog="""
""")
parser.add_argument('--output_dir', default=".", type=str,
help='output directory file (default: conf.json)')
parser.add_argument('--conf', default="conf.json", type=open,
help='configuration file (default: conf.json)')
# centralized
parser.add_argument('--compute_epoch', default=1, type=int,
help='each compute interval length (default: 1 = mili)') # y
parser.add_argument('--agg_interval', default=1, type=int,
help='each aggregation interval length according to which the MWM is computed (default: 1 = mili)')#z
parser.add_argument('--agg_epoch_delay', default=0, type=int,
help='number of compute epoch delays before aggregation (default: 0 = offline)')
parser.add_argument('--threshold', default=1, type=int,
help='the rate precentage from which the distributed optimization is considered, -1 for distributed only (default: 0 = centrelized)')
parser.add_argument('--window', default=1, type=int,
help='the length of a sliding window that the algorithm looks into when deciding whether to keep or break an edge in the matching (default: 1 = mili)')
parser.add_argument('--iterations', default=3, type=int,
help='the number of distributed iteration in per mili decission (default: 3)')
args = parser.parse_args()
conf = json.load(args.conf)
random.seed(conf.get("seed", 1))
conf["compute_epoch"] = args.compute_epoch
conf["agg_interval"] = args.agg_interval
conf["agg_epoch_delay"] = args.agg_epoch_delay
conf["threshold"] = args.threshold
conf["window"] = args.window
conf["iterations"] = args.iterations
if conf["threshold"] == -1:
tps, total_tp = compute_dist_only_throughput_ex(**conf)
else:
tps, total_tp = compute_throughput_ex(**conf)
write_results(tps, total_tp, **conf)
if __name__ == "__main__":
main()