Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • 57c9476
  • /
  • R
  • /
  • doParallel.R
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge
swh:1:cnt:a2b0a88b367688a07fa05fc2e9970e03756cc3fc
directory badge
swh:1:dir:14bb01b47550b273fbc62614923edb351e7aa34e

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
doParallel.R
#
# Copyright (c) 2008-2010, Revolution Analytics
#
# This program is free software; you can redistribute it and/or modify 
# it under the terms of the GNU General Public License (version 2) as 
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
# General Public License for more details.
#
# A copy of the GNU General Public License is available at
# http://www.r-project.org/Licenses/
#

.options <- new.env(parent=emptyenv())
.revoDoParCluster <- NULL

# this explicitly registers a multicore parallel backend
registerDoParallel <- function(cl, cores=NULL, ...) {
  opts <- list(...)
  optnames <- names(opts)
  if (is.null(optnames))
    optnames <- rep('', length(opts))

  # filter out unnamed arguments with a warning
  unnamed <- ! nzchar(optnames)
  if (any(unnamed)) {
    warning('ignoring doParallel package option(s) specified with unnamed argument')
    opts <- opts[!unnamed]
    optnames <- optnames[!unnamed]
  }

  # filter out unrecognized options with a warning
  recog <- optnames %in% c('nocompile')
  if (any(!recog)) {
    warning(sprintf('ignoring unrecognized doParallel package option(s): %s',
	  			  paste(optnames[!recog], collapse=', ')), call.=FALSE)
    opts <- opts[recog]
    optnames <- optnames[recog]
  }

  # clear .options in case registerDoParallel is called multiple times
  old.optnames <- ls(.options, all.names=TRUE)
  rm(list=old.optnames, pos=.options)

  # set new options
  for (i in seq(along=opts)) {
    assign(optnames[i], opts[[i]], pos=.options)
  }

  if (missing(cl) || is.numeric(cl)) {
    if (.Platform$OS.type == "windows") {
	  if (!missing(cl) && is.numeric(cl)) {
            cl <- makeCluster(cl)
	  } else {
        if (!missing(cores) && is.numeric(cores)){
            cl <- makeCluster(cores)
        } else {
            cl <- makeCluster(3)
        }
	  }
	  assign(".revoDoParCluster", cl, pos=.options)
	  setDoPar(doParallelSNOW, cl, snowinfo)
    } else {
      if (!missing(cl) && is.numeric(cl)) {
        cores <- cl
      }
      # register multicore backend
      setDoPar(doParallelMC, cores, mcinfo) 
    }
  } else {
    setDoPar(doParallelSNOW, cl, snowinfo)
  }
}

"stopImplicitCluster" <- function()
{
    if (exists(".revoDoParCluster", where=.options) && !is.null(.revoDoParCluster)) {
        stopCluster(.revoDoParCluster)
        remove(".revoDoParCluster", where=.options)
    }
}

# internal function that determines the number of workers to use
workers <- function(data) {
  if ("cluster" %in% class(data)) {
    length(data)
  } else {
    cores <- data
    if (!is.null(cores)) {
      # use the number specified when registering doMC
      cores
    } else {
      cores <- getOption('cores')
      if (!is.null(cores)) {
        # use the number specified via the 'cores' option
        cores
      } else {
        # use 1/2 the number detected by parallel 
		cores <- parallel::detectCores()
		if (cores > 2) {
		  cores <- ceiling(cores/2)
		}
		cores
	  }
	}
  }
}

# passed to setDoPar via registerDoParallel, and called by getDoParWorkers, etc
mcinfo <- function(data, item) {
  switch(item,
         workers=workers(data),
         name='doParallelMC',
         version=packageDescription('doParallel', fields='Version'),
         NULL)
}

# passed to setDoPar via registerDoParallel, and called by getDoParWorkers, etc
snowinfo <- function(data, item) {
  switch(item,
         workers=workers(data),
         name='doParallelSNOW',
         version=packageDescription('doParallel', fields='Version'),
         NULL)
}

