swh:1:snp:a72e953ecd624a7df6e6196bbdd05851996c5e40
Raw File
Tip revision: a6f2b7f3868705007659d8a3702d6637359609e2 authored by Shuhei Kadowaki on 31 October 2023, 16:59:36 UTC
revisit #47137: avoid round-trip of locally-cached inferred source
Tip revision: a6f2b7f
runtests.jl
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test
using Distributed
using Dates
import REPL
using Printf: @sprintf
using Base: Experimental

include("choosetests.jl")
include("testenv.jl")

(; tests, net_on, exit_on_error, use_revise, seed) = choosetests(ARGS)
tests = unique(tests)

if Sys.islinux()
    const SYS_rrcall_check_presence = 1008
    global running_under_rr() = 0 == ccall(:syscall, Int,
        (Int, Int, Int, Int, Int, Int, Int),
        SYS_rrcall_check_presence, 0, 0, 0, 0, 0, 0)
else
    global running_under_rr() = false
end

if use_revise
    using Revise
    union!(Revise.stdlib_names, Symbol.(STDLIBS))
    # Remote-eval the following to initialize Revise in workers
    const revise_init_expr = quote
        using Revise
        const STDLIBS = $STDLIBS
        union!(Revise.stdlib_names, Symbol.(STDLIBS))
        revise_trackall()
    end
end

const max_worker_rss = if haskey(ENV, "JULIA_TEST_MAXRSS_MB")
    parse(Int, ENV["JULIA_TEST_MAXRSS_MB"]) * 2^20
else
    typemax(Csize_t)
end
limited_worker_rss = max_worker_rss != typemax(Csize_t)

function test_path(test)
    t = split(test, '/')
    if t[1] in STDLIBS
        if length(t) == 2
            return joinpath(STDLIB_DIR, t[1], "test", t[2])
        else
            return joinpath(STDLIB_DIR, t[1], "test", "runtests")
        end
    else
        return joinpath(@__DIR__, test)
    end
end

# Check all test files exist
isfiles = isfile.(test_path.(tests) .* ".jl")
if !all(isfiles)
    error("did not find test files for the following tests: ",
          join(tests[.!(isfiles)], ", "))
end

const node1_tests = String[]
function move_to_node1(t)
    if t in tests
        splice!(tests, findfirst(isequal(t), tests))
        push!(node1_tests, t)
    end
    nothing
end

# Base.compilecache only works from node 1, so precompile test is handled specially
move_to_node1("ccall")
move_to_node1("precompile")
move_to_node1("SharedArrays")
move_to_node1("threads")
move_to_node1("Distributed")
move_to_node1("gc")
# Ensure things like consuming all kernel pipe memory doesn't interfere with other tests
move_to_node1("stress")

# In a constrained memory environment, run the "distributed" test after all other tests
# since it starts a lot of workers and can easily exceed the maximum memory
limited_worker_rss && move_to_node1("Distributed")

# Shuffle LinearAlgebra tests to the front, because they take a while, so we might
# as well get them all started early.
linalg_test_ids = findall(x->occursin("LinearAlgebra", x), tests)
linalg_tests = tests[linalg_test_ids]
deleteat!(tests, linalg_test_ids)
prepend!(tests, linalg_tests)

