https://github.com/cran/multicore
Tip revision: 3ea66bd3ecb5f9467b3db36480ee97c06fc001e4 authored by Simon Urbanek on 08 August 1977, 00:00:00 UTC
version 0.1-7
version 0.1-7
Tip revision: 3ea66bd
mclapply.R
mclapply <- function(X, FUN, ..., mc.preschedule=TRUE, mc.set.seed=TRUE, mc.silent=FALSE, mc.cores=getOption("cores"), mc.cleanup=TRUE) {
env <- parent.frame()
cores <- mc.cores
if (is.null(cores)) cores <- volatile$detectedCores
cores <- as.integer(cores)
jobs <- list()
cleanup <- function() {
# kill children if cleanup is requested
if (length(jobs) && mc.cleanup) {
## first take care of uncollected children
collect(children(jobs), FALSE)
kill(children(jobs), if (is.integer(mc.cleanup)) mc.cleanup else SIGTERM)
collect(children(jobs))
}
if (length(jobs)) {
## just in case there are zombies
collect(children(jobs), FALSE)
}
}
on.exit(cleanup())
if (!mc.preschedule) { # sequential (non-scheduled)
FUN <- match.fun(FUN)
if (length(X) <= cores) { # all-out, we can use one-shot parallel
jobs <- lapply(seq(X), function(i) parallel(FUN(X[[i]], ...), name=names(X)[i], mc.set.seed=mc.set.seed, silent=mc.silent))
res <- collect(jobs)
if (length(res) == length(X)) names(res) <- names(X)
has.errors <- sum(sapply(res, inherits, "try-error"))
if (has.errors) warning(has.errors, "of all function calls resulted in an error")
return(res)
} else { # more complicated, we have to wait for jobs selectively
sx <- seq(X)
res <- .Call(create_list, length(sx))
names(res) <- names(X)
ent <- rep(FALSE, length(X)) # values entered (scheduled)
fin <- rep(FALSE, length(X)) # values finished
jobid <- 1:cores
jobs <- lapply(jobid, function(i) parallel(FUN(X[[i]], ...), mc.set.seed=mc.set.seed, silent=mc.silent))
jobsp <- processID(jobs)
ent[jobid] <- TRUE
has.errors <- 0L
while (!all(fin)) {
s <- selectChildren(jobs, 0.5)
if (is.null(s)) break # no children -> no hope
if (is.integer(s)) for (ch in s) {
ji <- which(jobsp == ch)[1]
ci <- jobid[ji]
r <- readChild(ch)
if (is.raw(r)) {
child.res <- unserialize(r)
if (inherits(child.res, "try-error")) has.errors <- has.errors + 1L
# we can't jsut assign it since a NULL assignment would remove it from the list
if (!is.null(child.res)) res[[ci]] <- child.res
} else {
fin[ci] <- TRUE
# cat("fin: "); print(fin)
# cat("res: "); print(unlist(lapply(res, is.null)))
if (!all(ent)) { # still something to do, spawn a new job
nexti <- which(!ent)[1]
jobid[ji] <- nexti
jobs[[ji]] <- parallel(FUN(X[[nexti]], ...), mc.set.seed=mc.set.seed, silent=mc.silent)
jobsp[ji] <- processID(jobs[[ji]])
ent[nexti] <- TRUE
}
}
}
}
if (has.errors) warning(has.errors, "of all function calls resulted in an error")
return(res)
}
}
if (length(X) < cores) cores <- length(X)
if (cores < 2) return(lapply(X, FUN, ...))
sindex <- lapply(1:cores, function(i) seq(i,length(X), by=cores))
schedule <- lapply(1:cores, function(i) X[seq(i,length(X), by=cores)])
ch <- list()
res <- .Call(create_list, length(X))
names(res) <- names(X)
cp <- rep(0L, cores)
fin <- rep(FALSE, cores)
dr <- rep(FALSE, cores)
inner.do <- function(core) {
S <- schedule[[core]]
f <- fork()
if (inherits(f, "masterProcess")) { # child process
on.exit(exit(1,structure("fatal error in wrapper code",class="try-error")))
if (isTRUE(mc.set.seed)) set.seed(Sys.getpid())
if (isTRUE(mc.silent)) closeStdout()
sendMaster(try(lapply(S, FUN, ...), silent=TRUE))
exit(0)
}
jobs[[core]] <<- ch[[core]] <<- f
cp[core] <<- f$pid
NULL
}
job.res <- lapply(1:cores, inner.do)
ac <- cp[cp > 0]
has.errors <- integer(0)
while (!all(fin)) {
s <- selectChildren(ac, 1)
if (is.null(s)) break # no children -> no hope we get anything
if (is.integer(s)) for (ch in s) {
a <- readChild(ch)
if (is.integer(a)) {
core <- which(cp == a)
fin[core] <- TRUE
} else if (is.raw(a)) {
core <- which(cp == attr(a, "pid"))
ijr <- unserialize(a)
if (inherits(ijr, "try-error")) {
has.errors <- c(has.errors, core)
ijr <- rep(list(ijr), length(schedule[[core]]))
}
job.res[[core]] <- ijr
dr[core] <- TRUE
}
}
}
for (i in 1:cores) res[sindex[[i]]] <- job.res[[i]]
if (length(has.errors)) {
if (length(has.errors) == cores)
warning("all scheduled cores encountered errors in user code")
else if (length(has.errors) == 1L)
warning("scheduled core ", has.errors, " encountered error in user code, all values of the job will be affected")
else
warning("scheduled cores ",paste(has.errors, sep=", "), " encountered errors in user code, all values of the jobs will be affected")
}
res
}
#mcapply(1:4, function(i) i+1)