https://gitlab.com/migvasc/lowcarboncloud/
Raw File
Tip revision: 1c324b269dac1ace8b6a023e5570558c644a9cbb authored by migvasc on 09 February 2023, 12:29:43 UTC
Merge branch 'master' into 'main'
Tip revision: 1c324b2
low_carbon_cloud.py
from pulp import *
import time
import argparse
import os
from util import Util

#### Util functions

class LowCarbonCloudLP:
    
    def __init__(self,name):#, name, timeslots, DCs, workload, irradiance, C, pv_co2, grid_co2, pIdleDC, pNetIntraDC, PUE, pCore, bat_co2, eta_ch, eta_dch, eta_pv):
        parser = argparse.ArgumentParser(description='Low-carbon cloud sizing tool')
        parser.add_argument('--input_file', metavar = 'input_file', type=str,
                            help = 'filename with the inputs')
        parser.add_argument('--use_gurobi', help = 'use the gurobi solver', action='store_true')
        parser.add_argument('--only_write_lp_file', help = 'Write the .lp file to be used with an external solver', action='store_true')
        parser.add_argument('--write_sol_file', help = 'Write the variables and their values to a .csv file', action='store_true')
        res = parser.parse_args()  
        
        self.args = res
        self.use_gurobi = res.use_gurobi
        self.only_write_lp_file = res.only_write_lp_file
        self.input_file = res.input_file

        if self.input_file is None or self.input_file == '':
          raise RuntimeError('Need to pass a input file!')

        if not os.path.exists(self.input_file):
          raise RuntimeError('Input file must exist!')

        inputs = Util.load_inputs(self.input_file)    

        self.name = name
        self.timeslots = inputs['timeslots'] 
        self.DCs = inputs['DCs']
        self.workload = inputs['workload']
        self.irradiance = inputs['irradiance']
        self.C = inputs['C']
        self.pv_co2 =  inputs['pv_co2']
        self.grid_co2 = inputs['grid_co2']
        self.pIdleDC = inputs['pIdleDC']
        self.pNetIntraDC = inputs['pNetIntraDC']
        self.PUE = inputs['PUE']
        self.pCore = inputs['pCore']
        self.bat_co2= inputs['bat_co2']
        self.eta_ch = inputs['eta_ch']
        self.eta_dch= inputs['eta_dch']
        self.eta_pv =inputs['eta_pv']
        
        ### Variables    
        self.A   = LpVariable.dicts("A",self.DCs, 0,    cat='Continuous')                           # PV panels area (m²)
        self.Bat = LpVariable.dicts("Bat",self.DCs, 0, cat='Continuous')                            # Battery capacity in Wh
        self.B  = LpVariable.dicts('B_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous')     # Level of energy
        self.Pdch = LpVariable.dicts('Pdch_', (self.DCs,self.timeslots),lowBound=0,cat='Continuous') # Power to discharge (W)
        self.Pch = LpVariable.dicts('Pch_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous')  # Power to charge (W)
        self.w = LpVariable.dicts('w_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous')  # workload to be sent to each DC
        self.Pgrid = LpVariable.dicts('Pgrid_', (self.DCs,self.timeslots),lowBound=0, cat='Continuous')     # Green power surplus sold back to the grid
        
        

    ### Auxiliary functions
    def getDCPowerConsumption(self,d,k):
        if(k ==0):
            return 0
        return self.PUE[d] * (self.pNetIntraDC[d]+ self.pIdleDC[d]  + self.w[d][k] * self.pCore ) * 1/1000
    
    def FPgrid(self,d,k):
        return self.grid_co2[d] * self.Pgrid[d][k]

    def FPpv(self,d,k):
        return self.Pre(d,k) * self.pv_co2[d]

    def FPbat(self,d):
        return self.Bat[d] * self.bat_co2

    def P(self,d,k):
        return self.getDCPowerConsumption(d,k) 

    def Pre(self,d,k):
        return (self.A[d] * self.irradiance[d][k] * self.eta_pv) / 1000 # convert to kw

    def use_original_objective_function(self,prob):    
        prob +=  lpSum(
            [  self.FPgrid(d,k)  + self.FPpv(d,k)  for k in self.timeslots ]  + self.FPbat(d) for d in self.DCs)  
        return prob        
        
    def use_Pdch_obj_function(self,prob):    
        prob +=  lpSum(
            [  self.FPgrid(d,k)  + self.FPpv(d,k)  + self.Pdch[d][k]*self.pv_co2[d]*0.0000000001 for k in self.timeslots ]  + self.FPbat(d) for d in self.DCs)  
        return prob

    
    def build_lp(self):        
        prob = LpProblem(self.name, LpMinimize)

        for d in self.DCs:
            prob.addConstraint( self.Pch[d][0]   == 0.0 )
            prob.addConstraint( self.Pdch[d][0]   == 0.0 )

        for d in self.DCs:
            for k in self.timeslots[1:] :
                prob.addConstraint( self.B[d][k]  == self.B[d][k-1] + self.Pch[d][k]*self.eta_ch  - self.Pdch[d][k]*self.eta_dch )
                prob.addConstraint( self.Pch[d][k]  * self.eta_ch <= 0.8 * self.Bat[d] - self.B[d][k-1] )   # To ensure that only charge/discharge if
                prob.addConstraint( self.Pdch[d][k] * self.eta_dch <= self.B[d][k-1] -  0.2 * self.Bat[d] ) # battery capacity >= 0 


        for d in self.DCs:
            for k in self.timeslots :
                prob.addConstraint( self.P(d,k) <= self.Pgrid[d][k] + self.Pre(d,k) + self.Pdch[d][k] - self.Pch[d][k]  )
                prob.addConstraint( self.FPgrid(d,k) >= 0 )
                prob.addConstraint( self.Pre(d,k) >= self.Pch[d][k]   )
                prob.addConstraint( self.B[d][k]   >= 0.2 * self.Bat[d] )
                prob.addConstraint( self.B[d][k]   <= 0.8 * self.Bat[d] )
                prob.addConstraint( self.w[d][k]<=self.C[d])

        for k in self.timeslots:
                prob +=  lpSum([  self.w[d][k]   for d in self.DCs]) == self.workload[k]    
        
        return prob




