## Wrappers for Snow function. ## ## The wrappers do the following: decide whether we run in parallel or ## sequential mode. ## In parallel mode the according Snow functions are used. ## In sequential mode, if it makes sense, the sequential counterparts ## of the Snow functions are used. ##**************************************************************************** ## Wrapper for: clusterSplit ##**************************************************************************** sfClusterSplit <- function( seq ) { sfCheck(); if( sfParallel() ) return( clusterSplit( sfGetCluster(), seq ) ) ## In sequential mode return a list with everything in element 1 (means: ## everything is run on one node). else return( list( seq ) ) } ##**************************************************************************** ## Wrapper for: clusterCall ## ## Catches for errors. Return them or stop immidiately. ##**************************************************************************** sfClusterCall <- function( fun, ..., stopOnError=TRUE ) { sfCheck(); if( !checkFunction( fun, stopOnError=FALSE ) ) { if( stopOnError ) stop( "No function or not defined object in sfClusterCall" ) else { message( "No function or not defined object in sfClusterCall" ) return( NULL ) } } if( sfParallel() ) { ## Exec via Snow. result <- clusterCall( sfGetCluster(), fun, ... ) ## Not enough results? ## @TODO Check if this test is needed (Snow allways return list of correct size?) if( length( result ) != sfCpus() ) { if( stopOnError ) stop( paste( "Error in sfClusterCall (not all slaves responded).\n", "Call from: ", as.character( sys.call( -1 ) ) ) ) else { message( paste( "Error in sfClusterCall (not all slaves responded).\n", "Call from: ", as.character( sys.call( -1 ) ) ) ) return( result ); } } ## Check if snow throw an exception on any of the slaves. if( !all( checkTryErrorAny( result ) ) ) { if( stopOnError ) { stop( paste( "Error in sfClusterCall (catched TRY-ERROR).\n", "Call from: ", as.character( sys.call( -1 ) ) ) ) } else { message( paste( "Error in sfClusterCall (catched TRY-ERROR).\n", "Call from: ", as.character( sys.call( -1 ) ) ) ) return( result ) } } return( result ) } else return( do.call( fun, list( ... ) ) ) } ##**************************************************************************** ## Wrapper for: clusterEvalQ - renamed as indeed "eval" is executed and not ## "evalq". ##**************************************************************************** sfClusterEval <- function( expr, stopOnError=TRUE ) { sfCheck(); if( sfParallel() ) { return( sfClusterCall( eval, substitute( expr ), env=globalenv(), stopOnError=stopOnError ) ) } else { ## Problems can arise through "enclos", which is default set to parent ## and therefore here, too: on this way local variables (higher environments ## are visible, which badly are not visible in parallel runs...). ## There should be a fix or something. return( eval( expr, envir=globalenv(), enclos=parent.frame() ) ) } } ## Snows clusterEvalQ uses "eval" and not "evalq", so this wrapper is an alias. sfClusterEvalQ <- function( expr ) return( sfClusterEval( expr ) ) ##**************************************************************************** ## Wrapper for: clusterMap. ## Currently not used. ##**************************************************************************** sfClusterMap <- function( fun, ..., MoreArgs=NULL, RECYCLE=TRUE ) stop( "Currently no wrapper for clusterMap" ) ##**************************************************************************** ## Wrapper for: clusterApply (snow parallel) - lapply (sequential) ## Adds additional warnings before the execution (esp. in sequential mode, ## where exec works fine but can cause problems runnin in parallel). ## ## PARAMETERS: Parameters like clusterApply ## RETURN: Result ##**************************************************************************** sfClusterApply <- function( x, fun, ... ) { sfCheck(); checkFunction( fun ) ## However snow limits list size to cluster nodes in "normal" ## execution. ## This is a fatal error in parallel mode and a warning in sequential. if( length( x ) > sfCpus() ) { if( sfParallel() ) stop( "More list entries as nodes => use sfClusterApplyLB instead. See Snow/Snowfall documentation." ) else warning( "More list entries as nodes => causes error in parallel mode. use sfClusterApplyLB instead." ) } if( sfParallel() ) return( clusterApply( sfGetCluster(), x, fun, ... ) ) else return( lapply( x, fun, ... ) ) } ##**************************************************************************** ## Wrapper for: clusterApplyLB (snow parallel) - lapply (sequential) ## ## PARAMETERS: Parameters like clusterApply ## RETURN: Result ##**************************************************************************** sfClusterApplyLB <- function( x, fun, ... ) { sfCheck(); checkFunction( fun ) if( sfParallel() ) return( clusterApplyLB( sfGetCluster(), x, fun, ... ) ) else ## array... korrigieren. return( lapply( x, fun, ... ) ) } ##**************************************************************************** ## Also snow-Handler handling is hidden to the user. ## ## Wrapper for: parLappy (snow parallel) - lapply (sequential) ## ## As lapply parameters were inkonsitent ("x"/"fun") they were corrected to ## ""x"/"fun". ## ## PARAMETERS: Parameters like lapply ## RETURN: Result ##**************************************************************************** sfLapply <- function( x, fun, ... ) { sfCheck(); checkFunction( fun ) if( sfParallel() ) return( parLapply( sfGetCluster(), x, fun, ... ) ) else return( lapply( x, fun, ... ) ) } ##**************************************************************************** ## Wrapper for: parSapply (snow parallel) - sapply (sequential) ## ## PARAMETERS: Parameters like sapply ## RETURN: Result ##**************************************************************************** sfSapply <- function( x, fun, ..., simplify=TRUE, USE.NAMES=TRUE ) { sfCheck(); checkFunction( fun ) if( sfParallel() ) return( parSapply( sfGetCluster(), x, fun, ..., simplify, USE.NAMES ) ) else return( sapply( x, fun, ..., simplify, USE.NAMES ) ) } ##**************************************************************************** ## Wrapper for: parApply (snow parallel) - apply (sequential) ## ## PARAMETERS: Parameters like apply ## RETURN: Result ##**************************************************************************** sfApply <- function( x, margin, fun, ... ) { sfCheck(); checkFunction( fun ) if( sfParallel() ) return( parApply( sfGetCluster(), x, margin, fun, ... ) ) else return( apply( x, margin, fun, ... ) ) } sfRapply <- function( x, fun, ... ) { stop( "sfRapply does not exists yet. Use Snow's parRapply instead." ) return( invisible( NULL ) ); } sfCapply <- function( x, fun, ... ) { stop( "sfCapply does not exists yet. Use Snow's parCapply instead." ) return( invisible( NULL ) ); } ##**************************************************************************** ## Wrapper for: parMM (snow parallel) - %*% (sequential) ## ## PARAMETERS: Matrix a, Matrix b ## RETURN: Result ##**************************************************************************** sfMM <- function( a, b ) { sfCheck(); if( sfParallel() ) return( parMM( sfGetCluster(), a, b ) ) else return( a %*% b ) } ##**************************************************************************** ## Wrappers for the two uniform RNGs used in snow. ## Basically, at the moment these are not used in sequential (means: none ## of the two is included here for sequential execution). ## @TODO Sequential use of the RNGs. ##**************************************************************************** sfClusterSetupSPRNG <- function( seed = round( 2^32 * runif(1) ), prngkind = "default", para = 0, ... ) { sfCheck(); if( sfParallel() ) clusterSetupSPRNG( sfGetCluster(), seed, prngkind, para, ... ) else { warning( "Uniform random number streams (currently) not available in serial execution." + "Random numbers may differ in serial & parallel execution." ) set.seed( seed ) } } sfClusterSetupRNGstream <- function( seed=rep( 12345, 6 ), ... ) { sfCheck(); if( sfParallel() ) clusterSetupRNGstream( sfGetCluster(), seed=seed, ... ) else { warning( "Uniform random number streams (currently) not available in serial execution." + "Random numbers may differ in serial & parallel execution." ) set.seed( seed[1] ) } } sfClusterSetupRNG <- function( type="RNGstream", ... ) { sfCheck(); if( sfParallel() ) clusterSetupRNG( sfGetCluster(), type=type, ... ) else { warning( "Uniform random number streams (currently) not available in serial execution." + "Random numbers may differ in serial & parallel execution." ) } }