Tip revision: c2820e1
snowfall.Snw
% \VignetteIndexEntry{An R Package for easier cluster programming based on snow}
% \VignetteKeyword{Parallel Computing}
% \VignetteKeyword{Cluster}
% \VignetteKeyword{HPC}
% \VignetteKeyword{snow}
% \VignetteKeyword{LAM}
% \VignetteKeyword{MPI}

\documentclass[10pt,oneside]{article}

\usepackage{url}

\begin{document}

\pagestyle{empty}

\setlength{\baselineskip}{1.25em}
\setlength{\parskip}{0.5em}
\setlength{\parindent}{0.0em}
\begin{titlepage}
\title{Developing parallel programs using snowfall}
\author{Jochen Knaus}
\date{2010-03-04}
\maketitle

\begin{abstract}
\texttt{snowfall} is an R package for easier parallel programming
using clusters.
Basically it is build upon the package \texttt{snow} \cite{TIERNEY08} using it's
network and cluter abilities and therefore offering use of
Socket, MPI, PVM and NetWorkSpaces support and can be seen as an
"usability wrapper".

\texttt{snow} functions can used from within \texttt{snowfall} as well.

\texttt{snowfall} offers additional support for implicit sequential
execution (e.g. for distributing packages using optional parallel
support), additional calculation functions, extended error handling,
and many functions for more comfortable programming.

Also, \texttt{snowfall} can be configured via command line
arguments, making the change of cluster settings easier without
program change. This can be used to connect to batch- and workloadmanagers.

Finally \texttt{snowfall} can be directly connected to the R-specific
cluster manager \emph{sfCluster}.

\texttt{snowfall} does not add an technical layer of abstraction
to \texttt{snow}. But beside from the connector to \texttt{sfCluster}, it
builds an extra layer of usability on the top of \texttt{snow}.

It is not thought as an replacement for \texttt{snow}, but an addition for
inexperienced users or those who seek more comfort using parallel
computing and R.

A further introduction to snowfall is published in the R-Journal
\cite{Knau:Porz:Bind:Schw:easi:2009}.

