Revision 7eb0dc6129f9471df56f3af48826e1d27796927c authored by Nathan Daly on 10 April 2017, 20:44:33 UTC, committed by Nathan Daly on 12 April 2017, 22:24:38 UTC
As part of #14933, added docstring and jldoctest examples to Base.operator_precedence(s::Symbol).
1 parent 31337ae
Raw File
distributed_exec.jl
# This file is a part of Julia. License is MIT: http://julialang.org/license

using Base.Test

inline_flag = Base.JLOptions().can_inline == 1 ? `` : `--inline=no`
cov_flag = ``
if Base.JLOptions().code_coverage == 1
    cov_flag = `--code-coverage=user`
elseif Base.JLOptions().code_coverage == 2
    cov_flag = `--code-coverage=all`
end

# Test a few "remote" invocations when no workers are present
@test remote(myid)() == 1
@test pmap(identity, 1:100) == [1:100...]
@test 100 == @parallel (+) for i in 1:100
        1
    end

addprocs(4; exeflags=`$cov_flag $inline_flag --check-bounds=yes --startup-file=no --depwarn=error`)

# Test remote()
let
    pool = Base.default_worker_pool()

    count = 0
    count_condition = Condition()

    function remote_wait(c)
        @async begin
            count += 1
            remote(take!)(c)
            count -= 1
            notify(count_condition)
        end
        yield()
    end

    testchannels = [RemoteChannel() for i in 1:nworkers()]
    testcount = 0
    @test isready(pool) == true
    for c in testchannels
        @test count == testcount
        remote_wait(c)
        testcount += 1
    end
    @test count == testcount
    @test isready(pool) == false

    for c in testchannels
        @test count == testcount
        put!(c, "foo")
        testcount -= 1
        wait(count_condition)
        @test count == testcount
        @test isready(pool) == true
    end

    @test count == 0

    for c in testchannels
        @test count == testcount
        remote_wait(c)
        testcount += 1
    end
    @test count == testcount
    @test isready(pool) == false

    for c in reverse(testchannels)
        @test count == testcount
        put!(c, "foo")
        testcount -= 1
        wait(count_condition)
        @test count == testcount
        @test isready(pool) == true
    end

    @test count == 0
end


id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

# Test Futures
function testf(id)
    f=Future(id)
    @test isready(f) == false
    @test isnull(f.v) == true
    put!(f, :OK)
    @test isready(f) == true
    @test isnull(f.v) == false

    @test_throws ErrorException put!(f, :OK) # Cannot put! to a already set future
    @test_throws MethodError take!(f) # take! is unsupported on a Future

    @test fetch(f) == :OK
end

testf(id_me)
testf(id_other)

# Distributed GC tests for Futures
function test_futures_dgc(id)
    f = remotecall(myid, id)
    fid = Base.remoteref_id(f)

    # remote value should be deleted after a fetch
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == true
    @test isnull(f.v) == true
    @test fetch(f) == id
    @test isnull(f.v) == false
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == false


    # if unfetched, it should be deleted after a finalize
    f = remotecall(myid, id)
    fid = Base.remoteref_id(f)
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == true
    @test isnull(f.v) == true
    finalize(f)
    Base.Distributed.flush_gc_msgs()
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == false
end

test_futures_dgc(id_me)
test_futures_dgc(id_other)

# if sent to another worker, it should not be deleted till the other worker has fetched.
wid1 = workers()[1]
wid2 = workers()[2]
f = remotecall(myid, wid1)
fid = Base.remoteref_id(f)

fstore = RemoteChannel(wid2)
put!(fstore, f)

@test fetch(f) == wid1
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == true
remotecall_fetch(r->fetch(fetch(r)), wid2, fstore)
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == false

# put! should release remote reference since it would have been cached locally
f = Future(wid1)
fid = Base.remoteref_id(f)

# should not be created remotely till accessed
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == false
# create it remotely
isready(f)

@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == true
put!(f, :OK)
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == false
@test fetch(f) == :OK

# RemoteException should be thrown on a put! when another process has set the value
f = Future(wid1)
fid = Base.remoteref_id(f)

fstore = RemoteChannel(wid2)
put!(fstore, f) # send f to wid2
put!(f, :OK) # set value from master

@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == true

testval = remotecall_fetch(wid2, fstore) do x
    try
        put!(fetch(x), :OK)
        return 0
    catch e
        if isa(e, RemoteException)
            return 1
        else
            return 2
        end
    end
end
@test testval == 1

# Distributed GC tests for RemoteChannels
function test_remoteref_dgc(id)
    rr = RemoteChannel(id)
    put!(rr, :OK)
    rrid = Base.remoteref_id(rr)

    # remote value should be deleted after finalizing the ref
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, rrid) == true
    @test fetch(rr) == :OK
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, rrid) == true
    finalize(rr)
    Base.Distributed.flush_gc_msgs()
    @test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, rrid) == false
end
test_remoteref_dgc(id_me)
test_remoteref_dgc(id_other)

# if sent to another worker, it should not be deleted till the other worker has also finalized.
wid1 = workers()[1]
wid2 = workers()[2]
rr = RemoteChannel(wid1)
rrid = Base.remoteref_id(rr)

fstore = RemoteChannel(wid2)
put!(fstore, rr)

