pyina module documentation

ez_map module

The ez_map function is a helper to parallel_map to further simplify parallel programming. Primarily ez_map provides a standard interface for parallel_map, and facilitates running parallel jobs with serial python.

Usage

A call to ez_map will roughly follow this example:
>>> # get the parallel mapper
>>> from pyina.ez_map import ez_map
>>> # construct a target function
>>> def host(id):
...     import socket
...     return "Rank: %d -- %s" % (id, socket.gethostname())
...
>>> # launch the parallel map of the target function
>>> results = ez_map(host, range(100), nodes = 10)
>>> for result in results:
...     print(result)

Implementation

A parallel application is launched by using a helper script (e.g. ezrun.py) as an intermediary between the MPI implementation of the parallel map (e.g. pyina.mpi_pool.parallel_map) and the user’s serial python.

The system call that submits the mpi job is blocking. Reasons are::
  1. If the main program exits before the parallel job starts, any temp files used by ez_map will be lost.

  2. User is supposed to want to use the return value of the map, so blocking at the result of map shouldn’t be a big hinderance.

  3. If we were to allow the call to be asynchronous, we would need to implement some kind of ‘deferred’ mechanism or job monitoring.

Argument movement for the argument list and the returned results are pickled, while the mapped function is either saved to and imported from a temporary file (e.g. pyina.ez_map.ez_map), or transferred through serialization (e.g. pyina.ez_map.ez_map2). Either implementation has it’s own advantages and weaknesses, and one mapper may succeed in a case where the other may fail.

aprun_launcher(kdict={})

prepare launch for parallel execution using aprun syntax: aprun -n(nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N 1’, …}

aprun_tasks(nodes)

Helper function. compute aprun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, aprun_tasks(“3:core4:ppn=2”) yields ‘3 -N 2’

ez_map(func, *arglist, **kwds)

higher-level map interface for selected mapper and launcher

maps function ‘func’ across arguments ‘arglist’. arguments and results are stored and sent as pickled strings, while function ‘func’ is inspected and written as a source file to be imported.

Further Input:

nodes – the number of parallel nodes launcher – the launcher object scheduler – the scheduler object mapper – the mapper object timelimit – string representation of maximum run time (e.g. ‘00:02’) queue – string name of selected queue (e.g. ‘normal’)

ez_map2(func, *arglist, **kwds)

higher-level map interface for selected mapper and launcher

maps function ‘func’ across arguments ‘arglist’. arguments and results are stored and sent as pickled strings, the function ‘func’ is also stored and sent as pickled strings. This is different than ‘ez_map’, in that it does not use temporary files to store the mapped function.

Further Input:

nodes – the number of parallel nodes launcher – the launcher object scheduler – the scheduler object mapper – the mapper object timelimit – string representation of maximum run time (e.g. ‘00:02’) queue – string name of selected queue (e.g. ‘normal’)

launch(command)

launch mechanism for prepared launch command

moab_launcher(kdict={})

