https://github.com/cran/snowfall
Raw File
Tip revision: 0966757dabea1100ccf7bb9026adbb671933a0f1 authored by Jochen Knaus on 05 March 2010, 00:00:00 UTC
version 1.83
Tip revision: 0966757
clusterFunctions.R
##*****************************************************************************
## Functions which extend Snow usage or implement some higher level usage.
## Wrappers for Snow functions are in snowWrappers.R
##
## Functions:
##    sfLoadLib   - Load library in cluster (path conversion, flags...)
##    sfSource    - Load source (path conversion...)
##
##    sfExport    - Export local and global objects to cluster
##    sfExportAll - Export all global objects (with given exception list)
##    sfRemove    - Remove objects from nodes
##    sfRemoveAll - Remove all objects from nodes (with given excpetion list)
##
##    sfCat       - Cat something on cluster
##    sfPrint     - Print something on cluster
##*****************************************************************************

##*****************************************************************************
## Load a library depending on sequential or parallel execution.
##
## Should behave most likely the build-in library() function.
##
## Running in sequential mode: normal "library()" command.
## Running in parallel mode: the library is loaded on ALL nodes.
##
## PARAMETERS: 
## RETURN:     Logical TRUE/FALSE on success. On failure, if noStopOnError is not
##             set, stop immidiately.
##*****************************************************************************
sfLibrary <- function( package,
                       pos = 2,
                       lib.loc = NULL,
                       character.only = FALSE,
                       warn.conflicts = TRUE,
                       keep.source = getOption("keep.source.pkgs"),
                       verbose = getOption("verbose"),
                       version,
                       stopOnError = TRUE ) {
  sfCheck();
  
  ## Generate (global) names list with all parameters.
#  setVar( ".sfPars", list() )
  sfPars <- list()

  ## Help does not make sense.
##  if( !missing( help ) )
##    stop( "Help is not allowed in sfLibrary. Use 'library' instead." )
  
  if( !missing( package ) ) {
    if( character.only ) {
      if( is.character( package ) )
        sfPars$package <- package
      else
        stop( paste( "Package", package, "is no character string." ) )
    }
    else
      sfPars$package <- deparse( substitute( package ) )
  }

  ## package is now a string in any case.
  sfPars$character.only <- TRUE

  sfPars$pos            <- pos
  sfPars$lib.loc        <- lib.loc
  sfPars$warn.conflicts <- warn.conflicts
  sfPars$keep.source    <- keep.source
  sfPars$verbose        <- verbose

  ## All libraries are loaded internally with logical.return.
  sfPars$logical.return <- TRUE
  
  if( !missing( version ) )
    sfPars$version <- version

  if( sfParallel() ) {
    ## On Nodes load location with absolute path.
    if( !is.null( sfPars$lib.loc ) )
      sfPars$lib.loc <- absFilePath( sfPars$lib.loc )

    ## Export to namespace.
    setVar( ".sfPars", sfPars )

    ## Weird enough ".sfPars" need to be exorted (else it would not be found
    ## on slave, although it is a parameter)
    sfExport( ".sfPars", local=FALSE, namespace="snowfall" )

    ## Load libs using require as Exception on nodes doesn't help us here.
    ## @todo Check on correct execution via logical.return
    ## @todo Exporting of .sfPars needed?
    result <- try( sfClusterEval( do.call( "library", .sfPars ) ) )
##    result <- try( sfClusterEval( library( .sfPars ) ) )

    if( inherits( result, "try-error" ) ||
        ( length( result ) != sfCpus() ) ||
        !all( checkTryErrorAny( result ) ) ||
        !all( unlist( result ) ) ) {
      if( stopOnError )
        stop( paste( "Stop: error loading library on slave(s):",
                     .sfPars$package ) )
      else {
        warning( paste( "Error loading library on slave(s):", package ) )
        return( invisible( FALSE ) )
      }
    }
    else {
      ## Load message in slave logs.
      sfCat( paste( "Library", .sfPars$package, "loaded.\n" ) )

      ## Message in masterlog.
      message( paste( "Library", .sfPars$package, "loaded in cluster.\n" ) )
    }
  }

  result <- try( do.call( "library", sfPars ) )

  ## Remove global var from cluster (and local).
  ## Do before exception checks, as there might by a stop.
  sfRemove( ".sfPars" )

  if( inherits( result, "try-error" ) || !result ) {
    if( stopOnError ) {
      warning( paste( "Unable to load library:", package ) )
      return( invisible( FALSE ) )
    }
    else
      stop( paste( "Unable to load library:", package ) )
  }
  else {
    if( verbose )
      message( paste( "Library", package, "loaded.\n" ) )

    ## If logical return is requested here it comes.
    ## In clustermode the programm immidiately stops it a library couldn't be
    ## load on a slave. In sequentially mode it behaves like library().
    return( invisible( TRUE ) )
  }
}