@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, rrid) == true
finalize(rr); Base.Distributed.flush_gc_msgs() # finalize locally
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, rrid) == true
remotecall_fetch(r->(finalize(take!(r)); Base.Distributed.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely
sleep(0.5) # to ensure that wid2 messages have been executed on wid1
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, rrid) == false

@test fetch(@spawnat id_other myid()) == id_other
@test (@fetchfrom id_other myid()) == id_other

pids=[]
for i in 1:nworkers()
    push!(pids, @fetch myid())
end
@test sort(pids) == sort(workers())


# test getindex on Futures and RemoteChannels
function test_indexing(rr)
    a = rand(5,5)
    put!(rr, a)
    @test rr[2,3] == a[2,3]
    @test rr[] == a
end

test_indexing(Future())
test_indexing(Future(id_other))
test_indexing(RemoteChannel())
test_indexing(RemoteChannel(id_other))

dims = (20,20,20)

if is_linux()
    S = SharedArray{Int64,3}(dims)
    @test startswith(S.segname, "/jl")
    @test !ispath("/dev/shm" * S.segname)

    S = SharedArray{Int64,3}(dims; pids=[id_other])
    @test startswith(S.segname, "/jl")
    @test !ispath("/dev/shm" * S.segname)
end

# TODO : Need a similar test of shmem cleanup for OSX

##### SharedArray tests

function check_pids_all(S::SharedArray)
    pidtested = falses(size(S))
    for p in procs(S)
        idxes_in_p = remotecall_fetch(p, S) do D
            parentindexes(D.loc_subarr_1d)[1]
        end
        @test all(sdata(S)[idxes_in_p] .== p)
        pidtested[idxes_in_p] = true
    end
    @test all(pidtested)
end

d = Base.shmem_rand(1:100, dims)
a = convert(Array, d)

partsums = Array{Int}(length(procs(d)))
@sync begin
    for (i, p) in enumerate(procs(d))
        @async partsums[i] = remotecall_fetch(p, d) do D
            sum(D.loc_subarr_1d)
        end
    end
end
@test sum(a) == sum(partsums)

d = Base.shmem_rand(dims)
for p in procs(d)
    idxes_in_p = remotecall_fetch(p, d) do D
        parentindexes(D.loc_subarr_1d)[1]
    end
    idxf = first(idxes_in_p)
    idxl = last(idxes_in_p)
    d[idxf] = Float64(idxf)
    rv = remotecall_fetch(p, d,idxf,idxl) do D,idxf,idxl
        assert(D[idxf] == Float64(idxf))
        D[idxl] = Float64(idxl)
        D[idxl]
    end
    @test d[idxl] == rv
end

@test ones(10, 10, 10) == Base.shmem_fill(1.0, (10,10,10))
@test zeros(Int32, 10, 10, 10) == Base.shmem_fill(0, (10,10,10))

d = Base.shmem_rand(dims)
s = Base.shmem_rand(dims)
copy!(s, d)
@test s == d
s = Base.shmem_rand(dims)
copy!(s, sdata(d))
@test s == d
a = rand(dims)
@test sdata(a) == a

d = SharedArray{Int}(dims, init = D->fill!(D.loc_subarr_1d, myid()))
for p in procs(d)
    idxes_in_p = remotecall_fetch(p, d) do D
        parentindexes(D.loc_subarr_1d)[1]
    end
    idxf = first(idxes_in_p)
    idxl = last(idxes_in_p)
    @test d[idxf] == p
    @test d[idxl] == p
end

d = @inferred(SharedArray{Float64,2}((2,3)))
@test isa(d[:,2], Vector{Float64})

### SharedArrays from a file

# Mapping an existing file
fn = tempname()
write(fn, 1:30)
sz = (6,5)
Atrue = reshape(1:30, sz)

S = @inferred(SharedArray{Int,2}(fn, sz))
@test S == Atrue
@test length(procs(S)) > 1
@sync begin
    for p in procs(S)
        @async remotecall_wait(p, S) do D
            fill!(D.loc_subarr_1d, myid())
        end
    end
end
check_pids_all(S)

filedata = similar(Atrue)
read!(fn, filedata)
@test filedata == sdata(S)
finalize(S)

# Error for write-only files
@test_throws ArgumentError SharedArray{Int,2}(fn, sz, mode="w")

# Error for file doesn't exist, but not allowed to create
@test_throws ArgumentError SharedArray{Int,2}(joinpath(tempdir(),randstring()), sz, mode="r")

# Creating a new file
fn2 = tempname()
S = SharedArray{Int,2}(fn2, sz, init=D->D[localindexes(D)] = myid())
@test S == filedata
filedata2 = similar(Atrue)
read!(fn2, filedata2)
@test filedata == filedata2
finalize(S)

# Appending to a file
fn3 = tempname()
write(fn3, ones(UInt8, 4))
S = SharedArray{UInt8}(fn3, sz, 4, mode="a+", init=D->D[localindexes(D)]=0x02)
len = prod(sz)+4
@test filesize(fn3) == len
filedata = Array{UInt8}(len)
read!(fn3, filedata)
@test all(filedata[1:4] .== 0x01)
@test all(filedata[5:end] .== 0x02)
finalize(S)

# call gc 3 times to avoid unlink: operation not permitted (EPERM) on Windows
S = nothing
@everywhere gc()
@everywhere gc()
@everywhere gc()
rm(fn); rm(fn2); rm(fn3)

### Utility functions

# construct PR #13514
S = @inferred(SharedArray{Int}((1,2,3)))
@test size(S) == (1,2,3)
@test typeof(S) <: SharedArray{Int}
S = @inferred(SharedArray{Int}(2))
@test size(S) == (2,)
@test typeof(S) <: SharedArray{Int}
S = @inferred(SharedArray{Int}(1,2))
@test size(S) == (1,2)
@test typeof(S) <: SharedArray{Int}
S = @inferred(SharedArray{Int}(1,2,3))
@test size(S) == (1,2,3)
@test typeof(S) <: SharedArray{Int}

# reshape

d = Base.shmem_fill(1.0, (10,10,10))
@test ones(100, 10) == reshape(d,(100,10))
d = Base.shmem_fill(1.0, (10,10,10))
@test_throws DimensionMismatch reshape(d,(50,))

# rand, randn
d = Base.shmem_rand(dims)
@test size(rand!(d)) == dims
d = Base.shmem_fill(1.0, dims)
@test size(randn!(d)) == dims

# similar
d = Base.shmem_rand(dims)
@test size(similar(d, Complex128)) == dims
@test size(similar(d, dims)) == dims

# issue #6362
d = Base.shmem_rand(dims)
s = copy(sdata(d))
ds = deepcopy(d)
@test ds == d
pids_d = procs(d)
remotecall_fetch(setindex!, pids_d[findfirst(id->(id != myid()), pids_d)], d, 1.0, 1:10)
@test ds != d
@test s != d
copy!(d, s)
@everywhere setid!(A) = A[localindexes(A)] = myid()
@sync for p in procs(ds)
    @async remotecall_wait(setid!, p, ds)
end
@test d == s
@test ds != s
@test first(ds) == first(procs(ds))
@test last(ds)  ==  last(procs(ds))


# SharedArray as an array
# Since the data in d will depend on the nprocs, just test that these operations work
a = d[1:5]
@test_throws BoundsError d[-1:5]
a = d[1,1,1:3:end]
d[2:4] = 7
d[5,1:2:4,8] = 19

AA = rand(4,2)
A = @inferred(convert(SharedArray, AA))
B = @inferred(convert(SharedArray, AA'))
@test B*A == ctranspose(AA)*AA

d=SharedArray{Int64,2}((10,10); init = D->fill!(D.loc_subarr_1d, myid()), pids=[id_me, id_other])
d2 = map(x->1, d)
@test reduce(+, d2) == 100

@test reduce(+, d) == ((50*id_me) + (50*id_other))
map!(x->1, d, d)
@test reduce(+, d) == 100

@test fill!(d, 1) == ones(10, 10)
@test fill!(d, 2.) == fill(2, 10, 10)
@test d[:] == fill(2, 100)
@test d[:,1] == fill(2, 10)
@test d[1,:] == fill(2, 10)

# Boundary cases where length(S) <= length(pids)
@test 2.0 == remotecall_fetch(D->D[2], id_other, Base.shmem_fill(2.0, 2; pids=[id_me, id_other]))
@test 3.0 == remotecall_fetch(D->D[1], id_other, Base.shmem_fill(3.0, 1; pids=[id_me, id_other]))

# Shared arrays of singleton immutables
@everywhere struct ShmemFoo end
for T in [Void, ShmemFoo]
    s = @inferred(SharedArray{T}(10))
    @test T() === remotecall_fetch(x->x[3], workers()[1], s)
end

# Issue #14664
d = SharedArray{Int}(10)
@sync @parallel for i=1:10
    d[i] = i
end

for (x,i) in enumerate(d)
    @test x == i
end

# complex
sd = SharedArray{Int}(10)
se = SharedArray{Int}(10)
@sync @parallel for i=1:10
    sd[i] = i
    se[i] = i
end
sc = convert(SharedArray, complex.(sd,se))
for (x,i) in enumerate(sc)
    @test i == complex(x,x)
end

# Once finalized accessing remote references and shared arrays should result in exceptions.
function finalize_and_test(r)
    finalize(r)
    @test_throws ErrorException fetch(r)
end

for id in [id_me, id_other]
    finalize_and_test(Future(id))
    finalize_and_test((r=Future(id); put!(r, 1); r))
    finalize_and_test(RemoteChannel(id))
    finalize_and_test((r=RemoteChannel(id); put!(r, 1); r))
end

d = SharedArray{Int}(10)
finalize(d)
@test_throws BoundsError d[1]


# Test @parallel load balancing - all processors should get either M or M+1
# iterations out of the loop range for some M.
ids = @parallel((a,b)->[a;b], for i=1:7; myid(); end)
workloads = Int[sum(ids .== i) for i in 2:nprocs()]
@test maximum(workloads) - minimum(workloads) <= 1

# @parallel reduction should work even with very short ranges
@test @parallel(+, for i=1:2; i; end) == 3

@test_throws ArgumentError sleep(-1)
@test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5)

# specify pids for pmap
@test sort(workers()[1:2]) == sort(unique(pmap(WorkerPool(workers()[1:2]), x->(sleep(0.1);myid()), 1:10)))

# Testing buffered  and unbuffered reads
# This large array should write directly to the socket
a = ones(10^6)
@test a == remotecall_fetch((x)->x, id_other, a)

# Not a bitstype, should be buffered
s = [randstring() for x in 1:10^5]
@test s == remotecall_fetch((x)->x, id_other, s)

#large number of small requests
num_small_requests = 10000
@test fill(id_other, num_small_requests) == [remotecall_fetch(myid, id_other) for i in 1:num_small_requests]

# test parallel sends of large arrays from multiple tasks to the same remote worker
ntasks = 10
rr_list = [Channel(1) for x in 1:ntasks]

for rr in rr_list
    let rr=rr
        @async try
            for i in 1:10
                a = rand(2*10^5)
                @test a == remotecall_fetch(x->x, id_other, a)
                yield()
            end
            put!(rr, :OK)
        catch
            put!(rr, :ERROR)
        end
    end
end

@test [fetch(rr) for rr in rr_list] == [:OK for x in 1:ntasks]

function test_channel(c)
    put!(c, 1)
    put!(c, "Hello")
    put!(c, 5.0)

    @test isready(c) == true
    @test fetch(c) == 1
    @test fetch(c) == 1   # Should not have been popped previously
    @test take!(c) == 1
    @test take!(c) == "Hello"
    @test fetch(c) == 5.0
    @test take!(c) == 5.0
    @test isready(c) == false
    close(c)
end

test_channel(Channel(10))
test_channel(RemoteChannel(()->Channel(10)))

c=Channel{Int}(1)
@test_throws MethodError put!(c, "Hello")

# test channel iterations
function test_iteration(in_c, out_c)
    t=@schedule for v in in_c
        put!(out_c, v)
    end

    isa(in_c, Channel) && @test isopen(in_c) == true
    put!(in_c, 1)
    @test take!(out_c) == 1
    put!(in_c, "Hello")
    close(in_c)
    @test take!(out_c) == "Hello"
    isa(in_c, Channel) && @test isopen(in_c) == false
    @test_throws InvalidStateException put!(in_c, :foo)
    yield()
    @test istaskdone(t) == true
end

test_iteration(Channel(10), Channel(10))
# make sure exceptions propagate when waiting on Tasks
@test_throws CompositeException (@sync (@async error("oops")))
try
    @sync begin
        for i in 1:5
            @async error(i)
        end
    end
    error("unexpected")
catch ex
    @test typeof(ex) == CompositeException
    @test length(ex) == 5
    @test typeof(ex.exceptions[1]) == CapturedException
    @test typeof(ex.exceptions[1].ex) == ErrorException
    errors = map(x->x.ex.msg, ex.exceptions)
    @test collect(1:5) == sort(map(x->parse(Int, x), errors))
end

function test_remoteexception_thrown(expr)
    try
        expr()
        error("unexpected")
    catch ex
        @test typeof(ex) == RemoteException
        @test typeof(ex.captured) == CapturedException
        @test typeof(ex.captured.ex) == ErrorException
        @test ex.captured.ex.msg == "foobar"
    end
end

for id in [id_other, id_me]
    test_remoteexception_thrown() do
        remotecall_fetch(id) do
            throw(ErrorException("foobar"))
        end
    end
    test_remoteexception_thrown() do
        remotecall_wait(id) do
            throw(ErrorException("foobar"))
        end
    end
    test_remoteexception_thrown() do
        wait(remotecall(id) do
            throw(ErrorException("foobar"))
        end)
    end
end

# make sure the stackframe from the remote error can be serialized
let ex
    try
        remotecall_fetch(id_other) do
            @eval module AModuleLocalToOther
                foo() = throw(ErrorException("A.error"))
                foo()
            end
        end
    catch ex
    end
    @test (ex::RemoteException).pid == id_other
    @test ((ex.captured::CapturedException).ex::ErrorException).msg == "A.error"
    bt = ex.captured.processed_bt::Array{Any,1}
    @test length(bt) > 1
    frame, repeated = bt[1]::Tuple{StackFrame, Int}
    @test frame.func == :foo
    @test isnull(frame.linfo)
    @test repeated == 1
end

# pmap tests. Needs at least 4 processors dedicated to the below tests. Which we currently have
# since the distributed tests are now spawned as a separate set.

# Test all combinations of pmap keyword args.
pmap_args = [
                (:distributed, [:default, false]),
                (:batch_size, [:default,2]),
                (:on_error, [:default, e -> (e.msg == "foobar" ? true : rethrow(e))]),
                (:retry_delays, [:default, fill(0.001, 1000)]),
                (:retry_check, [:default, (s,e) -> (s,endswith(e.msg,"foobar"))]),
            ]

kwdict = Dict()
function walk_args(i)
    if i > length(pmap_args)
        kwargs = []
        for (k,v) in kwdict
            if v !== :default
                push!(kwargs, (k,v))
            end
        end

        data = 1:100

        testw = kwdict[:distributed] === false ? [1] : workers()

        if kwdict[:retry_delays] !== :default
            mapf = x -> iseven(myid()) ? error("notfoobar") : (x*2, myid())
            results_test = pmap_res -> begin
                results = [x[1] for x in pmap_res]
                pids = [x[2] for x in pmap_res]
                @test results == [2:2:200...]
                for p in testw
                    if isodd(p)
                        @test p in pids
                    else
                        @test !(p in pids)
                    end
                end
            end
        elseif kwdict[:on_error] === :default
            mapf = x -> (x*2, myid())
            results_test = pmap_res -> begin
                results = [x[1] for x in pmap_res]
                pids = [x[2] for x in pmap_res]
                @test results == [2:2:200...]
                for p in testw
                    @test p in pids
                end
            end
        else
            mapf = x -> iseven(x) ? error("foobar") : (x*2, myid())
            results_test = pmap_res -> begin
                w = testw
                for (idx,x) in enumerate(data)
                    if iseven(x)
                        @test pmap_res[idx] == true
                    else
                        @test pmap_res[idx][1] == x*2
                        @test pmap_res[idx][2] in w
                    end
                end
            end
        end

        try
            results_test(pmap(mapf, data; kwargs...))
        catch e
            println("pmap executing with args : ", kwargs)
            rethrow(e)
        end

        return
    end

    kwdict[pmap_args[i][1]] = pmap_args[i][2][1]
    walk_args(i+1)

    kwdict[pmap_args[i][1]] = pmap_args[i][2][2]
    walk_args(i+1)
end

# Start test for various kw arg combinations
walk_args(1)

# Simple test for pmap throws error
error_thrown = false
try
    pmap(x -> x==50 ? error("foobar") : x, 1:100)
catch e
    @test e.captured.ex.msg == "foobar"
    error_thrown = true
end
@test error_thrown

# Test pmap with a generator type iterator
@test [1:100...] == pmap(x->x, Base.Generator(x->(sleep(0.0001); x), 1:100))

# Test pgenerate
n = 10
as = [rand(4,4) for i in 1:n]
bs = deepcopy(as)
cs = collect(Base.Distributed.pgenerate(x->(sleep(rand()*0.1); svdfact(x)), bs))
svdas = map(svdfact, as)
for i in 1:n
    @test cs[i][:U] ≈ svdas[i][:U]
    @test cs[i][:S] ≈ svdas[i][:S]
    @test cs[i][:V] ≈ svdas[i][:V]
end

# Test asyncmap
@test allunique(asyncmap(x->(sleep(1.0);object_id(current_task())), 1:10))

# num tasks
@test length(unique(asyncmap(x->(yield();object_id(current_task())), 1:20; ntasks=5))) == 5

# default num tasks
@test length(unique(asyncmap(x->(yield();object_id(current_task())), 1:200))) == 100

# ntasks as a function
let nt=0
    global nt_func
    nt_func() = (v=div(nt, 25); nt+=1; v)  # increment number of tasks by 1 for every 25th call.
                                           # nt_func() will be called initally once and then for every
                                           # iteration
end
@test length(unique(asyncmap(x->(yield();object_id(current_task())), 1:200; ntasks=nt_func))) == 7

# batch mode tests
let ctr=0
    global next_ctr
    next_ctr() = (ctr+=1; ctr)
end
resp = asyncmap(x->(v=next_ctr(); map(_->v, x)), 1:22; ntasks=5, batch_size=5)
@test length(resp) == 22
@test length(unique(resp)) == 5

input = rand(1:1000, 100)
@test asyncmap(x->map(args->identity(args...), x), input; ntasks=5, batch_size=5) == input

# check whether shape is retained
a=rand(2,2)
b=asyncmap(identity, a)
@test a == b
@test size(a) == size(b)

# check with an iterator that does not implement size()
c=Channel(32); foreach(i->put!(c,i), 1:10); close(c)
b=asyncmap(identity, c)
@test Int[1:10...] == b
@test size(b) == (10,)

# check with an iterator that has only implements length()
len_only_iterable = (1,2,3,4,5)
@test Base.iteratorsize(len_only_iterable) == Base.HasLength()
@test asyncmap(identity, len_only_iterable) == map(identity, len_only_iterable)

# Error conditions
@test_throws ArgumentError asyncmap(identity, 1:10; batch_size=0)
@test_throws ArgumentError asyncmap(identity, 1:10; batch_size="10")
@test_throws ArgumentError asyncmap(identity, 1:10; ntasks="10")

# asyncmap and pmap with various types. Test for equivalence with map
function testmap_equivalence(f, c...)
    x1 = asyncmap(f,c...)
    x2 = pmap(f,c...)
    x3 = map(f,c...)

    if Base.iteratorsize == Base.HasShape()
        @test size(x1) == size(x3)
        @test size(x2) == size(x3)
    else
        @test length(x1) == length(x3)
        @test length(x2) == length(x3)
    end

    @test eltype(x1) == eltype(x3)
    @test eltype(x2) == eltype(x3)

    for (v1,v2) in zip(x1,x3)
        @test v1==v2
    end
    for (v1,v2) in zip(x2,x3)
        @test v1==v2
    end
end

testmap_equivalence(identity, (1,2,3,4))
testmap_equivalence(x->x>0?1.0:0.0, sparse(eye(5)))
testmap_equivalence((x,y,z)->x+y+z, 1,2,3)
testmap_equivalence(x->x?false:true, BitArray(10,10))
testmap_equivalence(x->"foobar", BitArray(10,10))
testmap_equivalence((x,y,z)->string(x,y,z), BitArray(10), ones(10), "1234567890")

@test asyncmap(uppercase, "Hello World!") == map(uppercase, "Hello World!")
@test pmap(uppercase, "Hello World!") == map(uppercase, "Hello World!")


# Test that the default worker pool cycles through all workers
pmap(_->myid(), 1:nworkers())  # priming run
@test nworkers() == length(unique(pmap(_->myid(), 1:100)))

# Test same behaviour when executed on a worker
@test nworkers() == length(unique(remotecall_fetch(()->pmap(_->myid(), 1:100), id_other)))

# Same tests with custom worker pools.
wp = WorkerPool(workers())
@test nworkers() == length(unique(pmap(wp, _->myid(), 1:100)))
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(wp, _->myid(), 1:100), id_other, wp)))


# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(wp, x->x, 1:100)

clear!(wp)
@test length(wp.map_obj2ref) == 0


# The below block of tests are usually run only on local development systems, since:
# - tests which print errors
# - addprocs tests are memory intensive
# - ssh addprocs requires sshd to be running locally with passwordless login enabled.
# The test block is enabled by defining env JULIA_TESTFULL=1

DoFullTest = Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0"))))

if DoFullTest
    # Topology tests need to run externally since a given cluster at any
    # time can only support a single topology and the current session
    # is already running in parallel under the default topology.
    script = joinpath(dirname(@__FILE__), "topology.jl")
    cmd = `$(Base.julia_cmd()) $script`

    (strm, proc) = open(pipeline(cmd, stderr=STDERR))
    wait(proc)
    if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0
        println(readstring(strm))
        error("Topology tests failed : $cmd")
    end

    println("Testing exception printing on remote worker from a `remote_do` call")
    println("Please ensure the remote error and backtrace is displayed on screen")

    remote_do(id_other) do
        throw(ErrorException("TESTING EXCEPTION ON REMOTE DO. PLEASE IGNORE"))
    end
    sleep(0.5)  # Give some time for the above error to be printed

    println("\n\nThe following 'invalid connection credentials' error messages are to be ignored.")
    all_w = workers()
    # Test sending fake data to workers. The worker processes will print an
    # error message but should not terminate.
    for w in Base.Distributed.PGRP.workers
        if isa(w, Base.Distributed.Worker)
            s = connect(get(w.config.host), get(w.config.port))
            write(s, randstring(32))
        end
    end
    @test workers() == all_w
    @test all([p == remotecall_fetch(myid, p) for p in all_w])