prepare launch for moab submission using srun, mpirun, aprun, or serial syntax: echo “srun -n(nodes) (python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “%(mpirun)s -np (nodes) (python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “aprun -n (nodes) (python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “(python) (program) (progargs)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:ppn=1,partition=xx’, …}

class moab_scheduler

Bases: object

moab scheduler – configured for mpirun, srun, aprun, or serial

aprun = 'moab_aprun'
mpirun = 'moab_mpirun'
serial = 'moab_serial'
srun = 'moab_srun'
mpirun_launcher(kdict={})

prepare launch for parallel execution using mpirun syntax: mpiexec -np (nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

mpirun_tasks(nodes)

Helper function. compute mpirun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, mpirun_tasks(“3:core4:ppn=2”) yields 6

serial_launcher(kdict={})

prepare launch for standard execution syntax: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

srun_launcher(kdict={})

prepare launch for parallel execution using srun syntax: srun -n(nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N1’, …}

srun_tasks(nodes)

Helper function. compute srun task_string from node string of pattern = N[:ppn=P][,partition=X] For example, srun_tasks(“3:ppn=2,partition=foo”) yields ‘3 -N2’

torque_launcher(kdict={})

prepare launch for torque submission using mpiexec, srun, aprun, or serial syntax: echo “mpiexec -np (nodes) (python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “srun -n(nodes) (python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “aprun -n (nodes) (python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue) syntax: echo “(python) (program) (progargs)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:nodetype:ppn=1’, …}

class torque_scheduler

Bases: object

torque scheduler – configured for mpirun, srun, aprun, or serial

aprun = 'torque_aprun'
mpirun = 'torque_mpirun'
serial = 'torque_serial'
srun = 'torque_srun'

launchers module

This module contains prepared launchers for parallel execution, including bindings to some common combinations of launchers and schedulers.

Base classes:

SerialMapper - base class for pipe-based mapping with python ParallelMapper - base class for pipe-based mapping with mpi4py

Parallel launchers:

Mpi - Slurm - Alps -

Pre-built combinations of the above launchers and schedulers:

TorqueMpi, TorqueSlurm, MoabMpi, MoabSlurm

Pre-configured maps using the ‘scatter-gather’ strategy:

MpiScatter, SlurmScatter, AlpsScatter, TorqueMpiScatter, TorqueSlurmScatter, MoabMpiScatter, MoabSlurmScatter

Pre-configured maps using the ‘worker pool’ strategy:

MpiPool, SlurmPool, AlpsPool, TorqueMpiPool, TorqueSlurmPool, MoabMpiPool, MoabSlurmPool

Usage

A typical call to a pyina mpi map will roughly follow this example:

>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])

Several common configurations are available as pre-configured maps. The following is identical to the above example:

>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])

Notes

This set of parallel maps leverage the mpi4py module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps provided here…

<<< FIXME >>

functionality when run from a script, however are somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:

<<< END FIXME >>>

class Alps(*args, **kwds)

Bases: ParallelMapper

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

_launcher(kdict={})

prepare launch for parallel execution using aprun

equivalent to: aprun -n (nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N 1’, …}

map(func, *args, **kwds)

The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.

Optional Keyword Arguments:
  • onall = if True, include master as a worker [default: True]

NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.

Additional keyword arguments are passed to ‘func’ along with ‘args’.

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

prepare launch for parallel execution using aprun

equivalent to: aprun -n (nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N 1’, …}

njobs(nodes)

convert node_string intended for scheduler to aprun node_string

compute aprun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, aprun.njobs(“3:core4:ppn=2”) yields ‘3 -N 2’

class AlpsPool(*args, **kwds)

Bases: Alps

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class AlpsScatter(*args, **kwds)

Bases: Alps

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MoabMpi(*args, **kwds)

Bases: Mpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MoabMpiPool(*args, **kwds)

Bases: MoabMpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MoabMpiScatter(*args, **kwds)

Bases: MoabMpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MoabSlurm(*args, **kwds)

Bases: Slurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MoabSlurmPool(*args, **kwds)

Bases: MoabSlurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MoabSlurmScatter(*args, **kwds)

Bases: MoabSlurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class Mpi(*args, **kwds)

Bases: ParallelMapper

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

_launcher(kdict={})

prepare launch command for parallel execution using mpirun