\url{http://www.imbi.uni-freiburg.de/parallel}

\end{abstract}
\end{titlepage}

%% Inhaltsverzeichnis
\tableofcontents
\newpage
\section{snowfall}
\subsection{Getting started}
\subsubsection{Requirements for sequential execution}
Basically, \texttt{snowfall} is able to run without any external
library. In this case, it is not possible to use parallel execution
of commands. All potential calls to parallel functions will be
executed sequentially.

Programs written in sequential use with \texttt{snowfall} calls
can be running in parallel without any code change.

\subsubsection{Requirements for parallel execution: Basics}
If you just want to use parallel computing on your local PC or
laptop you are just fine with basically installation of
\texttt{snowfall} and \texttt{snow}.
You can use then a so called socket cluster,
for which no additional software needs to be installed.

If you are just wanting to use parallel programming on your
local workstation, PC or laptop, you are fine.

\subsubsection{Requirements for parallel execution: MPI}
You have a running MPI cluster (OpenMPI or any other kind of
MPI cluster) available.

Although snowfall is useable with OpenMPI as well, the
management software sfCluster can currently only used with
LAM/MPI.

\subsubsection{Requirements for parallel execution: LAM/MPI}
For using sfCluster with snowfall, currently LAM/MPI is needed.

If you are using Debian/Ubuntu Linux, just call\\
\texttt{aptitude install xmpi lam4-dev}\footnote{On other Linux distributions
there are similar packages with probably different name. It is important
that you install the development version of the LAM package, as the
\texttt{Rmpi} package need these files for installation.}

Further you need to install the R-packages \texttt{snow} and
\texttt{Rmpi}.

If your program uses libraries, ensure that these are available on all
nodes. If they are not present in R-default path (on given machine),
ensure that they are accessible in the same location on all machines
(for example \texttt{/home/xy/R.libs}).

If you want to run programs only on your (multi core) computer without
any cluster of many machines, you do not have to setup the cluster
yourself, it will be started implicitly in \texttt{snowfall}s initialisation.

Using two or more machines for cluster calculations, you need to setup
a LAM/MPI cluster and start cluster explicitely.

This is no big thing at all. For example, edit a small textfile like this
one:

\texttt{machine1.yourdomain.com cpu=4 sched=yes\\
machine2.yourdomain.com cpu=2 sched=yes}

Just enter the machines for your cluster and the amount of CPUs.
You start a LAM/MPI cluster using\\
\texttt{lamboot hostfile}\\
where \texttt{hostfile} is the little configuration file edited
above.

To shutdown just call \texttt{lamhalt}.

For further details upon LAM/MPI setup, see \cite{burns94:_lam}.

Note: All parallel programs you start are running in this cluster.
you get that amount and 100 R processes are spawn, independent
or available ressources (memory, cpus).

For workgroups or larger clusters, management solutions like
\emph{sfCluster} are strongly recommended.

\subsubsection{Requirements for parallel execution: PVM/NWS}
PVM and NetWorkSpaces/Sleight are supported in snowfall as these
are useable with snow. But both are less supported by sfCluster
(but at least a managed start can be done using sfCluster), so
there is no further documentation about their usage here.

\subsection{(Short) introduction to parallel programming}
The general goal of paralleling your R program is to vectorize the data
or calculation loops (probably with wrapper functions), as all
calculation functions of \texttt{snowfall} are kind of reimplementations
of R-list/vector functions.

A good introduction to parallel programming for statistical
purposes can be found in \cite{ROSS_07} and \cite{HANA_STAT04}.

\subsection{Introduction to usage of snowfall}
Basically, usage of \texttt{snowfall} always works with the
following scheme:
\begin{enumerate}
\item
Initialization using \texttt{sfInit()}. Set up the cluster (if needed)
and the internal functions. \texttt{sfInit} must be called before
using any function of the \texttt{snowfall} package.\footnote{The only
exception is the function \texttt{sfSetMaxCPUs()}, which raises or limits
the configured maximum CPU count.}
\item Export needed variables/objects to all slaves.
\item
Do some parallel calculations using \texttt{snowfall} calculation
functions. Repeat as many times as needed.
\item
End parallel execution using \texttt{sfStop()}.
\end{enumerate}

The initialisation differs if you use \texttt{snowfall} alone or with
the management tool \emph{sfCluster}. In this chapter we only cover a
standalone usage of \texttt{snowfall}.
For usage with \emph{sfCluster}, see chapter 2.

If you are firm on using the R package \texttt{snow}, starting with
\texttt{snowfall} is easy.

The complete initialisation is done with a single call to
\texttt{sfInit()}.
The main arguments are \texttt{parallel}, \texttt{cpus} and \texttt{type},
giving the running mode (parallel execution or sequential execution),
the amount of CPUs if executing in parallel mode and the type of the
underlying cluster.
If running in sequential mode, \texttt{cpus} is ignored (and set
to one).
Without a given \texttt{type} a socket cluster is started, which does not
need any further software installed and therefore most likely runs anywhere
immidiately. This is the desired choice for executing on a laptop or single
multicore machine, too. Please note, that on Windows an installed Personal

%On calling \texttt{sfInit( parallel=TRUE )} without a running LAM
%cluster (but LAM installed), a \emph{local} cluster will be started,
%which only contains your local machine. This can be handy on single
%multi-core machines. But note you \texttt{sfStop} will not shutdown
%this cluster, so you have to stop it yourself manually (if wished).

Sequential mode can be useful for developing the program, probably
on a single core laptop without installed cluster or running Windows
operating system. Also sequential mode is needed to deploy a package
using \texttt{snowfall} safely,
where you cannot assume a user have an useable cluster installed.

Other arguments for \texttt{sfCluster} are \texttt{restore}, \texttt{socketHosts},
\texttt{slaveOutfile} and \texttt{nostart}. See package help for
description.

If the initialisation fails, probably because of missing base libraries
\texttt{Rmpi} and \texttt{snow},
\texttt{snowfall} falls back to sequential mode with a warning message.

In sequential and parallel execution,
all functions are useable in both modes in the same way and returning
the same results.

\begin{verbatim}
sfInit( parallel=FALSE )

sfLapply( 1:10, exp )

sfStop()

sfInit( parallel=TRUE, cpus=5 )

## Now, index 1 is calculated on CPU1, 2 on CPU2 and so on.
## Index 6 is again on CPU1.
## So the whole call is done in two steps on the 5 CPUs.
sfLapply( 1:10, exp )

sfStop()
\end{verbatim}

Please note: Most of the \texttt{snowfall} functions are stopping
the program on failure by default (by calling
\texttt{stop()}). This is much safer for unexperienced users. If you
want own failure handling, install your own handler
\texttt{options(error = ...)} to prevent snowfall from stopping in
general. Also most of the functions feature an argument \texttt{stopOnError}
which set to \texttt{FALSE} prevents the functions from stopping.
Do not forget to handle potential errors in your program if using this
feature.

The given behavior is not only better for unexperienced users, any other
behavior would be very nasty on package deployment.

\subsection{Writing parallel programs with snowfall}
\subsubsection{General notes and simple example}
If you detected parts of your program which can be parallelised (loops
etc) it is in most cases a fast step to give them a parallel run.

First, rewrite them using Rs list operators (lapply, apply) instead of
loops (if they are not yet calculated by list operators).

Then write a wrapper function to be called by the list operators and
manage a single parallel step. Note there are no local variables, only
the data from the list index will be given as argument.

If you need more than one variable argument, you need to make the required
variables global (assign to global environment) and export them to all
slaves. \texttt{snowfall} provides some functions to make this process
easier (take a look at the package help).

\begin{verbatim}
sfInit( parallel=TRUE, cpus=4 )

b <- c( 3.4, 5.7, 10.8, 8, 7 )

## Export a and b in their current state to all slaves.
sfExport( ''b'' )

## Only possible as ''b'' is exported!
cat( ''b:'', b[datastep] )

## Do something

return( datastep )
}

