approx2.jl
###############################################################################
# SCHEDULING INDEPENDENT MOLDABLE TASKS ON MULTI-CORES WITH GPUS
# Copyright (C) 2014 Sascha Hunold <sascha@hunoldscience.net>
# Raphaƫl Bleuse <raphael.bleuse@imag.fr>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
import JSON
using Logging
include("approx_common.jl")
include("jedule_output.jl")
include("output_csv.jl")
stats = Dict()
abstract PU
type CPU <: PU
archtype::Int
pid::Int
lasttime::Float64
end
type GPU <: PU
archtype::Int
pid::Int
lasttime::Float64
end
function Base.show(io::IO, cpu::CPU)
print(io, "CPU: $(cpu.archtype), $(cpu.pid), $(cpu.lasttime)")
end
function get_task_efficiency(instance::SchedulerInput, lambda::Float64, task_str::AbstractString)
nb_cpu = get_procs_by_lambda(instance, task_str, lambda)
cpu_time = get_time_by_procs(instance, task_str, nb_cpu)
gpu_time = float(instance["gpudata"][task_str])
ratio = cpu_time / gpu_time
return ratio
end
function get_task_gputime(instance::SchedulerInput, task_str::AbstractString)
gpu_time = float(instance["gpudata"][task_str])
return gpu_time
end
function get_task_canonical_core_count(instance::SchedulerInput, lambda::Float64, task_str::AbstractString)
nb_cpu = get_procs_by_lambda(instance, task_str, lambda)
return nb_cpu
end
# function spt_comparator(task1_str::String, task2_str::String, instance::SchedulerInput)
# gpu_time1 = instance["gpudata"][task1_str]
# gpu_time2 = instance["gpudata"][task2_str]
# return gpu_time1 - gpu_time2
# end
function partition_cores(cores::Array{CPU})
partitions = Array{CPU}[]
debug("unsorted cores: ", cores)
sort!(cores, by=f(x)=x.pid)
debug("sorted cores: ", cores)
start_idx = 1
end_idx = 1
cur_idx = 2
while cur_idx <= length(cores)
if cores[end_idx].pid == cores[cur_idx].pid - 1
end_idx = cur_idx
cur_idx += 1
else
part = cores[start_idx:end_idx]
push!(partitions, part)
# println(part)
start_idx = cur_idx
end_idx = cur_idx
cur_idx += 1
# println(start_idx, " ", end_idx, " ", cur_idx)
end
end
part = cores[start_idx:end_idx]
push!(partitions, part)
# println(part)
partitions
end
function solve_problem_2approx(instance::SchedulerInput,
inst_params::ScheduleInstanceParams)
N = Int(instance["meta"]["n"])
M = Int(instance["meta"]["m"])
K = Int(instance["meta"]["k"])
lb = compute_lowerbound(instance)
ub = compute_upperbound(instance)
# println(lb)
# println(ub)
params = Dict()
upper = ub
lower = lb
best_lambda = -1.0
best_schedule = ScheduleRect[]
empty!(stats)
stats["nb_of_iterations"] = 0
stats["mean_solve_time"] = 0.0
stats["total_solve_time"] = 0.0
start_time = time_ns()
while upper/lower > getCutoffRatio()
task_id_hybrid_lst = AbstractString[]
task_id_cpu_lst = AbstractString[]
task_id_gpu_lst = AbstractString[]
schedule = ScheduleRect[]
bisect = lower + (upper-lower)/2.0
lambda = bisect
debug("lb=", lb)
debug("ub=", ub)
debug("lambda=", bisect)
queue_cores = Collections.PriorityQueue()
queue_gpus = Collections.PriorityQueue()
cpus = CPU[]
gpus = GPU[]
for i = 1:M
cpu = CPU(0, i, 0.0)
push!(cpus, cpu)
Collections.enqueue!(queue_cores, cpu, cpu.lasttime)
end
for k = 1:K
gpu = GPU(1, k, 0.0)
push!(gpus, gpu)
Collections.enqueue!(queue_gpus, gpu, gpu.lasttime)
end
# ihelper = InstanceHelper(instance, bisect)
for i = 1:N
task_str = get_task_str(i)
gpu_time = float( instance["gpudata"][task_str] )
canonical_nb_cpu = get_procs_by_lambda(instance, task_str, bisect)
if gpu_time > bisect && canonical_nb_cpu > M
lower = bisect
@goto iter_end
elseif gpu_time > bisect
push!(task_id_cpu_lst, task_str)
elseif canonical_nb_cpu > M
push!(task_id_gpu_lst, task_str)
else
push!(task_id_hybrid_lst, task_str)
end
end
# debug("cpu only: " * reduce(*, map(x -> x * " ", task_id_cpu_lst)))
debug("cpu only: ", task_id_cpu_lst)
debug("gpu only: ", task_id_gpu_lst)
debug("hybrid : ", task_id_hybrid_lst)
task_id_gpu_lst = sort(task_id_gpu_lst, by=f(x)=get_task_gputime(instance, x))
task_id_hybrid_lst = sort(task_id_hybrid_lst, by=f(x)=get_task_efficiency(instance, lambda, x))
debug("sorted gpu only: ", task_id_gpu_lst)
debug("sorted hybrid : ", task_id_hybrid_lst)
cur_gpu_work = 0.0
while( (length(task_id_gpu_lst) > 0) && (cur_gpu_work <= bisect * K) )
# schedule task on least loaded GPU
cur_gpu = Collections.dequeue!(queue_gpus)
cur_task_str = pop!(task_id_gpu_lst)
cur_starttime = cur_gpu.lasttime
cur_gpu.lasttime += get_task_gputime(instance, cur_task_str)
cur_endtime = cur_gpu.lasttime
cur_gpu_work += get_task_gputime(instance, cur_task_str)
# add task to schedule
srect = ScheduleRect(get_task_id(cur_task_str), cur_gpu.archtype, cur_gpu.pid, 1, cur_starttime, cur_endtime)
push!(schedule, srect)
Collections.enqueue!(queue_gpus, cur_gpu, cur_gpu.lasttime)
end
if length(task_id_gpu_lst) > 0
# some GPU task could not be scheduled
lower = bisect
@goto iter_end
end
# same as above with hybrid tasks, we do not check for remaining,
# it'll be done while trying to schedule them on CPUs
while( (length(task_id_hybrid_lst) > 0) && (cur_gpu_work <= bisect * K) )
cur_task_str = pop!(task_id_hybrid_lst)
cur_gpu = Collections.dequeue!(queue_gpus)
cur_starttime = cur_gpu.lasttime
cur_gpu.lasttime += get_task_gputime(instance, cur_task_str)
cur_endtime = cur_gpu.lasttime
cur_gpu_work += get_task_gputime(instance, cur_task_str)
# actually schedule task
srect = ScheduleRect(get_task_id(cur_task_str), cur_gpu.archtype, cur_gpu.pid, 1, cur_starttime, cur_endtime)
push!(schedule, srect)
# push back current gpu in future available resources
Collections.enqueue!(queue_gpus, cur_gpu, cur_gpu.lasttime)
end
# print(task_id_cpu_lst)
# print(task_id_hybrid_lst)
task_id_remaining = append!(task_id_cpu_lst, task_id_hybrid_lst)
task_id_remaining = sort(task_id_remaining, by=f(x)=get_task_canonical_core_count(instance, lambda, x))
debug("remaining tasks: ", task_id_remaining)
while( length(task_id_remaining) > 0)
cur_task_str = pop!(task_id_remaining)
cur_nb_cores = get_task_canonical_core_count(instance, lambda, cur_task_str)
cur_cores = CPU[]
for i in 1:cur_nb_cores
push!( cur_cores, Collections.dequeue!(queue_cores) )
end
cur_starttime = cur_cores[cur_nb_cores].lasttime
cur_endtime = cur_starttime + get_time_by_procs(instance, cur_task_str, cur_nb_cores)
if cur_endtime > 2 * bisect
lower = bisect
@goto iter_end
end
for i in 1:cur_nb_cores
cur_cores[i].lasttime = cur_endtime
Collections.enqueue!(queue_cores, cur_cores[i], cur_cores[i].lasttime)
end
# @printf "type of cores : %s" typeof(cur_cores)
partitioned_cores = partition_cores(cur_cores)
debug("task: ", cur_task_str)
for i = 1:length(partitioned_cores)
debug("partition ", i, ": r", partitioned_cores[i])
partition = partitioned_cores[i]
first_pid = partition[1].pid
nb_p = length(partition)
# actually schedule task
srect = ScheduleRect(get_task_id(cur_task_str), partition[1].archtype, first_pid, nb_p, cur_starttime, cur_endtime)
push!(schedule, srect)
end
end
upper = bisect
best_lambda = bisect
best_schedule = schedule
@label iter_end
stats["nb_of_iterations"] += 1
end
end_time = time_ns()
stats["total_solve_time"] = (end_time-start_time)/1e9
stats["mean_solve_time"] = stats["total_solve_time"] / stats["nb_of_iterations"]
Base.isless(x::ScheduleRect,y::ScheduleRect) = x.etime < y.etime
cmax = maximum(best_schedule).etime
@printf "best lambda: %f\n" best_lambda
for key in keys(stats)
@printf("%s: %s\n", key, stats[key])
end
@printf "bound: %f\n" (2*best_lambda)
@printf "makespan: %f\n" cmax
if inst_params.jedfile != ""
write_jedule_output(instance, best_schedule, inst_params.jedfile)
end
if inst_params.csvfile != ""
write_csv_output(instance, best_schedule, inst_params.csvfile)
end
end
#Logging.configure(level=DEBUG)
#Logging.configure(level=INFO)
function test_2approx()
#infile = "/Users/sascha/work/progs/perfpack/task_sched/2015-01-imts/000-test_small_instances_opt-2015-01-12-1710/02-exp/input/instances/problem_n10_m8_k2_i0.in"
infile = "/Users/sascha/work/progs/perfpack/task_sched/2016/000-sched_julia_lpt_changed-2016-03-09-1307/02-exp/input/instances/problem_n10_m4_k2_i9.in.bz2"
#infile = "/Users/sascha/work/progs/perfpack/task_sched/2016/000-sched_julia_lpt_changed-2016-03-09-1307/02-exp/input/instances/problem_n1000_m512_k32_i6.in.bz2"
instance = load_scheduling_instance(infile)
solve_problem_2approx(
#"/Users/sascha/work/progs/perfpack/task_sched/2015-01-imts/000-test_small_instances_opt-2015-01-12-1710/02-exp/input/instances/problem_n10_m8_k2_i1.in",
instance,
"/Users/sascha/tmp/approxsched/julia.out",
"/Users/sascha/tmp/approxsched/julia.jed",
"/Users/sascha/tmp/approxsched/julia.csv"
)
end