def write_sol_file(filename,dict_variables):
    if not os.path.exists('results/'+filename):
        os.mkdir('results/'+filename)
    result_file = open(f'results/{filename}/solution.csv', 'w')    
    for d in dict_variables:                        
        result_file.write(f'{d};{round(dict_variables[d],2)}\n')                                                          
    result_file.close()

def write_summary(total_emissions,runtime,input_file,output_path):
    result_file = open(output_path, 'w')    
    result_file.write('input_file;total_emissions;runtime\n')
    result_file.write(f'{input_file};{total_emissions};{runtime}\n')
    result_file.close()


def main():
    lpModel = LowCarbonCloudLP("Low_carbon_cloud")
    lpProb = lpModel.build_lp()
    lpProb = lpModel.use_Pdch_obj_function(lpProb)

    ### The problem data is written to an .lp file
    if(lpModel.only_write_lp_file):
        lpProb.writeLP(lpModel.input_file.replace('.json','') + '.lp')
        return
 
    start = time.time()
    solver =  PULP_CBC_CMD()
 
    if(lpModel.use_gurobi):
       solver = GUROBI_CMD()
   
    lpProb.solve(solver)
    
    #### The status of the solution is printed to the screen
    print("Status:", LpStatus[lpProb.status])
 
    #### The optimised objective function value is printed to the screen
    emissions = round(value(lpProb.objective),2) # here we have the value in gramms 
    emissions = emissions / 1000000 # here we have the value in tons 

    print("Total emissions of the cloud = ", emissions)
    end = time.time()
    runtime = round(end - start,2)
    print(f"Executed in {end - start} s")

    variables = Util.extract_dict_variables(lpProb)

    file_name = lpModel.input_file
    folder_name = file_name.replace('.json','').replace('input/', '')

    write_sol_file(folder_name,variables)
    write_summary(emissions,runtime,file_name, f'results/{folder_name}/summary_results.csv')
if __name__ == '__main__':
    main()
back to top