## Calls parWrapper with each value of a and additional
## arguments 2 and 3.
result <- sfLapply( 1:5, parWrapper, 2, 3 )

sfStop()
\end{verbatim}

All parallel wrappers around the R-list operators are executed in
blocks: On one step the first $n$ indices are calculated, then the next
$n$ indices, where $n$ is the number of CPUs in the cluster.

This behavior is quite ok in a homogenous cluster, where all or mostly
all machines are built with equal hardware and therefore offer the same
speed. In heterogenous infrastructures, speed is depending on the
slowest machine in the cluster, as the faster machines have to wait for it
to finish its calculation.

If your parallel algorithm is using different time for different problems,
load balancing will reduce overall time in homogenous clusters greatly.

\texttt{snow} and so \texttt{snowfall} feature a simple load balanced method
to avoid waiting times in such environments. If calling
\texttt{sfClusterApplyLB} the faster machines get further indices to
calculate without waiting for the slowest to finish its step.
\texttt{sfClusterApplyLB} is called like \texttt{lapply}.

If your local infrastructure is such an heterogenous structure, this
function is the way to go. It can also be handy in homogenous clusters
where other users spawn processes, too, so sometimes load differs
temporarily.

A visualisation of basic load balacing can be found in \cite{ROSS_07}.

\begin{verbatim}
sfInit( parallel=TRUE, cpus=2 )

calcPar <- function( x ) {
x1 <- matrix( 0, x, x )
x2 <- matrix( 0, x, x )

for( var in 1:nrow( x1 ) ) x1[var,] = runif( ncol( x1 ) )
for( var in 1:nrow( x2 ) ) x2[var,] = runif( ncol( x1 ) )

b <- sum( diag( ( x1 %*% x2 ) %*% x1 ) )
return( b )
}

result <- sfClusterApplyLB( 50:100, calcPar )

sfStop()
\end{verbatim}

\subsubsection{Intermediate result saving and restoring using
\texttt{sfClusterApplySR}}
Another helpful function for long running clusters is
\texttt{sfClusterApplySR}, which saves intermediate results after
processing $n$-indices (where $n$ is the amount of CPUs). If it is likely
you have to interrupt your program (probably because of server
maintenance) you can start using \texttt{sfClusterApplySR} and restart
your program without the results produced up to the shutdown time.

Please note: Only complete $n$-blocks are saved, as the
function \texttt{sfLapply} is used internally.\footnote{This function is an addition
to \texttt{snow} and therefore could not be integrated in the load
balanced version.}

