See All Titles |
![]() ![]() threading ModuleWe will now introduce the higher-level threading module which gives you not only a Thread class but also a wide variety of synchronization mechanisms to use to your heart's content. Table 17.2 represents a list of all the objects which are provided for in the threading module.
In this section, we will examine how to use the Thread class to implement threading. Since we have already covered the basics of locking, we will not cover the locking primitives here. The Thread() class also contains a form of synchronization, so explicit use of locking primitives is not necessary. Thread ClassThere are a variety of ways you can create threads using the Thread class. We cover three of them here, all quite similar. Pick the one you feel most comfortable with, not to mention the most appropriate for your application and future scalability (we like choice 3 the best):
Create Thread instance, passing in functionIn our first example, we will just instantiate Thread, passing in our function (and its arguments) in a manner similar to our previous examples. This function is what will be executed when we direct the thread to begin execution. Taking our mtsleep2.py script and tweaking it, adding the use of Thread objects, we have mtsleep3.py, shown in Example 17.4. When we run it, we see output similar to its predecessors': % mtsleep3.py starting threads… start loop 0 at: Sun Aug 13 18:16:38 2000 start loop 1 at: Sun Aug 13 18:16:38 2000 loop 1 done at: Sun Aug 13 18:16:40 2000 loop 0 done at: Sun Aug 13 18:16:42 2000 all DONE at: Sun Aug 13 18:16:42 2000 So what did change? Gone are the locks which we had to implement when using the thread module. Instead, we create a set of Thread objects. When each Thread is instantiated, we dutifully pass in the function (target) and arguments (args) and receive a Thread instance in return. The biggest difference between instantiating Thread [calling Thread()] and invoking thread.start_new_thread() is that the new thread does not begin execution right away. This is a useful synchronization feature, especially when you don't want the threads to start immediately. Example 17.4. Using the threading Module (mtsleep3.py)The Thread class from the threading module has a join() method which lets the main thread wait for thread completion. <$nopage> 001 1 #!/usr/bin/env python 002 2 003 3 import threading 004 4 from time import sleep, time, ctime 005 5 006 6 loops = [ 4, 2 ] 007 7 008 8 def loop(nloop, nsec): 009 9 print 'start loop', nloop, 'at:', ctime(time()) 010 10 sleep(nsec) 011 11 print 'loop', nloop, 'done at:', ctime(time()) 012 12 013 13 def main(): 014 14 print 'starting threads…' 015 15 threads = [] 016 16 nloops = range(len(loops)) 017 17 018 18 for i in nloops: 019 19 t = threading.Thread(target=loop, 020 20 args=(i, loops[i])) 021 21 threads.append(t) 022 22 023 23 for i in nloops: # start threads 024 24 threads[i].start() 025 25 026 26 for i in nloops: # wait for all 027 27 threads[i].join( # threads to finish 028 28 029 29 print 'all DONE at:', ctime(time()) 030 30 031 31 if __name__ == '__main__': 032 32 main() 033 <$nopage> Once all the threads have been allocated, we let them go off to the races by invoking each thread's start() method, but not a moment before that. And rather than having to manage a set of locks (allocating, acquiring, releasing, checking lock state, etc.), we simply call the join() method for each thread. join() will wait until a thread terminates, or, if provided, a timeout occurs. Use of join() appears much cleaner than an infinite loop waiting for locks to be released (causing these locks to sometimes be known as "spin locks"). One other important aspect of join() is that it does not need to be called at all. Once threads are started, they will execute until their given function completes, whereby they will exit. If your main thread has things to do other than wait for threads to complete (such as other processing or waiting for new client requests), it should be all means do so. join() is useful only when you want to wait for thread completion. Create Thread instance, passing in callable class instanceA similar offshoot to passing in a function when creating a thread is to have a callable class and passing in an instance for execution—this is the more OO approach to MT programming. Such a callable class embodies an execution environment that is much more flexible than a function or choosing from a set of functions. You now have the power of a class object behind you, as opposed to a single function or a list/tuple of functions. Adding our new class ThreadFunc to the code and making other slight modifications to mtsleep3.py, we get mtsleep4.py, given in Example 17.5. If we run mtsleep4.py, we get the expected output: % mtsleep4.py starting threads… start loop 0 at: Sun Aug 13 18:49:17 2000 start loop 1 at: Sun Aug 13 18:49:17 2000 loop 1 done at: Sun Aug 13 18:49:19 2000 loop 0 done at: Sun Aug 13 18:49:21 2000 all DONE at: Sun Aug 13 18:49:21 2000 So what are the changes this time? The addition of the ThreadFunc class and a minor change to instantiate the Thread object, which also instantiates ThreadFunc, our callable class. In effect, we have a double instantiation going on here. Let's take a closer look at our ThreadFunc class. We want to make this class general enough to use with other functions besides our loop() function, so we added some new infrastructure, such as having this class hold the arguments for the function, the function itself, and also a function name string. The constructor __init__() just sets all the values. Example 17.5. Using Callable classes (mtsleep4.py)In this example we pass in a callable class (instance) as opposed to just a function. It presents more of an OO approach than mtsleep3.py. <$nopage> 001 1 #!/usr/bin/env python 002 2 003 3 import threading 004 4 from time import sleep, time, ctime 005 5 006 6 loops = [ 4, 2 ] 007 7 008 8 class ThreadFunc: 009 9 010 10 def __init__(self, func, args, name=''): 011 11 self.name = name 012 12 self.func = func 013 13 self.args = args 014 14 015 15 def __call__(self): 016 16 apply(self.func, self.args) 017 17 018 18 def loop(nloop, nsec): 019 19 print 'start loop', nloop, 'at:', ctime(time()) 020 20 sleep(nsec) 021 21 print 'loop', nloop, 'done at:', ctime(time()) 022 22 023 23 def main(): 024 24 print 'starting threads…' 025 25 threads = [] 026 26 nloops = range(len(loops)) 027 27 028 28 for i in nloops: # create all threads 029 29 t = threading.Thread( \ 030 30 target=ThreadFunc(loop, (i, loops[i]), 031 31 loop.__name__)) 032 32 threads.append(t) 033 33 034 34 for i in nloops: # start all threads 035 35 threads[i].start() 036 36 037 37 for i in nloops: # wait for completion 038 38 threads[i].join() 039 39 040 40 print 'all DONE at:', ctime(time()) 041 41 042 42 if __name__ == '__main__': 043 43 main() 044 <$nopage> When the Thread code calls our ThreadFunc object when a new thread is created, it will invoke the __call__() special method. Because we already have our set of arguments, we do not need to pass it to the Thread() constructor, but do have to use apply() in our code now because we have an argument tuple. Those of you who have Python 1.6 and higher can use the new function invocation syntax described in Section 11.6.3 instead of using apply() on line 17: self.res = self.func(*self.args) Subclass Thread and create subclass instanceThe final introductory example involves subclassing Thread(), which turns out to be extremely similar to creating a callable class as in the previous example. Subclassing is a bit easier to read when you are creating your threads (lines 28–29). We will present the code for mtsleep5.py in Example 17.6 as well as the output obtained from its execution, and leave it as an exercise for the reader to compare mtsleep5.py to mtsleep4.py. Here is the output for mtsleep5.py, again, just what we expected: % mtsleep5.py starting threads… start loop 0 at: Sun Aug 13 19:14:26 2000 start loop 1 at: Sun Aug 13 19:14:26 2000 loop 1 done at: Sun Aug 13 19:14:28 2000 loop 0 done at: Sun Aug 13 19:14:30 2000 all DONE at: Sun Aug 13 19:14:30 2000 While the reader compares the source between the mtsleep4 and mtsleep5 modules, we want to point out the most significant changes: (1) our MyThread subclass constructor must first invoke the base class constructor (line 9), and (2) the former special method __call__() must be called run() in the subclass. We now modify our MyThread class with some diagnostic output and store it in a separate module called myThread (see Example 17.7) and import this class for the upcoming examples. Rather than simply calling apply() to run our functions, we also save the result to instance attribute self.res, and create a new method to retrieve that value, getResult(). Example 17.6. Subclassing Thread (mtsleep5.py)Rather than instantiating the Thread class, we subclass it. This gives us more flexibility in customizing our threading objects and simplifies the thread creation call. <$nopage> 001 1 #!/usr/bin/env python 002 2 003 3 import threading 004 4 from time import sleep, time, ctime 005 5 006 6 loops = ( 4, 2 ) 007 7 008 8 class MyThread(threading.Thread): 009 9 def __init__(self, func, args, name=''): 010 10 threading.Thread.__init__(self) 011 11 self.name = name 012 12 self.func = func 013 13 self.args = args 014 14 015 15 def run(self): 016 16 apply(self.func, self.args) 017 17 018 18 def loop(nloop, nsec): 019 19 print 'start loop', nloop, 'at:', ctime(time()) 020 20 sleep(nsec) 021 21 print 'loop', nloop, 'done at:', ctime(time()) 022 22 023 23 def main(): 024 24 print 'starting threads…' 025 25 threads = [] 026 26 nloops = range(len(loops)) 027 27 028 28 for i in nloops: 029 29 t = MyThread(loop, (i, loops[i]), \ 030 30 loop.__name__) 031 31 threads.append(t) 032 32 033 33 for i in nloops: 034 34 threads[i].start() 035 35 036 36 for i in nloops: 037 37 threads[i].join() 038 38 039 39 print 'all DONE at:', ctime(time())' 040 40 041 41 if __name__ == '__main__': 042 42 main() 043 <$nopage> Example 17.7. MyThread Subclass of Thread (myThread.py)To generalize our subclass of Thread from mtsleep5.py, we move the subclass to a separate module and add a getResult() method for callables which produce return values. <$nopage> 001 1 #!/usr/bin/env python 002 2 003 3 import threading 004 4 from time import time, ctime 005 5 006 6 class MyThread(threading.Thread): 007 7 def __init__(self, func, args, name=''): 008 8 threading.Thread.__init__(self) 009 9 self.name = name 010 10 self.func = func 011 11 self.args = args 012 12 013 13 def getResult(self): 014 14 return self.res 015 15 016 16 def run(self): 017 17 print 'starting', self.name, 'at:', \ 018 18 ctime(time()) 019 19 self.res = apply(self.func, self.args) 020 20 print self.name, 'finished at:', \ 021 21 ctime(time()) 022 <$nopage> Fibonacci and factorial… take 2, plus summationThe mtfacfib.py script, given in Example 17.8, compares execution of the recursive Fibonacci, factorial, and summation functions. This script runs all three functions in a single-threaded manner, then performs the same task using threads to illustrate one of the advantages of having a threading environment. Example 17.8. Fibonacci, Factorial, Summation (mtfacfib.py)In this MT application, we execute 3 separate recursive functions—first in a single-threaded fashion, followed by the alternative with multiple threads. <$nopage> 001 1 #!/usr/bin/env python 002 2 003 3 from myThread import MyThread 004 4 from time import time, ctime, sleep 005 5 006 6 def fib(x): 007 7 sleep(0.005) 008 8 if x < 2: return 1 009 9 return (fib(x-2) + fib(x-1)) 010 10 011 11 def fac(x): 012 12 sleep(0.1) 013 13 if x < 2: return 1 014 14 return (x * fac(x-1)) 015 15 016 16 def sum(x): 017 17 sleep(0.1) 018 18 if x < 2: return 1 019 19 return (x + sum(x-1)) 020 20 021 21 funcs = [fib, fac, sum] 022 22 n = 12 023 23 024 24 def main(): 025 25 nfuncs = range(len(funcs)) 026 26 027 27 print '*** SINGLE THREAD' 028 28 for i in nfuncs: 029 29 print 'starting', funcs[i].__name__, 'at:', \ 030 30 ctime(time()) 031 31 print funcs[i](n) 032 32 print funcs[i].__name__, 'finished at:', \ 033 33 ctime(time()) 034 34 035 35 print '\n*** MULTIPLE THREADS' 036 36 threads = [] 037 37 for i in nfuncs: 038 38 t = MyThread(funcs[i], (n,), 039 39 funcs[i].__name__) 040 40 threads.append(t) 041 41 042 42 for i in nfuncs: 043 43 threads[i].start() 044 44 045 45 for i in nfuncs: 046 46 threads[i].join() 047 47 print threads[i].getResult() 048 48 049 49 print 'all DONE' 050 50 051 51 if __name__ == '__main__': 052 52 main() 053 <$nopage> Running in single-threaded mode simply involves calling the functions one at a time and displaying the corresponding the results right after the function call. When running in multithreaded mode, we do not display the result right away. Because we want to keep our MyThread class as general as possible (being able to execute callables which do and do not produce output), we wait until the end to call the getResult() method to finally show you the return values of each function call. Because these functions execute so quickly (well, maybe except for the Fibonacci function), you will noticed that we had to add calls to sleep() to each function to slow things down so that we can see how threading may improve performance, if indeed the actual work had varying execution times—you certainly wouldn't pad your work with calls to sleep(). Anyway, here is the output: % mtfacfib.py *** SINGLE THREAD starting fib at: Sun Jun 18 19:52:20 2000 233 fib finished at: Sun Jun 18 19:52:24 2000 starting fac at: Sun Jun 18 19:52:24 2000 479001600 fac finished at: Sun Jun 18 19:52:26 2000 starting sum at: Sun Jun 18 19:52:26 2000 78 sum finished at: Sun Jun 18 19:52:27 2000 *** MULTIPLE THREADS starting fib at: Sun Jun 18 19:52:27 2000 starting fac at: Sun Jun 18 19:52:27 2000 starting sum at: Sun Jun 18 19:52:27 2000 fac finished at: Sun Jun 18 19:52:28 2000 sum finished at: Sun Jun 18 19:52:28 2000 fib finished at: Sun Jun 18 19:52:31 2000 233 479001600 78 all DONE Producer-Consumer Problem and the Queue ModuleThe final example illustrates the producer-consumer scenario where a producer of goods or services creates goods and places it in a data structure such as a queue. The amount of time between producing goods is non-deterministic, as is the consumer consuming the goods produced by the producer. We use the Queue module to provide an interthread communication mechanism which allows threads to share data with each other. In particular, we create a queue for the producer (thread) to place new goods into and where the consumer (thread) can consume goods from. In particular, we will use the following attributes from the Queue module (see Table 17.3).
Without further ado, we present the code for prodcons.py, shown in Example 17.9. Example 17.9. Producer-Consumer Problem (prodcons.py)We feature an implementation of the Producer–Consumer problem using Queue objects and a random number of goods produced (and consumed). The producer and consumer are individually—and concurrently—executing threads. <$nopage> 001 1 #!/usr/bin/env python 002 2 003 3 from random import randint 004 4 from time import time, ctime, sleep 005 5 from Queue import Queue 006 6 from myThread import MyThread 007 7 008 8 def writeQ(queue): 009 9 print 'producing object for Q…', 010 10 queue.put('xxx', 1) 011 11 print "size now", queue.qsize() 012 12 013 13 def readQ(queue): 014 14 val = queue.get(1) 015 15 print 'consumed object from Q… size now', \ 016 16 queue.qsize() 017 17 018 18 def writer(queue, loops): 019 19 for i in range(loops): 020 20 writeQ(queue) 021 21 sleep(randint(1, 3)) 022 22 023 23 def reader(queue, loops): 024 24 for i in range(loops): 025 25 readQ(queue) 026 26 sleep(randint(2, 5)) 027 27 028 28 funcs = [writer, reader] 029 29 nfuncs = range(len(funcs)) 030 30 031 31 def main(): 032 32 nloops = randint(2, 5) 033 33 q = Queue(32) 034 34 035 35 threads = [] 036 36 for i in nfuncs: 037 37 t = MyThread(funcs[i], (q, nloops), \ 038 38 funcs[i].__name__) 039 39 threads.append(t) 040 40 041 41 for i in nfuncs: 042 42 threads[i].start() 043 43 044 44 for i in nfuncs: 045 45 threads[i].join() 046 46 047 47 print 'all DONE' 048 48 049 49 if __name__ == '__main__': 050 50 main() 051 <$nopage> Here is the output from one execution of this script: % prodcons.py starting writer at: Sun Jun 18 20:27:07 2000 producing object for Q… size now 1 starting reader at: Sun Jun 18 20:27:07 2000 consumed object from Q… size now 0 producing object for Q… size now 1 consumed object from Q… size now 0 producing object for Q… size now 1 producing object for Q… size now 2 producing object for Q… size now 3 consumed object from Q… size now 2 consumed object from Q… size now 1 writer finished at: Sun Jun 18 20:27:17 2000 consumed object from Q… size now 0 reader finished at: Sun Jun 18 20:27:25 2000 all DONE As you can see, the producer and consumer do not necessarily alternate in execution. (Thank goodness for random numbers!) Seriously though, real life is generally random and non-deterministic. Line-by-line explanationLines 1–6In this module, we will use the Queue.Queue object as well as our thread class myThread.MyThread which we gave in Example 17.7. We will use random.randint() to make production and consumption somewhat varied, and also grab the usual suspects from the time module. Lines 8–16The writeQ() and readQ() functions each have a specific purpose, to place an object in the queue—we are using the string 'xxx' for example—and to consume a queued object, respectively. Notice that we are producing one object and reading one object each time. Lines 18–26The writer() is going to run as a single thread who sole purpose is to produce an item for the queue, wait for a bit, then do it again, up to the specified number of times, chosen randomly per script execution. The reader() will do likewise, with the exception of consuming an item, of course. You will notice that the random number of seconds that the writer sleeps is in general shorter than the amount of time the reader sleeps. This is to discourage the reader from trying to take items from an empty queue. By giving the writer a shorter time period of waiting, it is more likely that there will already be an object for the reader to consume by the time their turn rolls around again. Lines 28–29These are just setup lines to set the total number of threads that are to be spawned and executed. Lines 31–47Finally, our main() function, which should look quite similar to the main() in all of the other scripts in this chapter. We create the appropriate threads and send them on their way, finishing up when both threads have concluded execution. We infer from this example that a program that has multiple tasks to perform can be organized to use separate threads for each of the tasks. This can result in a much cleaner program design than a single threaded program that attempts to do all of the tasks. In this chapter, we illustrated how a single-threaded process may limit an application's performance. In particular, programs with independent, non-deterministic, and non-causal tasks which execute sequentially can be improved by division into separate tasks executed by individual threads. Not all applications may benefit from multithreading and its overheads, but now you are more cognizant of Python's threading capability enough to use this tool to your advantage when appropriate.
|
© 2002, O'Reilly & Associates, Inc. |