comp <- if (getRversion() < "2.13.0") {
  function(expr, ...) expr
} else {
  function(expr, ...) {
    if (isTRUE(.options$nocompile))
      expr
    else
      compiler::compile(expr, ...)
  }
}


parSpl <- try(parallel::splitList, silent=TRUE)
## Use the "splitList" function from parallel if it's exported
## Otherwise, use the definition it had in R 3.0.2.
"splitList" <- if (inherits(parSpl, "try-error")) {
    function (x, ncl) 
	lapply(splitIndices(length(x), ncl), function(i) x[i])
} else {
	parSpl
}

doParallelMC <- function(obj, expr, envir, data) {
  # set the default mclapply options
  preschedule <- TRUE
  set.seed <- TRUE
  silent <- FALSE
  cores <- workers(data)

  if (!inherits(obj, 'foreach'))
    stop('obj must be a foreach object')

  it <- iter(obj)
  argsList <- as.list(it)
  accumulator <- makeAccum(it)

  # make sure all of the necessary libraries have been loaded
  for (p in obj$packages)
    library(p, character.only=TRUE)

  # check for multicore-specific options
  options <- obj$options$multicore
  if (!is.null(options)) {
    nms <- names(options)
    recog <- nms %in% c('preschedule', 'set.seed', 'silent', 'cores')
    if (any(!recog))
      warning(sprintf('ignoring unrecognized multicore option(s): %s',
                      paste(nms[!recog], collapse=', ')), call.=FALSE)

    if (!is.null(options$preschedule)) {
      if (!is.logical(options$preschedule) || length(options$preschedule) != 1) {
        warning('preschedule must be logical value', call.=FALSE)
      } else {
        if (obj$verbose)
          cat(sprintf('setting mc.preschedule option to %d\n', options$preschedule))
        preschedule <- options$preschedule
      }
    }

    if (!is.null(options$set.seed)) {
      if (!is.logical(options$set.seed) || length(options$set.seed) != 1) {
        warning('set.seed must be logical value', call.=FALSE)
      } else {
        if (obj$verbose)
          cat(sprintf('setting mc.set.seed option to %d\n', options$set.seed))
        set.seed <- options$set.seed
      }
    }

    if (!is.null(options$silent)) {
      if (!is.logical(options$silent) || length(options$silent) != 1) {
        warning('silent must be logical value', call.=FALSE)
      } else {
        if (obj$verbose)
          cat(sprintf('setting mc.silent option to %d\n', options$silent))
        silent <- options$silent
      }
    }

    if (!is.null(options$cores)) {
      if (!is.numeric(options$cores) || length(options$cores) != 1 ||
          options$cores < 1) {
        warning('cores must be numeric value >= 1', call.=FALSE)
      } else {
        if (obj$verbose)
          cat(sprintf('setting mc.cores option to %d\n', options$cores))
        cores <- options$cores
      }
    }
  }

  # define the "worker" function, compiling expr if possible
  c.expr <- comp(expr, env=envir, options=list(suppressUndefined=TRUE))
  FUN <- function(args) tryCatch(eval(c.expr, envir=args, enclos=envir),
                                 error=function(e) e)

  # execute the tasks
  results <- mclapply(argsList, FUN, mc.preschedule=preschedule,
                      mc.set.seed=set.seed, mc.silent=silent,
                      mc.cores=cores)

  # call the accumulator with all of the results
  tryCatch(accumulator(results, seq(along=results)), error=function(e) {
    cat('error calling combine function:\n')
    print(e)
    NULL
  })

  # check for errors
  errorValue <- getErrorValue(it)
  errorIndex <- getErrorIndex(it)

  # throw an error or return the combined results
  if (identical(obj$errorHandling, 'stop') && !is.null(errorValue)) {
    msg <- sprintf('task %d failed - "%s"', errorIndex,
                   conditionMessage(errorValue))
    stop(simpleError(msg, call=expr))
  } else {
    getResult(it)
  }
}

makeDotsEnv <- function(...) {
  list(...)
  function() NULL
}

.doSnowGlobals <- new.env(parent=emptyenv())

