Programmer's Python: Async - Futures
Written by Mike James   
Monday, 28 August 2023
Article Index
Programmer's Python: Async - Futures
Executors
Locking and Sharing Data
Using a Process Manager to Share Resources

Locking and Sharing Data

Of course, you need to avoid race conditions even with futures and executors. They both make using threads and processes easier, but any shared resources still need to be locked if you want to avoid the problems of multiple access. There really should be no reason to repeat this here as concurrent.futures is no different from using raw threads and processes in this respect, but there is a tendency to think that such problems go away as the level of sophistication increases.

Although concurrent.futures provides a similar approach to threads and processes, they differ in the way they work with shared data and locks. As already explained in detail, threads automatically share global data simply because they run in the same address space, i.e. they run in the same process. By contrast, processes are isolated and don’t share global resources. As a result you cannot generally take a program that works with threads and change it to work with processes by just changing the executor object. For example, the counter example from earlier automatically shares the global myCounter, but has to use a threading.Lock to avoid race conditions:

import concurrent.futures
import threading 
import time
from cmath import sqrt
myCounter=0
countlock=threading.Lock()
def count():
    global myCounter
    for i in range(100000):
        with countlock:
            temp=myCounter+1
            x=sqrt(2)
            myCounter=temp
with concurrent.futures.ThreadPoolExecutor() as execute:
    t1=time.perf_counter()
    f1= execute.submit(count)
    f2= execute.submit(count)
    concurrent.futures.wait([f1,f2],
return_when=concurrent.futures.ALL_COMPLETED) t2=time.perf_counter() print((t2-t1)*1000) print(myCounter)

If you remove the with countlock then you will usually see a much smaller final count than 200000 due to race conditions.

pythonAsync180

Locking and Process Parameters

Converting the counter example to use processes is complicated because you have to implement the sharing of the counter and the lock. As was explained in Chapter 8, passing locks and any object which cannot be pickled is a problem for processes. The multiprocessing.pool module treats locks as special and arranges to pass them to child processes in a way that works. The concurrent.futures module doesn’t do this and this often causes problems when you first try using it.

You cannot pass lock objects to concurrent.futures processes as parameters. This also means you cannot pass shared ctypes, like Value, as these contain a default lock. If you try something like:

    myCounter=multiprocessing.Value(ctypes.c_int,0)
    with concurrent.futures.ProcessPoolExecutor(2) 
as execute:
f1= execute.submit(counter, myCounter)

where the counter function is assumed to accept a single parameter which is a Value object, you will discover that it doesn’t work. It doesn’t generate an error message, it simply dies silently because the future, f1, absorbs the exception. You can see the problem by adding:

 print(f1.exception())

which displays:

Synchronized objects should only be shared between 
processes through inheritance

and isn’t entirely helpful. The problem really is that synchronized objects cannot be pickled and hence cannot be passed to a process.

There are two solutions to this problem and the first isn’t well known. You can use the initializer parameter to set up some global objects ready for the process to start. The second is to make use of a process manager to share resources. The process manager approach is very general in that it makes use of its own process to share resources and proxy objects that run in any other processes. This means that it works with almost anything, but it isn’t particularly efficient. Lets look at each approach in turn.

Using initializer to Create Shared Globals

If you want an efficient implementation using multiprocessing synchronization objects then you have to set up an initialization function and make sure it is called before each process starts:

import concurrent.futures
import multiprocessing
import time
import ctypes
def counter():
    global count
    for i in range(10000):
        with count:
            temp=count.value+1
            count.value=temp
def setup(var):
    global count
    count=var
if __name__ == '__main__':     
    myCounter=multiprocessing.Value(ctypes.c_int,0)
    with concurrent.futures.ProcessPoolExecutor(2,
initializer=setup,initargs=(myCounter,))
as execute: t1=time.perf_counter() f1= execute.submit(counter) f2= execute.submit(counter) concurrent.futures.wait([f1,f2], return_when=concurrent.futures.
ALL_COMPLETED) t2=time.perf_counter() print(myCounter.value) print((t2-t1)*1000)

The setup function simply converts its parameter into a global. In general this could be used to create multiple global objects, but in this case we only need multiprocessing.Value as this includes a basic lock. The initializer parameter is used to ensure that each process gets the Value object as a global. The count function uses this as a lock before it updates it. Notice that the initializer is only called once when the process is created. If the process is reused the globals that are created are not reset. Usually this is what you want as they are a communication between processes, but sometimes this can be a problem.

If you take out the with count statement then you will see that the reported count is less than 20000 due to race conditions.



Last Updated ( Monday, 28 August 2023 )