equivalent to: mpiexec -np (nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

map(func, *args, **kwds)

The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.

Optional Keyword Arguments:
  • onall = if True, include master as a worker [default: True]

NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.

Additional keyword arguments are passed to ‘func’ along with ‘args’.

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

prepare launch command for parallel execution using mpirun

equivalent to: mpiexec -np (nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

njobs(nodes)

convert node_string intended for scheduler to mpirun node_string

compute mpirun task_string from node string of pattern = N[:TYPE][:ppn=P] For example, mpirun.njobs(“3:core4:ppn=2”) yields 6

class MpiPool(*args, **kwds)

Bases: Mpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class MpiScatter(*args, **kwds)

Bases: Mpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class ParallelMapper(*args, **kwds)

Bases: Mapper

Mapper base class for pipe-based mapping with mpi4py.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

__get_nodes()

get the number of nodes in the pool

__nodes = None
__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes in the pool

_launcher(kdict={})

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

map(func, *args, **kwds)

The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.

Optional Keyword Arguments:
  • onall = if True, include master as a worker [default: True]

NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.

Additional keyword arguments are passed to ‘func’ along with ‘args’.

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

njobs(nodes)

convert node_string intended for scheduler to int number of nodes

compute int from node string. For example, parallel.njobs(“4”) yields 4

property nodes

get the number of nodes in the pool

class SerialMapper(*args, **kwds)

Bases: Mapper

Mapper base class for pipe-based mapping with python.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will default to 1. If source is not given, will attempt to minimially use TemporaryFiles. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If timeout is not given, will default to scheduler’s timelimit or INF.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

__repr__()

Return repr(self).

_launcher(kdict={})

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

map(func, *args, **kwds)

The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.

Optional Keyword Arguments:
  • onall = if True, include master as a worker [default: True]

NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.

Additional keyword arguments are passed to ‘func’ along with ‘args’.

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

class Slurm(*args, **kwds)

Bases: ParallelMapper

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

_launcher(kdict={})

prepare launch for parallel execution using srun

equivalent to: srun -n(nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N1’, …}

map(func, *args, **kwds)

The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.

Optional Keyword Arguments:
  • onall = if True, include master as a worker [default: True]

NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.

Additional keyword arguments are passed to ‘func’ along with ‘args’.

prepare launch command for pipe-based execution

equivalent to: (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …}

prepare launch for parallel execution using srun

equivalent to: srun -n(nodes) (python) (program) (progargs)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4 -N1’, …}

njobs(nodes)

convert node_string intended for scheduler to srun node_string

compute srun task_string from node string of pattern = N[:ppn=P][,partition=X] For example, srun.njobs(“3:ppn=2,partition=foo”) yields ‘3 -N2’

class SlurmPool(*args, **kwds)

Bases: Slurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class SlurmScatter(*args, **kwds)

Bases: Slurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class TorqueMpi(*args, **kwds)

Bases: Mpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class TorqueMpiPool(*args, **kwds)

Bases: TorqueMpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class TorqueMpiScatter(*args, **kwds)

Bases: TorqueMpi

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class TorqueSlurm(*args, **kwds)

Bases: Slurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class TorqueSlurmPool(*args, **kwds)

Bases: TorqueSlurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

class TorqueSlurmScatter(*args, **kwds)

Bases: TorqueSlurm

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will try to grab the number of nodes from the associated scheduler, and failing will count the local cpus. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If pickle is not given, will attempt to minimially use TemporaryFiles.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

mappers module

tiny function wrapper to make ez_map interface for mappers more standard

provides:

mapper_str = mapper() interface

(for a the raw map function, use parallel_map directly)

scatter_gather()

use the ‘scatter-gather’ strategy; hence split the workload as equally as possible across all available workers in a single pass

worker_pool()

use the ‘worker pool’ strategy; hence one job is allocated to each worker, and the next new work item is provided when a node completes its work

mpi module

This module contains the base of map and pipe interfaces to the mpi4py module.

Pipe methods provided:

???

Map methods provided:

map - blocking and ordered worker pool [returns: list]

Base classes:

Mapper - base class for pipe-based mapping

Usage

A typical call to a pyina mpi map will roughly follow this example:

>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> print (pool.map(pow, [1,2,3,4], [5,6,7,8]))

Several common configurations are available as pre-configured maps. The following is identical to the above example:

>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> print (pool.map(pow, [1,2,3,4], [5,6,7,8]))

Notes

See pyina.launchers and pyina.schedulers for more launchers and schedulers.

class Mapper(*args, **kwds)

Bases: AbstractWorkerPool

Mapper base class for pipe-based mapping.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will default to 1. If source is not given, will attempt to minimially use TemporaryFiles. If workdir is not given, will default to scheduler’s workdir or $WORKDIR. If scheduler is not given, will default to only run on the current node. If timeout is not given, will default to scheduler’s timelimit or INF.

For more details, see the docstrings for the “map” method, or the man page for the associated launcher (e.g mpirun, mpiexec).

__launch(command)

launch mechanism for prepared launch command

__repr__()

Return repr(self).

__settings()

apply default settings, then update with given settings

_cleanup(*args)

clean-up any additional tempfiles - path to pickled function output (e.g. ‘my_results’) - path to pickled function source (e.g. ‘my_func.py or ‘my_func.pik’) - path to pickled function inputs (e.g. ‘my_args.arg’)

_launcher(kdict={})

prepare launch command based on current settings

equivalent to: NotImplemented

_modularize(func)

pickle.dump function to tempfile

_modulenamemangle(modfilename)

mangle modulename string for use by mapper

_pickleargs(args, kwds)

pickle.dump args and kwds to tempfile

_save_in(*args)

save input tempfiles - path to pickled function source (e.g. ‘my_func.py or ‘my_func.pik’) - path to pickled function inputs (e.g. ‘my_args.arg’)

_save_out(*args)

save output tempfiles - path to pickled function output (e.g. ‘my_results’)

map(func, *args, **kwds)

The function ‘func’, it’s arguments, and the results of the map are all stored and shipped across communicators as pickled strings.

Optional Keyword Arguments:
  • onall = if True, include master as a worker [default: True]

NOTE: ‘onall’ defaults to True for both the scatter-gather and the worker pool strategies. A worker pool with onall=True may have added difficulty in pickling functions, due to asynchronous message passing with itself.

Additional keyword arguments are passed to ‘func’ along with ‘args’.

property settings

apply default settings, then update with given settings

_debug(boolean)

if True, print debuging info and save temporary files after pickling

_save(boolean)

if True, save temporary files after pickling; useful for debugging

mpi_pool module

MPool

alias of Pool

__index(*inputs)

build an index iterator for the given inputs

__queue(*inputs)

iterator that groups inputs by index (i.e. [(x[0], a[0]),(x[1], a[1])])

_debug(boolean)

print debug statements

lookup(inputs, *index)

get tuple of inputs corresponding to the given index

parallel_map(func, *seq, **kwds)

the worker pool strategy for mpi

mpi_scatter module

__index(*inputs)

build an index iterator for the given inputs

__queue(*inputs)

iterator that groups inputs by index (i.e. [(x[0], a[0]),(x[1], a[1])])

balance_workload(nproc, popsize, *index, **kwds)

divide popsize elements on ‘nproc’ chunks

nproc: int number of nodes popsize: int number of jobs index: int rank of node(s) to calculate for (using slice notation) skip: int rank of node upon which to not calculate (i.e. the master)

returns (begin, end) index vectors

get_workload(index, nproc, popsize, skip=None)

returns the workload that this processor is responsible for

index: int rank of node to calculate for nproc: int number of nodes popsize: int number of jobs skip: int rank of node upon which to not calculate (i.e. the master)

returns (begin, end) index

lookup(inputs, *index)

get tuple of inputs corresponding to the given index

parallel_map(func, *seq, **kwds)

the scatter-gather strategy for mpi

schedulers module

This module contains bindings to some common schedulers.

Base classes:

Scheduler - base class for cpu cluster scheduling

Schedulers:

Torque - Moab - Lsf -

Usage

A typical call to a pyina mpi map will roughly follow this example:

>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.mpi import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])