if is_unix() # aka have ssh
    function test_n_remove_pids(new_pids)
        for p in new_pids
            w_in_remote = sort(remotecall_fetch(workers, p))
            try
                @test intersect(new_pids, w_in_remote) == new_pids
            catch e
                print("p       :     $p\n")
                print("newpids :     $new_pids\n")
                print("w_in_remote : $w_in_remote\n")
                print("intersect   : $(intersect(new_pids, w_in_remote))\n\n\n")
                rethrow(e)
            end
        end

        remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids)
    end

    print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")
    print("Please ensure sshd is running locally with passwordless login enabled.\n")

    sshflags = `-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=ERROR `
    #Issue #9951
    hosts=[]
    localhost_aliases = ["localhost", string(getipaddr()), "127.0.0.1"]
    num_workers = parse(Int,(get(ENV, "JULIA_ADDPROCS_NUM", "9")))

    for i in 1:(num_workers/length(localhost_aliases))
        append!(hosts, localhost_aliases)
    end

    print("\nTesting SSH addprocs with $(length(hosts)) workers...\n")
    new_pids = remotecall_fetch(1, hosts, sshflags) do h, sf
        addprocs(h; sshflags=sf)
    end
    @test length(new_pids) == length(hosts)
    test_n_remove_pids(new_pids)

    print("\nMixed ssh addprocs with :auto\n")
    new_pids = sort(remotecall_fetch(1, ["localhost", ("127.0.0.1", :auto), "localhost"], sshflags) do h, sf
        addprocs(h; sshflags=sf)
    end)
    @test length(new_pids) == (2 + Sys.CPU_CORES)
    test_n_remove_pids(new_pids)

    print("\nMixed ssh addprocs with numeric counts\n")
    new_pids = sort(remotecall_fetch(1, [("localhost", 2), ("127.0.0.1", 2), "localhost"], sshflags) do h, sf
        addprocs(h; sshflags=sf)
    end)
    @test length(new_pids) == 5
    test_n_remove_pids(new_pids)

    print("\nssh addprocs with tunnel\n")
    new_pids = sort(remotecall_fetch(1, [("localhost", num_workers)], sshflags) do h, sf
        addprocs(h; tunnel=true, sshflags=sf)
    end)
    @test length(new_pids) == num_workers
    test_n_remove_pids(new_pids)
