mpi4py.util.sync
Added in version 4.0.0.
The mpi4py.util.sync
module provides parallel synchronization
utilities.
Sequential execution
- class mpi4py.util.sync.Sequential
Sequential execution.
Context manager for sequential execution within a group of MPI processes.
The implementation is based in MPI-1 point-to-point communication. A process with rank i waits in a blocking receive until the previous process rank i-1 finish executing and signals the next rank i with a send.
- __init__(comm, tag=0)
Initialize sequential execution.
Global counter
- class mpi4py.util.sync.Counter
Global counter.
Produce consecutive values within a group of MPI processes. The counter interface is close to that of
itertools.count
.The implementation is based in MPI-3 one-sided operations. A root process (typically rank
0
) holds the counter, and its value is queried and incremented with an atomic RMA fetch-and-add operation.- __init__(start=0, step=1, *, typecode='i', comm=COMM_SELF, info=INFO_NULL, root=0)
Initialize global counter.
- next(incr=None)
Return current value and increment.
Mutual exclusion
- class mpi4py.util.sync.Mutex
Mutual exclusion.
Establish a critical section or mutual exclusion among MPI processes.
The mutex interface is close to that of
threading.Lock
andthreading.RLock
, allowing the use of either recursive or non-recursive mutual exclusion. However, a mutex should be used within a group of MPI processes, not threads.In non-recursive mode, the semantics of
Mutex
are somewhat different than these ofthreading.Lock
:Once acquired, a mutex is held and owned by a process until released.
Trying to acquire a mutex already held raises
RuntimeError
.Trying to release a mutex not yet held raises
RuntimeError
.
This mutex implementation uses the scalable and fair spinlock algorithm from [mcs-paper] and took inspiration from the MPI-3 RMA implementation of [uam-book].
- __init__(*, recursive=False, comm=COMM_SELF, info=INFO_NULL)
Initialize mutex object.
- acquire(blocking=True)
Acquire mutex, blocking or non-blocking.
John M. Mellor-Crummey and Michael L. Scott. Algorithms for scalable synchronization on shared-memory multiprocessors. ACM Transactions on Computer Systems, 9(1):21-65, February 1991. https://doi.org/10.1145/103727.103729
William Gropp, Torsten Hoefler, Rajeev Thakur, Ewing Lusk. Using Advanced MPI - Modern Features of the Message-Passing Interface. Chapter 4, Section 4.7, Pages 130-131. The MIT Press, November 2014. https://mitpress.mit.edu/9780262527637/using-advanced-mpi/
Condition variable
- class mpi4py.util.sync.Condition
Condition variable.
A condition variable allows one or more MPI processes to wait until they are notified by another processes.
The condition variable interface is close to that of
threading.Condition
, allowing the use of either recursive or non-recursive mutual exclusion. However, the condition variable should be used within a group of MPI processes, not threads.This condition variable implementation uses a MPI-3 RMA-based scalable and fair circular queue algorithm to track the set of waiting processes.
- __init__(mutex=None, *, recursive=True, comm=COMM_SELF, info=INFO_NULL)
Initialize condition variable.
- notify(n=1)
Wake up one or more processes waiting on this condition.
- notify_all()
Wake up all processes waiting on this condition.
- Returns:
The actual number of processes woken up.
- Return type:
Semaphore object
- class mpi4py.util.sync.Semaphore
Semaphore object.
A semaphore object manages an internal counter which is decremented by each
acquire()
call and incremented by eachrelease()
call. The internal counter never reaches a value below zero; whenacquire()
finds that it is zero, it blocks and waits until some other process callsrelease()
.The semaphore interface is close to that of
threading.Semaphore
andthreading.BoundedSemaphore
, allowing the use of either bounded (default) or unbounded semaphores. With a bounded semaphore, the internal counter never exceeds its initial value; otherwiserelease()
raisesValueError
.This semaphore implementation uses a global
Counter
and aCondition
variable to handle waiting and and notification.- __init__(value=1, *, bounded=True, comm=COMM_SELF, info=INFO_NULL)
Initialize semaphore object.
- acquire(blocking=True)
Acquire semaphore, decrementing the internal counter by one.
- release(n=1)
Release semaphore, incrementing the internal counter by one or more.
Examples
1from mpi4py import MPI
2from mpi4py.util.sync import Counter, Sequential
3
4comm = MPI.COMM_WORLD
5
6counter = Counter(comm)
7with Sequential(comm):
8 value = next(counter)
9counter.free()
10
11assert comm.rank == value
1from mpi4py import MPI
2from mpi4py.util.sync import Counter, Mutex
3
4comm = MPI.COMM_WORLD
5
6mutex = Mutex(comm)
7counter = Counter(comm)
8with mutex:
9 value = next(counter)
10counter.free()
11mutex.free()
12
13assert (
14 list(range(comm.size)) ==
15 sorted(comm.allgather(value))
16)