The result files are saved in the temporary folder
\texttt{~/.sfCluster/RESTORE/x}, where x is a string with a given name
and the name of the input R-file.

\texttt{sfClusterApplySR} is called like \texttt{sfClusterApplyLB} and
therefore like \texttt{lapply}.

If using the function \texttt{sfClusterApplySR} result are always saved
in the intermediate result file. But, if cluster stopped and results
could be restored, restore itself is only done if explicitly stated.
This aims to prevent false results if a program was interrupted by
intend and restarted with different internal parameters (where with
automatical restore probably results from previous runs would be
inserted). So handle with care if you want to restore!

If you only use one call to \texttt{sfClusterApplySR} in your program,
the parameter \texttt{name} does not need to be changed, it only is
important if you use more than one call to \texttt{sfClusterApplySR}.

\begin{center}
\begin{verbatim}
sfInit( parallel=TRUE, cpus=2 )

# Saves under Name default
resultA <- sfClusterApplySR( somelist, somefunc )

# Must be another name.
resultB <- sfClusterApplySR( someotherlist, someotherfunc, name="CALC_TWO" )

sfStop()
\end{verbatim}
\end{center}

If cluster stops probably during run of \texttt{someotherfunc} and
restarted with restore-Option, the complete result of \texttt{resultA}
is loaded and therefore no calculation on \texttt{somefunc} is done.
\texttt{resultB} is restored with all the data available at shutdown
and calculation begins with the first undefined result.

\emph{Note on restoring errors}: If restoration of data fails (probably
because list size is different in saving and current run),
\texttt{sfClusterApplySR} stops. For securely reason it does not delete
the RESTORE-files itself, but prompt the user the complete path to
delete manually and explicitly.

\subsection{Fault tolerance}
Differing from \texttt{snowFT}, the fault tolerance extension for
\texttt{snow}, \texttt{snowfall} does not feature fault tolerance
(see \cite{HANA_04}).

This is due to the lack of an MPI implementation of \texttt{snowFT}.

\subsection{Controlling snowfall using the command line}
snowfall can be widely controlled via command line arguments.

This is useful for fast changing of cluster parameters (e.g. changing
the host names in a Socket cluster) on a raw installation and it serves
as connection to sfCluster. Of course it can be used as connection to
any other workload- or batch managing software, too.

On the commandline there are the following parameters:

\begin{tabular}{lp{10cm}}
parallel & Switch to parallel execution. Default is sequential execution \\
cpus=X & Amount of CPUs wanted. Without {-}{-}parallel, a value $X > 1$ switch to parallel execution. \\
type=X & Type of cluster. Allowed values are SOCK, MPI, PVM and NWS. \\
session=X & Session number. snowfall logfiles contain number, but only needed with sfCluster. \\
restoreSR & Enables restoring of previously saved results from \texttt{sfClusterApplySR} calls. \\
hosts=X & List of hosts for Socket (SOCK) or NetWorkSpaces (NWS) clusters. Entries are comma seperated. Any entry may contain colon seperated value for the amount of processors on this machine. Example: \texttt{{-}{-}hosts=machine1:4,machine2,123.123.12.13:2} (this spawns 4 workers on machine1, one on machine2 and two on 123.123.12.13). \\
tmpdir=X & Specify temporary directory for logfiles and R-output. \\
\end{tabular}

For using these arguments, just add these after an \texttt{--args} on the commandline (which
forces R not to treat these arguments as R ones).

\begin{center}
\texttt{R --no-save --args --parallel --cpus=2 < program.R}
\end{center}

Starts R and forces snowfall to start in parallel mode with 2 CPUs (in this case: using a
Socket-cluster, as this is the default).

\textit{Note}: arguments on the command line have lower priority as settings from the \texttt{sfInit} call.
That means that the above example only works if initialisation is done via \texttt{sfInit()}, but
not with \texttt{sfInit( parallel=FALSE )}, as then sequential execution is forced.

Further examples should explan the feature:

