The concurrent.futures module provides a common high level interface for asynchronously executing callables using pools of threads or processes.

The concurrent.futures.Executor is a class to execute function calls asynchronously. The important methods are submit(function, args), which calls the specified function passing in the given arguments, and map(function, iterables) which calls the specified function asynchronously passing in each iterable as an argument for a separate function call. This should not be used directly, but is used through its subclasses ThreadPoolExecutor and ProcessPoolExecutor.

Let’s jump into an example. The purpose of the following program is to find the sum of all prime numbers until the given number. There are two functions to demonstrate how to use a pool of threads and how to use a pool of processes. sum_primes_thread(nums) uses threads and sum_primes_process(nums) uses processes. Notice that the only difference between the two functions is that one uses ThreadPoolExecutor while the other uses ProcessPoolExecutor.

import concurrent.futures
import time

def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

def find_sum(num):
    sum_of_primes = 0

    ix = 2

    while ix <= num:
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1

    return sum_of_primes

def sum_primes_thread(nums):
    with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as executor:
        for number, sum_res in zip(nums, executor.map(find_sum, nums)):
            print("{} : Sum = {}".format(number, sum_res))

def sum_primes_process(nums):
    with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
        for number, sum_res in zip(nums, executor.map(find_sum, nums)):
            print("{} : Sum = {}".format(number, sum_res))

if __name__ == '__main__':
    nums = [100000, 200000, 300000]
    start = time.time()
    sum_primes_thread(nums)
    print("Time taken = {0:.5f}".format(time.time() - start))

Output when executing sum_primes_process:

100000 : Sum = 454396537
200000 : Sum = 1709600813
300000 : Sum = 3709507114
Time taken = 0.71783

Output when executing sum_primes_thread:

100000 : Sum = 454396537
200000 : Sum = 1709600813
300000 : Sum = 3709507114
Time taken = 1.23388

This was a simple example to demonstrate how to use pools of threads and processes.

Note : Don’t use threads for CPU intensive tasks.

Though the callables themselves are executed asynchronously, the results are printed in the order they were called. The output can be displayed asynchronously also, as I demonstrate in the following example.

This program just retrieves the titles of web pages.

The output is displayed as and when a request is completed, so the order of the output usually changes between multiple runs of the program.

The concurrent.futures.as_completed(iterable_of_futures) method accepts an iterable of Future objects. The iterable is a dictionary in this program, but other iterables can also be used (eg- list).

import concurrent.futures
import time
import requests
import bs4
import os

def load_url(current_url):
    res = requests.get(current_url)
    res.raise_for_status()

    current_page = bs4.BeautifulSoup(res.text,"html.parser")
    current_title = current_page.select('title')[0].getText()
    return current_title

def process_urls_thread_alt(urls):
    with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as executor:
        future_to_url = {executor.submit(load_url, url): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                print("{} : Url = {}".format(data, url))


def process_urls_process_alt(urls):
    with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
        future_to_url = {executor.submit(load_url, url): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                print("{} : Url = {}".format(data, url))

if __name__ == '__main__':
    url_list = ["https://www.google.com", "https://www.ploggingdev.com/2016/11/beginning-python-3/", "https://www.ploggingdev.com/archive/", "https://www.ploggingdev.com/2016/11/data-types-in-python-3/", "https://www.ploggingdev.com/2016/11/strings-in-python-3/"]
    start = time.time()
    process_urls_process_alt(url_list)
    print("Time taken = {0:.5f}".format(time.time() - start))

Output of the program (same when using threads or processes):

Google : Url = https://www.google.com
Beginning Python 3 : Url = https://www.ploggingdev.com/2016/11/beginning-python-3/
Archive : Url = https://www.ploggingdev.com/archive/
Strings in Python 3 : Url = https://www.ploggingdev.com/2016/11/strings-in-python-3/
Data types in Python 3 : Url = https://www.ploggingdev.com/2016/11/data-types-in-python-3/
Time taken = 1.82259

There are three exceptions that can occur:

  • concurrent.futures.CancelledError is raised when a future is cancelled

  • exception concurrent.futures.TimeoutError is raised when a future operation exceeds the given timeout

  • concurrent.futures.process.BrokenProcessPool is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion

In the above program they are handled in a catch all except Exception block, but this can be modified depending on the requirements.

Code for todays blog is here.

Reference : Official library reference