https://github.com/cran/snowfall
Raw File
Tip revision: 82ba53fb0a64fccf078a0cd8ffdb8ba6cc69eec0 authored by Jochen Knaus on 22 August 2008, 00:00:00 UTC
version 1.53
Tip revision: 82ba53f
snowfall.Snw
% \VignetteIndexEntry{An R Package for easier cluster programming based on snow}
% \VignetteKeyword{Parallel Computing}
% \VignetteKeyword{Cluster}
% \VignetteKeyword{HPC}
% \VignetteKeyword{snow}
% \VignetteKeyword{LAM}
% \VignetteKeyword{MPI}

\documentclass[10pt,oneside]{article}

\usepackage{url}

\begin{document}

\pagestyle{empty}

\setlength{\baselineskip}{1.25em}
\setlength{\parskip}{0.5em}
\setlength{\parindent}{0.0em}
\begin{titlepage}
\author{Sascha Frank} 
\title{Developing parallel programs using snowfall}
\author{Jochen Knaus}
\date{2008-04-29}
\maketitle

\begin{abstract}
\texttt{snowfall} is an R package for easier parallel programming
using clusters. Basically it is build upon the package
\texttt{snow}, extending implicit useable sequential support,
some additional calculation functions and tools and direct connection
to the cluster manager \emph{sfCluster}.
\end{abstract}
\end{titlepage}

%% Inhaltsverzeichnis
\tableofcontents
\newpage
\section{snowfall}
\subsection{Getting started}
\subsubsection{Requirements for sequential execution}
Basically \texttt{snowfall} is able to run without any external
library. In this case, it is not possible to use parallel execution
of commands. All potential calls to parallel functions will be
executed sequential.

Programs written in sequential use with \texttt{snowfall} calls
can be running in parallel without any code change.

\subsubsection{Requirements for parallel execution}
If you want to run programs with usage of more than one CPU (on
one machine or a complete cluster of machines), you need to setup
a LAM/MPI cluster on all machines first.

If you are using Debian/Ubuntu Linux, just call\\
\texttt{aptitude install xmpi lam4-dev}\footnote{On other Linux distributions
there are similar packages with probably different name. It is important
that you install the development version of the LAM package, as the
\texttt{Rmpi} package need these files for installation.}

Further you need to install the R-packages \texttt{snow} and
\texttt{Rmpi}.

If your program uses libraries, ensure that these are available on all
nodes. If they are not present in R-default path (on given machine),
ensure that they are accessible in the same location on all machines
(for example \texttt{/home/xy/R.libs}).

If you want to run programs only on your (multi core) computer without
any cluster of many machines, you do not have to setup the cluster
yourself, it will be started implicitly in \texttt{snowfall}s initialisation.

Using two or more machines for cluster calculations, you need to setup
a LAM/MPI cluster and start cluster explicitely.

This is no big thing at all. For example, edit a small textfile like this
one:

\texttt{machine1.yourdomain.com cpu=4 sched=yes\\
machine2.yourdomain.com cpu=2 sched=yes}

Just enter the machines for your cluster and the amount of CPUs.
You start a LAM/MPI cluster using\\
\texttt{lamboot hostfile}\\
where \texttt{hostfile} is the little configuration file edited
above.

To shutdown just call \texttt{lamhalt}.

For further details upon LAM/MPI setup, see \cite{burns94:_lam}.

Note: All parallel programs you start are running in this cluster.
If your program requests 100 CPUs on your private dual-core machine,
you get that amount and 100 R processes are spawn, independant
or available ressources (memory, cpus).

For workgroups or larger clusters, management solutions like
\emph{sfCluster} are strongly recommended.

\subsection{(Short) introduction to parallel programming}
The general goal of paralleling your R program is to vectorize the data
or calculation loops (probably with wrapper functions), as all
calculation functions of \texttt{snowfall} are kind of reimplementations
of R-list/vector functions.

A good introduction to parallel programming for statistical
purposes can be found in \cite{ROSS_07} and \cite{HANA_STAT04}.

\subsection{Introduction to usage of snowfall}
Basically, usage of \texttt{snowfall} always works with the
following scheme:
\begin{enumerate}
\item
Initialization using \texttt{sfInit()}. Set up the cluster (if needed)
and the internal functions. \texttt{sfInit} must be called before
using any function of the \texttt{snowfall} package.\footnote{The only
exception is the function \texttt{sfSetMaxCPUs()}, which raise or limit
the configured maximum CPU count.}
\item Export needed variables/objects to all slaves.
\item
Do some parallel calculations using \texttt{snowfall} calculation
functions. Repeat as many times as needed.
\item
End parallel execution using \texttt{sfStop()}.
\end{enumerate}

