Tuesday, August 4, 2009

Dead simple parallelization in Python

Posted by Danny Tarlow
I'm at the stage in one research project where I've done all of the brainstorming, derivations, and prototyping, and most indicators suggest that we might have a good idea. So the next step is to figure out how it works on a lot of real-world data. For me, this usually means running the program on lots of different inputs and/or with many combinations of parameters (e.g., for cross validation). Further, right now I'm working a lot with images and doing computations at the individual pixel level -- which can get pretty expensive pretty quickly if you're not careful.

I started this phase (as I usually do, actually) by re-writing all of my code in C++. I like to do this first to speed things up, but it also is useful to me as a debugging and sanity check stage, because I have to go through everything I've done in great detail, and I get to think about whether it's still the right idea.

Anyhow, when the task just involves lots of executions of an executable with different inputs, there's no real need to do any fancy parallelization. Instead, I use a dead simple version, where a script spawns off N threads, waits for them to finish, then spawns off N more threads. It keeps doing this until it has exhausted the stack of commands.

Now I know there are plenty of other ways to do this, but for my purposes, the following Python code works great for me.
import os
from threading import Thread

NUM_THREADS = 7

class ThreadedCommand(Thread):

    def __init__(self, shell_call):
        Thread.__init__(self)
        self.shell_call = shell_call

    def run(self):
        os.system(command)


commands = []
for parameter_1 in [5, 25, 100]:
    for parameter_2 in [5, 25, 100]:
        for parameter_3 in [.1, .25, .35]:
            command = "./command do_stuff --parameter_1 %s --parameter_2 %s--parameter_3 %s" % (parameter_1, parameter_2, parameter_3)
            commands.append(command)
                                                                                                                                                                                                                                                                                                                                   

while len(commands) > 0:
    running_threads = []
    for i in range(NUM_THREADS):
        try:
            command = commands.pop(0)
            new_thread = ThreadedCommand(command)                                                                                                                                                                                                                                                                                                                            
            running_threads.append(new_thread)
            new_thread.start()
        except:
            continue

    for thread in running_threads:
        thread.join()
        print "Joined %s" % thread.shell_call

8 comments:

Haz said...

Hey Danny,
Seems that you only spawn 7 threads here...and you've got 3^3 commands (because of the parameter spread) to execute. Am I missing something?

I've had this setup come up time and time again, so I've added it to a toolkit I'm working on:

- http://code.google.com/p/krtoolkit/wiki/utils#Running_an_Experiment

- http://code.google.com/p/krtoolkit/source/browse/trunk/kr/utils/experimentation.py

It's not fully documented yet, but the code needed to mirror your setup would be:

from kr.utils.experimentation import run_experiment

run_experiment(
base_command = './command do_stuff',
parameters = {
'--parameter_1': [5, 25, 100],
'--parameter_2': [5, 25, 100],
'--parameter_3': [.1, .25, .35]},
results_dir = "results",
processors = 7
)

Nice thing is that the processors are always kept busy even when the runtime of each command is variable (when one frees up, another kicks in). Arguments not used there include capping the memory / time limit, outputting the progress, and changing the default base location.

Cheers

Danny Tarlow said...

Hey Haz,

The outer while loop should keep looping until all commands have been executed, and the joins at the end of the loop interior will make sure not to spawn off more threads until processors are available. It won't make efficient use of the processors if there are, for example, six really fast jobs and one long one, but it should at least eventually run everything as expected.

The experimentation class you're working on looks great and much better. I'll give it a try if I come to a point where I need something cleaner and more efficient than this.

Haz said...

Ahh right -- ya I've run into issues where one parameter configuration would cause an extreme delay so I had to move away from that "wait for everyone" approach.

If you ever do want to give the python lib a try, let me know if you'd like to see anything else added. The feature set, thus far, is entirely dictated by the problems that have cropped up in my own research.

Cheers

Danny Tarlow said...

Sure, I'll definitely let you know. One thing I've been doing on an individual level but I'd like to do better is outputting results (both intermediate internal states and final outputs) directly to a database.

Have you thought anything about a clean way to deal with the large number of outputs that result from all of these runs with different versions of code, different data sets, and different parameter settings?

Haz said...

Intermediate results are stored only to the point of keeping the output of each run in a separate file. The object returned is a list of the runs that (try to) provide an easy method of filtering through the settings. For example, if you need all of the runs that had --parameter_1 set to 25, it would be:

results = run_experiment( ... )
p1_25_results = filter(lambda x: '--parameter_1 25' in x[0][1], results)

Arbitrary, I know, but each result contains info on the arguments used, the time it took, the file the output is in, etc. When I get around to it, I plan on writing the result lines, as they come, to some file for intermediate results in case something goes wrong or needs to be halted.

A typical step I've had to do is pump the results through an ANOVA analysis to find the effect of parameter settings and such -- I plan on autmating this in the near future too as soon as I can get rpy2 working correctly.

Andrew J. Montalenti said...

You should really consider using a WorkQueue instead of the approach you've taken. An open source implementation is provided with the reddit source code:

http://code.reddit.com/browser/r2/r2/lib/workqueue.py?rev=33fd4e9684ca7668b57aa32ab6a064cd2a86ef6d

There is also an example of a WorkQueue inside the threading module itself, but it is somewhat poorly written. Look for "Producer" and "Consumer" classes.

The Reddit one is pretty good.

Danny Tarlow said...

That does look pretty good. I'll report back if I get up the time/motivation to give it a try at some point.

Pussinboots said...

Twisted makes this kind of stuff easy:

http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IReactorProcess.spawnProcess.html

http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.threads.html