##*****************************************************************************
## Include a source file.
## @todo Include complete source() parameter list.
##
## Should behave most likely the build-in source() function.
##
## Running in sequential mode: normal "source()" command (relative path)
## Running in parallel mode: the sourcefile is loaded on ALL nodes
## (abs. path and no echo).
##
## PARAMETERS: filename
## RETURN:     Logical true on success, false else. is stopOnError=TRUE => stop
##*****************************************************************************
sfSource <- function( file,
                      encoding = getOption("encoding"),
                      stopOnError = TRUE
                    ) {
  sfCheck();

  absFile <- absFilePath( file )

  if( file.exists( absFile ) ) {
    if( sfParallel() ) {
      ## Load source on all nodes (with globalized arguments)
      success <- sfClusterCall( source, file=absFile, encoding=encoding,
                                echo=FALSE, local=FALSE, chdir=FALSE )

      if( inherits( success, "try-error" ) ) {
        if( stopOnError )
          stop( paste( "Try error on cluster source call: 'source ",
                       absFile, "'", sep="" ) )
        else {
          message( paste( "Try error on cluster source call: 'source ",
                          absFile, "'", sep="" ) )
          return( FALSE )
        }
      }
      else {
        sfCat( paste( "Source", file, "loaded.\n" ) )
        message( paste( "Source", file, "loaded in cluster.\n" ) )
      }
    }

    ## Same as in sfLibrary(): include file on master as well (with
    ## original filename and echo setting).
    res <- try( source( file=absFile, encoding=encoding,
                        echo=TRUE, local=FALSE, chdir=FALSE ) )

    if( inherits( res, "try-error" ) ) {
      if( stopOnError )
        stop( paste( "Try error loading on master: '", file, "'", sep="" ) )
      else {
        message(  paste( "Try error loading on master: '", file, "'", sep="" ) )
        return( invisible( FALSE ) )
      }
    }

    return( invisible( TRUE ) )
  }
  ## File not found?
  else {
    if( stopOnError )
      stop( paste( "File does not exist:", file ) )
    else {
      message( paste( "File does not exist:", file ) )
      return( invisible( FALSE ) )
    }
  }
}

##****************************************************************************
## Export a single variable.
## On slaves, ALL exported variables are global!
## From the master, either global or local variables can be exported.
##
## PARAMETERS: ...   - Names or Variables
##             local - if TRUE, local vars will be exported
##             namespace - also exports from namespace
##             debug - with local=TRUE, prints where vars are found
##             list  - List of variable names (like snows clusterExport)
## RETURN:     -
##****************************************************************************
sfExport <- function( ..., list=NULL, local=TRUE, namespace=NULL,
                      debug=FALSE, stopOnError=TRUE ) {
  sfCheck();

  ## Export is only done if running parallel, on master all vars are visible.
  ## @TODO: Although all vars are visible, they are not surely in global env!
  ##        => export to global env.
  ##        Test if global object of this name exists. If yes: warning,
  ##        if not: assign in global space.
  if( !sfParallel() ) {
    warning( "sfExport() writes to global environment in sequential mode.\n" )
##    return( invisible( TRUE ) )
  }

  ## List of given names in dot arguments.
  names <- fetchNames( ... )

  ## If extra list is given (calling style of clusterExport()), just add this
  ## list.
  if( !is.null( list ) ) {
    ## Test from rm, see fetchNames for details.
    if( !length( list ) ||
        !all( sapply( list, function(x) is.symbol(x) || is.character(x) ) ) ) {
      if( stopOnError )
        stop( "'list' must contain names or character strings" )
      else {
        warning( "Error in sfExport: 'list' must contain names or character strings" )
        return( invisible( FALSE ) )
      }
    }

    names <- c( names, list )
  }

  for( name in names ) {
    ## Also examine namespace (from snowfall package?). Only needed for internal
    ## functions.
    if( !is.null( namespace ) && is.character( namespace ) ) {
      ## On some strange behavior, this only works with given error
      ## function. Else errors on not found objects are not caught.
      val <- tryCatch( getFromNamespace( name, namespace ),   ##, pos=-1
                       error = function(x) { NULL } )

      if( !is.null( val ) && !inherits( val, "try-error" ) ) {
        res <- sfClusterCall( assign, name, val, env = globalenv(),
                              stopOnError = FALSE )

        ## Error on export?
        if( is.null( res ) || !all( checkTryErrorAny( res ) ) ) {
          if( stopOnError )
            stop( paste( "Error exporting '", name, "': ",
                         geterrmessage(), sep="" ) )
          else {
            warning( paste( "Error exporting '", name, "': ",
                            geterrmessage(), sep="" ) )
            return( invisible(FALSE) )
          }
        }

        ## Skip local tests.
        next
      }
    }
    
    ## Check if exists before exporting.
    if( local ) {
      found <- FALSE

      # Traverse back through scopes (get() with inherit only finds
      # global presence of variables).
      # sys.nframe() at least 1 at this point, globalenv() to check last.
      for( pframe in seq( 1, sys.nframe() ) ) {
        ## Var exists in this frame?
        if( exists( name, inherits=FALSE, envir=sys.frame( -pframe ) ) ) {
          found <- TRUE

          ## If debug Messages are wanted, print these (especially for local
          ## mode with nested functions important, to locate probably
          ## overwriting variables.
          if( debug ) {
            definedIn <- gsub( "\n", "|", as.character( sys.call( -pframe ) ) )

            cat( "Export '", name, "' defined in '", definedIn, "'",
                 "\n", sep="" )
            print( get( name, envir=sys.frame( -pframe ) ) )
          }

          ## Export it.
          ## Direct call to assign is far slower as a call to a function
          ## doing it (however...)
#          res <- sfClusterCall( simpleAssign, name, 
#                                get( name,
#                                     envir=sys.frame( -pframe ) ),
#                                stopOnError = FALSE )
          ## <= 1.70
          res <- sfClusterCall( assign, name,
                                get( name,
                                     envir=sys.frame( -pframe ) ),
                                env = globalenv(),
                                stopOnError = FALSE )

          ## Error on export?
          if( is.null( res ) || !all( checkTryErrorAny( res ) ) ) {
            if( stopOnError )
              stop( paste( "Error exporting '", name, "': ",
                           geterrmessage(), sep="" ) )
            else {
              message( paste( "Error exporting '", name, "': ",
                              geterrmessage(), sep="" ) )
              return( invisible(FALSE) )
            }
          }

          break
        }
      }

      ## If variable to export is not found.
      if( !found ) {
        if( stopOnError )
          stop( paste( "Unknown/unfound variable ", name,
                       " in export. (local=", local, ")", sep="" ) )
        else {
          message( paste( "Unknown/unfound variable ", name,
                          " in export. (local=", local, ")", sep="" ) )
          return( invisible( FALSE ) )
        }
      }
    }
    ## Global export only.
    else {
      if( exists( name, inherit=FALSE, envir=globalenv() ) ) {
#        res <- sfClusterCall( simpleAssign, name, 
#                                get( name, inherit=FALSE,
#                                     envir=globalenv() ),
#                                stopOnError = FALSE )
        ## <= 1.70
        res <- sfClusterCall( assign, name,
                              get( name, inherit=FALSE, envir=globalenv() ),
                              env = globalenv(), stopOnError = FALSE  )

        if( is.null( res ) || !all( checkTryErrorAny( res ) ) ) {
          if( stopOnError )
            stop( paste( "Error exporting global '", name, "': ",
                         geterrmessage(), sep="" ) )
          else {
            warning( paste( "Error exporting global '", name, "': ",
                            geterrmessage(), sep="" ) )
            return( invisible( TRUE ) )
          }
        }
      }
      else {
        if( stopOnError )
          stop( paste( "Unknown variable ", name, " in export." ) )
        else {
          warning( paste( "Unknown variable ", name, " in export." ) )
          return( invisible( TRUE ) )
        }
      }
    }
  }

  invisible( TRUE )
}

