1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816 | ##*****************************************************************************
## Function for initialisizing the Cluster.
##
## Also the predefinition of (internal) global variables is done here,
## mainly because of the code check of R.
##
## Compability issue: snowfall needs to know if sfCluster is working (also
## old versions of sfCluster).
## --session could be set by other solutions.
## So, --lockfile is decided to be the sfCluster indicator, as this most
## likely will have no use if not used with sfCluster.
## Therefore setting of --lockfile (LOCKFILE) can cause troubles.
##*****************************************************************************
## These variables are used by internal function. Need to declared for
## the compiler warnings. As these cannot be altered directly, setOption() does
## this (in the namespace). So no global objects are set, internal state is
## kept inside the namespace.
DEBUG <- FALSE ## Static switch for debugging messages
.sfOption <- list() ## Configuration of this master
.sfPresetCPUs <- 0 ## Presetted CPU amount (max. allocatable).
## Some vars needed at specific points, which can handled correctly otherwise,
## but would raise R CMD check warnings if not defined before.
.sfPars <- '' ## Tmp. var for sfLibrary.
.sfLoadError <- '' ## Tmp. var for loading.
.sfTestVar5 <- 0 ## Exporting test in sfTest().
##*****************************************************************************
## Function for initialisizing the Cluster.
##
## Attention: this package does nasty things with explicit sideeffects (not
## only using "require" and "options" etc.)...
##
## "Nodes" and "CPUs" are used identical (unbeautiful).
##
## PARAMETER: [Boolean parallel - overwrite CMDline settings],
## [Int nodes - overwrites commandline settings / DEPRECATED]
## [Int cpus - overwrites commandline settings]
## [Boolean nostart - If set, no cluster start will be run.
## Needed for nested usage of snowfall]
## [Boolean restore - Globally set restore]
## [String type - {'MPI','SOCK', 'PVM', 'NWS'}
## [Vector socketHosts - List of all hosts used in socketmode]
## [slaveOutfile - filename for output on slaves]
## [useRscript - Startup via R Script or shellscript. Only snow>0.3]
## RETURN: Boolean TRUE
##*****************************************************************************
sfInit <- function( parallel=NULL,
cpus=NULL,
type=NULL,
socketHosts=NULL,
restore=NULL,
slaveOutfile=NULL,
nostart=FALSE,
useRscript=FALSE ## snow: Default is TRUE.
) {
## Flag for detection of reconnect (means: non-first calls to sfInit())
reconnect <- FALSE
## Get rid of that stupid data load to global env.
initEnv <- new.env()
## Saves users from many own if-clauses probably.
if( nostart ) return( TRUE )
## Are options setted?
if( length( .sfOption ) == 0 ) {
debug( "Setup sfOption..." )
## Add 1.62: list from sysdata cleared and created again
setOption( "parallel", FALSE )
setOption( "session", NULL )
setOption( "priority", 1 )
setOption( "nodes", 1 )
setOption( "stopped", FALSE )
setOption( "init", FALSE )
## Load configuration file: delivered with package and changeable by user.
data( "config", package="snowfall", envir=initEnv )
configM <- as.matrix( t( config ) )
config <- as.list( configM )
names( config ) <- dimnames( configM )[[2]]
## Node count are limited in snowfall as well (as it is useable without
## sfCluster) and you probably don't want an arbitrary amount of CPUs
## requested by a DAU.
## If changed preset exists, take this number.
if( .sfPresetCPUs > 0 )
setOption( "MAXNODES", .sfPresetCPUs )
else
setOption( "MAXNODES", as.numeric( config[["MAXNODES"]] ) )
## Startup lockfile (only coming from sfCluster and if available
## signalling that snowfall is started through sfCluster).
## LOCKFILE can only be set through commandline --lockfile
setOption( "LOCKFILE", "" )
## Temporary directory (for logfiles, esp. on the slaves)
## Only if set, if not, take default.
if( as.character( config[["TMPDIR"]] ) != "-" )
setOption( "TMPDIR", path.expand( as.character( config[["TMPDIR"]] ) ) )
else {
## Default tempdir on Unix systems is R session tempdir
if( .Platform$OS.type == "unix" )
setOption( "TMPDIR", file.path( Sys.getenv( "R_SESSION_TMPDIR" ), "sfCluster" ) )
## On any non *nix system: take local dir (R_SESSION_TMPDIR unset on Win)
else
setOption( "TMPDIR", "" )
}
## Addition variables for save/restore (only used in sfClusterApplySR).
setOption( "RESTOREFILES", NULL ) ## List with restore files (for cleanup)
setOption( "RESTOREUPDATE", 5 ) ## Updates percent output any 5%
setOption( "RESTORE", FALSE ) ## Restore previous results?
setOption( "CURRENT", NULL ) ## Currently executed R-File
## Default cluster type (unchangeable by config to ensure runnability
## of a specific code in any setting).
setOption( "type", "SOCK" )
setOption( "sockHosts", NULL )
## Restore file directory (for saved intermediate results) - not neccessary
## under/in TMPDIR.
## (As log files prob woul be set in global dir, restore files should be
## stored under users home - as they don't contain a session-ID or something
## generic unique thing to differ them.
if( as.character( config[["RESTDIR"]] ) != "-" )
setOption( "RESTDIR", path.expand( as.character( config[["RESTDIR"]] ) ) )
else
setOption( "RESTDIR", file.path( Sys.getenv( "HOME" ), ".sfCluster", "restore" ) )
## Remove config (as data() writes it as global variable).
rm( config, envir=initEnv )
#pos=globalenv() )
}
## If .sfOption exists, sfInit() was called before: restart.
## (sfCluster should be able to handle this - although slaves are iterated and
## slave killing is only done through snow).
else {
reconnect <- TRUE
if( .sfOption$stopped && !.sfOption$init )
debug( "Irregluar init state (error on previous init)..." )
## If not stopped, but initialised.
if( !.sfOption$stopped && .sfOption$init ) {
message( "Explicit sfStop() is missing: stop now." )
sfStop()
}
}
##**************************************************************************
## Values for parallel/session can be in the commandline or the environment.
## Function parameters overwrite commandline.
##**************************************************************************
searchCommandline( parallel, cpus=cpus, type=type, socketHosts=socketHosts,
restore=restore )
if( getOption( 'verbose' ) && !reconnect )
print( .sfOption )
## If given restore-directory does not exist, create it.
if( !file.exists( .sfOption$RESTDIR ) ) {
## 1.62: removed
## .sfOption$RESTDIR <<- path.expand( "~/.sfCluster/restore" )
dirCreateStop( .sfOption$RESTDIR )
}
## Running in parallel mode? That means: Cluster setup.
## Will be blocked if argument "nostart" is set (for usage of snowfall
## inside of packages).
if( .sfOption$parallel && !nostart ) {
## Internal stopper. Running in parallel mode a session-ID is needed.
## For testing purposes can be anything (mainly used for pathnames
## of logfiles).
if( startedWithSfCluster() && is.null( .sfOption$session ) )
stop( "No session-ID but parallel run with sfCluster (something went wrong here?)..." )
## @TODO regenerate session id if missing.
## If amount of nodes not set via commandline, then it will be 2
if( is.null( .sfOption$nodes ) || is.na( as.numeric( .sfOption$nodes ) ) )
setOption( "nodes", 2 )
else
setOption( "nodes", as.numeric( .sfOption$nodes ) )
## Preload required libraries if needed (as an extended error check).
libList <- list( "PVM"="rpvm", "MPI"="Rmpi", "NWS"="nws", "SOCK"="" )
if( libList[[.sfOption$type]] != "" ) {
if( !require( libList[[.sfOption$type]], character.only=TRUE ) ) {
message( paste( "Failed to load required library:", libList[[.sfOption$type]],
"for parallel mode", .sfOption$type, "\nFallback to sequential execution" ) )
## Fallback to sequential mode.
return( sfInit( parallel=FALSE ) )
}
else
message( paste( "Library", libList[[.sfOption$type]], "loaded." ) )
}
## In any parallel mode, load snow if needed.
if( !require( snow ) ) {
message( paste( "Failed to load library 'snow' required for parallel mode.\n",
"Switching to sequential mode (1 cpu only)!." ) );
## Fallback to sequential mode.
return( sfInit( parallel=FALSE ) )
}
## Chg. 1.62
## Temporary file for output.
## If sfCluster is running (LOCKFILE given): session is taken.
## If sfCluster not running but user setted slaveOutfile option: take arg.
## Else (default): no slave outfiles (writing to /dev/null|nul).
if( startedWithSfCluster() ) {
tmp <- file.path( .sfOption$TMPDIR,
paste( "rout_", .sfOption$session, sep="" ) )
## Only create temporary directory once and if needed.
## Only needed if running with sfCluster. If user sets it's own
## slaveOutfile, he has to ensure himself about existing pathes.
## If needed create temporary path. Problem: this is executed only on
## master, not on slaves. The clusterstarter needs to manage this.
if( !reconnect )
dirCreateStop( .sfOption$TMPDIR )
}
else
tmp <- ifelse( is.null( slaveOutfile ), '/dev/null', slaveOutfile )
## @TODO Exception handler.
## @TODO Timeout on init.
## Ebenso: Timeout - das ist extrem hässlich, wenn das Cluster nicht
## korrekt startet und hängen bleibt (z.B. wenn zuviele CPUs für das
## Cluster angefordert werden - was PVM schluckt, macht MPI anscheinend
## Kopfzerbrechen).
setDefaultClusterOptions( type = .sfOption$type )
setDefaultClusterOptions( homogenous = FALSE )
## On socket connections the list of hosts needs to be given.
## If no is set, use localhost with default R.
if( .sfOption$type == "SOCK" ) {
## No host information given: use localhost with wished CPUs.
## Else: host settings overwrite wished CPUs (important for error checks!).
if( is.null( .sfOption$sockHosts ) || ( length( .sfOption$sockHosts ) == 0 ) )
setOption( "sockHosts", c( rep( "localhost", .sfOption$nodes ) ) )
else
setOption( "nodes", length( .sfOption$sockHosts ) )
setOption( "cluster", try( makeCluster( .sfOption$sockHosts,
type = "SOCK",
outfile = tmp,
homogenous = TRUE
) ) )
}
# PVM cluster
else if( .sfOption$type == "PVM" ) {
setOption( "cluster", try( makeCluster( .sfOption$nodes,
outfile = tmp ) ) )
}
# Network Spaces
else if( .sfOption$type == "NWS" ) {
if( is.null( .sfOption$sockHosts ) || ( length( .sfOption$sockHosts ) == 0 ) )
setOption( "sockHosts", c( rep( "localhost", .sfOption$nodes ) ) )
else
setOption( "nodes", length( .sfOption$sockHosts ) )
## Patch Markus Schmidberger (Mail 11/25/2008).
setOption( "cluster", try( makeNWScluster(
.sfOption$sockHosts[1:.sfOption$nodes],
type = "NWS",
outfile = tmp
) ) )
}
# MPI cluster (also default for irregular type).
else {
## 1.81: useRScript must be FALSE. Else sfCluster wont work
## with snow > 0.3 (on older snow Versions this option
## is ignored. Also homogenous is always on.
## 1.83: But for non-sfCluster usage at least it has to be modifyable.
setOption( "cluster", try( makeMPIcluster( .sfOption$nodes,
outfile = tmp,
homogenous = TRUE,
useRscript = useRscript
) ) )
}
## Startup successfull? If not: stop.
if( is.null( .sfOption$cluster ) ||
inherits( .sfOption$cluster, "try-error" ) )
stop( paste( "Starting of snow cluster failed!",
geterrmessage(), .sfOption$cluster ) )
## Cluster setup finished. Set flag (used in error handlers and stop).
## Also: no function can be called if init is not set.
setOption( "init", TRUE )
setOption( "stopped", FALSE )
if( !reconnect ) {
## As Snow Init spawn all the requires R-processes, the proprietary
## lockfile can be deleted now (if it exists).
## Problem: now all R procs are spawned, but the observer most
## likely didn't catch them until the next time of his observing
## loop.
if( !is.null( .sfOption$LOCKFILE ) && file.exists( .sfOption$LOCKFILE ) ) {
if( unlink( .sfOption$LOCKFILE ) != 0 )
warning( "Unable to remove startup lockfile: ", .sfOption$LOCKFILE )
else
message( "Startup Lockfile removed: ", .sfOption$LOCKFILE )
}
if( getOption( 'verbose' ) ) {
if( tmp == '/dev/null' )
message( "Slave output suppressed. Use 'slaveOutfile' to activate." )
else
message( paste( "Temporary log for STDOUT/STDERR (on each node): ", tmp, "\n",
"Cluster started with", .sfOption$nodes, "CPUs.", "\n" ) )
}
else
debug( paste( "Temporary log for STDOUT/STDERR (on each node): ", tmp, "\n",
"Cluster started with", .sfOption$nodes, "CPUs.", "\n" ) )
## Write R-Version and Time in (slave-)logfiles.
.startInfo <- strsplit( Sys.info(), "\n" );
.startMsg <- paste( sep="",
"JOB STARTED AT ", date(), # Global Var!
" ON ", .startInfo$nodename, " (OS", .startInfo$sysname,
") ", .startInfo$release, "\n" )
sfExport( ".sfOption", ".startMsg", local=TRUE, namespace="snowfall", debug=DEBUG )
sfCat( .startMsg, "\n", master=FALSE ) ## No master
sfCat( paste( "R Version: ", R.version$version.string, "\n\n" ) )
## Remove starting message.
sfRemove( ".startMsg" )
}
## @TODO Checken, ob dieser Export wirklich noch benötigt ist (jan 10)
else
sfExport( ".sfOption", local=FALSE, namespace="snowfall" )
}
## Sequential mode or option "nostart":
## init will be set. If someone calls sfInit with nostart and aims
## it to be started, it's his or her problem.
else {
## Cluster setup finished. Set flag (used in error handlers and stop).
## Also: no function can be called if init is not set.
setOption( "init", TRUE )
setOption( "stopped", FALSE )
setOption( "cluster", NULL )
}
## Print init Message (esp. print State of parallel and snowfall
## version.
if( sfParallel() ) {
message( paste( "snowfall ", packageDescription( "snowfall" )$Version,
" initialized (using snow ", packageDescription( "snow" )$Version,
"): parallel execution on ", sfCpus(), " CPUs.\n", sep="" ) );
}
else {
message( paste( "snowfall", packageDescription( "snowfall" )$Version,
"initialized: sequential execution, one CPU.\n" ) );
}
return( invisible( TRUE ) )
}
##*****************************************************************************
## Check if sfInit() was called.
## This function is called before any function which need initialised cluster.
##
## Previous it stops with error, now it calls sfInit() without parameters,
## so sfInit() does not have to be called explicitely (requested from Harald).
##
## (Not exported to namespace).
##*****************************************************************************
sfCheck <- function() {
if( !sfIsRunning() ) {
message( paste( "Calling a snowfall function without calling 'sfInit'",
"first or after sfStop().\n'sfInit()' is called now." ) )
return( invisible( sfInit() ) )
}
return( invisible( TRUE ) )
}
##*****************************************************************************
## Exported as userfunction.
## Give the user information if sfInit() was called and cluster is not stopped.
## (Maybe helpful inside of packages etc.).
##*****************************************************************************
sfIsRunning <- function() {
## Add 1.62: stopped as argument
if( ( length( .sfOption ) == 0 ) || !.sfOption$init || .sfOption$stopped )
return( FALSE )
else
return( TRUE )
}
##*****************************************************************************
## Stop the (snow)-Cluster. Just calls Snows stopCluster.
##
## PARAMETER: [Boolean nostop: don't stop]
##*****************************************************************************
sfStop <- function( nostop=FALSE ) {
## Saves users from many own if-clauses probably.
if( nostop ) return( TRUE );
if( exists( ".sfOption" ) && ( length( .sfOption ) > 0 ) ) {
## Only stop if initialisized and running parallel.
if( !.sfOption$stopped && .sfOption$init && .sfOption$parallel ) {
message( "\nStopping cluster\n" )
## Stopping snow cluster.
## NO call to sfGetCluster() here, as sfGetCluster sfCheck()s again.
stopCluster( .sfOption$cluster )
}
## Reset default values.
##.sfOption$init <<- FALSE
setOption( "stopped", TRUE )
setOption( "parallel", FALSE )
## Delete probably stored resultfiles (can also be used in sequential mode!)
deleteRestoreFiles()
}
invisible( NULL )
}
##*****************************************************************************
## Is programm running parallel? Wrapper for internal Optionblock (therefore
## exported of course).
## Also: get cluster Handler (prob. not exported in the final).
##
## RETURN: Boolean Running in parallel mode
##*****************************************************************************
sfParallel <- function() {
sfCheck()
return( .sfOption$parallel )
}
##*****************************************************************************
## Shall sfClusterApplySR restore results?
##*****************************************************************************
sfRestore <- function() {
sfCheck()
return( .sfOption$RESTORE )
}
##*****************************************************************************
## Receive snow cluster handler (for direct calls to snow functions).
##*****************************************************************************
sfGetCluster <- function() {
sfCheck()
return( .sfOption$cluster )
}
##*****************************************************************************
## Receive amount of currently used CPUs (sequential: 1).
##*****************************************************************************
sfCpus <- function() {
sfCheck()
return( .sfOption$nodes )
}
## getter for amount of nodes. Wrapper for sfCPUs.
sfNodes <- function() return( sfCpus() )
##*****************************************************************************
## Receive type of current cluster.
##*****************************************************************************
sfType <- function() {
sfCheck()
if( sfParallel() )
return( .sfOption$type )
else
return( "- sequential -" )
}
##*****************************************************************************
## Receive list with all socket hosts.
##*****************************************************************************
sfSocketHosts <- function() {
if( sfType() == "SOCK" ) {
sfCheck()
return( .sfOption$sockHosts )
}
else {
warning( paste( "No socket cluster used:", sfType() ) )
return( invisible( NULL ) )
}
}
##*****************************************************************************
## getter for session-ID.
##*****************************************************************************
sfSession <- function() {
sfCheck();
return( .sfOption$session )
}
##*****************************************************************************
## Increase max. numbers of CPUs used per process.
## No check for sensefull values (if user wants 1000, you get 1000 :)).
##*****************************************************************************
sfSetMaxCPUs <- function( number=32 ) {
setVar( ".sfPresetCPUs", number )
}
##*****************************************************************************
## Internal function:
##
## Search commandline arguments for Parallel and Session values.
## If there are arguments on function call, these overwrites the values on the
## commandline.
##
## Basically the arguments on the commandline come from sfCluster, but of
## course set manually or via another load- or sessionmanager.
##
## Commandline arguments: --parallel(=[01])*
## --session=\d{8}
## --nodes=\d{1,2}
## --tmpdir=\/[a-z_].*
## --hosts=((\s+:\d+))+
## --restoreDir=\/[a-z_].*
## --restoreSR
## --lockfile
## Results will be saved in options .parallel (bool) and .session (8 chars)
##*****************************************************************************
searchCommandline <- function( parallel=NULL, cpus=NULL,
socketHosts=NULL, type=NULL,
restore=NULL ) {
# if( !exists( ".sfOption", envir=globalenv() ) )
# stop( "Global options missing. Internal error." )
## If set, copy to sfCluster data structure.
if( !is.null( cpus ) ) {
setOption( "nodes", max( 1, cpus ) )
## For socket/NWS clusters: force rebuild of hostlist (as probably changed).
## (If not overwritten later by users own arguments).
setOption( "sockHosts", NULL )
## If more than one CPU is wanted, parallel mode is forced.
## Probably this is not an intended behavior.
# if( .sfOption$nodes > 1 ) {
# ## Potential misuse of argument: inform user.
# if( !is.null( parallel ) && ( parallel == FALSE ) )
# warning( "Explicit parallel=FALSE, but required >1 CPUs ==> parallel mode forced." )
#
# parallel = TRUE
# }
}
## Defaults come from calling arguments on sfInitCluster.
if( !is.null( parallel ) ) {
setOption( "parallel", parallel )
if( parallel ) {
## There is a slightly problem: as many users can use sfCluster without
## session-ID, the session number "XXXXXXXX" is not good enough.
## Problem: we need the filename on clusterinit so we cannot use cluster
## here.
## Win: USERNAME, *nix: LOGNAME
## LOGNAME/USER ist not set under Windows (tried Win Server 2003)
uname <- ifelse( Sys.getenv( "LOGNAME" ) != "", Sys.getenv( "LOGNAME" ),
Sys.getenv( "USERNAME" ) )
if( uname == "" )
uname <- "___"
## Add R for RunSnowMode heterogenous mode.
## XXX Check R version and fill in correct version.
setOption( "session", paste( sep="_",
"XXXXXXXXR",
uname,
format( Sys.time(), "%H%M%S_%m%d%y" ) ) )
## message( "Forced parallel. Using session: ", .sfOption$session, " \n" )
}
## Sequential mode: reduce to one CPU.
else {
setOption( "nodes", 1 )
## message( "Forced to sequential mode.\n" )
}
}
## If socket hosts are set, take them.
if( !is.null( socketHosts ) || is.vector( socketHosts ) )
setOption( "sockHosts", socketHosts )
## Type of the cluster ({SOCK|PVM|MPI|NWS} are allowed).
if( !is.null( type ) ) {
if( length( grep( "PVM|MPI|SOCK|NWS", type ) ) > 0 )
setOption( "type", type )
else {
warning( paste( "Unknown cluster type:", type, "Allowed are: {PVM,MPI,SOCK,NWS}. Fallback to SOCKet." ) )
setOption( "type", "SOCK" )
}
}
## Default value: socket cluster.
else
setOption( "type", "SOCK" )
## Global restore setting (for sfClusterApplySR).
if( !is.null( restore ) )
setOption( "RESTORE", restore )
arguments <- commandArgs()
## Search for currently executed R-file (if there is any). Detected by
## argument followed to option "-f" ("R CMD BATCH" adds -f implicitely).
## Save filename for options (for save/restore)
## @todo Find a better way to detect R-file (is there any?)
## Last argument to be ignored (as no follow-up exists).
if( length( arguments ) >= 2 ) {
for( entry in seq( 1, length( arguments ) - 1 ) ) {
if( !is.null( arguments[entry] ) && ( arguments[entry] == '-f' ) ) {
## Switch to next entry and check if this is valid.
entry <- entry + 1;
## If yes, take it as filename.
if( !is.null( arguments[entry] ) && ( arguments[entry] != "" ) ) {
setOption( "CURRENT", arguments[entry] )
break
}
}
}
}
## No R-file given: set to DEFAULT filename (always occurs in interactive
## mode).
if( is.null( .sfOption$CURRENT ) )
setOption( "CURRENT", "DEFAULT" )
## Go through all arguments from commandline.
for( arg in arguments ) {
## Non sfCluster-like argument? Skip.
## (Only empty argument are '--parallel' and '--restoreSR')
if( ( length( grep( "=", arg ) ) == 0 ) &&
!( ( arg == "--parallel" ) || ( arg == "--restoreSR" ) || ( arg == "--restore" ) ) )
next;
## Arguments in form "--name=value"
args <- strsplit( arg, "=" )
## Marker for parallel execution.
## If parallel was set via function arguments, commandline is ignored.
if( args[[1]][1] == "--parallel" ) {
if( !is.null( args[[1]][2] ) && !is.na( as.numeric( args[[1]][2] ) ) )
cmdParallel <- ifelse( ( as.numeric( args[[1]][2] ) > 0 ), TRUE, FALSE )
## --parallel is allowed to use without value (means: true).
else
cmdParallel <- TRUE
## Ask here, instead there will be a warning if used with commandline arg
## --parallel and sfInit( parallel=TRUE ).
## Rise warning if command arguments are overwritten by sfInit() arguments.
if( is.null( parallel ) )
setOption( "parallel", cmdParallel )
else if( parallel != cmdParallel )
warning( paste( "Commandline argument --parallel",
"overwritten with sfInit argument parallel=", parallel ) )
}
## Marker for general restore (only used in sfClusterApplySR).
## Both --restoreSR/--restore are allowed.
else if( ( args[[1]][1] == "--restoreSR" ) || ( args[[1]][1] == "--restore" ) ) {
if( is.null( restore ) )
setOption( "RESTORE", TRUE )
else if( !restore )
warning( "Commandline argument --parallel",
"overwritten with sfInit argument restore=TRUE" )
}
## Marker for Session-ID.
else if( args[[1]][1] == "--session" ) {
## Session-ID is allways 8 Chars long.
## Not anymore since sfCluster >=0.23
if( !is.null( args[[1]][2] ) ) { ##&& ( nchar( args[[1]][2] ) == 8 ) ) {
setOption( "session", args[[1]][2] )
}
else
warning( paste( "Empty or irregular Session-ID: '", args[[1]][2], "'\n" ) )
}
## Amount of CPUs (formerly called "nodes", kept for backward
## compatibility).
## If set via function arguments, commandline is ignored.
else if( ( args[[1]][1] == "--nodes" ) || ( args[[1]][1] == "--cpus" ) ) {
nodes <- try( as.numeric( args[[1]][2] ) )
if( !is.null( nodes ) && !is.na( nodes ) ) {
if( nodes > .sfOption$MAXNODES ) {
stop( paste( "Too much CPUs allocated:", nodes, "Max.:",
.sfOption$MAXNODES,
"\n - Call sfSetMaxCPUs() before sfInit() if you need more." ) )
}
else
nodes <- max( 1, nodes )
## Really set amount of CPUs? Rise overwrite warning if needed.
if( is.null( cpus ) )
setOption( "nodes", nodes )
else if( cpus != nodes )
warning( paste( "Commandline --cpus=", nodes,
" overwritten by sfInit() argument cpus=", cpus, sep="" ) )
}
else
warning( paste( "Empty or irregular nodes amount: '", nodes, "'\n" ) )
}
## Type of the network.
else if( args[[1]][1] == "--type" ) {
if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) ) {
if( length( grep( "PVM|MPI|SOCK|NWS", args[[1]][2] ) ) > 0 ) {
if( is.null( type ) )
setOption( "type", args[[1]][2] )
else if( type != args[[1]][2] )
warning( paste( "Commandline --type=", args[[1]][2],
" overwritten by sfInit() argument type=", type, sep="" ) )
}
else {
warning( paste( "Unknown cluster type on commandline:", args[[1]][2],
"Allowed are: {PVM,MPI,SOCK,NWS}" ) )
}
}
else
warning( "No cluster-type is given as value for argument --type" )
}
## Hosts for socket mode.
## Arguments come in format:
## nodename:cpus -> On node X are Y cpus used.
## nodename -> On node X one cpu is used.
## Any entries are comma seperated (no whitespace allowed!):
## node1:3,node2,node3:2
else if( args[[1]][1] == "--hosts" ) {
if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) ) {
cmdHosts <- c()
hosts = unlist( strsplit( args[[1]][2], "," ) )
## Examine single host
for( host in hosts ) {
info <- unlist( strsplit( host, ":" ) )
## No CPU amount given: assume 1.
if( is.null( info[2] ) || is.na( info[2] ) )
info[2] <- 1
offset <- as.integer( info[2] )
if( offset <= 0 )
offset <- 1
if( !is.numeric( offset ) )
stop( paste( "NOT NUMERIC: '", offset, "'", sep="" ) )
len <- length( cmdHosts ) + 1
## Insert Host n-times where n is amount of CPUs
## (required for snows argument format).
cmdHosts[seq(len,len+offset-1)] <- rep( as.character( info[1] ), offset )
}
if( is.null( socketHosts ) )
setOption( "sockHosts", cmdHosts )
else if( paste( cmdHosts, collapse="" ) != paste( socketHosts, collapse="" ) ) {
warning( paste( "Commandline --hosts=", args[[1]][2],
" overwritten by sfInit() argument hosts=", paste( socketHosts, collapse="," ),
sep="" ) )
}
}
else
warning( "No hosts are given as value for --hosts" )
}
## Temporary directory: slave logs.
else if( args[[1]][1] == "--tmpdir" ) {
if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) )
setOption( "TMPDIR", args[[1]][2] )
else
warning( "No temporary directory given as value for --tmpdir" )
}
## Restore directory: intermediate results are lawn here.
else if( args[[1]][1] == "--restdir" ) {
if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) )
setOption( "RESTDIR", args[[1]][2] )
else
warning( "No restore/result directory given as value for --restdir" )
}
## Startup lock.
## Add 1.62:
## should only used from sfCluster => is the marker snowfall is started
## though sfCluster!
else if( args[[1]][1] == "--lockfile" ) {
if( !is.null( args[[1]][2] ) && ( nchar( args[[1]][2] ) > 0 ) )
setOption( "LOCKFILE", args[[1]][2] )
else
warning( "No lockfile given as value for --lockfile" )
}
## Unknown option
## Add 1.62
## Add 1.72: patch from Michael Siegel for Mac OS X, which sets --gui.
else if( args[[1]][1] != "--gui" ) {
warning( paste( "Unknown option on commandline:", args[[1]][1] ) )
}
}
invisible( NULL )
}
|