Programmer's Python: Async - Futures
Written by Mike James   
Monday, 28 August 2023
Article Index
Programmer's Python: Async - Futures
Executors
Locking and Sharing Data
Using a Process Manager to Share Resources

Using a Process Manager to Share Resources

If you want to use a more sophisticated resource sharing method then you need to create a manager and pass the proxy objects it creates to the processes:

import concurrent.futures
import multiprocessing
import multiprocessing.managers
import time
import ctypes
def counter(count,lock):
    for i in range(10000):
        with lock:
            temp=count.value+1
            count.value=temp
if __name__ == '__main__':
     
    with multiprocessing.Manager() as man:
        with concurrent.futures.ProcessPoolExecutor(2)
as execute: myCounter=man.Value(ctypes.c_int,0) myLock=man.Lock() t1=time.perf_counter() f1= execute.submit(counter,myCounter,myLock) f2= execute.submit(counter,myCounter,myLock) concurrent.futures.wait([f1,f2],
return_when=concurrent.futures.
ALL_COMPLETED) t2=time.perf_counter() print(myCounter.value) print((t2-t1)*1000)

In this case we need to pass both a Value object and a Lock object because the manager’s Value object doesn’t have a built-in lock. The lock is used in the with statement in the counter function. If you remove it you will find that the result is less than 20000 due to race conditions.

Notice that we don’t need to use the initializer as now the shared objects are passed as parameters. To be more accurate, the proxies to the shared objects are passed as parameters and these are standard Python objects which are pickleable. The proxy objects connect to the manager’s server running in a separate process. This means we not only have one more process running, we also have the overhead of using a pipe to allow the proxy objects to communicate with the shared object. As a result this is slow. Compared to the use of the basic multiprocessing shared objects this takes more than ten times as long to complete.

Sharing Futures and Deadlock

There is another difference between using threads and processes when it comes to futures. In a threaded environment futures are generally accessible by more than one thread. The reason is that they are usually global to make sure that they live long enough to resolve and deliver their result. What this means is that not only can the use of locks result in deadlock, so can the use of futures by multiple threads. For example, consider what happens if we define two functions, taskA that waits for Future f2 and taskB waits for Future f1:

import concurrent.futures
import time
def taskA():
    time.sleep(1)
    ans=f2.result()
    return ans
def taskB():
    time.sleep(1)
    ans=f1.result()
    return ans
     
with concurrent.futures.ThreadPoolExecutor(2) 
as execute: f1= execute.submit(taskA) f2= execute.submit(taskB) concurrent.futures.wait([f1,f2],
return_when=concurrent.futures.
ALL_COMPLETED) print(f1.result())

Of course the result is deadlock. The sleep at the start of taskA is necessary to allow taskB to be started and create f2 before taskA tries to use it.

This may be a contrived example, but in real life deadlocks due to waiting on futures happen in ways that are much more difficult to detect. Notice that this can’t happen with process-based futures because these aren’t shared between processes. If you can avoid accessing futures on threads that didn’t create them then you can avoid deadlock.

Computing Pi with Futures

Computing pi using futures is very similar to the previous example using a process pool. It would seem to be more instructive to implement the example using a thread pool but as this would show no speed advantage due to the GIL, a version using the process executor is more interesting. Converting the code to use a thread pool is a matter of changing one line:

import concurrent.futures
import time
def myPi(m,n):
    pi=0
    for k in range(m,n+1):
        s= 1 if k%2 else -1 
        pi += s / (2 * k - 1)
    return pi*4
if __name__ == '__main__':
    N=10000000
    with concurrent.futures.ProcessPoolExecutor(2) 
as execute: t1=time.perf_counter() f1=execute.submit(myPi,1,N//2) f2=execute.submit(myPi,N//2+1,N) PI=f1.result() PI+=f2.result() t2=time.perf_counter() print((t2-t1)*1000) print(PI)

Notice that now we pass the parameters to the called function without the need to use a list or tuple and the calls to result makes the main process wait until the relevant future resolves. If there is an exception in the function this is passed to the main process. Also notice that no locking is required as the threads do not make use of shared resources and they return their results using a future.

If you change the with to read:

    with concurrent.futures.ThreadPoolExecutor(2) as execute:

then, with no other changes, you have a version which works with threads. This takes more than twice as long to run as the process version, which is what you would expect.

Process Pool or Concurrent Futures?

Python seems to have two modules which do similar things. ProcessPool provides a futures-like approach using AsyncResult and a wide range of map-like operations. However, it doesn’t do a good job of supporting a thread pool equivalent.

The concurrent.futures module, on the other hand, provides a more complete futures approach and both process and thread pools are well supported. You can also make use of multiprocessing managers, which isn’t surprising as the sharing by proxy approach does work with almost any type of process, irrespective of how it has been created.

In most cases the best choice is concurrent.futures backed up by multiprocessing. Only use multiprocessing.pool if you need the more advanced map-style functions.

Summary

  • A Future is an object that lives in the main thread/process that the child thread/process can use to signal its state and return a result at some time in the future – hence the name.

  • A Future can be in one of three states: Resolved – the function has completed, Pending – the function is still running, Canceled – the function has been canceled.

  • You can wait on a Future to either return a result or an exception.

  • Executors are objects which can run a function on a thread/process from the thread/process pool.

  • Functions can be submitted to the pool using the submit method. Parameters are transferred to the submitted functions by being pickled and transmitted over a connecting pipe. The result of the function is returned as a future.

  • The wait function can be used to wait for all futures to complete, the first to complete or the first to give an exception.

  • You can also set callbacks which are activated when the future resolves. This is mostly useful in a single-threaded environment.

  • One of the big advantages of using futures is that they capture any exceptions that happen in the child thread/process and return them to the parent thread/process. This makes error handling as easy as for synchronous code.

  • Even if you are using futures you still have to worry about race conditions and locking is still necessary.

  • Threads share global resources like locks, but processes don’t. You can’t pass a lock to a process as a parameter because the lock isn’t pickleable.

  • To pass a lock to a process you can use the initializer parameter to run a function which creates global objects when the process is first created.

  • An alternative way to pass a lock is to use a process manager, but this is slow.

  • Futures shared between threads can cause deadlock.

Programmer's Python:
Async
Threads, processes, asyncio & more

Is now available as a print book: Amazon

pythonAsync360Contents

1)  A Lightning Tour of Python.

2) Asynchronous Explained

3) Processed-Based Parallelism
         Extract 1 Process Based Parallism
4) Threads
         Extract 1 -- Threads
5) Locks and Deadlock

6) Synchronization

7) Sharing Data
        Extract 1 - Pipes & Queues

8) The Process Pool
        Extract 1 -The Process Pool 1 

9) Process Managers

10) Subprocesses ***NEW!

11) Futures
        Extract 1 Futures

12) Basic Asyncio
        Extract 1 Basic Asyncio

13) Using asyncio
        Extract 1 Asyncio Web Client
14) The Low-Level API
       Extract 1 - Streams & Web Clients
Appendix I Python in Visual Studio Code

 

kotlin book

 

Comments




or email your comment to: comments@i-programmer.info

To be informed about new articles on I Programmer, sign up for our weekly newsletter, subscribe to the RSS feed and follow us on Twitter, Facebook or Linkedin.

<ASIN:1871962765>

<ASIN:1871962749>

<ASIN:1871962595>



Last Updated ( Monday, 28 August 2023 )