The initialisation differs if you use \texttt{snowfall} alone or with
the management tool \emph{sfCluster}. In this chapter we only cover a
standalone usage of \texttt{snowfall}.
For usage with \emph{sfCluster}, see XXX.

If you are firm on using the R package \texttt{snow}, starting with
or porting your program to
\texttt{snowfall} is easy.

The complete initialisation is done with a single call to
\texttt{sfInit()}.
Arguments are \texttt{parallel} and \texttt{cpus}, giving the running
mode (parallel execution or sequential execution).
If running in sequential mode, \texttt{cpus} is ignored (and set
to one).

On calling \texttt{sfInit( parallel=TRUE )} without a running LAM
cluster (but LAM installed), a \emph{local} cluster will be started,
which only contains your local machine. This can be handy on single
multi-core machines. But note you \texttt{sfStop} will not shutdown
this cluster, so you have to stop it yourself manually (if wished).

Sequential mode can be useful for developing the program, probably
on a single core laptop without installed cluster or running Windows
operating system. Also sequential mode is needed to deploy a package
using \texttt{snowfall} safely,
where you cannot assume a user have an useable cluster installed.

If the initialisation fails, probably because of missing base libraries
\texttt{Rmpi} and \texttt{snow},
\texttt{snowfall} falls back to sequential mode with a warning message.

In sequential and parallel execution, 
all functions are useable in both modes in the same way and returning
the same results.

\begin{verbatim}
sfInit( parallel=FALSE )

sfLapply( 1:10, exp )

sfStop()

sfInit( parallel=TRUE, cpus=5 )

## Now, index 1 is calculated on CPU1, 2 on CPU2 and so on.
## Index 6 is again on CPU1.
## So the whole call is done in two steps on the 5 CPUs.
sfLapply( 1:10, exp )

sfStop()
\end{verbatim}

Please note: Most of the \texttt{snowfall} functions are stopping
the program on failure by default (by calling
\texttt{stop()}). This is much safer for unexperienced users. If you
want own failure handling, install your own handler
\texttt{options(error = ...)} to prevent snowfall from stopping in
general. Also most of the functions feature an argument \texttt{stopOnError}
which set to \texttt{FALSE} prevents the functions from stopping.
Do not forget to handle potential errors in your program if using this
feature.

The given behavior is not only better for unexperienced users, any other
behavior would be very nasty on package deployment.

\subsection{Writing parallel programs with snowfall}
\subsubsection{General notes and simple example}
If you detected parts of your program which can be parallised (loops
etc) it is in most cases an fast step to give them a parallel run.

First, rewrite them using Rs list operators (lapply, apply) instead of
loops (if they are not yet calculated by list operators).

Then write a wrapper function to be called by the list operators and
manage a single parallel step. Note there are no local variables, only
the data from the list index will be given as argument.

If you need more than one variable argument, you need to make the required
variables global (assign to global environment) and export them to all
slaves. \texttt{snowfall} provides some functions to make this process
easier (take a look at the package help).

\begin{verbatim}
sfInit( parallel=TRUE, cpus=4 )

b <- c( 3.4, 5.7, 10.8, 8, 7 )

## Export a and b in their current state to all slaves.
sfExport( ''b'' )

parWrapper <- function( datastep, add1, add2 ) {
  cat( ''Data: '', datastep, ''ADD1:'', add1, ''ADD2:'', add2, ''\n'' )

  ## Only possible as ''b'' is exported!
  cat( ''b:'', b[datastep] )

  ## Do something

  return( datastep )
}

## Calls parWrapper with each value of a and additional
## arguments 2 and 3.
result <- sfLapply( 1:5, parWrapper, 2, 3 )

sfStop()
\end{verbatim}

\subsubsection{Basic load balancing using \texttt{sfClusterApplyLB}}
All parallel wrappers around the R-list operators are executed in
blocks: On one step the first $n$ indices are calculated, then the next
$n$ indices, where $n$ is the number of CPUs in the cluster.

This behavior is quite ok in a homogenous cluster, where all or mostly
all machines are built with equal hardware and therefore offer the same
speed. In heterogenous infrastructures, speed is depending on the
slowest machine in the cluster, as the faster machines have to wait for it
to finish its calculation.

If your parallel algorithm is using different time for different problems,
load balancing will reduce overall time in homogenous clusters greatly.

\texttt{snow} and so \texttt{snowfall} feature a simple load balanced method
to avoid waiting times in such environments. If calling
\texttt{sfClusterApplyLB} the faster machines get further indices to
calculate without waiting for the slowest to finish its step.
\texttt{sfClusterApplyLB} is called like \texttt{lapply}.