import LinearAlgebra
cd(@__DIR__) do
    # `net_on` implies that we have access to the loopback interface which is
    # necessary for Distributed multi-processing. There are some test
    # environments that do not allow access to loopback, so we must disable
    # addprocs when `net_on` is false. Note that there exist build environments,
    # including Nix, where `net_on` is false but we still have access to the
    # loopback interface. It would be great to make this check more specific to
    # identify those situations somehow. See
    #   * https://github.com/JuliaLang/julia/issues/6722
    #   * https://github.com/JuliaLang/julia/pull/29384
    #   * https://github.com/JuliaLang/julia/pull/40348
    n = 1
    JULIA_TEST_USE_MULTIPLE_WORKERS = Base.get_bool_env("JULIA_TEST_USE_MULTIPLE_WORKERS", false)
    # If the `JULIA_TEST_USE_MULTIPLE_WORKERS` environment variable is set to `true`, we use
    # multiple worker processes regardless of the value of `net_on`.
    # Otherwise, we use multiple worker processes if and only if `net_on` is true.
    if net_on || JULIA_TEST_USE_MULTIPLE_WORKERS
        n = min(Sys.CPU_THREADS, length(tests))
        n > 1 && addprocs_with_testenv(n)
        LinearAlgebra.BLAS.set_num_threads(1)
    end
    skipped = 0

    @everywhere include("testdefs.jl")

    if use_revise
        Base.invokelatest(revise_trackall)
        Distributed.remotecall_eval(Main, workers(), revise_init_expr)
    end

    println("""
        Running parallel tests with:
          nworkers() = $(nworkers())
          nthreads() = $(Threads.threadpoolsize())
          Sys.CPU_THREADS = $(Sys.CPU_THREADS)
          Sys.total_memory() = $(Base.format_bytes(Sys.total_memory()))
          Sys.free_memory() = $(Base.format_bytes(Sys.free_memory()))
        """)

    #pretty print the information about gc and mem usage
    testgroupheader = "Test"
    workerheader = "(Worker)"
    name_align    = maximum([textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader); map(x -> textwidth(x) + 3 + ndigits(nworkers()), tests)])
    elapsed_align = textwidth("Time (s)")
    gc_align      = textwidth("GC (s)")
    percent_align = textwidth("GC %")
    alloc_align   = textwidth("Alloc (MB)")
    rss_align     = textwidth("RSS (MB)")
    printstyled(testgroupheader, color=:white)
    printstyled(lpad(workerheader, name_align - textwidth(testgroupheader) + 1), " | ", color=:white)
    printstyled("Time (s) | GC (s) | GC % | Alloc (MB) | RSS (MB)\n", color=:white)
    results = []
    print_lock = stdout isa Base.LibuvStream ? stdout.lock : ReentrantLock()
    if stderr isa Base.LibuvStream
        stderr.lock = print_lock
    end

    function print_testworker_stats(test, wrkr, resp)
        @nospecialize resp
        lock(print_lock)
        try
            printstyled(test, color=:white)
            printstyled(lpad("($wrkr)", name_align - textwidth(test) + 1, " "), " | ", color=:white)
            time_str = @sprintf("%7.2f",resp[2])
            printstyled(lpad(time_str, elapsed_align, " "), " | ", color=:white)
            gc_str = @sprintf("%5.2f", resp[5].total_time / 10^9)
            printstyled(lpad(gc_str, gc_align, " "), " | ", color=:white)

            # since there may be quite a few digits in the percentage,
            # the left-padding here is less to make sure everything fits
            percent_str = @sprintf("%4.1f", 100 * resp[5].total_time / (10^9 * resp[2]))
            printstyled(lpad(percent_str, percent_align, " "), " | ", color=:white)
            alloc_str = @sprintf("%5.2f", resp[3] / 2^20)
            printstyled(lpad(alloc_str, alloc_align, " "), " | ", color=:white)
            rss_str = @sprintf("%5.2f", resp[6] / 2^20)
            printstyled(lpad(rss_str, rss_align, " "), "\n", color=:white)
        finally
            unlock(print_lock)
        end
        nothing
    end

    global print_testworker_started = (name, wrkr)->begin
        pid = running_under_rr() ? remotecall_fetch(getpid, wrkr) : 0
        at = lpad("($wrkr)", name_align - textwidth(name) + 1, " ")
        lock(print_lock)
        try
            printstyled(name, at, " |", " "^elapsed_align,
                    "started at $(now())",
                    (pid > 0 ? " on pid $pid" : ""),
                    "\n", color=:white)
        finally
            unlock(print_lock)
        end
        nothing
    end

    function print_testworker_errored(name, wrkr, @nospecialize(e))
        lock(print_lock)
        try
            printstyled(name, color=:red)
            printstyled(lpad("($wrkr)", name_align - textwidth(name) + 1, " "), " |",
                " "^elapsed_align, " failed at $(now())\n", color=:red)
            if isa(e, Test.TestSetException)
                for t in e.errors_and_fails
                    show(t)
                    println()
                end
            elseif e !== nothing
                Base.showerror(stdout, e)
            end
            println()
        finally
            unlock(print_lock)
        end
    end


    all_tests = [tests; node1_tests]

    local stdin_monitor
    all_tasks = Task[]
    o_ts_duration = 0.0
    try
        # Monitor stdin and kill this task on ^C
        # but don't do this on Windows, because it may deadlock in the kernel
        running_tests = Dict{String, DateTime}()
        if !Sys.iswindows() && isa(stdin, Base.TTY)
            t = current_task()
            stdin_monitor = @async begin
                term = REPL.Terminals.TTYTerminal("xterm", stdin, stdout, stderr)
                try
                    REPL.Terminals.raw!(term, true)
                    while true
                        c = read(term, Char)
                        if c == '\x3'
                            Base.throwto(t, InterruptException())
                            break
                        elseif c == '?'
                            println("Currently running: ")
                            tests = sort(collect(running_tests), by=x->x[2])
                            foreach(tests) do (test, date)
                                println(test, " (running for ", round(now()-date, Minute), ")")
                            end
                        end
                    end
                catch e
                    isa(e, InterruptException) || rethrow()
                finally
                    REPL.Terminals.raw!(term, false)
                end
            end
        end
        o_ts_duration = @elapsed Experimental.@sync begin
            for p in workers()
                @async begin
                    push!(all_tasks, current_task())
                    while length(tests) > 0
                        test = popfirst!(tests)
                        running_tests[test] = now()
                        wrkr = p
                        before = time()
                        resp, duration = try
                                r = remotecall_fetch(runtests, wrkr, test, test_path(test); seed=seed)
                                r, time() - before
                            catch e
                                isa(e, InterruptException) && return
                                Any[CapturedException(e, catch_backtrace())], time() - before
                            end
                        delete!(running_tests, test)
                        push!(results, (test, resp, duration))
                        if length(resp) == 1
                            print_testworker_errored(test, wrkr, exit_on_error ? nothing : resp[1])
                            if exit_on_error
                                skipped = length(tests)
                                empty!(tests)
                            elseif n > 1
                                # the worker encountered some failure, recycle it
                                # so future tests get a fresh environment
                                rmprocs(wrkr, waitfor=30)
                                p = addprocs_with_testenv(1)[1]
                                remotecall_fetch(include, p, "testdefs.jl")
                                if use_revise
                                    Distributed.remotecall_eval(Main, p, revise_init_expr)
                                end
                            end
                        else
                            print_testworker_stats(test, wrkr, resp)
                            if resp[end] > max_worker_rss
                                # the worker has reached the max-rss limit, recycle it
                                # so future tests start with a smaller working set
                                if n > 1
                                    rmprocs(wrkr, waitfor=30)
                                    p = addprocs_with_testenv(1)[1]
                                    remotecall_fetch(include, p, "testdefs.jl")
                                    if use_revise
                                        Distributed.remotecall_eval(Main, p, revise_init_expr)
                                    end
                                else # single process testing
                                    error("Halting tests. Memory limit reached : $resp > $max_worker_rss")
                                end
                            end
                       end
                    end
                    if p != 1
                        # Free up memory =)
                        rmprocs(p, waitfor=30)
                    end
                end
            end
        end

        n > 1 && length(node1_tests) > 1 && print("\nExecuting tests that run on node 1 only:\n")
        for t in node1_tests
            # As above, try to run each test
            # which must run on node 1. If
            # the test fails, catch the error,
            # and either way, append the results
            # to the overall aggregator
            isolate = true
            t == "SharedArrays" && (isolate = false)
            before = time()
            resp, duration = try
                    r = Base.invokelatest(runtests, t, test_path(t), isolate, seed=seed) # runtests is defined by the include above
                    r, time() - before
                catch e
                    isa(e, InterruptException) && rethrow()
                    Any[CapturedException(e, catch_backtrace())], time() - before
                end
            if length(resp) == 1
                print_testworker_errored(t, 1, resp[1])
            else
                print_testworker_stats(t, 1, resp)
            end
            push!(results, (t, resp, duration))
        end
    catch e
        isa(e, InterruptException) || rethrow()
        # If the test suite was merely interrupted, still print the
        # summary, which can be useful to diagnose what's going on
        foreach(task -> begin
                istaskstarted(task) || return
                istaskdone(task) && return
                try
                    schedule(task, InterruptException(); error=true)
                catch ex
                    @error "InterruptException" exception=ex,catch_backtrace()
                end
            end, all_tasks)
        foreach(wait, all_tasks)
    finally
        if @isdefined stdin_monitor
            schedule(stdin_monitor, InterruptException(); error=true)
        end
    end

    #=
