% \VignetteIndexEntry{An R Package for easier cluster programming based on snow} % \VignetteKeyword{Parallel Computing} % \VignetteKeyword{Cluster} % \VignetteKeyword{HPC} % \VignetteKeyword{snow} % \VignetteKeyword{LAM} % \VignetteKeyword{MPI} \documentclass[10pt,oneside]{article} \usepackage{url} \usepackage{/usr/local/R/R-2.5.1/lib/R/share/texmf/Sweave} \begin{document} \pagestyle{empty} \setlength{\baselineskip}{1.25em} \setlength{\parskip}{0.5em} \setlength{\parindent}{0.0em} \begin{titlepage} \title{Developing parallel programs using snowfall} \author{Jochen Knaus} \date{2008-10-07} \maketitle \begin{abstract} \texttt{snowfall} is an R package for easier parallel programming using clusters. Basically it is build upon the package \texttt{snow} using it's network and cluter abilities and therefore offering use of Socket, MPI, PVM and NetWorkSpaces support. \texttt{snow} functions can used from within \texttt{snowfall} as well. \texttt{snowfall} offers additional support for implicit sequential execution (e.g. for distributing packages using optional parallel support), additional calculation functions and many functions for more comfortable programming. Also, \texttt{snowfall} can be configured via command line arguments, making the change of cluster settings easier without program change. This can be used to connect to batch- and workloadmanagers. Finally \texttt{snowfall} can be directly connected to the R-specific cluster manager \emph{sfCluster}. \texttt{snowfall} does not add an technical layer of abstraction to \texttt{snow}. But beside from the connector to \texttt{sfCluster}, it builds an extra layer of usability on the top of \texttt{snow}. It is not thought as an replacement for \texttt{snow}, but an addition for inexperienced users or those who seek more comfort using parallel computing and R. \end{abstract} \end{titlepage} %% Inhaltsverzeichnis \tableofcontents \newpage \section{snowfall} \subsection{Getting started} \subsubsection{Requirements for sequential execution} Basically \texttt{snowfall} is able to run without any external library. In this case, it is not possible to use parallel execution of commands. All potential calls to parallel functions will be executed sequential. Programs written in sequential use with \texttt{snowfall} calls can be running in parallel without any code change. \subsubsection{Requirements for parallel execution: Basics} If you just want to use parallel computing on your local PC or laptop you are just fine with basically installation of \texttt{snowfall} and \texttt{snow}. You can use then a so called socket cluster, for which no additional software needs to be installed. If you are just wanting to use parallel programming on your local workstation, PC or laptop, you are fine. \subsubsection{Requirements for parallel execution: MPI} You have a running MPI cluster (OpenMPI or any other kind of MPI cluster) available. Although snowfall is useable with OpenMPI as well, the management software sfCluster can currently only used with LAM/MPI. \subsubsection{Requirements for parallel execution: LAM/MPI} For using sfCluster with snowfall, currently LAM/MPI is needed. If you are using Debian/Ubuntu Linux, just call\\ \texttt{aptitude install xmpi lam4-dev}\footnote{On other Linux distributions there are similar packages with probably different name. It is important that you install the development version of the LAM package, as the \texttt{Rmpi} package need these files for installation.} Further you need to install the R-packages \texttt{snow} and \texttt{Rmpi}. If your program uses libraries, ensure that these are available on all nodes. If they are not present in R-default path (on given machine), ensure that they are accessible in the same location on all machines (for example \texttt{/home/xy/R.libs}). If you want to run programs only on your (multi core) computer without any cluster of many machines, you do not have to setup the cluster yourself, it will be started implicitly in \texttt{snowfall}s initialisation. Using two or more machines for cluster calculations, you need to setup a LAM/MPI cluster and start cluster explicitely. This is no big thing at all. For example, edit a small textfile like this one: \texttt{machine1.yourdomain.com cpu=4 sched=yes\\ machine2.yourdomain.com cpu=2 sched=yes} Just enter the machines for your cluster and the amount of CPUs. You start a LAM/MPI cluster using\\ \texttt{lamboot hostfile}\\ where \texttt{hostfile} is the little configuration file edited above. To shutdown just call \texttt{lamhalt}. For further details upon LAM/MPI setup, see \cite{burns94:_lam}. Note: All parallel programs you start are running in this cluster. If your program requests 100 CPUs on your private dual-core machine, you get that amount and 100 R processes are spawn, independant or available ressources (memory, cpus). For workgroups or larger clusters, management solutions like \emph{sfCluster} are strongly recommended. \subsubsection{Requirements for parallel execution: PVM/NWS} PVM and NetWorkSpaces/Sleight are supported in snowfall as these are useable with snow. But both are less supported by sfCluster (but at least a managed start can be done using sfCluster), so there is no further documentation about their usage here. \subsection{(Short) introduction to parallel programming} The general goal of paralleling your R program is to vectorize the data or calculation loops (probably with wrapper functions), as all calculation functions of \texttt{snowfall} are kind of reimplementations of R-list/vector functions. A good introduction to parallel programming for statistical purposes can be found in \cite{ROSS_07} and \cite{HANA_STAT04}. \subsection{Introduction to usage of snowfall} Basically, usage of \texttt{snowfall} always works with the following scheme: \begin{enumerate} \item Initialization using \texttt{sfInit()}. Set up the cluster (if needed) and the internal functions. \texttt{sfInit} must be called before using any function of the \texttt{snowfall} package.\footnote{The only exception is the function \texttt{sfSetMaxCPUs()}, which raise or limit the configured maximum CPU count.} \item Export needed variables/objects to all slaves. \item Do some parallel calculations using \texttt{snowfall} calculation functions. Repeat as many times as needed. \item End parallel execution using \texttt{sfStop()}. \end{enumerate} The initialisation differs if you use \texttt{snowfall} alone or with the management tool \emph{sfCluster}. In this chapter we only cover a standalone usage of \texttt{snowfall}. For usage with \emph{sfCluster}, see XXX. If you are firm on using the R package \texttt{snow}, starting with or porting your program to \texttt{snowfall} is easy. The complete initialisation is done with a single call to \texttt{sfInit()}. Arguments are \texttt{parallel} and \texttt{cpus}, giving the running mode (parallel execution or sequential execution). If running in sequential mode, \texttt{cpus} is ignored (and set to one). On calling \texttt{sfInit( parallel=TRUE )} without a running LAM cluster (but LAM installed), a \emph{local} cluster will be started, which only contains your local machine. This can be handy on single multi-core machines. But note you \texttt{sfStop} will not shutdown this cluster, so you have to stop it yourself manually (if wished). Sequential mode can be useful for developing the program, probably on a single core laptop without installed cluster or running Windows operating system. Also sequential mode is needed to deploy a package using \texttt{snowfall} safely, where you cannot assume a user have an useable cluster installed. If the initialisation fails, probably because of missing base libraries \texttt{Rmpi} and \texttt{snow}, \texttt{snowfall} falls back to sequential mode with a warning message. In sequential and parallel execution, all functions are useable in both modes in the same way and returning the same results. \begin{verbatim} sfInit( parallel=FALSE ) sfLapply( 1:10, exp ) sfStop() sfInit( parallel=TRUE, cpus=5 ) ## Now, index 1 is calculated on CPU1, 2 on CPU2 and so on. ## Index 6 is again on CPU1. ## So the whole call is done in two steps on the 5 CPUs. sfLapply( 1:10, exp ) sfStop() \end{verbatim} Please note: Most of the \texttt{snowfall} functions are stopping the program on failure by default (by calling \texttt{stop()}). This is much safer for unexperienced users. If you want own failure handling, install your own handler \texttt{options(error = ...)} to prevent snowfall from stopping in general. Also most of the functions feature an argument \texttt{stopOnError} which set to \texttt{FALSE} prevents the functions from stopping. Do not forget to handle potential errors in your program if using this feature. The given behavior is not only better for unexperienced users, any other behavior would be very nasty on package deployment. \subsection{Writing parallel programs with snowfall} \subsubsection{General notes and simple example} If you detected parts of your program which can be parallised (loops etc) it is in most cases an fast step to give them a parallel run. First, rewrite them using Rs list operators (lapply, apply) instead of loops (if they are not yet calculated by list operators). Then write a wrapper function to be called by the list operators and manage a single parallel step. Note there are no local variables, only the data from the list index will be given as argument. If you need more than one variable argument, you need to make the required variables global (assign to global environment) and export them to all slaves. \texttt{snowfall} provides some functions to make this process easier (take a look at the package help). \begin{verbatim} sfInit( parallel=TRUE, cpus=4 ) b <- c( 3.4, 5.7, 10.8, 8, 7 ) ## Export a and b in their current state to all slaves. sfExport( ''b'' ) parWrapper <- function( datastep, add1, add2 ) { cat( ''Data: '', datastep, ''ADD1:'', add1, ''ADD2:'', add2, ''\n'' ) ## Only possible as ''b'' is exported! cat( ''b:'', b[datastep] ) ## Do something return( datastep ) } ## Calls parWrapper with each value of a and additional ## arguments 2 and 3. result <- sfLapply( 1:5, parWrapper, 2, 3 ) sfStop() \end{verbatim} \subsubsection{Basic load balancing using \texttt{sfClusterApplyLB}} All parallel wrappers around the R-list operators are executed in blocks: On one step the first $n$ indices are calculated, then the next $n$ indices, where $n$ is the number of CPUs in the cluster. This behavior is quite ok in a homogenous cluster, where all or mostly all machines are built with equal hardware and therefore offer the same speed. In heterogenous infrastructures, speed is depending on the slowest machine in the cluster, as the faster machines have to wait for it to finish its calculation. If your parallel algorithm is using different time for different problems, load balancing will reduce overall time in homogenous clusters greatly. \texttt{snow} and so \texttt{snowfall} feature a simple load balanced method to avoid waiting times in such environments. If calling \texttt{sfClusterApplyLB} the faster machines get further indices to calculate without waiting for the slowest to finish its step. \texttt{sfClusterApplyLB} is called like \texttt{lapply}. If your local infrastructure is such an heterogenous structure, this function is the way to go. It can also be handy in homogenous clusters where other users spawn processes, too, so sometimes load differs temporarily. A visualisation of basic load balacing can be found in \cite{ROSS_07}. \begin{verbatim} sfInit( parallel=TRUE, cpus=2 ) calcPar <- function( x ) { x1 <- matrix( 0, x, x ) x2 <- matrix( 0, x, x ) for( var in 1:nrow( x1 ) ) x1[var,] = runif( ncol( x1 ) ) for( var in 1:nrow( x2 ) ) x2[var,] = runif( ncol( x1 ) ) b <- sum( diag( ( x1 %*% x2 ) %*% x1 ) ) return( b ) } result <- sfClusterApplyLB( 50:100, calcPar ) sfStop() \end{verbatim} \subsubsection{Intermediate result saving and restoring using \texttt{sfClusterApplySR}} Another helpful function for long running clusters is \texttt{sfClusterApplySR}, which saves intermediate results after processing $n$-indices (where $n$ is the amount of CPUs). If it is likely you have to interrupt your program (probably because of server maintenance) you can start using \texttt{sfClusterApplySR} and restart your program without the results produced up to the shutdown time. Please note: Only complete $n$-blocks are saved, as the function \texttt{sfLapply} is used internally.\footnote{This function is an addition to \texttt{snow} and therefore could not be integrated in the load balanced version.} The result files are saved in the temporary folder \texttt{~/.sfCluster/RESTORE/x}, where x is a string with a given name and the name of the input R-file. \texttt{sfClusterApplySR} is called like \texttt{sfClusterApplyLB} and therefore like \texttt{lapply}. If using the function \texttt{sfClusterApplySR} result are always saved in the intermediate result file. But, if cluster stopped and results could be restored, restore itself is only done if explicitly stated. This aims to prevent false results if a program was interrupted by intend and restarted with different internal parameters (where with automatical restore probably results from previous runs would be inserted). So handle with care if you want to restore! If you only use one call to \texttt{sfClusterApplySR} in your program, the parameter \texttt{name} does not need to be changed, it only is important if you use more than one call to \texttt{sfClusterApplySR}. \begin{center} \begin{verbatim} sfInit( parallel=TRUE, cpus=2 ) # Saves under Name default resultA <- sfClusterApplySR( somelist, somefunc ) # Must be another name. resultB <- sfClusterApplySR( someotherlist, someotherfunc, name="CALC_TWO" ) sfStop() \end{verbatim} \end{center} If cluster stops probably during run of \texttt{someotherfunc} and restarted with restore-Option, the complete result of \texttt{resultA} is loaded and therefore no calculation on \texttt{somefunc} is done. \texttt{resultB} is restored with all the data available at shutdown and calculation begins with the first undefined result. \emph{Note on restoring errors}: If restoration of data fails (probably because list size is different in saving and current run), \texttt{sfClusterApplySR} stops. For securely reason it does not delete the RESTORE-files itself, but prompt the user the complete path to delete manually and explicitly. \subsection{Fault tolerance} Differing from \texttt{snowFT}, the fault tolerance extension for \texttt{snow}, \texttt{snowfall} does not feature fault tolerance (see \cite{HANA_04}). This is due to the lack of an MPI implementation of \texttt{snowFT}. \subsection{Controlling snowfall using the command line} snowfall can be widely controlled via command line arguments. This is useful for fast changing of cluster parameters (e.g. changing the host names in a Socket cluster) on a raw installation and it serves as connection to sfCluster. Of course it can be used as connection to any other workload- or batch managing software, too. On the commandline there are the following parameters: \begin{tabular}{lp{10cm}} parallel & Switch to parallel execution. Default is sequential execution \\ cpus=X & Amount of CPUs wanted. Without --parallel, a value $X > 1$ switch to parallel execution. \\ type=X & Type of cluster. Allowed values are SOCK, MPI, PVM and NWS. \\ session=X & Session number. snowfall logfiles contain number, but only needed with sfCluster. \\ restoreSR & Enables restoring of peviously saved results from sfClusterApplySR calls. \\ hosts=X & List of hosts for Socket (SOCK) or NetWorkSpaces (NWS) clusters. Entries are comma seperated. Any entry may contain colon seperated value for the amount of processors on this machine. Example: \texttt{--hosts=machine1:4,machine2,123.123.12.13:2} (this spawns 4 workers on machine1, one on machine2 and two on 123.123.12.13). \\ tmpdir=X & Specify temporary directory for logfiles and R-output. \\ \end{tabular} For using these arguments, just add these after an \texttt{--args} on the commandline (which forces R not to treat these arguments as R ones). \begin{center} \texttt{R --no-save --args --parallel --cpus=2 < program.R} \end{center} Starts R and forces snowfall to start in parallel mode with 2 CPUs (in this case: using a Socket-cluster, as this is the default). \textit{Note}: arguments on the command line have lower priority as settings from the \texttt{sfInit} call. That means that the above example only works if initialisation is done via \texttt{sfInit()}, but not with \texttt{sfInit( parallel=FALSE )}, as then sequential execution is forced. Further examples should explan the feature: \begin{itemize} \item \texttt{R --no-save --args --parallel --type=MPI --cpus=4 < program.R} (start using 4 workers in an existing MPI cluster. If no MPI cluster exists, a plain one is started on your local machine only. Beware of this, as you have to shutdown this cluster afterwards manually.). \item \texttt{R --no-save --args --parallel --type=SOCK --hosts=localhost:3,singlema,othmach:4 < program.R} (Starts a socket cluster with two machines and 7 CPUs: 3 on \texttt{localhost}, 4 on \texttt{othmach} and one worker on \texttt{singlema}). \end{itemize} \subsection{Traps, Internals} \texttt{snowfall} limits the amount of CPUs by default (to 40). If you need more CPUs, call \texttt{sfSetMaxCPUs()} \emph{before} calling \texttt{sfInit()}. Beware of requesting more CPUs as you have ressources: there are as many R processes spawned as CPUs wanted. They are distributed across your cluster like in the given scheme of the LAM host configuration. You can easily kill all machines in your cluster by requesting huge amounts of CPUs or running very memory consuming functions across the cluster. To avoid such common problems use \emph{sfCluster}. For some functions of \texttt{snowfall} it is needed to create global variables on the master. All these variables start with prefix ``\texttt{.sf}'', please do not delete them. The internal control structure of \texttt{snowfall} is saved in the variable \texttt{.sfOptions}, which should be accessed through the wrapper functions as the structure may change in the future.\section{Using \emph{sfCluster} with \texttt{snowfall}} \subsection{About \emph{sfCluster}} \emph{sfCluster} is a small management tool, helping to run parallel R-programs using \texttt{snowfall}. Mainly, it exculpates the user from setting up a LAM/MPI cluster on his own. Further, it allows multiple clusters per user and therefore executes any parallel R program in a single cluster. These clusters are built according to the current load and usage of your cluster (this means: only machines are taken with free ressources). Also, execution is observed and if problems arise, the cluster is shut down. \emph{sfCluster} can be used with R-interactive shell or batch mode and also feature a special batch mode with visual logfile and process-displaying. For further details about installation, administration and configuration of \emph{sfCluster}, please visit \url{http://www.imbi.uni-freiburg.de/XXX} or run \texttt{sfCluster {-}{-}help} if you installed it yet. \subsection{Starting R using \emph{sfCluster}} An \emph{sfCluster} execution is following these steps: \begin{enumerate} \item Test memory usage of program if not explicitly given. This is done via a default temporary (10 minutes) sequential run to determinate the maximum usage of RAM on a slave. This is important for allocating ressources on slaves. \item Detect free ressources in cluster universe.\footnote{Which are all potentially useable machines.} Take machines with free ressources matching users request. \item Start LAM/MPI cluster with previous built setting. \item Run R with parameters for \texttt{snowfall} control. \item LOOP: Observe execution (check processes, memory usage, and machine state). In monitoring mode: Display state of cluster and logfiles on screen. \item On interruption or regular end: shutdown cluster. \end{enumerate} \subsection{Using \emph{sfCluster}} The most common parameters of \emph{sfCluster} are \texttt{{-}{-}cpus}, with which you request a certain amount of CPUs among the cluster (default is 2 in parallel and 1 in sequential mode). There is a builtin limit for the amount of CPUs, which is changeable using the \emph{sfCluster} configuration. There are four execution modes: \begin{tabular}{lp{3cm}p{9cm}} -b & Batchmode (Default) & Run silent on terminal.\\ -i & Interactive R-shell & Ability to use interactive R-shell with cluster.\\ -m & Monitoring mode & Visual processmonitor and logfile viewer.\\ -s & Sequential execution (no cluster usage) & Run without cluster on single CPU.\\ \end{tabular} To avoid the (time consuming) memory test, you can specify a maximum amount of memory usable per slave via option \texttt{{-}{-}mem}. The behavior on excessing this memory usage is configurable (default: cluster stop). The memory usage limit is very important for not getting your machines into swapping (means: shortage of physical RAM), which would hurt performance badly. So, simple calls to \emph{sfCluster} could be \begin{verbatim} ## Run a given R program with 8 cpus and max. 500MB (0.5 gigabytes) in monitoring mode sfCluster -m --cpus=8 --mem=0.5G myRprogram.R ## Run nonstopping cluster with real quiet output. nohup sfCluster -b --cpus=8 --mem=500M myRprogram.R --quiet ## Start R interactive shell with 4 cores. With 300MB memory (MB is default unit) ## No R-file is given for interactive mode. sfCluster -i --cpus=4 --mem=300 \end{verbatim} For all possible options and further examples for \emph{sfCluster} usage, see \texttt{sfCluster {-}{-}help}. \subsection{The snowfall-side of \emph{sfCluster}} If you start an R program using \texttt{snowfall} with \emph{sfCluster}, the latter waits until \texttt{sfInit()} is called and then starts the observation of the execution. The default behavior if using \emph{sfCluster} is just to call \texttt{sfInit()} without any argument. Use arguments only if you want to explicitly overwrite given settings by \emph{sfCluster}. \subsection{Proposed development cycle} The following development cycle is of course a proposal. You can skip or replace any step depending on your own needs. \begin{enumerate} \item Develop program in sequential mode (start using option \texttt{-s}). \item Test in parallel mode using interactive mode to detect directly problems on parallelisation (start using option \texttt{-i}). \item Try larger test runs using monitoring mode, observing the cluster and probably side effects during parallel execution (start using option \texttt{-m}). Problems arise on single nodes will be visible (like non correct working libraries). \item Do real runs using silent batch mode (start using options \texttt{-b {-}{-}quiet}). Probably you want to run these runs in the background of your Unix shell using \texttt{nohup}. \end{enumerate} \subsection{Future sfCluster} These additions are planned for the future: \begin{itemize} \item Port to OpenMPI \item Faster SSH connections for observing \item Extended scheduler for system ressources \end{itemize} \bibliographystyle{plain} \bibliography{all-bib} \end{document}