end # unix-only
end # full-test

let t = @task 42
    schedule(t, ErrorException(""), error=true)
    @test_throws ErrorException wait(t)
end

# issue #8207
let A = Any[]
    @parallel (+) for i in (push!(A,1); 1:2)
        i
    end
    @test length(A) == 1
end

# issue #13168
function f13168(n)
    val = 0
    for i=1:n val+=sum(rand(n,n)^2) end
    val
end
let t = schedule(@task f13168(100))
    @test t.state == :queued
    @test_throws ErrorException schedule(t)
    yield()
    @test t.state == :done
    @test_throws ErrorException schedule(t)
    @test isa(wait(t),Float64)
end

# issue #13122
@test remotecall_fetch(identity, workers()[1], C_NULL) === C_NULL

# issue #11062
function t11062()
    @async v11062 = 1
    v11062 = 2
end

@test t11062() == 2

# issue #15406
v15406 = remotecall_wait(() -> 1, id_other)
fetch(v15406)
remotecall_wait(fetch, id_other, v15406)

# Test various forms of remotecall* invocations

@everywhere f_args(v1, v2=0; kw1=0, kw2=0) = v1+v2+kw1+kw2

function test_f_args(result, args...; kwargs...)
    @test fetch(remotecall(args...; kwargs...)) == result
    @test fetch(remotecall_wait(args...; kwargs...)) == result
    @test remotecall_fetch(args...; kwargs...) == result

    # A visual test - remote_do should NOT print any errors
    remote_do(args...; kwargs...)