`   Construct a testset on the master node which will hold results from all the
    test files run on workers and on node1. The loop goes through the results,
    inserting them as children of the overall testset if they are testsets,
    handling errors otherwise.

    Since the workers don't return information about passing/broken tests, only
    errors or failures, those Result types get passed `nothing` for their test
    expressions (and expected/received result in the case of Broken).

    If a test failed, returning a `RemoteException`, the error is displayed and
    the overall testset has a child testset inserted, with the (empty) Passes
    and Brokens from the worker and the full information about all errors and
    failures encountered running the tests. This information will be displayed
    as a summary at the end of the test run.

    If a test failed, returning an `Exception` that is not a `RemoteException`,
    it is likely the julia process running the test has encountered some kind
    of internal error, such as a segfault.  The entire testset is marked as
    Errored, and execution continues until the summary at the end of the test
    run, where the test file is printed out as the "failed expression".
    =#
    Test.TESTSET_PRINT_ENABLE[] = false
    o_ts = Test.DefaultTestSet("Overall")
    o_ts.time_end = o_ts.time_start + o_ts_duration # manually populate the timing
    Test.push_testset(o_ts)
    completed_tests = Set{String}()
    for (testname, (resp,), duration) in results
        push!(completed_tests, testname)
        if isa(resp, Test.DefaultTestSet)
            resp.time_end = resp.time_start + duration
            Test.push_testset(resp)
            Test.record(o_ts, resp)
            Test.pop_testset()
        elseif isa(resp, Test.TestSetException)
            fake = Test.DefaultTestSet(testname)
            fake.time_end = fake.time_start + duration
            for i in 1:resp.pass
                Test.record(fake, Test.Pass(:test, nothing, nothing, nothing, LineNumberNode(@__LINE__, @__FILE__)))
            end
            for i in 1:resp.broken
                Test.record(fake, Test.Broken(:test, nothing))
            end
            for t in resp.errors_and_fails
                Test.record(fake, t)
            end
            Test.push_testset(fake)
            Test.record(o_ts, fake)
            Test.pop_testset()
        else
            if !isa(resp, Exception)
                resp = ErrorException(string("Unknown result type : ", typeof(resp)))
            end
            # If this test raised an exception that is not a remote testset exception,
            # i.e. not a RemoteException capturing a TestSetException that means
            # the test runner itself had some problem, so we may have hit a segfault,
            # deserialization errors or something similar.  Record this testset as Errored.
            fake = Test.DefaultTestSet(testname)
            fake.time_end = fake.time_start + duration
            Test.record(fake, Test.Error(:nontest_error, testname, nothing, Any[(resp, [])], LineNumberNode(1)))
            Test.push_testset(fake)
            Test.record(o_ts, fake)
            Test.pop_testset()
        end
    end
    for test in all_tests
        (test in completed_tests) && continue
        fake = Test.DefaultTestSet(test)
        Test.record(fake, Test.Error(:test_interrupted, test, nothing, [("skipped", [])], LineNumberNode(1)))
        Test.push_testset(fake)
        Test.record(o_ts, fake)
        Test.pop_testset()
    end
    Test.TESTSET_PRINT_ENABLE[] = true
    println()
    # o_ts.verbose = true # set to true to show all timings when successful
    Test.print_test_results(o_ts, 1)
    if !o_ts.anynonpass
        println("    \033[32;1mSUCCESS\033[0m")
    else
        println("    \033[31;1mFAILURE\033[0m\n")
        skipped > 0 &&
            println("$skipped test", skipped > 1 ? "s were" : " was", " skipped due to failure.")
        println("The global RNG seed was 0x$(string(seed, base = 16)).\n")
        Test.print_test_errors(o_ts)
        throw(Test.FallbackTestSetException("Test run finished with errors"))
    end
end
back to top