\begin{itemize}
\item \texttt{R --no-save --args --parallel --type=MPI --cpus=4 < program.R} (start using 4 workers in an existing MPI cluster. If no MPI cluster exists, a plain one is started on your local machine only. Beware of this, as you have to shutdown this cluster afterwards manually.).
\item \texttt{R --no-save --args --parallel --type=SOCK --hosts=localhost:3,singlema,othmach:4 < program.R}
(Starts a socket cluster with two machines and 7 CPUs: 3 on \texttt{localhost}, 4 on \texttt{othmach} and one worker on \texttt{singlema}).
\end{itemize}

\subsection{Traps, Internals}
\texttt{snowfall} limits the amount of CPUs by default (to 40). If you
need more CPUs, call \texttt{sfSetMaxCPUs()} \emph{before} calling \texttt{sfInit()}.
Beware of requesting more CPUs as you have ressources: there are as many R processes
spawned as CPUs wanted. They are distributed across your cluster like in the
given scheme of the LAM host configuration. You can easily kill all machines in
your cluster by requesting huge amounts of CPUs or running very memory consuming
functions across the cluster. To avoid such common problems use \emph{sfCluster}.

For some functions of \texttt{snowfall} it is needed to create global variables
on the master. All these variables start with prefix \texttt{.sf}'', please do
not delete them. The internal control structure of \texttt{snowfall} is saved in
the variable \texttt{.sfOptions}, which should be accessed through the wrapper
functions as the structure may change in the future.\section{Using \emph{sfCluster} with \texttt{snowfall}}
\emph{sfCluster} is a small management tool, helping to run parallel R-programs
using \texttt{snowfall}. Mainly, it exculpates the user from setting up a LAM/MPI
cluster on his own. Further, it allows multiple clusters per user and
therefore executes any parallel R program in a single cluster. These clusters
are built according to the current load and usage of your cluster (this
means: only machines are taken with free ressources).

Also, execution is observed and if problems arise, the cluster is shut down.

\emph{sfCluster} can be used with R-interactive shell or batch mode and also feature
a special batch mode with visual logfile and process-displaying.

run \texttt{sfCluster {-}{-}help} if you installed it yet.

\subsection{Starting R using \emph{sfCluster}}
An \emph{sfCluster} execution is following these steps:

\begin{enumerate}
\item Test memory usage of program if not explicitly given. This is done via
a default temporary (10 minutes) sequential run to determinate the maximum usage
of RAM on a slave.
This is important for allocating ressources on slaves.
\item Detect free ressources in cluster universe.\footnote{Which are all potentially
useable machines.} Take machines with free ressources matching users
request.
\item Start LAM/MPI cluster with previous built setting.
\item Run R with parameters for \texttt{snowfall} control.
\item LOOP: Observe execution (check processes, memory usage, and machine state). In
monitoring mode: Display state of cluster and logfiles on screen.
\item On interruption or regular end: shutdown cluster.
\end{enumerate}

\subsection{Using \emph{sfCluster}}
The most common parameters of \emph{sfCluster} are \texttt{{-}{-}cpus}, with which you request a certain
amount of CPUs among the cluster (default is 2 in parallel and 1 in sequential mode).
There is a builtin limit for the amount of CPUs, which is changeable using the
\emph{sfCluster} configuration.

There are four execution modes:

\begin{tabular}{lp{3cm}p{9cm}}
-b & Batchmode (Default) & Run silent on terminal.\\
-i & Interactive R-shell & Ability to use interactive R-shell with cluster.\\
-m & Monitoring mode & Visual processmonitor and logfile viewer.\\
-s & Sequential execution (no cluster usage) & Run without cluster on single CPU.\\
\end{tabular}

To avoid the (time consuming) memory test, you can specify a maximum amount of
memory usable per slave via option \texttt{{-}{-}mem}. The behavior on excessing this memory
usage is configurable (default: cluster stop).

The memory usage limit is very important for not getting your machines into
swapping (means: shortage of physical RAM), which would hurt performance badly.

So, simple calls to \emph{sfCluster} could be

\begin{verbatim}
## Run a given R program with 8 cpus and max. 500MB (0.5 gigabytes) in monitoring mode
sfCluster -m --cpus=8 --mem=0.5G myRprogram.R

## Run nonstopping cluster with real quiet output.
nohup sfCluster -b --cpus=8 --mem=500M myRprogram.R --quiet

## Start R interactive shell with 4 cores. With 300MB memory (MB is default unit)
## No R-file is given for interactive mode.
sfCluster -i --cpus=4 --mem=300
\end{verbatim}