end

for tid in [id_other, id_me, Base.default_worker_pool()]
    test_f_args(1, f_args, tid, 1)
    test_f_args(3, f_args, tid, 1, 2)
    test_f_args(5, f_args, tid, 1; kw1=4)
    test_f_args(13, f_args, tid, 1; kw1=4, kw2=8)
    test_f_args(15, f_args, tid, 1, 2; kw1=4, kw2=8)
end

# Test remote_do
f=Future(id_me)
remote_do(fut->put!(fut, myid()), id_me, f)
@test fetch(f) == id_me

f=Future(id_other)
remote_do(fut->put!(fut, myid()), id_other, f)
@test fetch(f) == id_other

# github PR #14456
n = DoFullTest ? 6 : 5
for i = 1:10^n
    fetch(@spawnat myid() myid())
end

# issue #15451
@test remotecall_fetch(x->(y->2y)(x)+1, workers()[1], 3) == 7

# issue #16091
mutable struct T16091 end
wid = workers()[1]
@test try
    remotecall_fetch(()->T16091, wid)
    false
catch ex
    ((ex::RemoteException).captured::CapturedException).ex === UndefVarError(:T16091)
end
@test try
    remotecall_fetch(identity, wid, T16091)
    false
catch ex
    ((ex::RemoteException).captured::CapturedException).ex === UndefVarError(:T16091)
end

f16091a() = 1
remotecall_fetch(()->eval(:(f16091a() = 2)), wid)
@test remotecall_fetch(f16091a, wid) === 2
@test remotecall_fetch((myid)->remotecall_fetch(f16091a, myid), wid, myid()) === 1

# these will only heisen-fail, since it depends on the gensym counter collisions:
f16091b = () -> 1
remotecall_fetch(()->eval(:(f16091b = () -> 2)), wid)
@test remotecall_fetch(f16091b, 2) === 1
# Global anonymous functions are over-written...
@test remotecall_fetch((myid)->remotecall_fetch(f16091b, myid), wid, myid()) === 1

# ...while local anonymous functions are by definition, local.
let
    f16091c = () -> 1
    @test remotecall_fetch(f16091c, 2) === 1
    @test remotecall_fetch(
        myid -> begin
            let
                f16091c = () -> 2
                remotecall_fetch(f16091c, myid)
            end
        end, wid, myid()) === 2
end

# issue #16451
rng=RandomDevice()
retval = @parallel (+) for _ in 1:10
    rand(rng)
end
@test retval > 0.0 && retval < 10.0

rand(rng)
retval = @parallel (+) for _ in 1:10
    rand(rng)
end
@test retval > 0.0 && retval < 10.0

# serialization tests
wrkr1 = workers()[1]
wrkr2 = workers()[end]

@test remotecall_fetch(p->remotecall_fetch(myid, p), wrkr1, wrkr2) == wrkr2

# Send f to wrkr1 and wrkr2. Then try calling f on wrkr2 from wrkr1
f_myid = ()->myid()
@test wrkr1 == remotecall_fetch(f_myid, wrkr1)
@test wrkr2 == remotecall_fetch(f_myid, wrkr2)
@test wrkr2 == remotecall_fetch((f, p)->remotecall_fetch(f, p), wrkr1, f_myid, wrkr2)

# Deserialization error recovery test
# locally defined module, but unavailable on workers
module LocalFoo
    global foo=1
end

let
    @test_throws RemoteException remotecall_fetch(()->LocalFoo.foo, 2)

    bad_thunk = ()->NonexistantModule.f()
    @test_throws RemoteException remotecall_fetch(bad_thunk, 2)

    # Test that the stream is still usable
    @test remotecall_fetch(()->:test,2) == :test
    ref = remotecall(bad_thunk, 2)
    @test_throws RemoteException fetch(ref)
end

