https://gitlab.com/migvasc/lowcarboncloud/
Tip revision: cb0d575f26877718aa9f68595ec5019ee65696ca authored by Miguel Felipe Silva Vasconcelos on 06 February 2023, 15:00:16 UTC
updating fix versions
updating fix versions
Tip revision: cb0d575
low_carbon_cloud.py
from pulp import *
import time
import argparse
import os
from util import Util
#### Util functions
class LowCarbonCloudLP:
def __init__(self,name):#, name, timeslots, DCs, workload, irradiance, C, pv_co2, grid_co2, pIdleDC, pNetIntraDC, PUE, pCore, bat_co2, eta_ch, eta_dch, eta_pv):
parser = argparse.ArgumentParser(description='Low-carbon cloud sizing tool')
parser.add_argument('--input_file', metavar = 'input_file', type=str,
help = 'filename with the inputs')
parser.add_argument('--use_gurobi', help = 'use the gurobi solver', action='store_true')
parser.add_argument('--only_write_lp_file', help = 'Write the .lp file to be used with an external solver', action='store_true')
parser.add_argument('--write_sol_file', help = 'Write the variables and their values to a .csv file', action='store_true')
res = parser.parse_args()
self.args = res
self.use_gurobi = res.use_gurobi
self.only_write_lp_file = res.only_write_lp_file
self.input_file = res.input_file
if self.input_file is None or self.input_file == '':
raise RuntimeError('Need to pass a input file!')
if not os.path.exists(self.input_file):
raise RuntimeError('Input file must exist!')
inputs = Util.load_inputs(self.input_file)
self.name = name
self.timeslots = inputs['timeslots']
self.DCs = inputs['DCs']
self.workload = inputs['workload']
self.irradiance = inputs['irradiance']
self.C = inputs['C']
self.pv_co2 = inputs['pv_co2']
self.grid_co2 = inputs['grid_co2']
self.pIdleDC = inputs['pIdleDC']
self.pNetIntraDC = inputs['pNetIntraDC']
self.PUE = inputs['PUE']
self.pCore = inputs['pCore']
self.bat_co2= inputs['bat_co2']
self.eta_ch = inputs['eta_ch']
self.eta_dch= inputs['eta_dch']
self.eta_pv =inputs['eta_pv']
### Variables
self.A = LpVariable.dicts("A",self.DCs, 0, cat='Continuous') # PV panels area (m²)
self.Bat = LpVariable.dicts("Bat",self.DCs, 0, cat='Continuous') # Battery capacity in Wh
self.B = LpVariable.dicts('B_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous') # Level of energy
self.Pdch = LpVariable.dicts('Pdch_', (self.DCs,self.timeslots),lowBound=0,cat='Continuous') # Power to discharge (W)
self.Pch = LpVariable.dicts('Pch_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous') # Power to charge (W)
self.w = LpVariable.dicts('w_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous') # workload to be sent to each DC
self.Pgrid = LpVariable.dicts('Pgrid_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous') # Green power surplus sold back to the grid
### Auxiliary functions
def getDCPowerConsumption(self,d,k):
if(k ==0):
return 0
return self.PUE[d] * (self.pNetIntraDC[d]+ self.pIdleDC[d] + self.w[d][k] * self.pCore ) * 1/1000
def FPgrid(self,d,k):
return self.grid_co2[d] * self.Pgrid[d][k]
def FPpv(self,d,k):
return self.Pre(d,k) * self.pv_co2[d]
def FPbat(self,d):
return self.Bat[d] * self.bat_co2
def P(self,d,k):
return self.getDCPowerConsumption(d,k)
def Pre(self,d,k):
return (self.A[d] * self.irradiance[d][k] * self.eta_pv) / 1000 # convert to kw
def use_original_objective_function(self,prob):
prob += lpSum(
[ self.FPgrid(d,k) + self.FPpv(d,k) for k in self.timeslots ] + self.FPbat(d) for d in self.DCs)
return prob
def use_Pdch_obj_function(self,prob):
prob += lpSum(
[ self.FPgrid(d,k) + self.FPpv(d,k) + self.Pdch[d][k]*self.pv_co2[d]*0.0000000001 for k in self.timeslots ] + self.FPbat(d) for d in self.DCs)
return prob
def build_lp(self):
prob = LpProblem(self.name, LpMinimize)
for d in self.DCs:
prob.addConstraint( self.Pch[d][0] == 0.0 )
prob.addConstraint( self.Pdch[d][0] == 0.0 )
for d in self.DCs:
for k in self.timeslots[1:] :
prob.addConstraint( self.B[d][k] == self.B[d][k-1] + self.Pch[d][k]*self.eta_ch - self.Pdch[d][k]*self.eta_dch )
prob.addConstraint( self.Pch[d][k] * self.eta_ch <= 0.8 * self.Bat[d] - self.B[d][k-1] ) # To ensure that only charge/discharge if
prob.addConstraint( self.Pdch[d][k] * self.eta_dch <= self.B[d][k-1] - 0.2 * self.Bat[d] ) # battery capacity >= 0
for d in self.DCs:
for k in self.timeslots :
prob.addConstraint( self.P(d,k) <= self.Pgrid[d][k] + self.Pre(d,k) + self.Pdch[d][k] - self.Pch[d][k] )
prob.addConstraint( self.FPgrid(d,k) >= 0 )
prob.addConstraint( self.Pre(d,k) >= self.Pch[d][k] )
prob.addConstraint( self.B[d][k] >= 0.2 * self.Bat[d] )
prob.addConstraint( self.B[d][k] <= 0.8 * self.Bat[d] )
prob.addConstraint( self.w[d][k]<=self.C[d])
for k in self.timeslots:
prob += lpSum([ self.w[d][k] for d in self.DCs]) == self.workload[k]
return prob
def write_sol_file(filename,dict_variables):
if not os.path.exists('results/'+filename):
os.mkdir('results/'+filename)
result_file = open(f'results/{filename}/solution.csv', 'w')
for d in dict_variables:
result_file.write(f'{d};{round(dict_variables[d],2)}\n')
result_file.close()
def write_summary(total_emissions,runtime,input_file,output_path):
result_file = open(output_path, 'w')
result_file.write('input_file;total_emissions;runtime\n')
result_file.write(f'{input_file};{total_emissions};{runtime}\n')
result_file.close()
def main():
lpModel = LowCarbonCloudLP("Low_carbon_cloud")
lpProb = lpModel.build_lp()
lpProb = lpModel.use_Pdch_obj_function(lpProb)
### The problem data is written to an .lp file
if(lpModel.only_write_lp_file):
lpProb.writeLP(lpModel.input_file.replace('.json','') + '.lp')
return
start = time.time()
solver = PULP_CBC_CMD()
if(lpModel.use_gurobi):
solver = GUROBI_CMD()
lpProb.solve(solver)
#### The status of the solution is printed to the screen
print("Status:", LpStatus[lpProb.status])
#### The optimised objective function value is printed to the screen
emissions = round(value(lpProb.objective),2) # here we have the value in gramms
emissions = emissions / 1000000 # here we have the value in tons
print("Total emissions of the cloud = ", emissions)
end = time.time()
runtime = round(end - start,2)
print(f"Executed in {end - start} s")
variables = Util.extract_dict_variables(lpProb)
file_name = lpModel.input_file
folder_name = file_name.replace('.json','').replace('input/', '')
write_sol_file(folder_name,variables)
write_summary(emissions,runtime,file_name, f'results/{folder_name}/summary_results.csv')
if __name__ == '__main__':
main()