For all possible options and further examples for \emph{sfCluster} usage,
see \texttt{sfCluster {-}{-}help}.

\subsection{The snowfall-side of \emph{sfCluster}}
If you start an R program using \texttt{snowfall} with \emph{sfCluster}, the latter waits
until \texttt{sfInit()} is called and then starts the observation of the execution.

The default behavior if using \emph{sfCluster} is just to call \texttt{sfInit()}
without any argument. Use arguments only if you want to explicitly overwrite
given settings by \emph{sfCluster}.

\subsection{Proposed development cycle}
The following development cycle is of course a proposal. You can skip
or replace any step depending on your own needs.

\begin{enumerate}
\item
Develop program in sequential mode (start using option \texttt{-s}).
\item
Test in parallel mode using interactive mode to detect directly problems
on parallelisation (start using option \texttt{-i}).
\item
Try larger test runs using monitoring mode, observing the cluster and
probably side effects during parallel execution
(start using option \texttt{-m}). Problems arise on single nodes
will be visible (like non correct working libraries).
\item
Do real runs using silent batch mode (start using options
\texttt{-b {-}{-}quiet}). Probably you want to run these runs in the
background of your Unix shell using \texttt{nohup}.
\end{enumerate}

\subsection{Future sfCluster}

These additions are planned for the future:

\begin{itemize}
\item Port to OpenMPI
\item Faster SSH connections for observing
\item Extended scheduler for system ressources
\end{itemize}

%% History.
\section{History of snowfall changes}
You can also call: RShowDoc("NEWS", package="snowfall")

\begin{itemize}

\item 1.83 (API changes: minor additions)
\begin{itemize}
\item sfIsRunning: new function giving a logical is sfInit() was called or not. Needed, as all other snowfall functions implicitely call sfInit() if it was not called.
\end{itemize}

\item 1.82
\begin{itemize}
\item Internal refactorings.
\end{itemize}

\item 1.81
\begin{itemize}
\item Change in sfInit() MPI startup so sfCluster can run with snow > 0.3 now.
\item sfExport now also works in sequential mode (writing to global environment). This prevented sequential execution in some cases.
\end{itemize}

\item 1.80 (API changes: minor additions)
\begin{itemize}
\item snowfall passes packages checks of R 2.10.1 without warning or error. Internal state is now only saved in the namespace itself (thanks to Uwe Ligges for the tipp).
\item sfExport can now also export objects in a specific namespace (argument 'namespace')
\item sfExport: behavior in error case manageable (stopOnError)
\item sfExport: smaller bugfixes.
\item sfRemoveAll can now also remove hidden names (argument 'hidden')
\item sfRemoveAll is more robust now (some minor bugfixes, more checks)
\item sfRemoveAll bugfix for multiple removals (thanks to Greggory Jefferis)
\item Bugfix on exception list on sfExportAll
\item Refactorings in sfTest()
\item snowfall now has a NEWS doc ;)
\item No warning on Mac OS because of default Mac-R command line arg 'gui' (thanks to Michael Siegel).
\end{itemize}

\item 1.71 (API changes: none)
\begin{itemize}
\item Exporting of objects using \texttt{sfExport} is speed up (round 30%)
\item Fixed a bug on Windows in \texttt{sfSource}
\end{itemize}
\item 1.70 (API changes: minor additions, BEHAVIOR CHANGES: logging)
\begin{itemize}
\item Behavior change: new default: no logging of slave/worker output.
\item API change: new argument \texttt{slaveOutfile} on \texttt{sfInit()}.
\item API change: new argument \texttt{restore} on \texttt{sfInit()}.
\item API change: new argument \texttt{master} on \texttt{sfCat}.
\item Windows startup fixed.
\item NWS startup fixed.
\item sfSapply is working as intended.
\item Changing CPU amount during runtime (with multiple sfInit() calls with different
settings in a single program) is now possible using socket and NWS clusters.
\item Dozens of small glitches inside snowfall fixed (also messages are made
more precisly).
\item Package vignette slightly extended.
\end{itemize}
\end{itemize}

\bibliographystyle{plain}
\bibliography{all-bib}
\end{document}