##****************************************************************************
## Export all GLOBAL variables and functions to the whole cluster.
## Aware of memory usage.
##
## PARAMETERS: [Vector/List Names or variables NOT to export]
## RETURN:     Logical Success
##****************************************************************************
sfExportAll <- function( except=NULL, debug=FALSE ) {
  sfCheck();

  if( sfParallel() ) {
    ## Vector with all global variables.
    expList <- as.list( objects( pos = globalenv() ) )

    ## Now remove all variables which are listed in list except from
    ## parameters.
    if( !is.null( except ) ) {
      if( is.list( except ) )
        except <- unlist( except )

      if( !is.vector( except ) ) {
        warning( "sfExportAll: except is not a vector.\n" )
        return( invisible( FALSE ) )
      }

      ## Remove those elements which are included in except.
      ## Reverse matches for correct indices after removing of
      ## single elements (start removing on right border).
##      for( i in rev( match( except, expList ) ) )
##        expList <- expList[-i]

      ## Nicer version, proposal Greggory Jefferis (1.7.2)
      ## na.omit is not a must here though.
      expList <- expList[-na.omit(match(except, expList))]
    }

    ## Exporting mode with explicit global mode.
    sfExport( list=expList, local=FALSE )

    if( debug ) {
      message( "sfExportAll: Following variables are exported:" )
      message( paste( expList, collapse=", " ) )
    }
  }
  else {
    message( "sfExportAll() ignored in sequential mode.\n" )
    return( invisible( TRUE ) )
  }

  invisible( TRUE )
}

##****************************************************************************
## Remove objects from global environment (on whole cluster cluster)
## or at least from master (sequentially mode).
##
## PARAMETERS: List with Names(!) of the variables.
## RETURN:     -
##****************************************************************************
sfRemove <- function( ..., list=NULL, master=FALSE, debug=FALSE ) {
  sfCheck();

  ## List of given names in dot arguments.
  .sfNames <- fetchNames( ... )

  ## If extra list is given (calling style of clusterExport()), just add this
  ## list.
  if( !is.null( list ) ) {
    ## Test from rm, see fetchNames for details.
    if( !length( list ) ||
        !all( sapply( list, function(x) is.symbol(x) || is.character(x) ) ) )
      stop( "list must contain names or character strings" )

    .sfNames <- c( .sfNames, list )
  }

  ## If running parallel, remove objects from slaves.
  if( sfParallel() ) {
    if( debug )
      for( name in .sfNames )
        cat( "REMOVE:", name, "\n" )

    sfExport( ".sfNames", local=TRUE )
    sfClusterEval( rm( list=.sfNames, pos=globalenv() ) )
    sfClusterEval( rm( .sfNames, pos=globalenv() ) )
  }

  ## Remove on master as well?
  if( master )
    rm( list=.sfNames, pos=globalenv() )

  invisible( NULL )
}

