Spent much of yesterday thinking about the generalized manager/worker problem, and spent most of today re-writing my paper about it (I had a good quote to Tracy today, "if I were a theoretician, the paper would be done". But I'm not, and there were still a number of non-trivial practical issues that concerned me. So I re-wrote it). I solved the problem with the variable rate input/output in the input data unit queue -- a cool use of a condition variable and something I call a "reservation system" (which really amounts to two queues managed by mutex and condition variable... which are more or less the same thing anyway ;-). You get all the benefits of a variable rate I/O and blocking (instead of spinning). Here's the problem:
The input thread reads in chunks of input at a time and preprocesses them (working on the idea that preprocessing takes much less time than the "real" calculation). The preprocessed data units go into a data queue. The calculate entities (which can be local threads or remote threads that are serviced by MPI proxies) are removed from the input data unit queue (which will involve an MPI_SEND/MPI_RECV pair if the thread is remote from the manager) and processed. This is the step that takes the most time -- the actual calculation. When each unit is finished processing, it is placed in the output data unit queue (which, again, will involve MPI_SEND/MPI_RECV is the thread is remote). The output thread removes the output data unit from the queue, postprocesses it (again, taking orders of magnitude time less than the actual calculation), and then writes it out to the output datafile.
Believe it or not, this is not rocket science. Pretty standard stuff, actually. Here's the problem:
When we mix local and remote threads, however, he need to give different amounts of work to threads based upon their location -- this can help hide latency for remote threads, because we give them larger amounts of work. They request work less often, which directly translates to less messages, and therefore less latency (latency is bad, Bad, BAD!).
Since we're dealing with multiple threads that have to share the same data structures (i.e., the input data queue), that data structure must be locked in some fashion to prevent multiple threads from entering it simultaneously. That would also be BAD! So we lock it with a mutex -- again, pretty simple. The input thread locks the mutex, adds one or more input units. It's will probably be more than one, actually, for efficiency. Just like sending a smaller number of large messages is more efficient than sending a larger number of small messages -- even if the total number of data bytes is the same in both cases -- it is more efficient to lock something a fewer number of times for longer periods of time rather than a larger number of times, each for a short amount of time. The total lock time is the same in both cases, it's the overhead time (i.e., the time necessary to lock and unlock) that is different. But that's not the whole story -- there's concurrency as well! So it's not quite so simple; hopefully our worker threads will be busy enough with real calculations that their blocking time will be minimal in comparison.
And it's even more complicated than that. In order to save latency again (provided that the input units are somewhat small -- and yes, this is problem-specific), we want to only send one message to each remote node, not one message per input unit. So we have to request M input units per CPU per node. Hence, the proxy MPI thread must determine at the beginning of time how many CPUs each worker node has (no problem -- MPI_GATHER into num_cpus[]). For each remote node, the MPI proxy requests M*num_cpus[i] units, packs them into a single message, and sends them off. A similar thing happens with the output data on the way back --
the MPI proxy on the worker node packs all the output data into a single message and sends it back to the manager MPI proxy.
Anyway, back to the problem. So the input data unit queue is locked with a mutex. We want local threads to be able to retrieve X data units from the queue, but want remote data threads to retrieve Y data units. This is for several reasons:
- The latency reasons that were cited above; when Y > X, the remote threads do more work than the local threads to hide latency.
- When X and Y are relatively prime, given that one input unit translates to some T amount of time of computation, it can help offset the synchronizations necessary between local and remote threads. That is, it adds "jitter" to the scheduling, such that local and remote threads will be synchronizing at different times, which can reduce contention by preventing bottlenecks.
The question is how to do this? My previous scheme used a semaphore and mutex such that any thread (either a local thread or an MPI proxy thread) could remove single units at a time. This is no good, because retrieving a series of individual units may not guarantee their contiguousness -- another thread may slip in and grab a unit from the middle of your stream. I needed a way to atomically get an arbitrary number of units from the queue, and to be able to do it with blocking, not spinning (spinning == eating CPU cycles, and therefore taking them from someone else; blocking == allowing the thread to be swapped out until some external event wakes it up).
So here's what happens: the input works as before, putting in Q input units at a time (where Q >= 1). It then broadcasts to the condition variable (RTFM Tannenbaum). The calculate and MPI proxy threads are a bit more complicated. They get the mutex and check to see if the reservation queue is empty (this is different than the input data unit queue, but protected by the same mutex). If it is, it puts (threadID, num_requested_units) at the tail of the reservation queue. If it is actually at the head of the reservation queue, it checks to see if there are already enough units in the input data unit queue. If so, it takes them, removes itself from the reservation queue, and unlocks the mutex. If there aren't enough input data units, or this thread is not at the head of the reservation queue, it goes to sleep on the condition variable.
The input thread's broadcast to the condition variable wakes up all the calculate threads that are waiting on it (if any) and they all check to see if a) they are at the head of the reservation queue, and b) if there are enough input data units to service their request. If a thread finds that both a) and b) are true, it services itself, removes itself from the reservation queue, and broadcasts to the condition variable again to wake up the next thread in line. And so on.
Of course, it's not quite as clean as this -- there's other sticky issues like the input queue can be drained (i.e., the input is exhausted) but there's not enough units to fulfill a thread's request, etc. So the extra logic gets a bit corn-fuzing.
Interestingly, the output thread protection is much simpler -- the output thread comes in and takes as many contiguous output units off the output queue as possible. It gets more complicated when we can have an arbitrary number of input and output files being processed simultaneously; dequeuing from the output queue becomes an absolute nightmare. Consider just one issue: if we're processing B input files into B output files, all B input files will be read fairly quickly. Their data will be processed in order, and the processing will take some time. But we want the output files to be ready upon output of the final output unit in each file -- i.e., we have to close() the file. Hence, since only the input thread knows when the input has been exhausted, it has to submit some kind of sentinel value into the input data unit queue for a given output file that filters all the way through the pipeline to the output thread such that it says, "when you get all the output data units from this file, you can close it."
Pair that with the fact that the output data units will be coming in a random order from the calculation threads since they may all be operating at different speeds (indeed, since we're using MPI, the remote worker nodes may not be the same kind of machine as the manager machine). So some kind of order has be associated with input/output data (i.e., sequence numbers). The output thread has to re-order the output units into order, postprocess them, and then write them out to disk. And know when to close() the file since most POSIX systems work with write-on-close semantics.
Whew!
The paper is up to 18 pages now.
I'll probably spend more time re-writing the paper again. It seems that I get all the way through it and then realize one more critical threading/synchronization issue that throws the whole thing off, and makes me start the pipeline from scratch all the way back at the input stage. Still, it's all good, because it's way cool stuff. I've redesigned the pipeline from input to output 3-4 times now, each time better than before. :-)
I talked to Lummy about the paper today, and told him that I'd like to have something for him (and possibly Jeremy, Rich, and maybe even Jeremiah) to read in the next day or two.