MPI programs have always started with the worker processes described as the world for the computation (MPI_COMM_WORLD
). The MPI-2 standard includes the concept of something larger than that world: the universe encompasses all processing resources available, where the world is a subset of the universe. An MPI job granted 24 processing units may startup an MPI program with just one worker, making its world = 1 processing unit and its universe = 24 processing units. The MPI program can use the MPI_Comm_spawn()
function to launch additional workers (which could be the same or a completely unique MPI program) whose MPI_COMM_WORLD
is a subset of the 24-processing element universe associated with the top-level job. An intercommunicator will be created connecting the parent worker to the child processes it has launched (similar to pipes between Unix processes).
The R language has the Rmpi library to provide an interface to MPI programming paradigms. The standard Send/Receive/Broadcast functions are present, as well as the MPI_Comm_spawn()
functionality. Examples included with the library make use of the sub-program spawning, so it's important that it work as expected. One Caviness user1) found the situation to be the contrary:
MPI_Comm_spawn()
worked fineMPI_Comm_spawn()
function, the sub-programs failed to launchIn each test, the same error message was cited: failure of MPI_Init() inside the DPM (Dynamic Process Management) initialization.
Being unfamiliar with this portion of the MPI-2 API, I started by authoring a simple program making use of MPI_Comm_spawn()
.
--np 1
with a universe size of 4MPI_COMM_WORLD
of 3)The code was compiled against Open MPI 3.1.0 (as was the Rmpi library being used) and worked as expected: the manager displayed its random number, then the three workers were started by the manager and received the broadcast, displaying the same random number.
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <error.h> #include <errno.h> #include "mpi.h" #ifdef BUILD_MANAGER void choose_worker_program( char *s, size_t slen ) { strncpy(s, "./worker", slen); s[slen - 1] = '\0'; } int main(int argc, char *argv[]) { int world_size, universe_size, *universe_sizep, flag, ball, rc; MPI_Comm everyone; /* intercommunicator */ char worker_program[100]; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &world_size); if (world_size != 1) error(0, 0, "Top heavy with management"); MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &universe_sizep, &flag); if (!flag) { printf("This MPI does not support UNIVERSE_SIZE. How many\nprocesses total?"); scanf("%d", &universe_size); } else universe_size = *universe_sizep; if (universe_size == 1) error(0, 0, "No room to start workers"); /* * Now spawn the workers. Note that there is a run-time determination * of what type of worker to spawn, and presumably this calculation must * be done at run time and cannot be calculated before starting * the program. If everything is known when the application is * first started, it is generally better to start them all at once * in a single MPI_COMM_WORLD. */ int spawnError; choose_worker_program(worker_program, sizeof(worker_program)); rc=MPI_Comm_spawn(worker_program, MPI_ARGV_NULL, universe_size-world_size, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &everyone, &spawnError); printf("%s: MPI_Comm_spawn() = %d (%d)\n", argv[0], rc, spawnError); fflush(stdout); /* * Parallel code here. The communicator "everyone" can be used * to communicate with the spawned processes, which have ranks 0,.. * MPI_UNIVERSE_SIZE-1 in the remote group of the intercommunicator * "everyone". */ srandom(time(NULL) & getpid()); ball = random(); printf("%s: Broadcasting ball = %08X\n", argv[0], ball); MPI_Bcast(&ball, 1, MPI_INT, MPI_ROOT, everyone); MPI_Barrier(everyone); MPI_Finalize(); return 0; } #else int main(int argc, char *argv[], char* env[]) { int rank=0, size=0, parentSize=0, ball=0xCAFEBABE, rc; MPI_Comm parent = NULL; /*char** p = env; while ( *p ) printf("%s\n", *p++);*/ unsetenv("PSM2_DEVICES"); rc=MPI_Init(&argc, &argv); printf("%s: MPI_Init() called (%d)\n", argv[0], rc); fflush(stdout); rc=MPI_Comm_get_parent(&parent); if (parent == MPI_COMM_NULL) error(0, 0, "No parent!"); printf("%s: MPI_Comm_get_parent() = %p (%d)\n", argv[0], (void*)parent, rc);fflush(stdout); rc=MPI_Comm_size(MPI_COMM_WORLD, &size); printf("%s: MPI_Comm_size() = %d (%d)\n", argv[0], size, rc); rc=MPI_Comm_rank(MPI_COMM_WORLD, &rank); printf("%s: MPI_Comm_rank() = %d (%d)\n", argv[0], rank, rc); rc=MPI_Comm_remote_size(parent, &parentSize); if (parentSize != 1) error(0, 0, "Something's wrong with the parent"); printf("%s: MPI_Comm_remote_size() = %d (%d)\n", argv[0], parentSize, rc); /* * Parallel code here. * The manager is represented as the process with rank 0 in (the remote * group of) MPI_COMM_PARENT. If the workers need to communicate among * themselves, they can use MPI_COMM_WORLD. */ rc=MPI_Bcast(&ball, 1, MPI_INT, 0, parent); printf("%s: %d/%d[%d] : ball = %08X (%d)\n", argv[0], rank, size, parentSize, ball, rc); MPI_Barrier(parent); MPI_Finalize(); return 0; } #endif
The issue did not appear to be a fault in the Open MPI MPI_Comm_spawn()
function. The error messages from Rmpi did not say much more than "program failed to start" so more information was needed.
Our standard Open MPI job script template has a single flag that controls the enablement of debug logging for the various Open MPI sub-units: BTL, MTL, PML, etc. With that flag set, the Slurm output for another test of the Rmpi code didn't show much beyond the same finger pointing at DPM initialization inside MPI_Init(). The environment was propagating properly to all child processes (more importantly, PATH
and LD_LIBRARY_PATH
).
I reasoned that the top-level of the Rmpi code was working properly, right up until MPI_Comm_spawn()
; and my C worker program worked fine as a sub-program. So rather than having the Rmpi code spawn copies of itself, I would have it spawn copies of the C worker. I edited the R script and was again greeted with failure in DPM initialization. So the Rmpi library wasn't fundamentally flawed, either.
If Open MPI and R were not to blame, then it had to be an underlying library. A quick search via Google pointed toward many issues with the Intel PSM2 library when working with MPI_Comm_spawn()
. I set PSM2_VERBOSE_ENV
and PSM2_TRACEMASK
in the job script and saw that the PSM2 library was not finding shared memory communicator files for some of the worker processes.
The information about PSM2's failure to find shmem communicators finally produced a useful result in a Google search. Early in the lifetime of Intel Omni-path and the PSM2 library, Intel advocated for a performance optimization in programs using the PSM2 library:
In some instances, enabling only the devices that are required may improve performance.
MPI users not using Intel Omni-Path can set this to enable running in shared memory mode on a single node. It is automatically set for Intel Omni-Path MPI.
If an MPI program is running all workers exclusively on a single node, PSM2-directed communications will be more efficient via mmap()'d files in the /dev/shm
file system on the node. In such cases, the Omni-path hardware can be explicitly excluded by setting PSM2_DEVICES=self,shm
in the environment. In fact, the implementation of Open MPI's PSM2 MTL module did just that: if the size of MPI_COMM_WORLD
matches the number of MPI workers on the current node, then PSM2_DEVICES=self,shm
would be set. Later releases of the PSM2 programmer's manual removed the second sentence cited above.
Naturally, this conflicts with how the MPI-2 universe works. The manager program's MPI_COMM_WORLD is typically a single worker (size = 1) so obviously all worker processes are present on the local node; but the universe in which it will launch additional MPI programs may span many nodes. With PSM2_DEVICES=self,shm
set in the manager program's environment and propagated to the worker programs, the worker programs were restricted to run PSM2 via shared-memory communicators only. So if workers spanned multiple nodes, no internode PSM2 communications channels could exist.
Suddenly the success of my test C program made sense: I had been using a development node with all 4 tasks on a single node. Modifying my test job to instead request 4 tasks (cores) split evenly across 2 nodes, it predictably failed during MPI_Init() inside the DPM initialization.
One possible mitigation was to instruct users that the MPI_Comm_spawn()
API required that they disable the PSM2 MTL module in their MPI jobs. Most versions of Open MPI managed for users on the cluster are also linked against the OFI library, with its own PSM2 implementation that did not forcible set PSM2_DEVICES
.
The problem with setting PSM2_DEVICES
was addressed starting in the 4.0 release of Open MPI. The change was actually very simple: if the total MPI workers in MPI_COMM_WORLD
== the number of local workers on this node, and the total MPI workers > 1, then set PSM2_DEVICES=self,shm
. The second part of the conditional is inspired by the MPI_Comm_spawn()
model of the manager program's being run as a single worker in the universe.
Rebuilding my test C program with a 4.1.2 release of Open MPI available on the cluster, it worked regardless of the 4 tasks' being on 1 or 2 nodes. Rather than expecting all users to alter their job scripts (and remember to do so henceforth), this very simple change to the PSM2 MTL component initialization – which only made use of a variable already part of the comparison between total and local worker counts – took moments to introduce into the source for each managed pre-4.0 version, compile, and install (only the ompi_mtl_psm2.so module was affected).
Thus, as of March 27, 2022, all pre-4.0 versions of Open MPI on Caviness that are managed by IT-RCI have been patched to normalize the behavior of the PSM2 MTL with respect to setting PSM2_DEVICES=self,shm
in the MPI processes' environment. The user who reported the issue has affirmed that the Rmpi code now works as expected.