##****************************************************************************
## Remove all variables from nodes (important: only global vars from nodes
## - NOT the master R process - are deleted).
## To delete on master as well, use sfRemove().
##
## PARAMETERS: [Vector/List Names of objects NOT to remove].
## RETURN:     Boolean Success (invisible)
##****************************************************************************
sfRemoveAll <- function( except=NULL, debug=FALSE, hidden=TRUE ) {
  sfCheck();

  if( sfParallel() ) {
    ## @TODO Also hidden vars?
    if( hidden )
      sfTmpAll <- sfClusterEval( ls( pos=globalenv(), all.names=TRUE ) )
    else
      sfTmpAll <- sfClusterEval( ls( pos=globalenv(), all.names=FALSE ) )

    if( length( sfTmpAll ) == 0 ) {
      message( "sfRemoveAll: problems fetching variables from nodes (or none existant)...\n" )
      return( invisible( FALSE ) )
    }

    ## Only take result from one node.
    ## We assume all nodes have exactly the same variables in global space.
    ## It may be the case, that there are different variables on each node
    ## (like a node-routine writes different vars on different cases).
    ## Take that node with the most variables in object space.
    ## @todo: Merge result lists from all nodes.
    sfTmp <- sfTmpAll[[which.max(sapply(sfTmpAll,length))]]

    ## If there are any variables on nodes.
    if( length( sfTmp ) > 0 ) {
      ## Now remove all variables which are listed in list except from
      ## parameters.
      if( !is.null( except ) ) {
        if( is.list( except ) )
          except <- unlist( except )

        if( !is.vector( except ) ) {
          warning( "sfRemoveAll: except is not a vector.\n" )
          return( invisible( FALSE ) )
        }

        ## Remove those elements which are included in except.
        ## Not very elegant... However.
        ## Bugfix see sfExportAll
        for( i in match( except, sfTmp ) )
          sfTmp[i] <- NA     ## sfTmp <- sfTmp[-i] would fail for multiple removals

        ## Remove NAs
        sfTmp <- sort( sfTmp, na.last = NA )
      }

      ## Create a new namespace vector (temporary).
#      setVar( ".sfTmpList", sfTmp )

      if( debug ) {
        message( "sfRemoveAll: Remove variables from nodes:" )
        message( paste( sfTmp, collapse=", " ) )
      }

      ## Export the list to cluster.
      sfExport( "sfTmp", local=TRUE )
     
      ## Delete all variables in the list.
      sfClusterEval( rm( list=sfTmp, pos=globalenv() ) )
      sfClusterEval( rm( "sfTmp", pos=globalenv() ) )
    }
    else {
      message( "sfRemoveAll: no variables on nodes.\n" )
      return( invisible( FALSE ) )
    }

    return( invisible( TRUE ) )
  }
  ## In sequential mode nothing is done and it counts as success ;)
  else {
    message( "sfRemoveAll() ignored in sequential mode.\n" )
    return( invisible( TRUE ) )
  }
}

##****************************************************************************
## Fast messages on the cluster (all nodes!).
## For testing proposes mainly.
##
## PARAMETER: Vector  x      Objects to print,
##            String  sep    Separator
##            Boolean master Print on master as well
##****************************************************************************
sfCat <- function( ..., sep=" ", master=TRUE ) {
  sfCheck();

  .sfTmpX   <- c( ... )
  .sfTmpSEP <- sep

  if( length( .sfTmpX ) == 0 )
    return( invisible( NULL ) )

  ## ...it's unbelievable...
  if( sfParallel() ) {
    sfExport( ".sfTmpSEP", ".sfTmpX", local=TRUE )

    sfClusterCall( cat, .sfTmpX, sep=.sfTmpSEP )

    sfRemove( ".sfTmpX", ".sfTmpSEP", master=FALSE )
  }

  ## Master&sequential mode.
  if( master )
    cat( .sfTmpX, sep=.sfTmpSEP )

  invisible( TRUE )
}