Notes

The schedulers provided here are built through pipes and not direct bindings, and are currently somewhat limited on inspecting the status of a submitted job and killing a submitted job. Currently, the use of pre-built scheduler job files are also not supported.

class Lsf(*args, **kwds)

Bases: Scheduler

Scheduler that leverages the lsf scheduler.

Important class members:

nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files

Other class members:

jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to

NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.

_submit(command, kdict={})

prepare the given command for submission with bsub

equivalent to: bsub -K -W (timelimit) -n (nodes) -o (outfile) -e (errfile) -q (queue) -J (progname) “(command)”

Notes

if mpich=’mx’, uses “-a mpich_mx mpich_mx_wrapper” instead of given launcher if mpich=’gm’, uses “-a mpich_gm gmmpirun_wrapper” instead of given launcher run non-python commands with: {‘python’:’’, …}

submit(command)

submit the given command with bsub

equivalent to: bsub -K -W (timelimit) -n (nodes) -o (outfile) -e (errfile) -q (queue) -J (progname) “(command)”

Notes

if mpich=’mx’, uses “-a mpich_mx mpich_mx_wrapper” instead of given launcher if mpich=’gm’, uses “-a mpich_gm gmmpirun_wrapper” instead of given launcher run non-python commands with: {‘python’:’’, …}

class Moab(*args, **kwds)