getparentenv <- function(pkgname) {
  parenv <- NULL

  # if anything goes wrong, print the error object and return
  # the global environment
  tryCatch({
    # pkgname is NULL in many cases, as when the foreach loop
    # is executed interactively or in an R script
    if (is.character(pkgname)) {
      # load the specified package
      if (require(pkgname, character.only=TRUE)) {
        # search for any function in the package
        pkgenv <- as.environment(paste0('package:', pkgname))
        for (sym in ls(pkgenv)) {
          fun <- get(sym, pkgenv, inherits=FALSE)
          if (is.function(fun)) {
            env <- environment(fun)
            if (is.environment(env)) {
              parenv <- env
              break
            }
          }
        }
        if (is.null(parenv)) {
          stop('loaded ', pkgname, ', but parent search failed', call.=FALSE)
        } else {
          message('loaded ', pkgname, ' and set parent environment')
        }
      }
    }
  },
  error=function(e) {
    cat(sprintf('Error getting parent environment: %s\n',
                conditionMessage(e)))
  })

  # return the global environment by default
  if (is.null(parenv)) globalenv() else parenv
}

workerInit <- function(expr, exportenv, pkgname, packages, attach=FALSE) {
  assign('expr', expr, .doSnowGlobals)
  assign('exportenv', exportenv, .doSnowGlobals)
  exportEnv <- .doSnowGlobals$exportenv
  parent.env(exportEnv) <- getparentenv(pkgname)
  if (attach) {
    attach(exportEnv)
  }
  
  tryCatch({
    for (p in packages)
      library(p, character.only=TRUE)

    NULL  # indicates success
  },
  error=function(e) {
    # a character string indicates an error
    conditionMessage(e)
  })
}
workerCleanup <- function() {
	if ("exportEnv" %in% search()) {
		detach(exportEnv)
	}
}

evalWrapper <- function(args) {
  lapply(names(args), function(n) assign(n, args[[n]], pos=.doSnowGlobals$exportenv))
  tryCatch(eval(.doSnowGlobals$expr, envir=.doSnowGlobals$exportenv), error=function(e) e)
}

# This function takes the place of workerInit and evalWrapper when
# preschedule is enabled.  It is executed by the master via clusterApply
# such that there is a single chunked task for each worker in the
# cluster, rather than using clusterCall to initialize the workers and
# clusterApplyLB to compute the tasks one-by-one.  This strategy can be
# significantly more efficient when there are many small tasks, and is
# very similar to the default behavior of mclapply.
workerPreschedule <- function(largs, expr, exportenv, pkgname, packages) {
  parent.env(exportenv) <- getparentenv(pkgname)
  task <- function(args) {
    lapply(names(args), function(n) assign(n, args[[n]], pos=exportenv))
    eval(expr, envir=exportenv)
  }

  tryCatch({
    # load all necessary packages
    for (p in packages)
      library(p, character.only=TRUE)

    # execute all of the tasks
    lapply(largs, task)
  },
  error=function(e) {
    # only one exception was thrown, but we don't know which one,
    # so we'll return it for all of the tasks
    lapply(seq_along(largs), function(i) e)
  })
}