##****************************************************************************
## Mainly an parallised lapply with intermediate result savings and restore
## on later callback.
## Resultfiles are saved on each step, where a step is defined by the amount
## of CPUs given (e.g. 4 cpus, 100 steps => 25 savings).
##
## Characteristic of the called functions: they are not allowed to return NULL
## values, as these indicate uncalculated potions in the result file. Please
## use NA or any other marker for undefined values.
##
## Files are saved under directory .sfOption$RESTDIR with the form:
##    SAVE_file_name
##
## where  file : name of current R-file or "DEFAULT" in interactive mode
##        name : usergiven name for this current calculation (default: "default")
##               If a program uses more than one call to sfClusterApplySR(),
##               then name MUST be set!
##
## As this function itself calls sfLappy(), there is no explicit sequential form
## here.
##
## To disable printing of progress, set perupdate to 100.
##
## PARAMETERS: List x,         \ Like lapply
##             Function fun,   |
##             ...             /
##             [String name Name for this call of sfClusterApplySR],
##             [perupdate int Percent Update frequency for process report],
##             [Logical restore: restore previous results or don't restore]
## RETURN:     List
##****************************************************************************
sfClusterApplySR <- function( x, fun,
                              ...,
                              name="default",
                              perUpdate=NULL,
                              restore=sfRestore()
                             ) {
  sfCheck();

  checkFunction( fun )

  ## If none or no regular update frequency is given.
  if( is.null( perUpdate ) || !is.numeric( perUpdate ) ||
      ( perUpdate < 0 ) || ( perUpdate > 100 ) )
    perUpdate <- .sfOption$RESTOREUPDATE

  ## Ensure destination directory's existing, if not: create.
  if( !file.exists( .sfOption$RESTDIR ) )
    dirCreateStop( .sfOption$RESTDIR )

  ## No R-file given?
  if( is.null( .sfOption$CURRENT ) )
    setOption( "CURRENT", "DEFAULT" )

  ## Abs. file path
  file <- file.path( .sfOption$RESTDIR,
                     paste( "SAVE_", .sfOption$CURRENT, "_", name, sep="" ) )

  ## Mark this file for deletion on (regular) cluster stop - even if the file
  ## itself does not exist atm.
  addRestoreFile( file )
  
  ## Resultfile is present: try to load it, check if variable result is included
  ## and check how many results are present in the file.
  ## If it seems that the results are ok, take them and continue at their end.
  if( file.exists( file ) && restore ) {
    ## Temp global var for saving possible loading errors (in namespace).
    setVar( ".sfLoadError", "" )

    ## Load in current environment.
    tryCatch( load( file ), error=function( x ) { setVar( ".sfLoadError", x ) } )

    if( .sfLoadError != "" )
      stop( paste( "Loading error:", .sfLoadError ) )

    cat( "Restoring previous made results from file:", file, "\n" )
    errMsg <- "\nPlease remove file manually.\n"

    ## First check the contents of the file.
    ## If these don't match, do NOT remove file or overwrite it automatically,
    ## as (due to the weak filenames) mistakes could be done.
    ## Commit removal to user (with message).
    
    ## Variable "result" is loaded?
    if( length( ls( pattern="^result$" ) ) == 0 )
      stop( paste( "Result variable not found in datafile:", file, errMsg ) )

    ## Check if variable result is present.
    if( !is.list( result ) )
      stop( paste( "Variable result is no list in datafile:", file, errMsg ) )

    ## Check if variable result has correct length.
    if( length( result ) != length( x ) )
      stop( paste( "Variable result from resultfile has different length to data:",
                   length( result ), "<->", length( x ), errMsg ) )
   
    ## Set marker to NA.
    startIndex <- NA

    ## Fetch the last non-NULL value in result (which declares the last result
    ## value which does not have to be recalculated).
    for( index in seq( length( result ), 1 ) ) {
      if( !is.null( result[[index]] ) ) {
        ## Flip to first NULL value => means the first unprocessed value.
        startIndex <- index + 1
        break
      }
    }

    ## Complete unprocessed resultset found? Then start at first element.
    if( is.na( startIndex ) ) {
      startIndex <- 1
      perCent    <- 0
    }
    ## At least some parts in the resultset are given.
    else {
      ## Complete processed result? Can happen in programs with more than one
      ## parallised call.
      if( startIndex >= length( result ) ) {
        return( result )
      }

      ## Message for user where restore begins.
      perCent <- ( ( startIndex - 1 ) * 100 ) / length( result )
      cat( "Starting calculation at ", round( perCent, 1 ),
           "% (", startIndex, "/", length( result ), ")\n" )
    }
  }
  ## No resultfile given/present: generate clear result with all NULL fields.
  else {
    if( !restore )
      message( "Restore is not active! No results are loaded." )

    message( paste( "Saving results to: ", file, "\n" ) )
  
    ## Resultlist: init with NULL in any element.
    result <- lapply( 1:length( x ), function( x ) NULL )

    startIndex <- 1        # Start at the beginning
    perCent    <- 0        # Nothing done yet
  }

  lastPrintPercent <- 0
  
  ## Calculating list parts in cluster.
  for( sIndex in seq( startIndex, length( x ), by=sfCpus() ) ) {
    ## Endindex.
    eIndex <- sIndex + sfCpus() - 1

    ## End out of bounds?
    if( eIndex > length( x ) )
      eIndex <- length( x )

##    cat( "Calculating Indizes: ", sIndex, eIndex, "\n" )

    newResult <- sfLapply( x[sIndex:eIndex], fun, ... )

    ## Fill cells with new results.
    result[sIndex:eIndex] <- newResult[1:length( newResult )]

    ## Intermediate save of current results.
    save( result, file=file )
    
    ## Calculated percentage.
    perCent <- eIndex * 100 / length( result )

    ## If message about process is wanted, print it (also is a connector for
    ## sfCluster to show the calculation process).
    ## Also catch the case where mod rarely matches (if amount of CPUs is
    ## bigger than perUpdate).
    if( ( ( round( perCent, 0 ) -
            round( sIndex * 100 / length( result ), 0 ) ) >= perUpdate )
        ||
        ( ( ( round( perCent, 0 ) %% perUpdate ) == 0 ) &&
          ( ( round( perCent, 0 ) - lastPrintPercent ) >= perUpdate ) ) ) {
      cat( "SR '", name, "' processed: ",
           round( perCent, 1 ), "%\n", sep="" )
      lastPrintPercent <- round( perCent, 0 )
    }

##    cat( "Finished Indizes: ", sIndex, eIndex, "\n" )
  }
   
  return( result )
}

