Revision bdc080d3acb04ab5f1977c15aa2bda23299e7ea8 authored by Leif Strand on 23 July 2005, 09:02:47 UTC, committed by Leif Strand on 23 July 2005, 09:02:47 UTC
1 parent 5040de8
Launchers.py
#!/usr/bin/env python
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#
#<LicenseText>
#=====================================================================
#
# CitcomS.py
# ---------------------------------
#
# Authors:
# Eh Tan, Eun-seo Choi, and Pururav Thoutireddy
# (c) California Institute of Technology 2002-2005
#
# By downloading and/or installing this software you have
# agreed to the CitcomS.py-LICENSE bundled with this software.
# Free for non-commercial academic research ONLY.
# This program is distributed WITHOUT ANY WARRANTY whatsoever.
#
#=====================================================================
#
# Copyright June 2005, by the California Institute of Technology.
# ALL RIGHTS RESERVED. United States Government Sponsorship Acknowledged.
#
# Any commercial use must be negotiated with the Office of Technology
# Transfer at the California Institute of Technology. This software
# may be subject to U.S. export control laws and regulations. By
# accepting this software, the user agrees to comply with all
# applicable U.S. export laws and regulations, including the
# International Traffic and Arms Regulations, 22 C.F.R. 120-130 and
# the Export Administration Regulations, 15 C.F.R. 730-744. User has
# the responsibility to obtain export licenses, or other export
# authority as may be required before exporting such information to
# foreign countries or providing access to foreign nationals. In no
# event shall the California Institute of Technology be liable to any
# party for direct, indirect, special, incidental or consequential
# damages, including lost profits, arising out of the use of this
# software and its documentation, even if the California Institute of
# Technology has been advised of the possibility of such damage.
#
# The California Institute of Technology specifically disclaims any
# warranties, including the implied warranties or merchantability and
# fitness for a particular purpose. The software and documentation
# provided hereunder is on an "as is" basis, and the California
# Institute of Technology has no obligations to provide maintenance,
# support, updates, enhancements or modifications.
#
#=====================================================================
#</LicenseText>
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#
from mpi.Launcher import Launcher
from pyre.util import expandMacros
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# utility functions
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def which(filename):
from os.path import abspath, exists, join
from os import environ, pathsep
dirs = environ['PATH'].split(pathsep)
for dir in dirs:
pathname = join(dir, filename)
if exists(pathname):
return abspath(pathname)
return filename
def hms(t):
return (int(t / 3600), int((t % 3600) / 60), int(t % 60))
defaultEnvironment = "[EXPORT_ROOT,LD_LIBRARY_PATH,PYTHONPATH]"
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Launcher for LAM/MPI
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class LauncherLAMMPI(Launcher):
class Inventory(Launcher.Inventory):
import pyre.inventory
dry = pyre.inventory.bool("dry", default=False)
debug = pyre.inventory.bool("debug", default=False)
nodegen = pyre.inventory.str("nodegen")
extra = pyre.inventory.str("extra")
command = pyre.inventory.str("command", default="mpirun")
python_mpi = pyre.inventory.str("python-mpi", default=which("mpipython.exe"))
environment = pyre.inventory.list("environment", default=defaultEnvironment)
def launch(self):
args = self._buildArgumentList()
if not args:
return False
command = " ".join(args)
self._info.log("executing: {%s}" % command)
dry = self.inventory.dry
if not dry:
import os
os.system(command)
return True
return False
def __init__(self):
Launcher.__init__(self, "lam-mpi")
return
def _buildArgumentList(self):
import sys
nodes = self.nodes
nodelist = self.nodelist
nodegen = self.inventory.nodegen
python_mpi = self.inventory.python_mpi
if not nodes:
nodes = len(nodelist)
if nodes < 2:
self.inventory.nodes = 1
return []
# build the command
args = []
args.append(self.inventory.command)
args.append(self.inventory.extra)
args.append("-x %s" % ','.join(self.inventory.environment))
args.append("-np %d" % nodes)
# use only the specific nodes specified explicitly
if nodelist:
args.append("n" + ",".join([(nodegen) % node for node in nodelist]))
# add the parallel version of the interpreter on the command line
args.append(python_mpi)
args += sys.argv
args.append("--mode=worker")
return args
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# These are Launchers for batch schedulers found on the TeraGrid.
# These should be incorporated into Pythia eventually.
# With something like StringTemplate by Terence Parr and Marq Kole,
# the batch scripts could be generated entirely from an
# inventory-data-driven template.
# http://www.stringtemplate.org/doc/python-doc.html
# This code uses a hybrid approach, mixing Python logic with primitive
# templates powered by pyre.util.expandMacros().
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class LauncherBatch(Launcher):
class Inventory(Launcher.Inventory):
import os, sys
import pyre.inventory
from pyre.units.time import minute
dry = pyre.inventory.bool("dry", default=False)
debug = pyre.inventory.bool("debug", default=False)
python_mpi = pyre.inventory.str("python-mpi", default=which("mpipython.exe"))
task = pyre.inventory.str("task")
walltime = pyre.inventory.dimensional("walltime", default=30.0*minute)
mail = pyre.inventory.bool("mail", default=False)
queue = pyre.inventory.str("queue")
directory = pyre.inventory.str("directory", default="${cwd}")
script = pyre.inventory.str("script", default="${directory}/script")
stdout = pyre.inventory.str("stdout", default="${directory}/stdout")
stderr = pyre.inventory.str("stderr", default="${directory}/stderr")
environment = pyre.inventory.list("environment", default=defaultEnvironment)
cwd = pyre.inventory.str("cwd", default=os.getcwd())
argv = pyre.inventory.str("argv", default=(' '.join(sys.argv)))
def launch(self):
# write the script
scriptFile = open(expandMacros("${script}", self.inv), "w")
scriptFile.write(self._buildScript())
scriptFile.close()
# build the batch command
command = self._buildBatchCommand()
self._info.log("executing: {%s}" % command)
dry = self.inventory.dry
if not dry:
import os
os.system(command)
return True
return False
def __init__(self, name):
Launcher.__init__(self, name)
# Used to recursively expand ${macro) in format strings using my inventory.
class InventoryAdapter(object):
def __init__(self, launcher):
self.launcher = launcher
def __getitem__(self, key):
return expandMacros(str(self.launcher.inventory.getTraitValue(key)), self)
self.inv = InventoryAdapter(self)
return
def _buildBatchCommand(self):
return expandMacros("${batch-command} ${script}", self.inv)
def _buildScript(self):
script = [
"#!/bin/sh",
]
self._buildScriptDirectives(script)
script += [
expandMacros('''\
cd ${directory}
${command} ${python-mpi} ${argv} --mode=worker
''', self.inv)
]
script = "\n".join(script) + "\n"
return script
# Note: mpi.LauncherPBS in Pythia-0.8 does not work!
class LauncherPBS(LauncherBatch):
class Inventory(LauncherBatch.Inventory):
import pyre.inventory
command = pyre.inventory.str("command", default="mpirun -np ${nodes} -machinefile $PBS_NODEFILE") # Sub-launcher?
batch_command = pyre.inventory.str("batch-command", default="qsub")
def __init__(self):
LauncherBatch.__init__(self, "pbs")
def _buildScriptDirectives(self, script):
queue = self.inventory.queue
if queue:
script.append("#PBS -q %s" % queue)
task = self.inventory.task
if task:
script.append("#PBS -N %s" % task)
if self.inventory.stdout:
script.append(expandMacros("#PBS -o ${stdout}", self.inv))
if self.inventory.stderr:
script.append(expandMacros("#PBS -e ${stderr}", self.inv))
resourceList = self._buildResourceList()
script += [
"#PBS -V", # export qsub command environment to the batch job
"#PBS -l %s" % resourceList,
]
return script
def _buildResourceList(self):
resourceList = [
"nodes=%d" % self.nodes,
]
walltime = self.inventory.walltime.value
if walltime:
resourceList.append("walltime=%d:%02d:%02d" % hms(walltime))
resourceList = ",".join(resourceList)
return resourceList
class LauncherLSF(LauncherBatch):
class Inventory(LauncherBatch.Inventory):
import pyre.inventory
command = pyre.inventory.str("command", default="pam -g 1 gmmpirun_wrapper") # TACC-specific?
batch_command = pyre.inventory.str("batch-command", default="bsub")
def __init__(self):
LauncherBatch.__init__(self, "lsf")
def _buildBatchCommand(self):
return expandMacros("${batch-command} < ${script}", self.inv)
def _buildScriptDirectives(self, script):
# LSF scripts must have a job name; otherwise strange "/bin/sh: Event not found"
# errors occur (tested on TACC's Lonestar system).
task = self.inventory.task
if not task:
task = "jobname"
script.append("#BSUB -J %s" % task)
queue = self.inventory.queue
if queue:
script.append("#BSUB -q %s" % queue)
walltime = self.inventory.walltime.value
if walltime:
script.append("#BSUB -W %d:%02d" % hms(walltime)[0:2])
if self.inventory.stdout:
script.append(expandMacros("#BSUB -o ${stdout}", self.inv))
if self.inventory.stderr:
script.append(expandMacros("#BSUB -e ${stderr}", self.inv))
script += [
"#BSUB -n %d" % self.nodes,
]
return script
class LauncherGlobus(LauncherBatch):
class Inventory(LauncherBatch.Inventory):
import pyre.inventory
batch_command = pyre.inventory.str("batch-command", default="globusrun")
resource = pyre.inventory.str("resource", default="localhost")
def _buildBatchCommand(self):
return expandMacros("${batch-command} -b -r ${resource} -f ${script}", self.inv)
def __init__(self):
LauncherBatch.__init__(self, "globus")
return
def _buildScript(self):
import sys
script = [
expandMacros('''\
& (jobType=mpi)
(executable="${python-mpi}")
(count=${nodes})
(directory="${directory}")
(stdout="${stdout}")
(stderr="${stderr}")''', self.inv),
]
script.append(' (environment = %s)' % self._buildEnvironment())
# add the arguments
args = sys.argv
args.append("--mode=worker")
command = ' (arguments= ' + ' '.join([('"%s"' % arg) for arg in args]) + ')'
script.append(command)
script = '\n'.join(script) + '\n'
return script
def _buildEnvironment(self):
from os import environ
#vars = environ.keys()
vars = self.inventory.environment
env = [('(%s "%s")' % (var, environ.get(var,""))) for var in vars]
env = ' '.join(env)
return env
# main
if __name__ == "__main__":
from pyre.applications.Script import Script
class TestApp(Script):
class Inventory(Script.Inventory):
import pyre.inventory
from mpi.Launcher import Launcher
launcher = pyre.inventory.facility("launcher", default="mpich")
def main(self, *args, **kwds):
launcher = self.inventory.launcher
if launcher:
try:
# batch launcher
print launcher._buildScript()
print
print "# submit with", launcher._buildBatchCommand()
except AttributeError:
# direct launcher
print ' '.join(launcher._buildArgumentList())
return
def __init__(self):
Script.__init__(self, "citcoms")
app = TestApp()
app.run()
# version
__id__ = "$Id: Launchers.py,v 1.1.2.1 2005/07/22 03:04:51 leif Exp $"
# End of file
Computing file changes ...