doParallelSNOW <- function(obj, expr, envir, data) {
  cl <- data
  preschedule <- FALSE
  attachExportEnv <- FALSE

  if (!inherits(obj, 'foreach'))
    stop('obj must be a foreach object')

  it <- iter(obj)
  accumulator <- makeAccum(it)

  # check for snow-specific options
  options <- obj$options$snow
  if (!is.null(options)) {
    nms <- names(options)
    recog <- nms %in% c('preschedule', 'attachExportEnv')
    if (any(!recog))
      warning(sprintf('ignoring unrecognized snow option(s): %s',
                      paste(nms[!recog], collapse=', ')), call.=FALSE)

    if (!is.null(options$preschedule)) {
      if (!is.logical(options$preschedule) ||
          length(options$preschedule) != 1) {
        warning('preschedule must be logical value', call.=FALSE)
      } else {
        if (obj$verbose)
          cat(sprintf('bundling all tasks into %d chunks\n', length(cl)))
        preschedule <- options$preschedule
      }
    }
    if (!is.null(options$attachExportEnv)) {
      if (!is.logical(options$attachExportEnv) ||
          length(options$attachExportEnv) != 1) {
        warning('attachExportEnv must be logical value', call.=FALSE)
      } else {
        if (obj$verbose)
          cat("attaching export environment\n")
        attachExportEnv <- options$attachExportEnv
      }
    }
  }

  # setup the parent environment by first attempting to create an environment
  # that has '...' defined in it with the appropriate values
  exportenv <- tryCatch({
    qargs <- quote(list(...))
    args <- eval(qargs, envir)
    environment(do.call(makeDotsEnv, args))
  },
  error=function(e) {
    new.env(parent=emptyenv())
  })
  noexport <- union(obj$noexport, obj$argnames)
  getexports(expr, exportenv, envir, bad=noexport)
  vars <- ls(exportenv)
  if (obj$verbose) {
    if (length(vars) > 0) {
      cat('automatically exporting the following variables',
          'from the local environment:\n')
      cat(' ', paste(vars, collapse=', '), '\n')
    } else {
      cat('no variables are automatically exported\n')
    }
  }

  # compute list of variables to export
  export <- unique(obj$export)
  ignore <- intersect(export, vars)
  if (length(ignore) > 0) {
    warning(sprintf('already exporting variable(s): %s',
            paste(ignore, collapse=', ')))
    export <- setdiff(export, ignore)
  }

  # add explicitly exported variables to exportenv
  if (length(export) > 0) {
    if (obj$verbose)
      cat(sprintf('explicitly exporting variables(s): %s\n',
                  paste(export, collapse=', ')))

    for (sym in export) {
      if (!exists(sym, envir, inherits=TRUE))
        stop(sprintf('unable to find variable "%s"', sym))
      val <- get(sym, envir, inherits=TRUE)
      if (is.function(val) &&
          (identical(environment(val), .GlobalEnv) ||
           identical(environment(val), envir))) {
        # Changing this function's environment to exportenv allows it to
        # access/execute any other functions defined in exportenv.  This
        # has always been done for auto-exported functions, and not
        # doing so for explicitly exported functions results in
        # functions defined in exportenv that can't call each other.
        environment(val) <- exportenv
      }
      assign(sym, val, pos=exportenv, inherits=FALSE)
    }
  }

  # send exports to workers
  c.expr <- comp(expr, env=envir, options=list(suppressUndefined=TRUE))
  
   # packageName function added in R 3.0.0
   pkgname <- if (exists('packageName', mode='function'))
     packageName(envir)
   else
     NULL

  if (! preschedule) {
    # send exports to workers
    r <- clusterCall(cl, workerInit, c.expr, exportenv, pkgname, 
                     obj$packages, attachExportEnv)
    for (emsg in r) {
      if (!is.null(emsg))
        stop('worker initialization failed: ', emsg)
    }

    # execute the tasks
    argsList <- as.list(it)
    results <- clusterApplyLB(cl, argsList, evalWrapper)
	
	# clean up the workers
	if (attachExportEnv){
	  clusterCall(cl, workerCleanup)
	}
  } else {
    # convert argument iterator into a list of lists
    argsList <- splitList(as.list(it), length(cl))

    # execute the tasks
    results <- do.call(c, clusterApply(cl, argsList, workerPreschedule,
                                       c.expr, exportenv, pkgname, 
                                       obj$packages))
  }


  # call the accumulator with all of the results
  tryCatch(accumulator(results, seq(along=results)), error=function(e) {
    cat('error calling combine function:\n')
    print(e)
  })

  # check for errors
  errorValue <- getErrorValue(it)
  errorIndex <- getErrorIndex(it)

  # throw an error or return the combined results
  if (identical(obj$errorHandling, 'stop') && !is.null(errorValue)) {
    msg <- sprintf('task %d failed - "%s"', errorIndex,
                   conditionMessage(errorValue))
    stop(simpleError(msg, call=expr))
  } else {
    getResult(it)
  }
}

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API