##****************************************************************************
## Complete "unit test" or most of the buildin functions.
## Mainly integrated for development, but can be used for testing the
## R functionality on all nodes, too.
##
## PARAMETER: -
## RETURN:    Int amount of errors (0: everything is ok).
##****************************************************************************
sfTest <- function() {
  sfCheck();

  if( !sfParallel() ) {
    message( "Tests only work in parallel mode." )
    return( invisible( FALSE ) )
  }

  ##***************************************************************************
  ## Basic checks for Calls/Evals.
  ##***************************************************************************
  checkResultBasic <- function( result ) {
    if( is.null( result ) )
      return( c( FALSE, "Result was NULL" ) )

    if( !is.list( result ) )
      return( c( FALSE, "No proper return type (no list)." ) )

    if( length( result ) != sfCpus() )
      return( c( FALSE, "No proper return type (wrong length)." ) )

    if( inherits( result, "try-error" ) )
      return( c( FALSE, "TRY-ERROR raised on result." ) )

    if( !all( sapply( result,
                      function( x )
                        if( inherits( x, "try-error" ) )
                          return( FALSE )
                        else
                          return( TRUE )
                    ) ) )
      return( c( FALSE, "Result elements raised TRY-ERROR(s)." ) )

    return( c( TRUE, "" ) )
  }

  ##***************************************************************************
  ## Checks if each element of a given list is equal to a certain value !=
  ## NA/NULL).
  ##***************************************************************************
  checkAllEqual <- function( result, equal ) {
    if( !all( sapply( unlist( result ), function( x ) return( x == equal ) ) ) )
      return( FALSE )

    return( TRUE )
  }

  ##***************************************************************************
  ## Test a list of lists against a vector (each sublist must be equal to the
  ## given list).
  ##***************************************************************************
  checkAllEqualList <- function( result, equal ) {
    if( is.list( equal ) )
      equal <- sort( unlist( equal ) )
    else
      equal <- sort( equal )

    for( res in result ) {
##      res <- sort( res )

      if( ( length( res ) != length( equal ) ) ||
          ( length( which( sort( res ) == equal ) ) < length( equal ) ) )
        return( FALSE )
      
##      i <- 1
##     while( i <= length( res ) ) {
##        if( res[i] != equal[i] )
##          return( FALSE )
##        i <- i + 1
##      }
    }

    return( TRUE )
  }

  ##***************************************************************************
  ## Compare vectors.
  ##***************************************************************************
  checkVecCmp <- function( x, y ) {
    if( length( x ) != length( y ) )
      return( FALSE )

    for( i in seq( 1, length( x ) ) ) {
      ## If NULL or NA, nothing is to compare. But only if both vals are NA/NULL
      ## If not, compare (throws exception/stop)
      if( ( is.na( x[i] ) && is.na( y[i] ) ) ||
          ( is.null( x[i] ) && is.null( y[i] ) ) )
        next

      if( x[i] != y[i] )
        return( FALSE )
    }

    return( TRUE )
  }

  ##***************************************************************************
  ## Testing sfLibrary.
  ##***************************************************************************
  testLib <- function() {
    ## Package always be installed.
    if( !sfLibrary( "boot", character.only=TRUE, stopOnError=FALSE ) )
      return( c( FALSE, "Unable to load library 'tools'" ) )

    ## calcium is a dataframe.
    result <- sfClusterEval( as.matrix( get('calcium') )[,2] )

    ## Compare if all nodes delivered the same data for variable "calcium"
    ## get needed to avoid R CMD check warnings.
    for( res in result )
      if( !checkVecCmp( res, as.matrix( get( "calcium" ) )[,2] ) )
        return( c( FALSE, "Wrong data delivered..." ) )

    ## Load surely uninstalled package to test if lib call fail safely.
    if( try( sfLibrary( "xxxyyyzzz", character.only=TRUE, stopOnError=FALSE ),
             silent=TRUE ) )
      return( c( FALSE, "Irregular return on loading inexisting library." ) )

    return( c( TRUE, "ok" ) )
  }

  ##***************************************************************************
  ## testing sfSource.
  ##***************************************************************************
  testSource <- function() {
    sfRemoveAll()

    ## Find path of the installed snowfall Package.
    res <- NULL
    res <- try( .find.package( "snowfall" ) )

    if( inherits( res, "try-error" ) )
      return( c( FALSE, paste( "Exception: cannot locate package snowfall.",
                               geterrmessage() ) ) )

    if( is.null( res ) )
      return( c( FALSE, "Cannot locate package snowfall." ) )

    res <- file.path( res, "data", "test.R" )

    cat( "PACKAGE...: ", res, "\n" )

    con <- file( res, "r", blocking=FALSE )
    a <- readLines( con, n=-1 )
    debug( "test.R content:" )
    debug( a )

    result <- sfSource( res, stopOnError=FALSE )

    if( inherits( result, "try-error" ) )
      return( c( FALSE, paste( "Exception: cannot source on slaves.",
                               geterrmessage() ) ) )

    ## get to satisfy R CMD check
    result <- sfClusterEval( get("f1")(), stopOnError=FALSE )

    resBasic <- checkResultBasic( result )

    if( resBasic[1] == FALSE )
      return( resBasic )

    if( !checkAllEqual( result, 999 ) )
      return( c( FALSE, "Wrong results on sourced function f1." ) )

    ## get to satisfy R CMD check
    result <- sfClusterEval( get("f2")( 99, 1 ), stopOnError=FALSE )

    resBasic <- checkResultBasic( result )

    if( resBasic[1] == FALSE )
      return( resBasic )

    if( !checkAllEqual( result, 100 ) )
      return( c( FALSE, "Wrong results on sourced function f2." ) )
    
    return( c( TRUE, "ok" ) )
  }
  
  ##***************************************************************************
  ## Testing sfClusterCall. Allways first test.
  ##***************************************************************************
  testCall <- function() {
    # Test 1 on Call
    result <- sfClusterCall( paste, "a", "b", "c", sep="", stopOnError=FALSE )

    resBasic <- checkResultBasic( result )

    if( resBasic[1] == FALSE )
      return( resBasic )

    if( !checkAllEqual( result, "abc" ) )
      return( c( FALSE, "Wrong results on paste." ) )

    # Test 2 on Call
    sums <- c( 99, 7, 3.4 )
    result <- sfClusterCall( sum, sums, stopOnError=FALSE )

    if( !checkAllEqual( result, sum( sums ) ) )
      return( c( FALSE, "Wrong result on sum." ) )

    return( c( TRUE, "ok" ) )
  }

  ##***************************************************************************
  ## Testing sfClusterEval
  ##***************************************************************************
  testEval <- function() {
    # Test 1 on Eval
    result <- sfClusterEval( sum( sapply( 1:10, exp ) ) )
   
    resBasic <- checkResultBasic( result )

    if( resBasic[1] == FALSE )
      return( resBasic )

    if( !checkAllEqual( result, sum( sapply( 1:10, exp ) ) ) )
      return( c( FALSE, "Wrong results on sum." ) )

    return( c( TRUE, "ok" ) )
  }

  ##***************************************************************************
  ## Testing Export Funktion
  ##***************************************************************************
  testExport <- function() {
    ## Needed to have a clean comparison global env.
    sfRemoveAll( hidden=TRUE )

    vars <- sfClusterEval( ls( all.names=TRUE, envir=globalenv() ) )

    print( vars )
    
    if( length( vars ) != 0 )
      if( !all( sapply( vars,
                        function( x ) return( length( x ) == 0 ) ) ) )
        return( c( FALSE, "sfRemoveAll() didn't kill everything" ) )

    ## Setting global variable via assign, as <<- invokes warnings on
    ## package check.
    assign( "var1", 99, pos=globalenv() )
    assign( "var2", 101, pos=globalenv() )
#    var1 <<- 99    # Global
#    var2 <<- 101
    var3 <-  103   # Local
    var4 <-  7

    ## Setting var in namespace ("snowfall").
    setVar( ".sfTestVar5", 77 )

    if( getVar( ".sfTestVar5" ) != 77 )
      return( c( FALSE, "Access to namespace failed." ) )
    
    iTest <- function() {
      var3 <- 88

      res <- FALSE

      res <- sfExport( "var1", "var2", ".sfTestVar5",
                       list=list( "var3", "var4" ),
                       local=TRUE, namespace="snowfall", stopOnError=FALSE )

      if( inherits( res, "try-error" ) )
        return( c( FALSE, "Exception on export." ) )

      if( !res )
        return( c( FALSE, "Unexpected Exception on export." ) )

      print( "GLOBALENV..." )
      print( sfClusterCall( ls, envir=globalenv() ) )
      
      if( !checkAllEqualList( sfClusterCall( ls, all.names=TRUE,
                                             envir=globalenv() ),
                              c( "var1", "var3", "var2", "var4",
                                 ".sfTestVar5" ) ) )
        return( c( FALSE, "Not all vars exported." ) )

      ## get to satisfy R CMD check
      if( !checkAllEqual( sfClusterEval( get("var1") ), 99 ) ||
          !checkAllEqual( sfClusterEval( get("var2") ), 101 ) )
        return( c( FALSE, "Error exporting global var." ) )

      ## get to satisfy R CMD check
      if( !checkAllEqual( sfClusterEval( get("var3") ), 88 ) ||
          !checkAllEqual( sfClusterEval( get("var4") ), 7 ) )
        return( c( FALSE, "Error exporting local var." ) )

      if( !checkAllEqual( sfClusterEval( get(".sfTestVar5") ), 77 ) )
        return( c( FALSE, "Error exporting namespace var." ) )
      
      ## Test removeAll with Exception-List
      sfRemoveAll( except=list( "var2", "var3" ) )

      if( !checkAllEqualList( sfClusterCall( ls, envir=globalenv() ),
                              list( "var2", "var3" ) ) )
        return( c( FALSE, "Error on removeAll except-list." ) )

      sfRemoveAll()

      return( c( TRUE, "ok" ) )
    }   
    
    return( iTest() )
  }    


  ##***************************************************************************
  ## Testing Calculation Function Part 1
  ##***************************************************************************
  testCalc1 <- function() {
    size <- 50
    mat  <- matrix( 0, size, size )

    for( var in 1:nrow( mat ) ) mat[var,] = runif( nrow( mat ) )

    rSum <- function( row, mat ) {
      s <- 0
      for( col in 1:ncol( mat ) )
        s <- s + mat[row,col]
      return( s )
    }

    cmp <- unlist( lapply( seq( 1, ncol( mat ) ), rSum, mat ) )

    sfExport( "rSum", local=TRUE, debug=TRUE )

    # Test 1 on Eval
    result <- sfLapply( seq( 1, ncol( mat ) ), rSum, mat )

##    cat( "FINISHED...\n" )
##    print( result )
    
    if( !checkVecCmp( unlist( result ), cmp ) )
      return( c( FALSE, "Wrong results on sfLapply." ) )

    ## Testing sfClusterApplyLB
    result <- sfClusterApplyLB( seq( 1, ncol( mat ) ), rSum, mat )

    if( !checkVecCmp( unlist( result ), cmp ) )
      return( c( FALSE, "Wrong results on sfClusterApplyLB." ) )

    ## Testing sfClusterApplySR
    result <- sfClusterApplySR( seq( 1, ncol( mat ) ), rSum, mat,
                                name="TEST",
                                restore=FALSE, perUpdate=100 )

    if( !checkVecCmp( unlist( result ), cmp ) )
      return( c( FALSE, "Wrong results on sfClusterApplySR." ) )

    ## As clusterApply only works for #nodes samples, reduce data size depending
    ## on it.
    result <- sfClusterApply( seq( 1, min( sfCpus(), ncol( mat ) ) ), rSum, mat )

    if( !checkVecCmp( unlist( result ),
                      unlist( lapply( seq( 1, min( sfCpus(), ncol( mat ) ) ), rSum, mat ) ) ) )
      return( c( FALSE, "Wrong results on sfClusterLapply." ) )
    
    return( c( TRUE, "ok" ) )
  }

  ##***************************************************************************
  ## Testing Calculation Function Part 2
  ## Further snow Wrappers.
  ##***************************************************************************
  testCalc2 <- function() {
    size <- 50
    mat1 <- matrix( 0, size, size )
    mat2 <- matrix( 0, size, size )

    for( var in 1:nrow( mat1 ) ) mat1[var,] = runif( nrow( mat1 ) )
    for( var in 1:nrow( mat2 ) ) mat2[var,] = runif( nrow( mat2 ) )

    matRes1 <- sfMM( mat1, mat2 )
    matRes2 <- mat1 %*% mat2

    for( row in seq( 1, size ) )
      if( !checkVecCmp( matRes1[row,], matRes2[row,] ) )
        return( c( FALSE, "Wrong results on sfParMM." ) )
   
    return( c( TRUE, "ok" ) )
  }

  
  ##***************************************************************************
  ## Run single test (with given functionname)
  ##***************************************************************************
  runTest <- function( fun ) {
    cat( "Run test: ", fun, "\n" )

    res <- c( NA, "" )
    res <- try( do.call( fun, list() ) )

    if( inherits( res, "try-error" ) )
      return( c( FALSE, paste( "TRY-ERROR on testCall()", geterrmessage() ) ) )

    if( is.null( res ) || !is.vector( res ) || is.na( res[1] ) )
      return( c( FALSE, paste( "Hidden exception on test.", geterrmessage() ) ) )
   
    return( res )
  }

  complete <- list( errors=0, warnings=0 )

  ## @todo - Bibliotheken / Source
  ## @todo - anderen Applies / parMM
  ## @todo - exportAll
  tests <- c( "testCall",
              "testEval",
              "testExport",
              "testCalc1",
              "testCalc2",
              "testLib",
              "testSource" )

  ## Run tests.
  for( test in tests )
    complete[[test]] <- runTest( test )

  ## Print results.
  cat( "\n\nRESULTS ON TEST:\n\n" )
  errors <- 0

  for( test in tests ) {
    ## Align names to same length.
    if( as.logical( complete[[test]][1] ) )
      cat( test, sapply( seq( 1, 13 - nchar( test ) ), function( x ) return( " " ) ),
           ": ok", "\n", sep="" )
    else {
      cat( test, sapply( seq( 1, 13 - nchar( test ) ), function( x ) return( " " ) ),
           ": FAILED! (", complete[[test]][2], ")\n", sep="" )

      errors <- errors + 1
    }
  }

  cat( "\n----------------------------\n", errors, "tests failed.\n\n" )

  return( invisible( errors ) )
}
back to top