« He's weirder than a five dollar bill | Main | Sir, we can't eliminate the line item for our oxygen supply »

Gazizza, Bill

The exact topic of my dissertation has changed several times.

Here's what I presented to my committee last week, with their comments applied, as well as with information from my coding it up (particularly with their changes). Those who aren't geek-minded can probably ignore the rest of this message.

Fair warning: this is a pretty long journal entry!

Background and Algorithmic Overview

The idea is to have a "fully generalized manager-worker framework". However, the end result is that it's not quite the manager-worker model -- it's more like a "fully generalized, threaded distributed work farm". I started with a model for any [serial] kind of computation that looked something like this (it won't render right in pine -- deal -- go look at the web version):

    |=======|   |===========|   |========|
    | Input |-->| Calculate |-->| Output |
    |=======|   |===========|   |========| 

If you throw some queues in there, you can duplicate and therefore parallelize the Calculate step (keep the Input and Output steps serial, because a) they'll probably take very little time, and b) any part that can be parallelized can be thrown into the Calculate step):

    |=======|   |===|   |===========|   |===|   |========|
    | Input |-->| Q |-->| Calculate |-->| Q |-->| Output |
    |=======|   |===|   |===========|   |===|   |========|
                  |     |===========|     |
                  |====>| Calculate |====>|                  |     |===========|     |
                 ...         ...         ...
                  |     |===========|     |
                  |====>| Calculate |====>|                        |===========| 

That's pretty standard stuff, actually. That's essentially the manager-worker model.

So what I'm doing is two things: extending this model to include threads (still relatively unexplored areas with MPI, particularly since the 2 major freeware MPI implementations have little multithreading support) and to make a distributed scatter/gather scheme.

The goal here is to present a framework (i.e., a library) to the user such that they only have to supply the Input, Calculate, and Output steps. Yes, they do have to be aware of the parallelism, but only so much so that they can make their problem decomposable. The framework takes care of all the bookkeeping. Hence, the user essentially writes three functions (actually, 3 classes, each with a virtual run()-like function, and functions to pack/unpack their input and output data. As briefly mentioned in previous journal entries, I ended up using C++ templates heavily so that the type safety would all work out).

The target audience is people who want parallelism but don't really care how it works. That would be most engineers and scientists -- even some computer scientists| Most of these kinds of users just want to run their results, and run them faster -- they don't care how it works. After all, that's our job (as computer scientists), right?

Back to the description...

From the above picture, if we're only running on one machine (say, a 4-way SMP), the Calculate boxes (instances) will be individual threads. The Input and Output instances will be threads, too. By default, there will be one Calculate thread per CPU -- the Input and Output threads will be "extra" and cause some thrashage of CPU scheduling, but not very much -- particularly when the Calculate step is large enough to run for a while.

Note that the two queues do not have threads running in them --
those queues are just data structures with some intelligent accessor functions. The Input, Calculate, and Output threads access the queues and become a thread active in the queue. But there are no separate threads running the queues themselves.

Using threads is nice because it avoids the whole issue of extraneous memory copying and allows message passing latency hiding (even with single-threaded MPI implementation). If we used the same model with pure MPI instead of threads -- i.e., where Input, each of the Calculate instances, and the Output were all separate MPI ranks on the same node, we'd be doing sends and receives between each of the instances (the queues would possibly be located in the Input and Output ranks), which would invoke at least one memory copy (and probably more). If the input data and output data are large, this could add up to be a non-trivial portion of the wall clock execution time. Using threads within a single process, pointers to input/output data can just be passed between the Input, Calculate, and Output blocks. i.e., pass by reference instead of by value. Therefore, it makes no difference how large (or small) the input and output data is.

Extending this model to cover multiple nodes, let's throw in a definition first. The node on which the Input and Output are run is called the "Server". While it would certainly be possible to run the Input and Output phases on different nodes, this model will assume that they are on the same node, just for simplicity. It is [probably] not difficult to separate them, but this work doesn't focus on that issue. Hence, there's only one server in this model, regardless of however many nodes are involved in the computation.

To extend this model to include multiple nodes, we add a special kind of Calculate instance to the diagram from above -- a "Calculate Relay":

    |=======|   |===|   |===========|   |===|   |========|
    | Input |==>| Q |==>| Calculate |==>| Q |==>| Output |
    |=======|   |===|   |===========|   |===|   |========|
                  |     |===========|     |
                  |====>| Calculate |====>|                  |     |===========|     |
                 ...         ...         ...
                  |     |===========|     |
                  |====>| Calculate |====>|                  |     |===========|     |
                  |     |===========|     |
                  |====>| RelayCalc |====>|                        |===========| 

This RelayCalc instance has the MPI smarts to send input data to, and receive output data from, remote nodes. Notice that it just dequeues input data and enqueues output data just like the other Calculate instances. Hence, the Input and Output instances do not need to know anything special about remote access.

Also note that there will be a thread running the RelayCalc instance. One could conceivably model the relays in the queues, but this would entail having 2 relays, and would cause some issues with non-thread safe MPI implementations (although these issues arise elsewhere, anyway), and it would destroy the idea of not having threads running in the queues. While threads are nice and lightweight, we don't need to have extraneous threads running where we don't need them. Not only are they not free (in terms of resources), they do add complexity (e.g., what would threads running in the queues do?).

The RelayCalc fits in the came category as Input and Output --
it's an "extra" thread, but it is not expected to take many CPU cycles (particularly when the Calculate phase is non-trivial).

Note that there is only one RelayCalc instance, regardless of how many nodes it is relaying to. This greatly simplifies the relaying with a single threaded MPI implementation -- indeed, to have N instances of RelayCalc to relay to N remote nodes would mean that a global lock would have to be used to only allow one RelayCalc instance in MPI at any time. This would mean that that all the RelayCalc instances would have to poll with functions such as MPI_TEST. And this would involve continually locking, testing, unlocking between all the RelayCalc instances, which would certainly keep one or more CPUs busy doing message passing rather than working in the Calculate instances, which is not desirable.

Hence, there's only one RelayCalc instance that can do blocking MPI_WAITANY calls to check for messages from any of the nodes that it is expecting output data from (and checking for completion of sent messages -- see below). This will probably serialize message passing in the server, but that is to be expected with a single-threaded MPI implementation anyway.

Indeed, even if the MPI implementation were multi-threaded, there will frequently be less network interfaces than remote nodes (typically only one), so the network messages will likely be at least somewhat serialized anyway. The best that a multi-threaded MPI implementation could do would be to pipeline messages to different destinations across the available NICs, but that's within the MPI implementation, and not within the user's (i.e., the framework's) control. Indeed, a quality single-threaded MPI implementation can pipeline messages anyway (if non-blocking sends are used). So there's actually little gain (and potentially a lot of CPU cycles to lose) in having multiple RelayCalc instances when using a single-threaded MPI implementation -- the same end result of having multiple RelayCalc instances with a true multi-threaded MPI implementation can be achieved with carefully coded single RelayCalc instance using non-blocking sends and receives with a single-threaded MPI implementation.

(There's a lot of the finer details that I didn't cover in the previous two paragraphs; those are currently left as an exercise to the reader. :-) Read my dissertation for the full scoop)

So now let's look at a typical non-server node in the model:

    |=========|   |===|   |===========|   |===|   |==========|
    | RelayIn |==>| Q |==>| Calculate |==>| Q |==>| RelayOut |
    |=========|   |===|   |===========|   |===|   |==========|
                    |     |===========|     |
                    |====>| Calculate |====>|                    |     |===========|     |
                   ...         ...         ...
                    |     |===========|     |
                    |====>| Calculate |====>|                          |===========| 

A few interesting notes here:

  • The Input and Output instances have been replaced by RelayIn and RelayOut instances, respectively.

  • As far as the Calculate instances are concerned, the model is the same -- it dequeues input, processes, and enqueues output.

The RelayIn and RelayOut instances are the MPI entry and exit points -- input data is relayed to the RelayIn instance from the RelayCalc instance on the Server, and output data is relayed back to the RelayCalc instance by RelayOut. This is why the user has to supply not only the Input, Calculate, and Output instances, but also methods to pack and unpack their the input and output data -- the framework will call them automatically to send and receive the data between nodes.

But again, in terms of the Calculate phase -- nothing is different. It operates exactly as it does on the server node. The framework has just added some magic that moves the input and output data around transparently.

There are now two threads vying for control of MPI. Since we only have a single-threaded MPI implementation, we cannot have both of them making MPI calls simultaneously. The following algorithm allows both threads to "share" access to MPI in a fair manner.

In the beginning of the run, the RelayIn instance has control of MPI because we expect to receive some number of messages to seed the input queue. After those messages have been received, control of MPI is given to the RelayOut. The RelayOut will block while dequeuing output data from the output queue (since the Calculate threads started acting on the data as soon as it was put in the input queue), and then return the output data to the Server. Control is then given back to the RelayIn in order to receive more input data.

That is, the message passing happens at specific times:

  • Messages will only be received at the beginning of the run, or after messages have been sent back to the Server

  • Messages will only be sent after messages have been received and the Calculate threads have converted them to output data

Specifically, incoming and outgoing messages will occur at different (and easily categorizable) points in time. Indeed, outgoing messages will [eventually] trigger new incoming messages, and vice versa. So the simple "handoff" model of switching control of MPI between the RelayIn and RelayOut instances works nicely.

A big performance-determining factor in MPI codes can be latency hiding. Particularly in high-latency networks such as 10/100Mbps ethernet. An advantage of this model is that even with a single threaded MPI, progress can be made on message passing calls which actual calculation work is being done in other threads. This pipelined model can hide most of the latency caused by message passing.

That is, the RelayIn thread can request more input data before the Calculate threads will require it. Hence, when the Calculate threads finish one set of data, the next set is already available --
they don't have to wait for new data to arrive.

A possible method to do this is to initially send twice the amount of expected work to each node. That is, if there are N Calculate threads on a given node, send 2N input data packets. The Calculate threads will dequeue the first N input data packets, and eventually enqueue them in the output. The next N input data packets will immediately be available for the Calculate threads to dequeue and start working on.

Meanwhile, the RelayOut thread will return the output data and the RelayIn thread will [effectively] request N more input data packets. When the N input data packets arrive, they will be queued in the input queue for eventual dequeuing by the Calculate threads. This occurs while the Calculate threads are working -- the message passing latency is hidden from them.

This scheme works as long as the Calculate phase takes longer than the time necessary to send output data back to the Server and receive N new input data packets. If the Calculate phase is short, the RelayIn can initially request more than 2N input data packets, and/or be sure to use non-blocking communication to request new input data blocks so that requests back to the Server can be pipelined.

To improve the scalability of the system by removing some of the bottlenecks in the scattering/gathering, non-server nodes can also have a RelayCalc instance:

    |=========|   |===|   |===========|   |===|   |==========|
    | RelayIn |==>| Q |==>| Calculate |==>| Q |==>| RelayOut |
    |=========|   |===|   |===========|   |===|   |==========|
                    |     |===========|     |
                    |====>| Calculate |====>|                    |     |===========|     |
                   ...         ...         ...
                    |     |===========|     |
                    |====>| Calculate |====>|                    |     |===========|     |
                    |     |===========|     |
                    |====>| RelayCalc |====>|                          | (optional)|

This RelayCalc instance will relay input data to additional remote nodes, and gather the output data from them, just like the RelayCalc on the Server node.

The implication of having a RelayCalc step is that we can have arbitrary trees of input and output. That is, the Server is not the only node who can scatter input out to, and gather output data from, remote nodes -- arbitrary trees can be created to mimic network topology (for example). Consider the following tree:

                      the_big_cheese (0)
!===========! !============!
! !
child_a0 (1) child_a1 (3)
! !=====! !=====!
child_b0 (2) ! !
child_c0 (4) child_c1 (5)

the_big_cheese is the Server. It has two children, child_a0 and child_a1. child_a0 has only one child, but child_a1 has two children. The numbers in parentheses represent the MPI rank numbers (with respect to MPI_COMM_WORLD). Note that there is no restriction to having a maximum of two children =- this is just an example. Each node also has one or more Calculate instances. So the end result can be a large, distributed farm of compute nodes.

This refines some of the previous discussion: the various Relay instances (In, Calc, Out) will actually not necessarily talk to the Server -- they'll talk to their parent, child, and parent, respectively. In some cases, the parent will be the Server. In other cases, the parent will be just another relay.

The RelayIn will now need to request enough input data packets to keep not only its local Calculate threads busy, but also all of its children. This is accomplished by doing a tree reduction during the startup of the framework that counts the total number of Calculate instances in each subtree. This will allow a RelayIn to know how many Calculate instances it needs to service. The RelayIn can then use an appropriate formula / algorithm can be used to keep its input buffer full (as described above) before any of the local Calculate instances or the RelayCalc instance needs data.

The astute reader will realize that there are now three threads vying for control of MPI. Therefore, the simple handoff protocol discussed above will not work (although the handoff protocol is still applicable for "leaf" nodes, where there is no RelayCalc instance). To make matters worse, both RelayIn and RelayCalc will potentially need to be blocking in receive calls, waiting for messages to asynchronously arrive. RelayIn will only receive messages at discrete times (as discussed above), but the frequency at which RelayCalc can receive messages is determined by the node's children, and therefore could effectively be at any time. That is, since RelayIn/RelayOut will be driven not only by the actions of its children nodes, but also by its local Calculate threads, the times at which RelayCalc will need to receive messages is not always related to when RelayIn/RelayOut will need to communicate.

Specifically, there will be times when RelayCalc needs to receive a message that will independent of what RelayIn and RelayOut are doing.

It would be easiest of RelayCalc could just block on its MPI_WAITANY while waiting on a return message from any of its children. But this would disallow any new outgoing messages from RelayOut (and therefore any new incoming messages from RelayIn). The implication is that nodes will have to wait for a message from their any of their children before they can send the output data from their local Calculate threads back to their parent, and therefore have to wait request any new input data.

This can be disastrous if a node's children are slower than it is. In this case, a fast node could potentially drain its entire input queue and be blocked waiting for results from any of its children before being able to ask for more input data from its parent. Even worse, this effect can daisy-chain such that slow nodes could cause the same problem in multiple [faster] parent nodes; the fast parents could all get trapped waiting for results from the one slow child.

These questions are addressed further in the following section.

This tree design will help eliminate the bottleneck of having a single Server that has to communicate with N nodes (especially as N grows large) -- the problems of serializing the message passing could easily dwarf the CPU cycles given to the Calculate instances. That is, the communication could become more costly than the Calculation.

But just having a tree structure for scattering/gathering is not sufficient. Indeed, if a leaf node (e.g., child_c0) sends an output block back to its parent, and its parent immediately sends it to its parent, etc., all the way back up to the Server, this would effectively be no different than if all N nodes were connected to the Server directly -- the Server will get a message for every single output data block. This model would then only add hops to input and output data rather than increase scalability.

Instead, the various Relay instances will gather multiple messages into a single message (or a single group of messages) before sending them up the tree. For example, a RelayOut instance can wait for output data from each of its Calculate instances before sending them back to its parent. The RelayOut instance will send all of its N messages at once in a "burst" such that its parent RelayCalc instance will be able to process them all in a short period of time and then relinquish its CPU back to a Calculate instance. I'll refer to this group of messages as a "mega message", below.

Likewise, there will need to be some flow control on the messages from RelayCalc instances. It is desirable to group together multiple "mega messages" into a single "mega mega message" in order to send larger and larger messages as output data propagates up the tree, and therefore decrease the number and frequency of messages at upper levels in the tree. Hence, the mega messages that are received by a RelayCalc must be grouped together, possibly in conjunction with the output data from the local Calculate instances, before sending to the node's parent.

But how to do this? Does the RelayCalc just wait for mega messages from all of its children before enqueuing them all to the output? It would seem simpler to just enqueue the mega messages as they come in, and when the RelayOut sees "enough" messages, it can pass a mega message of its own (potentially larger than any of the individual mega messages that it received) to its parent.

One definition for "enough" messages could be N mega messages (where N is the number of children for this node), and/or M output data enqueues (where M is the number of Calculate instances on this node). This may also be a problem-dependent value -- for example, if the Calculate process is short, "enough" messages may to be a relatively small value.

This scheme will probably work well in a homogeneous world. But what if the node and its children are heterogenous? What is some nodes are more powerful/faster than others, or if the network connections between some of the children are heterogeneous? For example, what if some of the children nodes are connected via IMPI, where network communication to them is almost guaranteed to be slower than network communication to local MPI ranks?

The heterogeneity effect implies the problem discussed in the previous section -- that slow Calculate instances can cause parent nodes to block with no more work to do, and not be able to obtain any more work because RelayCalc has not returned from waiting for results from a child.

Another alternative is to use the "separate MPI thread" approach (where all threads needing access to MPI use simple event queues to a separate MPI thread), and have the separate MPI thread use all non-blocking communication. But instead of using a blocking MPI_WAIT approach, use the non-blocking MPI_TEST polling approach. The problem with this, as discussed previously, is that this could incur a undesirably significant number of CPU cycles, and therefore detract from the main computations in the Calculate instances. If polling only happened infrequently, perhaps using a backoff method (finitely bounded, of course), this might be acceptable.

Note that there will be one "incoming event queue" for the MPI thread where threads can place new events for the MPI thread to handle. But there will be multiple "return event queues" where the MPI thread places the result of the incoming events -- one for each thread that enqueues incoming events.

                  |-============================|   |========|
   |=============>| Shared incoming event queue |==>|        |
   |              |=============================|   |        |
   |                                                |        |
   |   |====|   |===============================|   |        |
   |<==| T1 |<==| Return event queue / thread 1 |==>|        |
   |   |====|   |===============================|   |  MPI   |
   |   |====|   |===============================|   | thread |
   |<==| T2 |<==| Return event queue / thread 2 |==>|        |
   |   |====|   |===============================|   |        |
  ...                                              ...      ...
   |   |====|   |===============================|   |        |
   |<==| TN |<==| Return event queue / thread N |==>|        |
       |====|   |===============================|   |========| 

The various threads that need to access MPI place events on the MPI thread's shared incoming event queue, and then block on (or poll) their respective return event queues to know when the event has finished. An event is a set of data necessary for an MPI send or receive.

The general idea is that the MPI thread will take events from its incoming queue and start the communication in a non-blocking manner. It will poll MPI periodically (the exact frequency of polling is discussed below) with MPI_TEST, and also check for new events on its incoming queue. As MPI indicates that events have finished, the MPI thread will place events on the relevant return event queue. The MPI thread never blocks in an MPI call; it must maintain a polling cycle of checking both the incoming event queue and MPI for event completions.

A special case, however, is that the MPI thread can block on the incoming event queue if there are no MPI events pending. This allows the MPI thread to "go to sleep" when there are no messages pending for MPI (although this will rarely happen).

The polling frequency is critical. It cannot be so high that it takes many cycles away from Calculate threads, nor can be too low such that input queues become drained or threads become otherwise blocked unduly while waiting for MPI events to complete. These conflicting goals seem to indicate that an adaptive polling frequency is necessary.

That is, it would seem that the polling frequency should be high when events are being completed, and should be low when no events are occurring. This would preserve the "bursty" model described above; when an event occurs, it is likely that more events will follow in rapid succession. When nothing is happening, it is likely that nothing will continue to happen for a [potentially long] period of time.

A backoff method fits this criteria: the sleep time between polling is initially small (perhaps even zero). Several loops are made with this small/zero value (probably with a thread yield call in each loop iteration, to allow for other threads to wake up and generate/consume MPI events). If nothing "interesting" occurs in this time, gradually increase the sleep time value. If something "interesting" does occur in this time, set the sleep time value back to the small/zero value to allow more rapid polling.

This allows the polling to occur rapidly when messages arrive or need to be sent, and slowly when no message passing is occurring (e.g., when the Calculate threads are running at full speed).

An obvious optimization to the polling model is to allow the MPI thread to loop until there are no new actions before going to sleep. Hence, if a event appears in the incoming queue, or MPI_TEST indicates that some communication has finished, both the sleep time is reduced and the MPI thread will poll again without sleeping.

This stuff hasn't been implemented yet, these are questions that I do not yet have definitive answers to. It is likely that my first implementation will be modeled on the backoff polling described above. We'll see how that works out.

There's a whole bit in here that I haven't really described about a primitive level of fault tolerance -- if a node disappears, all of the work that it (and all of its children) was doing will be lost, and reallocated to other workers. That is, as long as one Calculate thread remains, the entire computation will [eventually] finish, but likely at a slower rate.

The gist of this is to set the error handler MPI_ERRORS_RETURN such that MPI will not abort if an error occurs (such as a node disappearing). There's some extra bookkeeping code in the RelayCalc that keeps track of what node had what work assigned to it, and will reassign it to another node (including its local Calculate threads, if a) the local Calculate threads become idle, and/or no remote Calculate threads remain alive).

Just to clarify: I am not trying to be fault tolerant for programmer error. If you have broken code (e.g., a seg fault), this framework will not recover from that error. This framework will also not attempt to bring nodes back that previously failed; once nodes die, they -- and all their children -- are dead for the rest of the computation. Future work may make this better, but not this version. :-)

