Revision c344a8c951aaa948d86121f2f37288012f37e67b authored by Johannes Blaschke on 05 November 2023, 01:41:23 UTC, committed by Johannes Blaschke on 05 November 2023, 01:41:23 UTC
1 parent c72875d
pointtopoint.jl
"""
Send(buf, comm::Comm; dest::Integer, tag::Integer=0)
Perform a blocking send from the buffer `buf` to MPI rank `dest` of communicator
`comm` using the message tag `tag`.
Send(obj, comm::Comm; dest::Integer, tag::Integer=0)
Complete a blocking send of an `isbits` object `obj` to MPI rank `dest` of
communicator `comm` using with the message tag `tag`.
# External links
$(_doc_external("MPI_Send"))
"""
Send(data, comm::Comm; dest::Integer, tag::Integer=Cint(0)) =
Send(data, dest, tag, comm)
function Send(buf::Buffer, dest::Integer, tag::Integer, comm::Comm)
# int MPI_Send(const void* buf, int count, MPI_Datatype datatype, int dest,
# int tag, MPI_Comm comm)
API.MPI_Send(buf.data, buf.count, buf.datatype, dest, tag, comm)
return nothing
end
Send(arr::Union{Ref,AbstractArray}, dest::Integer, tag::Integer, comm::Comm) =
Send(Buffer(arr), dest, tag, comm)
function Send(obj::T, dest::Integer, tag::Integer, comm::Comm) where T
buf = Ref{T}(obj)
Send(buf, dest, tag, comm)
end
"""
send(obj, comm::Comm; dest::Integer, tag::Integer=0)
Complete a blocking send using a serialized version of `obj` to MPI rank
`dest` of communicator `comm` using with the message tag `tag`.
"""
send(obj, comm::Comm; dest::Integer, tag::Integer=0) =
send(obj, dest, tag, comm)
function send(obj, dest::Integer, tag::Integer, comm::Comm)
buf = MPI.serialize(obj)
Send(buf, dest, tag, comm)
end
"""
Isend(data, comm::Comm[, req::AbstractRequest = Request()]; dest::Integer, tag::Integer=0)
Starts a nonblocking send of `data` to MPI rank `dest` of communicator `comm` using with
the message tag `tag`.
`data` can be a [`Buffer`](@ref), or any object for which [`Buffer_send`](@ref) is defined.
Returns the [`AbstractRequest`](@ref) object for the nonblocking send.
# External links
$(_doc_external("MPI_Isend"))
"""
Isend(data, comm::Comm, req::AbstractRequest=Request(); dest::Integer, tag::Integer=0) =
Isend(data, dest, tag, comm, req)
function Isend(buf::Buffer, dest::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
# int MPI_Isend(const void* buf, int count, MPI_Datatype datatype, int dest,
# int tag, MPI_Comm comm, MPI_Request *request)
API.MPI_Isend(buf.data, buf.count, buf.datatype, dest, tag, comm, req)
setbuffer!(req, buf)
return req
end
Isend(data, dest::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request()) =
Isend(Buffer_send(data), dest, tag, comm, req)
"""
isend(obj, comm::Comm[, req::AbstractRequest = Request()]; dest::Integer, tag::Integer=0)
Starts a nonblocking send of using a serialized version of `obj` to MPI rank
`dest` of communicator `comm` using with the message tag `tag`.
Returns the commication `Request` for the nonblocking send.
"""
isend(data, comm::Comm, req::AbstractRequest = Request(); dest::Integer, tag::Integer=0) =
isend(data, dest, tag, comm, req)
function isend(obj, dest::Integer, tag::Integer, comm::Comm, req::AbstractRequest = Request())
buf = MPI.serialize(obj)
Isend(buf, dest, tag, comm, req)
end
"""
data = Recv!(recvbuf, comm::Comm;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
data, status = Recv!(recvbuf, comm::Comm, MPI.Status;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
Completes a blocking receive into the buffer `recvbuf` from MPI rank `source` of communicator
`comm` using with the message tag `tag`.
`recvbuf` can be a [`Buffer`](@ref), or any object for which `Buffer(recvbuf)` is defined.
Optionally returns the [`Status`](@ref) object of the receive.
# See also
- [`Recv`](@ref)
- [`recv`](@ref)
# External links
$(_doc_external("MPI_Recv"))
"""
Recv!(recvbuf, comm::Comm, status=nothing; source=API.MPI_ANY_SOURCE[], tag=API.MPI_ANY_TAG[]) =
Recv!(recvbuf, source, tag, comm, status)
function Recv!(recvbuf::Buffer, source::Integer, tag::Integer, comm::Comm, status::Union{Ref{Status},Nothing})
# int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source,
# int tag, MPI_Comm comm, MPI_Status *status)
API.MPI_Recv(recvbuf.data, recvbuf.count, recvbuf.datatype, source, tag, comm, something(status, API.MPI_STATUS_IGNORE[]))
return recvbuf.data
end
Recv!(recvbuf, source::Integer, tag::Integer, comm::Comm, status::Union{Ref{Status},Nothing}) =
Recv!(Buffer(recvbuf), source, tag, comm, status)
function Recv!(recvbuf, source::Integer, tag::Integer, comm::Comm, ::Type{Status})
status = Ref(STATUS_ZERO)
data = Recv!(recvbuf, source, tag, comm, status)
return data, status[]
end
"""
data = Recv(::Type{T}, comm::Comm;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
data, status = Recv(::Type{T}, comm::Comm, MPI.Status;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
Completes a blocking receive of a single `isbits` object of type `T` from MPI rank `source` of
communicator `comm` using with the message tag `tag`.
Returns a tuple of the object of type `T` and optionally the [`Status`](@ref) of the receive.
# See also
- [`Recv!`](@ref)
- [`recv`](@ref)
# External links
$(_doc_external("MPI_Recv"))
"""
Recv(::Type{T}, comm::Comm, status=nothing; source=API.MPI_ANY_SOURCE[], tag=API.MPI_ANY_TAG[]) where {T} =
Recv(T, source, tag, comm, status)
function Recv(::Type{T}, source::Integer, tag::Integer, comm::Comm, status::Union{Ref{Status}, Nothing}) where T
data = Recv!(Ref{T}(), source, tag, comm, status)
return data[]
end
function Recv(::Type{T}, source::Integer, tag::Integer, comm::Comm, ::Type{Status}) where T
status = Ref(STATUS_ZERO)
val = Recv(T, source, tag, comm, status)
return val, status[]
end
"""
obj = recv(comm::Comm;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
obj, status = recv(comm::Comm, MPI.Status;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
Completes a blocking receive of a serialized object from MPI rank `source` of communicator
`comm` using with the message tag `tag`.
Returns the deserialized object and optionally the [`Status`](@ref) of the receive.
"""
recv(comm::Comm, status=nothing; source::Integer=API.MPI_ANY_SOURCE[], tag::Integer=API.MPI_ANY_TAG[]) =
recv(source, tag, comm, status)
function recv(source::Integer, tag::Integer, comm::Comm, status::Union{Ref{Status}, Nothing})
msg, stat = Mprobe(comm, Status; source=source, tag=tag)
count = Get_count(stat, UInt8)
buf = Array{UInt8}(undef, count)
Mrecv!(buf, msg, status)
return MPI.deserialize(buf)
end
function recv(source::Integer, tag::Integer, comm::Comm, ::Type{Status})
status = Ref(STATUS_ZERO)
val = recv(source, tag, comm, status)
return val, status[]
end
"""
req = Irecv!(recvbuf, comm::Comm[, req::AbstractRequest = Request()];
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
Starts a nonblocking receive into the buffer `data` from MPI rank `source` of communicator
`comm` using with the message tag `tag`.
`data` can be a [`Buffer`](@ref), or any object for which `Buffer(data)` is defined.
Returns the [`AbstractRequest`](@ref) object for the nonblocking receive.
# External links
$(_doc_external("MPI_Irecv"))
"""
Irecv!(recvbuf, comm::Comm, req::AbstractRequest=Request(); source::Integer=API.MPI_ANY_SOURCE[], tag::Integer=API.MPI_ANY_TAG[]) =
Irecv!(recvbuf, source, tag, comm, req)
function Irecv!(buf::Buffer, source::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
# int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source,
# int tag, MPI_Comm comm, MPI_Request *request)
API.MPI_Irecv(buf.data, buf.count, buf.datatype, source, tag, comm, req)
setbuffer!(req, buf)
return req
end
Irecv!(data, source::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request()) =
Irecv!(Buffer(data), source, tag, comm, req)
"""
data = Sendrecv!(sendbuf, recvbuf, comm;
dest::Integer, sendtag::Integer=0, source::Integer=MPI.ANY_SOURCE, recvtag::Integer=MPI.ANY_TAG)
data, status = Sendrecv!(sendbuf, recvbuf, comm, MPI.Status;
dest::Integer, sendtag::Integer=0, source::Integer=MPI.ANY_SOURCE, recvtag::Integer=MPI.ANY_TAG)
Complete a blocking send-receive operation over the MPI communicator `comm`. Send `sendbuf`
to the MPI rank `dest` using message tag `sendtag`, and receive from MPI rank `source` into the
buffer `recvbuf` using message tag `recvtag`. Return a [`Status`](@ref) object.
# External links
$(_doc_external("MPI_Sendrecv"))
"""
Sendrecv!(sendbuf, recvbuf, comm::MPI.Comm, status=nothing; dest::Integer, sendtag::Integer=0, source::Integer=API.MPI_ANY_SOURCE[], recvtag::Integer=API.MPI_ANY_TAG[]) =
Sendrecv!(sendbuf, dest, sendtag, recvbuf, source, recvtag, comm, status)
function Sendrecv!(sendbuf::Buffer, dest::Integer, sendtag::Integer,
recvbuf::Buffer, source::Integer, recvtag::Integer,
comm::Comm, status::Union{Ref{Status}, Nothing})
# int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag,
# void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag,
# MPI_Comm comm, MPI_Status *status)
API.MPI_Sendrecv(sendbuf.data, sendbuf.count, sendbuf.datatype, dest, sendtag,
recvbuf.data, recvbuf.count, recvbuf.datatype, source, recvtag,
comm, something(status, API.MPI_STATUS_IGNORE[]))
return recvbuf.data
end
Sendrecv!(sendbuf, dest::Integer, sendtag::Integer, recvbuf, source::Integer, recvtag::Integer, comm::Comm, status::Union{Ref{Status}, Nothing}) =
Sendrecv!(Buffer(sendbuf), dest, sendtag, Buffer(recvbuf), source, recvtag, comm, status)
function Sendrecv!(sendbuf, dest::Integer, sendtag::Integer, recvbuf, source::Integer, recvtag::Integer, comm::Comm, ::Type{Status})
status = Ref(STATUS_ZERO)
data = Sendrecv!(sendbuf, dest, sendtag, recvbuf, source, recvtag, comm, status)
return data, status[]
end
# persistent requests
"""
Send_init(buf, comm::MPI.Comm[, req::AbstractRequest = Request()];
dest, tag=0)
Allocate a persistent send request, returning a [`AbstractRequest`](@ref) object. Use
[`Start`](@ref) or [`Startall`](@ref) to start the communication operation, and
`free` to deallocate the request.
# External links
$(_doc_external("MPI_Send_init"))
"""
Send_init(buf, comm::Comm, req::AbstractRequest=Request(); dest::Integer, tag::Integer=0) =
Send_init(buf, dest, tag, comm, req)
function Send_init(buf::Buffer, dest::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
API.MPI_Send_init(buf.data, buf.count, buf.datatype, dest, tag, comm, req)
setbuffer!(req, buf)
return req
end
Send_init(buf, dest::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request()) =
Send_init(Buffer(buf), dest, tag, comm, req)
"""
Recv_init(buf, comm::MPI.Comm[, req::AbstractRequest = Request()];
source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
Allocate a persistent receive request, returning a [`AbstractRequest`](@ref) object. Use
[`Start`](@ref) or [`Startall`](@ref) to start the communication operation, and
`free` to deallocate the request.
# External links
$(_doc_external("MPI_Recv_init"))
"""
Recv_init(buf, comm::Comm, req::AbstractRequest=Request(); source=API.MPI_ANY_SOURCE[], tag=API.MPI_ANY_TAG[]) =
Recv_init(buf, source, tag, comm, req)
function Recv_init(buf::Buffer, source::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
API.MPI_Recv_init(buf.data, buf.count, buf.datatype, source, tag, comm, req)
setbuffer!(req, buf)
return req
end
Recv_init(buf, source::Integer, tag::Integer, comm::Comm, req::AbstractRequest=Request()) =
Recv_init(Buffer(buf), source, tag, comm, req)
"""
Start(request::AbstractRequest)
Start a persistent communication request created by [`Send_init`](@ref) or
[`Recv_init`](@ref). Call [`Wait`](@ref) to complete the request.
# External links
$(_doc_external("MPI_Start"))
"""
function Start(req::AbstractRequest)
API.MPI_Start(req)
return nothing
end
"""
Startall(reqs::AbstractVector{Request})
Start a set of persistent communication requests created by [`Send_init`](@ref)
or [`Recv_init`](@ref). Call [`Waitall`](@ref) to complete the requests.
# External links
$(_doc_external("MPI_Startall"))
"""
Startall(reqs::AbstractVector{Request}) = Startall(RequestSet(reqs))
function Startall(reqs::Union{AbstractMultiRequest, RequestSet})
API.MPI_Startall(length(reqs), reqs.vals)
update!(reqs)
return nothing
end
"""
MPI.Message
An MPI message handle object, used by matched receive operations. These are
returned by [`MPI.Mprobe`](@ref) and [`MPI.Improbe`](@ref) operations, and must
be received by either [`MPI.Mrecv!`](@ref) or [`MPI.Imrecv!`](@ref).
"""
mutable struct Message
val::MPI_Message
end
Base.unsafe_convert(::Type{Ptr{MPI_Message}}, msg::Message) = convert(Ptr{MPI_Message}, pointer_from_objref(msg))
Message() = Message(API.MPI_MESSAGE_NULL[])
isnull(msg::Message) = msg.val == API.MPI_MESSAGE_NULL[]
"""
ismsg, msg = MPI.Improbe(comm::MPI.Comm;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
ismsg, msg, status = MPI.Improbe(comm::MPI.Comm, MPI.Status;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
Matching non-blocking probe. Similar to [`MPI.Iprobe`](@ref), except that it
also returns `msg`, an [`MPI.Message`](@ref) object.
Checks if there is a message that can be received matching `source`, `tag` and
`comm`. If so, returns `ismsg = true`, and a [`Message`](@ref) objec `msg`,
which must be received by either [`MPI.Mrecv!`](@ref) or [`MPI.Imrecv!`](@ref).
Otherwise `msg` is set to be a null `Message`.
The `Status` argument additionally returns the [`Status`](@ref) of the completed
request.
# External links
$(_doc_external("MPI_Improbe"))
"""
Improbe(comm::Comm, status=nothing; source::Integer=API.MPI_ANY_SOURCE[], tag::Integer=API.MPI_ANY_TAG[]) =
Improbe(source, tag, comm, status)
function Improbe(source::Integer, tag::Integer, comm::Comm, status::Union{Ref{Status}, Nothing})
flag = Ref{Cint}()
msg = Message()
API.MPI_Improbe(source, tag, comm, flag, msg, something(status, API.MPI_STATUS_IGNORE[]))
return flag[] != 0, msg
end
function Improbe(source::Integer, tag::Integer, comm::Comm, ::Type{Status})
status = Ref(STATUS_ZERO)
ismsg, msg = Improbe(source, tag, comm, status)
return ismsg, msg, status[]
end
"""
msg = MPI.Mprobe(comm::MPI.Comm;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
msg, status = MPI.Mprobe(comm::MPI.Comm, MPI.Status;
source::Integer=MPI.ANY_SOURCE, tag::Integer=MPI.ANY_TAG)
Matching blocking probe. Similar to [`MPI.Probe`](@ref), except that it also
returns `msg`, an [`MPI.Message`](@ref) object.
Blocks until a message that can be received matching `source`, `tag` and `comm`,
returning a [`Message`](@ref) objec `msg`, which must be received by either
[`MPI.Mrecv!`](@ref) or [`MPI.Imrecv!`](@ref).
The `Status` argument additionally returns the [`Status`](@ref) of the completed
request.
# External links
$(_doc_external("MPI_Mprobe"))
"""
Mprobe(comm::Comm, status=nothing; source::Integer=API.MPI_ANY_SOURCE[], tag::Integer=API.MPI_ANY_TAG[]) =
Mprobe(source, tag, comm, status)
function Mprobe(source::Integer, tag::Integer, comm::Comm, status::Union{Ref{Status}, Nothing})
msg = Message()
API.MPI_Mprobe(source, tag, comm, msg, something(status, API.MPI_STATUS_IGNORE[]))
return msg
end
function Mprobe(source::Integer, tag::Integer, comm::Comm, ::Type{Status})
status = Ref(STATUS_ZERO)
msg = Mprobe(source, tag, comm, status)
return msg, status[]
end
"""
data = MPI.Mrecv!(recvbuf, msg::MPI.Message)
data, status = MPI.Mrecv!(recvbuf, msg::MPI.Message, MPI.Status)
Completes a blocking receive matched by a matching probe operation into the
buffer `recvbuf`, and the [`Message`](@ref) `msg`.
`recvbuf` can be a [`Buffer`](@ref), or any object for which `Buffer(recvbuf)`
is defined.
Optionally returns the [`Status`](@ref) object of the receive.
# External links
$(_doc_external("MPI_Mrecv"))
"""
function Mrecv!(recvbuf::Buffer, msg::Message, status::Union{Ref{Status},Nothing}=nothing)
API.MPI_Mrecv(recvbuf.data, recvbuf.count, recvbuf.datatype, msg, something(status, API.MPI_STATUS_IGNORE[]))
return recvbuf.data
end
Mrecv!(recvbuf, msg::Message, status::Union{Ref{Status},Nothing}=nothing) =
Mrecv!(Buffer(recvbuf), msg, status)
function Mrecv!(recvbuf,msg::Message, ::Type{Status})
status = Ref(STATUS_ZERO)
data = Mrecv!(recvbuf, msg, status)
return data, status[]
end
"""
req = MPI.Imrecv!(recvbuf, msg::MPI.Message[, req::AbstractRequest=Request()])
Starts a nonblocking receive matched by a matching probe operation into the
buffer `recvbuf`, and the [`Message`](@ref) `msg`.
`recvbuf` can be a [`Buffer`](@ref), or any object for which `Buffer(recvbuf)` is defined.
Returns `req`, an [`AbstractRequest`](@ref) object for the nonblocking receive.
# External links
$(_doc_external("MPI_Imrecv"))
"""
function Imrecv!(buf::Buffer, msg::Message, req::AbstractRequest=Request())
@assert isnull(req)
API.MPI_Imrecv(buf.data, buf.count, buf.datatype, msg, req)
setbuffer!(req, buf)
return req
end
Imrecv!(data, msg::Message, req::AbstractRequest=Request()) =
Imrecv!(Buffer(data), msg, req)
![swh spinner](/static/img/swh-spinner.gif)
Computing file changes ...