Bases: Scheduler

Scheduler that leverages the moab scheduler.

Important class members:

nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files

Other class members:

jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to

NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.

_submit(command, kdict={})

prepare the given command for submission with msub ` equivalent to: echo “(command)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:ppn=1,partition=xx’, …}

submit(command)

submit the given command with msub ` equivalent to: echo “(command)” | msub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:ppn=1,partition=xx’, …}

class Scheduler(*args, **kwds)

Bases: object

Scheduler base class for cpu cluster scheduling.

Important class members:

nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files

Other class members:

jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to

NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.

__init(*args, **kwds)

default filter for __init__ inputs

__launch(command)

launch mechanism for prepared launch command

__nodes = 1
__repr__()

Return repr(self).

__settings()

fetch the settings for the map (from defaults and self.__dict__)

_cleanup()

clean-up scheduler files (jobfile, outfile, and errfile)

_prepare()

prepare the scheduler files (jobfile, outfile, and errfile)

_submit(command, kdict={})

prepare the given command for the scheduler

equivalent to: (command)

fetch(outfile, subproc=None)

fetch result from the results file

property settings

fetch the settings for the map (from defaults and self.__dict__)

submit(command)

submit the given command to the scheduler

equivalent to: (command)

class Torque(*args, **kwds)

Bases: Scheduler

Scheduler that leverages the torque scheduler.

Important class members:

nodes - number (and potentially description) of workers queue - name of the scheduler queue [default: ‘normal’] timelimit - upper limit of clocktime for each scheduled job workdir - associated $WORKDIR for scratch calculations/files

Other class members:

jobfile - name of the ‘job’ file pyina.mpi builds for the scheduler outfile - name of the ‘output’ file the scheduler will write to errfile - name of the ‘error’ file the scheduler will write to

NOTE: The format for timelimit is typically ‘HH:MM’ or ‘HH:MM:SS’, while the format for nodes is typically ‘n’ or some variant of ‘n:ppn=m’ where ‘n’ is number of nodes and ‘m’ is processors per node. For more details, see the docstrings for the “sumbit” method, or the man page for the associated scheduler.

_submit(command, kdict={})

prepare the given command for submission with qsub

equivalent to: echo “(command)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:nodetype:ppn=1’, …}

submit(command)

submit the given command with qsub

equivalent to: echo “(command)” | qsub -l nodes=(nodes) -l walltime=(timelimit) -o (outfile) -e (errfile) -q (queue)

Notes

run non-python commands with: {‘python’:’’, …} fine-grained resource utilization with: {‘nodes’:’4:nodetype:ppn=1’, …}

tools module

Various mpi python tools

Main function exported are::
  • ensure_mpi: make sure the script is called by mpi-enabled python

  • get_workload: get the workload the processor is responsible for

balance_workload(nproc, popsize, *index, **kwds)

divide popsize elements on ‘nproc’ chunks

nproc: int number of nodes popsize: int number of jobs index: int rank of node(s) to calculate for (using slice notation) skip: int rank of node upon which to not calculate (i.e. the master)

returns (begin, end) index vectors

ensure_mpi(size=1, doc=None)

ensure that mpi-enabled python is being called with the appropriate size

inputs:
  • size: minimum required size of the MPI world [default = 1]

  • doc: error string to throw if size restriction is violated

get_workload(index, nproc, popsize, skip=None)

returns the workload that this processor is responsible for

index: int rank of node to calculate for nproc: int number of nodes popsize: int number of jobs skip: int rank of node upon which to not calculate (i.e. the master)

returns (begin, end) index

isoformat(seconds)

generate an isoformat timestring for the given time in seconds

isoseconds(time)

calculate number of seconds from a given isoformat timestring

lookup(inputs, *index)

get tuple of inputs corresponding to the given index

mpiprint(string='', end='\n', rank=0, comm=None)

print the given string to the given rank

which_mpirun(mpich=None, fullpath=False)

try to autodetect an available mpi launcher

if mpich=True only look for mpich, if False only look for openmpi

which_python(lazy=False, fullpath=True)

get an invocation for this python on the execution path

which_strategy(scatter=True, lazy=False, fullpath=True)

try to autodetect an available strategy (scatter or pool)