Programmer's Python Async - Sharing Data Pipes & Queues
Written by Mike James   
Tuesday, 18 July 2023
Article Index
Programmer's Python Async - Sharing Data Pipes & Queues
Queue Example
Pipes

A simple example demonstrates the basics of using a Queue. An addCount function is run as one process and this stores 1000 integers in the queue and then two processes running getCount read the data back out:

import multiprocessing
def addCount(q):
    for i in range(1000):
        q.put(i)
def getCount(q,l):
    while True:
        i=q.get()
        with l:
            print(i)
if __name__ == '__main__':
    q=multiprocessing.Queue()
    pLock=multiprocessing.Lock()
    p1=multiprocessing.
Process(target=addCount,args=(q,)) p2=multiprocessing.
Process(target=getCount,args=(q,pLock)) p3=multiprocessing.
Process(target=getCount,args=(q,pLock)) p1.start() p2.start() p3.start() p3.join()

To keep things tidy, a lock is also used to control access to the screen so that results are printed without a race condition.

When using a Queue you can generate the Full and Empty exceptions which don’t always mean that the queue is full or empty – they can just mean that a timeout has occurred. To find out which it is you can use:

empty()
full()

and to find how many items are in the queue:

qsize()

Each of these methods is described as unreliable in the documentation, but all this means is that it is very possible for the status of the Queue to change between getting a value from one of the methods and doing something with it. For example, a common approach is to check if the Queue is full before trying to put a new value. If you test and find the Queue not full it is possible you could still have to wait when adding a new value because another thread might have filled the Queue before you had a chance to do the job.

You can also control the Queue and the thread that is working with it in the process. The close() method shuts the Queue down so that no more data can be added. The Queue can still be read by other processes. After a close it is recommended that you allow any data put to the Queue to get there using join_thread(). The only problem with this is that if the Queue is full the thread will wait until it has space and if this doesn’t happen you have deadlock. Any process that is not the creator of the Queue does a join_thread by default and hence deadlock can occur if the data cannot be entered into the Queue. To avoid this you can use cancel_join_thread(), which results in the process exiting without waiting for the data to be sent to the Queue.

As well as Queue there are also two variations on the basic idea.

SimpleQueue is a reduced version which lacks the ability to restrict its overall size and it doesn’t have join or qsize. It only supports:

  • get()
  • put()
  • close()
  • empty()

JoinableQueue is a subclass of Queue that makes it easier to wait for a queue to be empty. It has two extra methods:

  • join() waits until all items in the queue have been processed, i.e. the queue is empty

  • task_done() signals that an item has been removed from the queue and has been processed

The idea is that every time an item is put to the queue the count of unprocessed items goes up by one. The count goes down by one when task_done is called. When the count reaches zero any join unblocks and the waiting thread starts running.

For example you can modify the previous example to use a JoinableQueue by modifying getCount to use task_done():

def getCount(q,l):
    while True:
        i=q.get()
        q.task_done()
        with l:
            print(i)

 

Now the main program can wait for the queue to be empty:

if __name__ == '__main__':
    q=multiprocessing.JoinableQueue()
    pLock=multiprocessing.Lock()
    p1=multiprocessing.
Process(target=addCount,args=(q,)) p2=multiprocessing.
Process(target=getCount,args=(q,pLock)) p3=multiprocessing.
Process(target=getCount,args=(q,pLock)) p1.start() p2.start() p3.start() q.join() print("done")

Now the main program waits for the queue to empty and prints done.

Running this example using Python 3.10 under Linux and Windows produces different results. Under Windows the parent process never prints done and waits forever. Under Linux it works correctly – the main process prints done and then waits forever.

pythonAsync180

 



Last Updated ( Tuesday, 18 July 2023 )