https://github.com/prosysscience/CPWorkloadBalancing
Tip revision: 8c5e2d6ad1af00c81c112dab2cb8711894d654c2 authored by Benjamin Kovács on 28 May 2021, 22:00:19 UTC
fix import
fix import
Tip revision: 8c5e2d6
instance_reader.py
from collections import defaultdict
from typing import List
class CapacityManager2:
def __init__(self, json_data):
self.first_day, self.last_day = None, None
self.machine_capacities = defaultdict(lambda: defaultdict(lambda: 0.0))
for m in json_data['machines']:
for ik, k in enumerate(m['capacities']):
self.machine_capacities[str(m['id'])][ik] = k
def get_max_capacity(self, machine):
return min([c for c in self.machine_capacities[str(machine)].values() if c > 0])
def get_capacity(self, machine, ordinal_day):
machine = str(machine)
return self.machine_capacities[machine][ordinal_day]
class Machine:
def __init__(self, id, external: bool, day_zero):
self.id = id
self.external = external
self.day_zero = day_zero
assert type(day_zero) is int
def max_capacity(self, cm):
return cm.get_max_capacity(self.id)
def capacity(self, time_step, cm):
return cm.get_capacity(self.id, time_step + self.day_zero)
def virtual_t(self, t, cm):
if self.capacity(t, cm) == 0:
return None
else:
o = 0
for i in range(t):
if self.capacity(i, cm) > 0:
o += 1
return o
def next_timestep(self, t, cm):
i = 1
while self.capacity(t + i, cm) == 0:
i += 1
if i > 1000:
raise Exception(f'No capacity in next 1000 day for machine {self.id} and day {t + self.day_zero}')
return t + i
class Job:
def __init__(self, id, deadline: int, project: str):
self.id = id
self.deadline = deadline
self.tasks = []
self.external_timeout = 0
self.project = project
self.solver_delay = None
@property
def result_delay(self):
return max(0, self.tasks[-1].result_processing_day - self.deadline) if len(self.tasks) > 0 else 0
@property
def min_length(self):
return sum([t.free_days_before for t in self.tasks]) + len(self.tasks)
class Task:
def __init__(self, job: Job, machine: Machine, length: int, earliest_start: int, free_days_before: int):
self.job = job
self.machine = machine
self.length = length
self.earliest_start = earliest_start
self.result_processing_day = None
self.solver_result_processing_day = None
self.directly_after_last = False
self.free_days_before = free_days_before
self.id = None
self.original_id = None
self.job_id = None
self.min_days_after = 0
def load_presolved(json_obj):
tasks = []
jobs = {}
machines = {}
for j in json_obj['jobs']:
job = Job(j['id'], j['deadline'], 'PROJECT')
jobs[job.id] = job
for m in json_obj['machines']:
machine = Machine(m['id'], m['external'], 0)
machines[machine.id] = machine
for t in json_obj['tasks']:
task = Task(jobs[t['job']], machines[t['machine']], t['length'], t['earliest_start'], t['free_days_before'])
task.id = t['id']
task.original_id = str(['id'])
task.min_days_after = 0
task.job_id = t['job']
task.directly_after_last = t['directly_after_last']
task.earliest_start = t['earliest_start']
tasks.append(task)
for t_pre in task.job.tasks:
t_pre.min_days_after += 1 + task.free_days_before
jobs[t['job']].tasks.append(task)
return tasks, machines.values(), jobs.values()
class PenaltyManager:
def __init__(self, objective):
self.project_weights = {a['project']: a['jobWeight'] for a in objective['projects']}
self.default_weight = objective['jobWeight']
self.pen_per_day = objective['penaltyPerDay']
self.pen_per_job = objective['oneTimePenalty']
def project(self, project):
return self.project_weights[project] if project in self.project_weights else self.default_weight
def perf_measures(job_list: List[Job]):
sum_delay = sum([job.result_delay for job in job_list])
max_delay = max([job.result_delay for job in job_list])
avg_delay = round(sum_delay / len(job_list), 2)
return dict(sum_delay=sum_delay, max_delay=max_delay, avg_delay=avg_delay)