pyina module documentation
ez_map module
The ez_map function is a helper to parallel_map to further simplify parallel programming. Primarily ez_map provides a standard interface for parallel_map, and facilitates running parallel jobs with serial python.
Usage
- A call to ez_map will roughly follow this example:
>>> # get the parallel mapper >>> from pyina.ez_map import ez_map >>> # construct a target function >>> def host(id): ... import socket ... return "Rank: %d -- %s" % (id, socket.gethostname()) ... >>> # launch the parallel map of the target function >>> results = ez_map(host, range(100), nodes = 10) >>> for result in results: ... print(result)
Implementation
A parallel application is launched by using a helper script (e.g. ezrun.py) as an intermediary between the MPI implementation of the parallel map (e.g. pyina.mpi_pool.parallel_map) and the user’s serial python.
- The system call that submits the mpi job is blocking. Reasons are::
If the main program exits before the parallel job starts, any temp files used by ez_map will be lost.
User is supposed to want to use the return value of the map, so blocking at the result of map shouldn’t be a big hinderance.
If we were to allow the call to be asynchronous, we would need to implement some kind of ‘deferred’ mechanism or job monitoring.
Argument movement for the argument list and the returned results are pickled, while the mapped function is either saved to and imported from a temporary file (e.g. pyina.ez_map.ez_map), or transferred through serialization (e.g. pyina.ez_map.ez_map2). Either implementation has it’s own advantages and weaknesses, and one mapper may succeed in a case where the other may fail.
- aprun_launcher(kdict={})
prepare launch for parallel execution using aprun syntax: aprun -n(nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N 1’, …}
- aprun_tasks(nodes)
Helper function. compute aprun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, aprun_tasks(“3:core4:ppn=2”) yields ‘3 -N 2’
- ez_map(func, *arglist, **kwds)
higher-level map interface for selected mapper and launcher
maps function ‘func’ across arguments ‘arglist’. arguments and results are stored and sent as pickled strings, while function ‘func’ is inspected and written as a source file to be imported.
- Further Input:
nodes – the number of parallel nodes launcher – the launcher object scheduler – the scheduler object mapper – the mapper object timelimit – string representation of maximum run time (e.g. ‘00:02’) queue – string name of selected queue (e.g. ‘normal’)
- ez_map2(func, *arglist, **kwds)
higher-level map interface for selected mapper and launcher
maps function ‘func’ across arguments ‘arglist’. arguments and results are stored and sent as pickled strings, the function ‘func’ is also stored and sent as pickled strings. This is different than ‘ez_map’, in that it does not use temporary files to store the mapped function.
- Further Input:
nodes – the number of parallel nodes launcher – the launcher object scheduler – the scheduler object mapper – the mapper object timelimit – string representation of maximum run time (e.g. ‘00:02’) queue – string name of selected queue (e.g. ‘normal’)
- launch(command)
launch mechanism for prepared launch command
- moab_launcher(kdict={})
prepare launch for moab submission using srun, mpirun, aprun, or serial syntax: echo “srun -n(nodes) (python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “%(mpirun)s -np (nodes) (python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “aprun -n (nodes) (python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “(python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:ppn=1,partition=xx’, …}
- class moab_scheduler
Bases:
object
moab scheduler – configured for mpirun, srun, aprun, or serial
- aprun = 'moab_aprun'
- mpirun = 'moab_mpirun'
- serial = 'moab_serial'
- srun = 'moab_srun'
- mpirun_launcher(kdict={})
prepare launch for parallel execution using mpirun syntax: mpiexec -np (nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- mpirun_tasks(nodes)
Helper function. compute mpirun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, mpirun_tasks(“3:core4:ppn=2”) yields 6
- serial_launcher(kdict={})
prepare launch for standard execution syntax: (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- srun_launcher(kdict={})
prepare launch for parallel execution using srun syntax: srun -n(nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N1’, …}
- srun_tasks(nodes)
Helper function. compute srun task_string from node string of pattern = N[:ppn=P][,partition=X] For example, srun_tasks(“3:ppn=2,partition=foo”) yields ‘3 -N2’
- torque_launcher(kdict={})
prepare launch for torque submission using mpiexec, srun, aprun, or serial syntax: echo “mpiexec -np (nodes) (python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “srun -n(nodes) (python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “aprun -n (nodes) (python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “(python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:nodetype:ppn=1’, …}
launchers module
This module contains prepared launchers for parallel execution, including bindings to some common combinations of launchers and schedulers.
- Base classes:
SerialMapper - base class for pipe-based mapping with python ParallelMapper - base class for pipe-based mapping with mpi4py
- Parallel launchers:
Mpi - Slurm - Alps -
- Pre-built combinations of the above launchers and schedulers:
TorqueMpi, TorqueSlurm, MoabMpi, MoabSlurm
- Pre-configured maps using the ‘scatter-gather’ strategy:
MpiScatter, SlurmScatter, AlpsScatter, TorqueMpiScatter, TorqueSlurmScatter, MoabMpiScatter, MoabSlurmScatter
- Pre-configured maps using the ‘worker pool’ strategy:
MpiPool, SlurmPool, AlpsPool, TorqueMpiPool, TorqueSlurmPool, MoabMpiPool, MoabSlurmPool
Usage
A typical call to a pyina mpi map will roughly follow this example:
>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])
Several common configurations are available as pre-configured maps. The following is identical to the above example:
>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])
Notes
This set of parallel maps leverage the mpi4py module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps provided here…
<<< FIXME >>
functionality when run from a script, however are somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:
<<< END FIXME >>>
- class Alps(*args, **kwds)
Bases:
ParallelMapper
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- _launcher(kdict={})
prepare launch for parallel execution using aprun
equivalent to: aprun -n (nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N 1’, …}
- map(func, *args, **kwds)
The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.
- Optional Keyword Arguments:
onall = if True, include master as a worker [default: True]
NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.
- Additional keyword arguments are passed to ‘func’ along with ‘args’.
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
- run non-python commands with: {‘python’:’’, …}
prepare launch for parallel execution using aprun
equivalent to: aprun -n (nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N 1’, …}
- njobs(nodes)
convert node_string intended for scheduler to aprun node_string
compute aprun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, aprun.njobs(“3:core4:ppn=2”) yields ‘3 -N 2’
- class AlpsPool(*args, **kwds)
Bases:
Alps
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class AlpsScatter(*args, **kwds)
Bases:
Alps
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MoabMpi(*args, **kwds)
Bases:
Mpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MoabMpiPool(*args, **kwds)
Bases:
MoabMpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MoabMpiScatter(*args, **kwds)
Bases:
MoabMpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MoabSlurm(*args, **kwds)
Bases:
Slurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MoabSlurmPool(*args, **kwds)
Bases:
MoabSlurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MoabSlurmScatter(*args, **kwds)
Bases:
MoabSlurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class Mpi(*args, **kwds)
Bases:
ParallelMapper
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- _launcher(kdict={})
prepare launch command for parallel execution using mpirun
equivalent to: mpiexec -np (nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- map(func, *args, **kwds)
The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.
- Optional Keyword Arguments:
onall = if True, include master as a worker [default: True]
NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.
- Additional keyword arguments are passed to ‘func’ along with ‘args’.
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
- run non-python commands with: {‘python’:’’, …}
prepare launch command for parallel execution using mpirun
equivalent to: mpiexec -np (nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- njobs(nodes)
convert node_string intended for scheduler to mpirun node_string
compute mpirun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, mpirun.njobs(“3:core4:ppn=2”) yields 6
- class MpiPool(*args, **kwds)
Bases:
Mpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class MpiScatter(*args, **kwds)
Bases:
Mpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class ParallelMapper(*args, **kwds)
Bases:
Mapper
Mapper base class for pipe-based mapping with mpi4py.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- __get_nodes()
get the number of nodes in the pool
- __nodes = None
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes in the pool
- _launcher(kdict={})
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- map(func, *args, **kwds)
The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.
- Optional Keyword Arguments:
onall = if True, include master as a worker [default: True]
NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.
- Additional keyword arguments are passed to ‘func’ along with ‘args’.
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- njobs(nodes)
convert node_string intended for scheduler to int number of nodes
compute int from node string. For example, parallel.njobs(“4”) yields 4
- property nodes
get the number of nodes in the pool
- class SerialMapper(*args, **kwds)
Bases:
Mapper
Mapper base class for pipe-based mapping with python.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will default to 1. If source is not given, will attempt to minimially use TemporaryFiles. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If timeout is not given, will default to scheduler’s timelimit or INF.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- __repr__()
Return repr(self).
- _launcher(kdict={})
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- map(func, *args, **kwds)
The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.
- Optional Keyword Arguments:
onall = if True, include master as a worker [default: True]
NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.
- Additional keyword arguments are passed to ‘func’ along with ‘args’.
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …}
- class Slurm(*args, **kwds)
Bases:
ParallelMapper
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- _launcher(kdict={})
prepare launch for parallel execution using srun
equivalent to: srun -n(nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N1’, …}
- map(func, *args, **kwds)
The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.
- Optional Keyword Arguments:
onall = if True, include master as a worker [default: True]
NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.
- Additional keyword arguments are passed to ‘func’ along with ‘args’.
prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
Notes
- run non-python commands with: {‘python’:’’, …}
prepare launch for parallel execution using srun
equivalent to: srun -n(nodes) (python) (program) (progargs)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N1’, …}
- njobs(nodes)
convert node_string intended for scheduler to srun node_string
compute srun task_string from node string of pattern = N[:ppn=P][,partition=X] For example, srun.njobs(“3:ppn=2,partition=foo”) yields ‘3 -N2’
- class SlurmPool(*args, **kwds)
Bases:
Slurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class SlurmScatter(*args, **kwds)
Bases:
Slurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class TorqueMpi(*args, **kwds)
Bases:
Mpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class TorqueMpiPool(*args, **kwds)
Bases:
TorqueMpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class TorqueMpiScatter(*args, **kwds)
Bases:
TorqueMpi
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class TorqueSlurm(*args, **kwds)
Bases:
Slurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class TorqueSlurmPool(*args, **kwds)
Bases:
TorqueSlurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- class TorqueSlurmScatter(*args, **kwds)
Bases:
TorqueSlurm
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
mappers module
tiny function wrapper to make ez_map interface for mappers more standard
- provides:
mapper_str = mapper() interface
(for a the raw map function, use parallel_map directly)
- scatter_gather()
use the ‘scatter-gather’ strategy; hence split the workload as equally as possible across all available workers in a single pass
- worker_pool()
use the ‘worker pool’ strategy; hence one job is allocated to each worker, and the next new work item is provided when a node completes its work
mpi module
This module contains the base of map and pipe interfaces to the mpi4py module.
- Pipe methods provided:
???
- Map methods provided:
map - blocking and ordered worker pool [returns: list]
- Base classes:
Mapper - base class for pipe-based mapping
Usage
A typical call to a pyina mpi map will roughly follow this example:
>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> print (pool.map(pow, [1,2,3,4], [5,6,7,8]))
Several common configurations are available as pre-configured maps. The following is identical to the above example:
>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> print (pool.map(pow, [1,2,3,4], [5,6,7,8]))
Notes
See pyina.launchers and pyina.schedulers for more launchers and schedulers.
- class Mapper(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper base class for pipe-based mapping.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will default to 1. If source is not given, will attempt to minimially use TemporaryFiles. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If timeout is not given, will default to scheduler’s timelimit or INF.
For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).
- __launch(command)
launch mechanism for prepared launch command
- __repr__()
Return repr(self).
- __settings()
apply default settings, then update with given settings
- _cleanup(*args)
clean-up any additional tempfiles - path to pickled function output (e.g. ‘my_results’) - path to pickled function source (e.g. ‘my_func.py or ‘my_func.pik’) - path to pickled function inputs (e.g. ‘my_args.arg’)
- _launcher(kdict={})
prepare launch command based on current settings
equivalent to: NotImplemented
- _modularize(func)
pickle.dump function to tempfile
- _modulenamemangle(modfilename)
mangle modulename string for use by mapper
- _pickleargs(args, kwds)
pickle.dump args and kwds to tempfile
- _save_in(*args)
save input tempfiles - path to pickled function source (e.g. ‘my_func.py or ‘my_func.pik’) - path to pickled function inputs (e.g. ‘my_args.arg’)
- _save_out(*args)
save output tempfiles - path to pickled function output (e.g. ‘my_results’)
- map(func, *args, **kwds)
The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.
- Optional Keyword Arguments:
onall = if True, include master as a worker [default: True]
NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.
Additional keyword arguments are passed to ‘func’ along with ‘args’.
- property settings
apply default settings, then update with given settings
- _debug(boolean)
if True, print debuging info and save temporary files after pickling
- _save(boolean)
if True, save temporary files after pickling; useful for debugging
mpi_pool module
- MPool
alias of
Pool
- __index(*inputs)
build an index iterator for the given inputs
- __queue(*inputs)
iterator that groups inputs by index (i.e. [(x[0], a[0]),(x[1], a[1])])
- _debug(boolean)
print debug statements
- lookup(inputs, *index)
get tuple of inputs corresponding to the given index
- parallel_map(func, *seq, **kwds)
the worker pool strategy for mpi
mpi_scatter module
- __index(*inputs)
build an index iterator for the given inputs
- __queue(*inputs)
iterator that groups inputs by index (i.e. [(x[0], a[0]),(x[1], a[1])])
- balance_workload(nproc, popsize, *index, **kwds)
divide popsize elements on ‘nproc’ chunks
nproc: int number of nodes popsize: int number of jobs index: int rank of node(s) to calculate for (using slice notation) skip: int rank of node upon which to not calculate (i.e. the master)
returns (begin, end) index vectors
- get_workload(index, nproc, popsize, skip=None)
returns the workload that this processor is responsible for
index: int rank of node to calculate for nproc: int number of nodes popsize: int number of jobs skip: int rank of node upon which to not calculate (i.e. the master)
returns (begin, end) index
- lookup(inputs, *index)
get tuple of inputs corresponding to the given index
- parallel_map(func, *seq, **kwds)
the scatter-gather strategy for mpi
schedulers module
This module contains bindings to some common schedulers.
- Base classes:
Scheduler - base class for cpu cluster scheduling
- Schedulers:
Torque - Moab - Lsf -
Usage
A typical call to a pyina mpi map will roughly follow this example:
>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.mpi import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])
Notes
The schedulers provided here are built through pipes and not direct bindings, and are currently somewhat limited on inspecting the status of a submitted job and killing a submitted job. Currently, the use of pre-built scheduler job files are also not supported.
- class Lsf(*args, **kwds)
Bases:
Scheduler
Scheduler that leverages the lsf scheduler.
- Important class members:
nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to
NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.
- _submit(command, kdict={})
prepare the given command for submission with bsub
equivalent to: bsub -K -W (timelimit) -n (nodes) -o (outfile) -e (errfile) -q (queue) -J (progname) “(command)”
Notes
if mpich=’mx’, uses “-a mpich_mx mpich_mx_wrapper” instead of given launcher if mpich=’gm’, uses “-a mpich_gm gmmpirun_wrapper” instead of given launcher run non-python commands with: {‘python’:’’, …}
- submit(command)
submit the given command with bsub
equivalent to: bsub -K -W (timelimit) -n (nodes) -o (outfile) -e (errfile) -q (queue) -J (progname) “(command)”
Notes
if mpich=’mx’, uses “-a mpich_mx mpich_mx_wrapper” instead of given launcher if mpich=’gm’, uses “-a mpich_gm gmmpirun_wrapper” instead of given launcher run non-python commands with: {‘python’:’’, …}
- class Moab(*args, **kwds)
Bases:
Scheduler
Scheduler that leverages the moab scheduler.
- Important class members:
nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to
NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.
- _submit(command, kdict={})
prepare the given command for submission with msub ` equivalent to: echo “(command)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:ppn=1,partition=xx’, …}
- submit(command)
submit the given command with msub ` equivalent to: echo “(command)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:ppn=1,partition=xx’, …}
- class Scheduler(*args, **kwds)
Bases:
object
Scheduler base class for cpu cluster scheduling.
- Important class members:
nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to
NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.
- __init(*args, **kwds)
default filter for __init__ inputs
- __launch(command)
launch mechanism for prepared launch command
- __nodes = 1
- __repr__()
Return repr(self).
- __settings()
fetch the settings for the map (from defaults and self.__dict__)
- _cleanup()
clean-up scheduler files (jobfile, outfile, and errfile)
- _prepare()
prepare the scheduler files (jobfile, outfile, and errfile)
- _submit(command, kdict={})
prepare the given command for the scheduler
equivalent to: (command)
- fetch(outfile, subproc=None)
fetch result from the results file
- property settings
fetch the settings for the map (from defaults and self.__dict__)
- submit(command)
submit the given command to the scheduler
equivalent to: (command)
- class Torque(*args, **kwds)
Bases:
Scheduler
Scheduler that leverages the torque scheduler.
- Important class members:
nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to
NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.
- _submit(command, kdict={})
prepare the given command for submission with qsub
equivalent to: echo “(command)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:nodetype:ppn=1’, …}
- submit(command)
submit the given command with qsub
equivalent to: echo “(command)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)
Notes
run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:nodetype:ppn=1’, …}
tools module
Various mpi python tools
- Main function exported are::
ensure_mpi: make sure the script is called by mpi-enabled python
get_workload: get the workload the processor is responsible for
- balance_workload(nproc, popsize, *index, **kwds)
divide popsize elements on ‘nproc’ chunks
nproc: int number of nodes popsize: int number of jobs index: int rank of node(s) to calculate for (using slice notation) skip: int rank of node upon which to not calculate (i.e. the master)
returns (begin, end) index vectors
- ensure_mpi(size=1, doc=None)
ensure that mpi-enabled python is being called with the appropriate size
- inputs:
size: minimum required size of the MPI world [default = 1]
doc: error string to throw if size restriction is violated
- get_workload(index, nproc, popsize, skip=None)
returns the workload that this processor is responsible for
index: int rank of node to calculate for nproc: int number of nodes popsize: int number of jobs skip: int rank of node upon which to not calculate (i.e. the master)
returns (begin, end) index
- isoformat(seconds)
generate an isoformat timestring for the given time in seconds
- isoseconds(time)
calculate number of seconds from a given isoformat timestring
- lookup(inputs, *index)
get tuple of inputs corresponding to the given index
- mpiprint(string='', end='\n', rank=0, comm=None)
print the given string to the given rank
- which_mpirun(mpich=None, fullpath=False)
try to autodetect an available mpi launcher
if mpich=True only look for mpich, if False only look for openmpi
- which_python(lazy=False, fullpath=True)
get an invocation for this python on the execution path
- which_strategy(scatter=True, lazy=False, fullpath=True)
try to autodetect an available strategy (scatter or pool)