# Test calling @everywhere from a module not defined on the workers
module LocalBar
    bar() = @everywhere new_bar()=myid()
end
LocalBar.bar()
for p in procs()
    @test p == remotecall_fetch(new_bar, p)
end

# Test addprocs enable_threaded_blas parameter

const get_num_threads = function() # anonymous so it will be serialized when called
    blas = BLAS.vendor()
    # Wrap in a try to catch unsupported blas versions
    try
        if blas == :openblas
            return ccall((:openblas_get_num_threads, Base.libblas_name), Cint, ())
        elseif blas == :openblas64
            return ccall((:openblas_get_num_threads64_, Base.libblas_name), Cint, ())
        elseif blas == :mkl
            return ccall((:MKL_Get_Max_Num_Threads, Base.libblas_name), Cint, ())
        end

        # OSX BLAS looks at an environment variable
        if is_apple()
            return ENV["VECLIB_MAXIMUM_THREADS"]
        end
    end

    return nothing
end

function get_remote_num_threads(processes_added)
    return [remotecall_fetch(get_num_threads, proc_id) for proc_id in processes_added]
end

function test_blas_config(pid, expected)
    for worker in Base.Distributed.PGRP.workers
        if worker.id == pid
            @test get(worker.config.enable_threaded_blas) == expected
            return
        end
    end
end

function test_add_procs_threaded_blas()
    if get_num_threads() === nothing
        warn("Skipping blas num threads tests due to unsupported blas version")
        return
    end
    master_blas_thread_count = get_num_threads()

    # Test with default enable_threaded_blas false
    processes_added = addprocs(2)
    for proc_id in processes_added
        test_blas_config(proc_id, false)
    end

    # Master thread should not have changed
    @test get_num_threads() == master_blas_thread_count

    # Threading disabled in children by default
    thread_counts_by_process = get_remote_num_threads(processes_added)
    for thread_count in thread_counts_by_process
        @test thread_count == 1
    end
    rmprocs(processes_added)

    processes_added = addprocs(2, enable_threaded_blas=true)
    for proc_id in processes_added
        test_blas_config(proc_id, true)
    end

    @test get_num_threads() == master_blas_thread_count

    # BLAS.set_num_threads(`num`) doesn't  cause get_num_threads to return `num`
    # depending on the machine, the BLAS version, and BLAS configuration, so
    # we need a very lenient test.
    thread_counts_by_process = get_remote_num_threads(processes_added)
    for thread_count in thread_counts_by_process
        @test thread_count >= 1
    end
    rmprocs(processes_added)
end
test_add_procs_threaded_blas()

#19687
# ensure no race conditions between rmprocs and addprocs
for i in 1:5
    p = addprocs(1)[1]
    @spawnat p sleep(5)
    rmprocs(p; waitfor=0)
end

# Test if a wait has been called on rmprocs(...;waitfor=0), further remotecalls
# don't throw errors.
for i in 1:5
    p = addprocs(1)[1]
    np = nprocs()
    @spawnat p sleep(5)
    wait(rmprocs(p; waitfor=0))
    for pid in procs()
        @test pid == remotecall_fetch(myid, pid)
    end
    @test nprocs() == np - 1
end

# Test that an exception is thrown if workers are unable to be removed within requested time.
if DoFullTest
    pids=addprocs(4);
    @test_throws ErrorException rmprocs(pids; waitfor=0.001);
    # wait for workers to be removed
    while any(x -> (x in procs()), pids)
        sleep(0.1)
    end
end

# Test addprocs/rmprocs from master node only
for f in [ ()->addprocs(1), ()->rmprocs(workers()) ]
    try
        remotecall_fetch(f, id_other)
        error("Unexpected")
    catch ex
        @test isa(ex, RemoteException)
        @test ex.captured.ex.msg == "Only process 1 can add and remove workers"
    end
end

# Test the following addprocs error conditions
# - invalid host name - github issue #20372
# - julia exe exiting with an error
# - timeout reading host:port from worker STDOUT
# - host:port not found in worker STDOUT in the first 1000 lines

struct ErrorSimulator <: ClusterManager
    mode
end

function Base.launch(manager::ErrorSimulator, params::Dict, launched::Array, c::Condition)
    exename = params[:exename]
    dir = params[:dir]

    cmd = `$(Base.julia_cmd(exename)) --startup-file=no`
    if manager.mode == :timeout
        cmd = `$cmd -e "sleep(10)"`
    elseif manager.mode == :ntries
        cmd = `$cmd -e "[println(x) for x in 1:1001]"`
    elseif manager.mode == :exit
        cmd = `$cmd -e "exit(-1)"`
    else
        error("Unknown mode")
    end
    io, pobj = open(pipeline(detach(setenv(cmd, dir=dir)); stderr=STDERR), "r")

    wconfig = WorkerConfig()
    wconfig.process = pobj
    wconfig.io = io
    push!(launched, wconfig)
    notify(c)
end

testruns = Any[]

if DoFullTest
    append!(testruns, [(()->addprocs(["errorhost20372"]), "Unable to read host:port string from worker. Launch command exited with error?", ())])
end

append!(testruns, [
    (()->addprocs(ErrorSimulator(:exit)), "Unable to read host:port string from worker. Launch command exited with error?", ()),
    (()->addprocs(ErrorSimulator(:ntries)), "Unexpected output from worker launch command. Host:port string not found.", ()),
    (()->addprocs(ErrorSimulator(:timeout)), "Timed out waiting to read host:port string from worker.", ("JULIA_WORKER_TIMEOUT"=>"1",))
])

for (addp_testf, expected_errstr, env) in testruns
    try
        withenv(env...) do
            addp_testf()
        end
        error("Unexpected")
    catch ex
        @test isa(ex, CompositeException)
        @test ex.exceptions[1].ex.msg == expected_errstr
    end
end


# Auto serialization of globals from Main.
# bitstypes
global v1 = 1
@test remotecall_fetch(()->v1, id_other) == v1
@test remotecall_fetch(()->isdefined(Main, :v1), id_other)
for i in 2:5
    global v1 = i
    @test remotecall_fetch(()->v1, id_other) == i