If your local infrastructure is such an heterogenous structure, this
function is the way to go. It can also be handy in homogenous clusters
where other users spawn processes, too, so sometimes load differs
temporarily.

A visualisation of basic load balacing can be found in \cite{ROSS_07}.

\begin{verbatim}
sfInit( parallel=TRUE, cpus=2 )

calcPar <- function( x ) {
  x1 <- matrix( 0, x, x )
  x2 <- matrix( 0, x, x )

  for( var in 1:nrow( x1 ) ) x1[var,] = runif( ncol( x1 ) )
  for( var in 1:nrow( x2 ) ) x2[var,] = runif( ncol( x1 ) )

  b <- sum( diag( ( x1 %*% x2 ) %*% x1 ) )
  return( b )
}

result <- sfClusterApplyLB( 50:100, calcPar )

sfStop()
\end{verbatim}

\subsubsection{Intermediate result saving and restoring using
               \texttt{sfClusterApplySR}}
Another helpful function for long running clusters is
\texttt{sfClusterApplySR}, which saves intermediate results after
processing $n$-indices (where $n$ is the amount of CPUs). If it is likely
you have to interrupt your program (probably because of server
maintenance) you can start using \texttt{sfClusterApplySR} and restart
your program without the results produced up to the shutdown time.

Please note: Only complete $n$-blocks are saved, as the
function \texttt{sfLapply} is used internally.\footnote{This function is an addition
to \texttt{snow} and therefore could not be integrated in the load
balanced version.}

The result files are saved in the temporary folder
\texttt{~/.sfCluster/RESTORE/x}, where x is a string with a given name
and the name of the input R-file.

\texttt{sfClusterApplySR} is called like \texttt{sfClusterApplyLB} and
therefore like \texttt{lapply}.

If using the function \texttt{sfClusterApplySR} result are always saved
in the intermediate result file. But, if cluster stopped and results
could be restored, restore itself is only done if explicitly stated.
This aims to prevent false results if a program was interrupted by
intend and restarted with different internal parameters (where with
automatical restore probably results from previous runs would be
inserted). So handle with care if you want to restore!

If you only use one call to \texttt{sfClusterApplySR} in your program,
the parameter \texttt{name} does not need to be changed, it only is
important if you use more than one call to \texttt{sfClusterApplySR}.

\begin{verbatim}
sfInit( parallel=TRUE, cpus=2 )

## Saves under Name ``default''
resultA <- sfClusterApplySR( somelist, somefunc )

## Must be another name.
resultB <- sfClusterApplySR( someotherlist, someotherfunc, name=''CALC_TWO'' )

sfStop()
\end{verbatim}

If cluster stops probably during run of \texttt{someotherfunc} and
restarted with restore-Option, the complete result of \texttt{resultA}
is loaded and therefore no calculation on \texttt{somefunc} is done.
\texttt{resultB} is restored with all the data available at shutdown
and calculation begins with the first undefined result.

\emph{Note on restoring errors}: If restoration of data fails (probably
because list size is different in saving and current run),
\texttt{sfClusterApplySR} stops. For securely reason it does not delete
the RESTORE-files itself, but prompt the user the complete path to
delete manually and explicitly.

\subsection{Fault tolerance}
Differing from \texttt{snowFT}, the fault tolerance extension for
\texttt{snow}, \texttt{snowfall} does not feature fault tolerance
(see \cite{HANA_04}).

This is due to the lack of an MPI implementation of \texttt{snowFT}.

\subsection{Traps, Internals}
\texttt{snowfall} limits the amount of CPUs by default (to 40). If you
need more CPUs, call \texttt{sfSetMaxCPUs()} \emph{before} calling \texttt{sfInit()}.
Beware of requesting more CPUs as you have ressources: there are as many R processes
spawned as CPUs wanted. They are distributed across your cluster like in the
given scheme of the LAM host configuration. You can easily kill all machines in
your cluster by requesting huge amounts of CPUs or running very memory consuming
functions across the cluster. To avoid such common problems use \emph{sfCluster}.

For some functions of \texttt{snowfall} it is needed to create global variables
on the master. All these variables start with prefix ``\texttt{.sf}'', please do
not delete them. The internal control structure of \texttt{snowfall} is saved in
the variable \texttt{.sfOptions}, which should be accessed through the wrapper
functions as the structure may change in the future.\section{Using \emph{sfCluster} with \texttt{snowfall}}
\subsection{About \emph{sfCluster}}
\emph{sfCluster} is a small management tool, helping to run parallel R-programs
using \texttt{snowfall}. Mainly, it exculpates the user from setting up a LAM/MPI
cluster on his own. Further, it allows multiple clusters per user and
therefore executes any parallel R program in a single cluster. These clusters
are built according to the current load and usage of your cluster (this
means: only machines are taken with free ressources).