Probably a good way to describe this fault tolerant work is: if you have a program that will run correctly when all nodes are up, your program will run correctly as long as the Input and Output threads stay alive, and at least one Calculate thread stays alive.

Implementation Details

Woof. This journal entry is already much longer than I thought it would be (we're approaching 600 lines here!), so I'll be a little sparse on the implementation details, especially since it's all in a state of flux anyway.

The implementation is currently dependent upon LAM/MPI and will not run under MPICH. This is because I make use of the MPI_COMM_SPAWN function call to spawn off all the non-server ranks. The code could be adapted to allow for using mpirun to start the entire job, but that's a "feature" that I don't intend to add yet.

Unfortunately, the only way that I could think to specify the tree used for the distribution was to specify an additional configuration file. Attempting to specify the tree on the command line resulted in a clunky interface, and arbitrarily long command lines. I used inilib to parse the file; it's a fairly simplistic, yet flexible format.

The server starts up, parses the configuration file, determines how many total non-server nodes it needs, and spawns them. The non-server nodes are spawned with an additional "-child" command line argument so that they know that they do not need to read a configuration file. Instead, they receive their configuration information from their parent in the tree.

Sidenote: what's interesting to me about using MPI_COMM_SPAWN to start all the non-server nodes is that startup protocols have to be used. I'm used to writing startup protocols to coordinate between multiple processes for socket-level (and other IPC) programs, but I've never used MPI itself for startup meta information. It just seems weird. :-)

The server sends a sub-tree of the entire tree to each of its direct children. The sub-tree that it sends is the tree with that child as the root; hence, every child of the server learns about all of its children, but does not learn about its siblings or their children. The process repeats -- each child then sends a sub-tree to each of its direct children, until there are no more sub-trees to send.

One of the parameters in the configuration file is "how many Calculate threads to have on this node". If this value is -1 in the configuration, the value will be determined at run time by how many CPUs the node has. As such, after the sub-trees are distributed, a global sum reduction is initiated from the leaves back to the server. In this way, each relay will learn the total number of Calculate threads that it serves.

After this, it's fairly straightforward (hah!) bookkeeping to distribute input data and gather output data (mainly as described in the algorithms discussed above). The queues actually contain quite a bit of intelligence (particularly the output queue), and are worthy of discussion. However, I'm pretty sure that I have discussed the queues in a previous journal entry (they've been implemented for quite some time now), so I won't repeat that discussion here.

Future Work

Here's a conglomerated list of items from the text above, as well as a few new items that would be desirable in future versions of this software:

  • When a node dies, try to bring it back.

  • When a node dies, allow all of its children to continue in the computation if possible. This may be as simple as the grandparent assuming control of all the children, or may entail something more complicated than a simple tree distribution (perhaps something more like a graph [perhaps a jmesh?]).

  • Allow the job to be started with a full MPI_COMM_WORLD --
    i.e., don't rely on MPI_COMM_SPAWN to start the non-server nodes.

  • Multiple, different Calculate thread kernels for different kinds of input.

  • "Chainable" Calculate threads, such that the output of one CalculateA instance can be given to the input of a CalculateB instance.

  • Allow the Input and Output instances to run on arbitrary (and potentially different) nodes.

  • Allows multiple Input and/or Output instances.

  • World peace.

Post a comment

(If you haven't left a comment here before, you may need to be approved by the site owner before your comment will appear. Until then, it won't appear on the entry. Thanks for waiting.)


This page contains a single entry from the blog posted on April 21, 2001 4:54 AM.

The previous post in this blog was He's weirder than a five dollar bill.

The next post in this blog is Sir, we can't eliminate the line item for our oxygen supply.

Many more can be found on the main index page or by looking through the archives.

Powered by
Movable Type 3.34