The concurrent.futures
module is part of the standard library which provides a high level API for launching async tasks. We will discuss and go through code samples for the common usages of this module.
Executors
This module features the Executor
class which is an abstract class and it can not be used directly. However it has two very useful concrete subclasses – ThreadPoolExecutor
and ProcessPoolExecutor
. As their names suggest, one uses multi threading and the other one uses multi-processing. In both case, we get a pool of threads or processes and we can submit tasks to this pool. The pool would assign tasks to the available resources (threads or processes) and schedule them to run.
ThreadPoolExecutor
Let’s first see some codes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from concurrent.futures import ThreadPoolExecutor from time import sleep def return_after_5_secs(message): sleep(5) return message pool = ThreadPoolExecutor(3) future = pool.submit(return_after_5_secs, ("hello")) print(future.done()) sleep(5) print(future.done()) print(future.result()) |
I hope the code is pretty self explanatory. We first construct a ThreadPoolExecutor with the number of threads we want in the pool. By default the number is 5 but we chose to use 3 just because we can ;-). Then we submitted a task to the thread pool executor which waits 5 seconds before returning the message it gets as it’s first argument. When we submit()
a task, we get back a Future
. As we can see in the docs, the Future
object has a method – done()
which tells us if the future has resolved, that is a value has been set for that particular future object. When a task finishes (returns a value or is interrupted by an exception), the thread pool executor sets the value to the future object.
In our example, the task doesn’t complete until 5 seconds, so the first call to done()
will return False
. We take a really short nap for 5 secs and then it’s done. We can get the result of the future by calling the result()
method on it.
A good understanding of the Future
object and knowing it’s methods would be really beneficial for understanding and doing async programming in Python. So I highly recommend taking the time to read through the docs.
ProcessPoolExecutor
The process pool executor has a very similar API. So let’s modify our previous example and use ProcessPool instead:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from concurrent.futures import ProcessPoolExecutor from time import sleep def return_after_5_secs(message): sleep(5) return message pool = ProcessPoolExecutor(3) future = pool.submit(return_after_5_secs, ("hello")) print(future.done()) sleep(5) print(future.done()) print("Result: " + future.result()) |
It works perfectly! But of course, we would want to use the ProcessPoolExecutor
for CPU intensive tasks. The ThreadPoolExecutor
is better suited for network operations or I/O.
While the API is similar, we must remember that the ProcessPoolExecutor
uses the multiprocessing
module and is not affected by the Global Interpreter Lock. However, we can not use any objects that is not picklable. So we need to carefully choose what we use/return inside the callable passed to process pool executor.
Executor.map()
Both executors have a common method – map()
. Like the built in function, the map method allows multiple calls to a provided function, passing each of the items in an iterable to that function. Except, in this case, the functions are called concurrently. For multiprocessing, this iterable is broken into chunks and each of these chunks is passed to the function in separate processes. We can control the chunk size by passing a third parameter, chunk_size
. By default the chunk size is 1.
Here’s the ThreadPoolExample
from the official docs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the url and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) |
And the ProcessPoolExecutor
example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main() |
as_completed() & wait()
The concurrent.futures
module has two functions for dealing with the futures returned by the executors. One is as_completed()
and the other one is wait()
.
The as_completed()
function takes an iterable of Future
objects and starts yielding values as soon as the futures start resolving. The main difference between the aforementioned map
method with as_completed
is that map
returns the results in the order in which we pass the iterables. That is the first result from the map
method is the result for the first item. On the other hand, the first result from the as_completed
function is from whichever future completed first.
Let’s see an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_5_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_5_secs, x)) for x in as_completed(futures): print(x.result()) |
The wait()
function would return a named tuple which contains two set – one set contains the futures which completed (either got result or exception) and the other set containing the ones which didn’t complete.
We can see an example here:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_5_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_5_secs, x)) print(wait(futures)) |
We can control the behavior of the wait
function by defining when it should return. We can pass one of these values to the return_when
param of the function: FIRST_COMPLETED
, FIRST_EXCEPTION
and ALL_COMPLETED
. By default, it’s set to ALL_COMPLETED
, so the wait function returns only when all futures complete. But using that parameter, we can choose to return when the first future completes or first exception encounters.
30 replies on “Python: A quick introduction to the concurrent.futures module”
[…] Python: A quick introduction to the concurrent.futures module […]
Very good explanation how to use futures. Thank you! Short and simple.
I really like your posts, it’s cool stuff!
Awesome explanation
Thanks, this was a good read on async things in Python.
Excellent post! Small nitpick: I would’t use
futures
as the name for the list, as it’s easy to confuse withconcurrent.futures
.Well done! Thank you! Looking forward to read other tutorials from you.
what about shutdown?
Quick and easy to understand!!
Good informative post. I think you meant
("hello",)
when writing("hello")
.Uh… I take it back. You simply had unnecessary
()
surrounding the string 🙂 sorryExecutors Very well explained with examples. Thanks for the post.
Good article.
[…] ref: https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures https://www.blog.pythonlibrary.org/2016/08/03/python-3-concurrency-the-concurrent-futures-module/ https://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html […]
Very clear and concise write-up. Please provide some contents on performing Gevent spawning in a multicore system.
Thank you, really great explanation. I was trying to do something like this:
When I try to run this I get this error:
PicklingError: Can’t pickle <type ‘function’>: attribute lookup __builtin__.function failed
You can not pass something that can’t be pickled. This is for process pool executor. Python uses pickles to send data between processes.
Its not possible to pickle nested functions with pickle (although the dill module can do it). You can restructure your function as a class to get around it though.
https://stackoverflow.com/questions/12019961/python-pickling-nested-functions
Great write up!
Strange replies, seems suspiciously
[…] Python: A quick introduction to the concurrent.futures module […]
what library are you using for the code snippets in your blog?
SyntaxHighlighter Evolved
Thank you for posting this
Great explanation! Thank you!
Thanks for the useful article.
There is a small mistake in the is_prime function – it gives an incorrect answer for is_prime of 2. That doesn’t show up in your example, but could be a problem in other cases.
Like your tutorial very much. Is it possible to pause future object, inject some external additional data to the thread and start it again?
Thanks for explanation. Try to understand this whole thing, but always come to an early end when some of the examples look well – and don’t work in IDLE. (the same with the documentation)
Your second example, the ProcessPoolExecutor never ends in IDLE. (False, False and nothing mor happens)
Do you know why?
This was great! Very helpful and clear! Thank you!
what does “The
as_completed()
function takes an iterable ofFuture
objects” mean??I can’t understand the relationship between iterable and Future objects…