end

# non-bitstypes
global v2 = zeros(10)
for i in 1:5
    v2[i] = i
    @test remotecall_fetch(()->v2, id_other) == v2
end

# Test that a global is not being repeatedly serialized when
# a) referenced multiple times in the closure
# b) hash value has not changed.

@everywhere begin
    global testsercnt_d = Dict()
    mutable struct TestSerCnt
        v
    end
    import Base.hash, Base.==
    hash(x::TestSerCnt, h::UInt) = hash(hash(x.v), h)
    ==(x1::TestSerCnt, x2::TestSerCnt) = (x1.v == x2.v)

    function Base.serialize(s::AbstractSerializer, t::TestSerCnt)
        Base.Serializer.serialize_type(s, TestSerCnt)
        serialize(s, t.v)
        global testsercnt_d
        cnt = get!(testsercnt_d, object_id(t), 0)
        testsercnt_d[object_id(t)] = cnt+1
    end

    Base.deserialize(s::AbstractSerializer, ::Type{TestSerCnt}) = TestSerCnt(deserialize(s))
end

# hash value of tsc is not changed
global tsc = TestSerCnt(zeros(10))
for i in 1:5
    remotecall_fetch(()->tsc, id_other)
end
# should have been serialized only once
@test testsercnt_d[object_id(tsc)] == 1

# hash values are changed
n=5
testsercnt_d[object_id(tsc)] = 0
for i in 1:n
    tsc.v[i] = i
    remotecall_fetch(()->tsc, id_other)
end
# should have been serialized as many times as the loop
@test testsercnt_d[object_id(tsc)] == n

# Multiple references in a closure should be serialized only once.
global mrefs = TestSerCnt(ones(10))
@test remotecall_fetch(()->(mrefs.v, 2*mrefs.v, 3*mrefs.v), id_other) == (ones(10), 2*ones(10), 3*ones(10))
@test testsercnt_d[object_id(mrefs)] == 1


# nested anon functions
global f1 = x->x
global f2 = x->f1(x)
v = rand()
@test remotecall_fetch(f2, id_other, v) == v
@test remotecall_fetch(x->f2(x), id_other, v) == v

# consts
const c1 = ones(10)
@test remotecall_fetch(()->c1, id_other) == c1
@test remotecall_fetch(()->isconst(Main, :c1), id_other)

# Test same calls with local vars
function wrapped_var_ser_tests()
    # bitstypes
    local lv1 = 1
    @test remotecall_fetch(()->lv1, id_other) == lv1
    @test !remotecall_fetch(()->isdefined(Main, :lv1), id_other)
    for i in 2:5
        lv1 = i
        @test remotecall_fetch(()->lv1, id_other) == i
    end

    # non-bitstypes
    local lv2 = zeros(10)
    for i in 1:5
        lv2[i] = i
        @test remotecall_fetch(()->lv2, id_other) == lv2
    end

    # nested anon functions
    local lf1 = x->x
    local lf2 = x->lf1(x)
    v = rand()
    @test remotecall_fetch(lf2, id_other, v) == v
    @test remotecall_fetch(x->lf2(x), id_other, v) == v
end

wrapped_var_ser_tests()

# Test internal data structures being cleaned up upon gc.
global ids_cleanup = ones(6)
global ids_func = ()->ids_cleanup

clust_ser = (Base.Distributed.worker_from_id(id_other)).w_serializer
@test remotecall_fetch(ids_func, id_other) == ids_cleanup

@test haskey(clust_ser.glbs_sent, object_id(ids_cleanup))
finalize(ids_cleanup)
@test !haskey(clust_ser.glbs_sent, object_id(ids_cleanup))

# TODO Add test for cleanup from `clust_ser.glbs_in_tnobj`

# reported github issues - Mostly tests with globals and various distributed macros
#2669, #5390
v2669=10
@test fetch(@spawn (1+v2669)) == 11

#12367
refs = []
if true
    n = 10
    for p in procs()
        push!(refs, @spawnat p begin
            @sync for i in 1:n
                nothing
            end
        end)
    end
end
foreach(wait, refs)

#14399
s = convert(SharedArray, [1,2,3,4])
@test pmap(i->length(s), 1:2) == [4,4]

#6760
if true
    a = 2
    x = @parallel (vcat) for k=1:2
        sin(a)
    end
end
@test x == map(_->sin(2), 1:2)

# Testing clear!
function setup_syms(n, pids)
    syms = []
    for i in 1:n
        symstr = string("clrtest", randstring())
        sym = Symbol(symstr)
        eval(:(global $sym = rand()))
        for p in pids
            eval(:(@test $sym == remotecall_fetch(()->$sym, $p)))
            eval(:(@test remotecall_fetch(isdefined, $p, Symbol($symstr))))
        end
        push!(syms, sym)
    end
    syms
end

function test_clear(syms, pids)
    for p in pids
        for sym in syms
            remote_val = remotecall_fetch(()->getfield(Main, sym), p)
            @test remote_val === nothing
            @test remote_val != getfield(Main, sym)
        end
    end
end

syms = setup_syms(1, [id_other])
clear!(syms[1], id_other)
test_clear(syms, [id_other])

syms = setup_syms(1, workers())
clear!(syms[1], workers())
test_clear(syms, workers())

syms = setup_syms(3, [id_other])
clear!(syms, id_other)
test_clear(syms, [id_other])

syms = setup_syms(3, workers())
clear!(syms, workers())
test_clear(syms, workers())

# Test partial recovery from a deserialization error in CapturedException
try
    expr = quote
                mutable struct DontExistOn1
                    x
                end
                throw(BoundsError(DontExistOn1(1), 1))
           end

    remotecall_fetch(()->eval(expr), id_other)
    error("unexpected")
catch ex
    @test isa(ex.captured.ex.exceptions[1].ex, ErrorException)
    @test contains(ex.captured.ex.exceptions[1].ex.msg, "BoundsError")
    @test ex.captured.ex.exceptions[2].ex == UndefVarError(:DontExistOn1)
end
back to top