Also, execution is observed and if problems arise, the cluster is shut down.

\emph{sfCluster} can be used with R-interactive shell or batch mode and also feature
a special batch mode with visual logfile and process-displaying.

For further details about installation, administration and configuration of
\emph{sfCluster}, please visit \url{http://www.imbi.uni-freiburg.de/XXX} or
run \texttt{sfCluster {-}{-}help} if you installed it yet.

\subsection{Starting R using \emph{sfCluster}}
An \emph{sfCluster} execution is following these steps:

\begin{enumerate}
 \item Test memory usage of program if not explicitly given. This is done via
       a default temporary (10 minutes) sequential run to determinate the maximum usage
       of RAM on a slave.
       This is important for allocating ressources on slaves.
 \item Detect free ressources in cluster universe.\footnote{Which are all potentially
       useable machines.} Take machines with free ressources matching users
       request.
 \item Start LAM/MPI cluster with previous built setting.
 \item Run R with parameters for \texttt{snowfall} control.
 \item LOOP: Observe execution (check processes, memory usage, and machine state). In
       monitoring mode: Display state of cluster and logfiles on screen.
 \item On interruption or regular end: shutdown cluster.
\end{enumerate}

\subsection{Using \emph{sfCluster}}
The most common parameters of \emph{sfCluster} are \texttt{{-}{-}cpus}, with which you request a certain
amount of CPUs among the cluster (default is 2 in parallel and 1 in sequential mode).
There is a builtin limit for the amount of CPUs, which is changeable using the
\emph{sfCluster} configuration.

There are four execution modes:

\begin{tabular}{lll}
-b & Batchmode (Default) & Run silent on terminal.\\
-i & Interactive R-shell & Ability to use interactive R-shell with cluster.\\
-m & Monitoring mode & Visual processmonitor and logfile viewer.\\
-s & Sequential execution (no cluster usage) & Run without cluster on single CPU.\\
\end{tabular} 

To avoid the (time consuming) memory test, you can specify a maximum amount of
memory usable per slave via option \texttt{{-}{-}mem}. The behavior on excessing this memory
usage is configurable (default: cluster stop).

The memory usage limit is very important for not getting your machines into
swapping (means: shortage of physical RAM), which would hurt performance badly.

So, simple calls to \emph{sfCluster} could be

\begin{verbatim}
  ## Run a given R program with 8 cpus and max. 500MB (0.5 gigabytes) in monitoring mode
  sfCluster -m --cpus=8 --mem=0.5G myRprogram.R

  ## Run nonstopping cluster with real quiet output.
  nohup sfCluster -b --cpus=8 --mem=500M myRprogram.R --quiet

  ## Start R interactive shell with 4 cores. With 300MB memory (MB is default unit)
  ## No R-file is given for interactive mode.
  sfCluster -i --cpus=4 --mem=300
\end{verbatim}

For all possible options and further examples for \emph{sfCluster} usage,
see \texttt{sfCluster {-}{-}help}.

\subsection{The snowfall-side of \emph{sfCluster}}
If you start an R program using \texttt{snowfall} with \emph{sfCluster}, the latter waits
until \texttt{sfInit()} is called and then starts the observation of the execution.

The default behavior if using \emph{sfCluster} is just to call \texttt{sfInit()}
without any argument. Use arguments only if you want to explicitly overwrite
given settings by \emph{sfCluster}.

\subsection{Proposed development cycle}
The following development cycle is of course a proposal. You can skip
or replace any step depending on your own needs.

\begin{enumerate}
\item
Develop program in sequential mode (start using option \texttt{-s}).
\item
Test in parallel mode using interactive mode to detect directly problems
on parallelisation (start using option \texttt{-i}).
\item
Try larger test runs using monitoring mode, observing the cluster and
probably side effects during parallel execution
(start using option \texttt{-m}). Problems arise on single nodes
will be visible (like non correct working libraries).
\item
Do real runs using silent batch mode (start using options
\texttt{-b {-}{-}quiet}). Probably you want to run these runs in the
background of your Unix shell using \texttt{nohup}.
\end{enumerate}\subsection{Future}

Theseadditions are planned for the future:

\begin{itemize}
\item Easy to use installation program
\item Port to OpenMPI
\item Faster SSH connections for observing
\item Extended scheduler for system ressources
\item Extended batch captabilities
\item Port to Windows/CygWin
\end{itemize}
\bibliographystyle{plain}
\bibliography{all-bib}
\end{document}
back to top