Producer/Consumer

Back Up Next

Synchronizing processes and resources part 2: producer/consumer 

(Author: Klaus G. Muller; kgmuller at users.sourceforge.net)

In producer/consumer synchronizations, producer processes make items available to consumer processes. Examples are a message sender and a message receiver, or two machines working on items in sequence. The synchronization here must ensure that the consumer process does not consume more items than have been produced. If necessary, the consumer process is blocked (must wait) if no item is available to be consumed. Producer and consumer processes are coupled by a buffer to allow asynchronous production and consumption. The buffer can be bounded (have a capacity limit) or unbounded (be able to store an unlimited number of items).

The structure of a producer/consumer synchronization with an unbounded buffer looks like this in SimPy: 

class Producer(Process):
    def produce(self):
        while True:
            yield hold,self,productionTime            #produce an item
            put item into buffer
            if consumer.passive():                       #if consumer waiting for item,
                 reactivate(consumer)                   #       wake it up

class Consumer(Process):
    def consume(self):
        while True:
            if not buffer:                                     #if no item in buffer,
                yield passivate,self                        #      wait for item
            while buffer:                                     #while items in buffer,
                take item out of buffer
               
yield hold,self,consumptionTime       #      consume item

buffer=[ ]
producer=Producer()
consumer=Consumer()

Note: this only works for the coordination/synchronization between one producer and one consumer. With several consumers working in parallel, one consumer could erroneously try to take an item out of the buffer while another process is consuming the last item in the buffer. 

Here is a simple yet complete model with producer/consumer synchronization with an unbounded buffer:

"""
Simple producer/consumer model with unbounded buffer
"""
from SimPy.Simulation import *
import random

class Producer(Process):
    def produce(self):
        self.produced=0
        while True:
            yield hold,self,random.uniform(1.0,5.0)     #produce
            buffer.append(Item())                       #put item into buffer
            if cons.passive():                          #reactivate consumer if passive
                reactivate(cons)
            print "%s item produced"%now()
            self.produced+=1

class Consumer(Process):
    def consume(self):
        self.consumed=0
        while True:
            if not buffer:                              #if buffer empty, passivate
                yield passivate,self
            while buffer:                               #while items in buffer
                buffer.pop(0)                           #take item out of buffer
                self.consumed+=1                        #consume item
                print "%s item consumed"%now()
                yield hold,self,random.uniform(1.0,4.9)

class Item:
    pass

buffer=[]
produced=consumed=0
initialize()
prod=Producer("Producer")
activate(prod,prod.produce())
cons=Consumer("Consumer")
activate(cons,cons.consume())
simulate(until=20)
print "produced: %s, consumed: %s, in buffer: %s"\
      %(prod.produced,cons.consumed,len(buffer))

OUTPUT:

1.18998162499 item produced
1.18998162499 item consumed
3.04978978053 item produced
4.67639404396 item consumed
7.91845737849 item produced
7.91845737849 item consumed
9.39241701538 item produced
10.7954940036 item produced
12.5224683604 item consumed
14.2282703847 item produced
14.2545615616 item consumed
16.3758414166 item consumed
17.4764134234 item produced
17.8687107056 item consumed
19.9345355896 item produced
produced: 8, consumed: 7, in buffer: 1

Clearly, if the buffer access must be in mutual exclusion between producer putting and consumer getting, the mutual exclusion synchronization construct yield request/yield release must be added in both processes. An example where this may be necessary is a sender process putting messages into shared memory from which a receiver process retrieves them.

What about buffers with limited capacity (bounded buffers)? Here, the producer process must not put items into a full buffer. The producer must wait until the consumer process has removed an item. The consumer must then wake up the producer. 

Here is the general SimPy structure for producer/consumer synchronization with a bounded buffer:

class Producer(Process):
    def produce(self):
        while True:
            if len(buffer)==bufferSize:                       #if buffer full, 
                 yield passivate, self                           #     wait for space to be freed
            yield hold,self,productionTime                     #produce an item
            put item into buffer

            if consumer.passive():
                reactivate(consumer)

class Consumer(Process):
    def consume(self):
        while True:
            if not buffer:
                yield passivate self
            while buffer:
                take item out of buffer
                if producer.passive():                             #if producer waiting for space,
                    
reactivate(producer)                         #     wake it up
               
yield hold,self,consumptionTime             #consume item

buffer=[ ]
bufferSize=x
consumer=Consumer()
producer=Producer()

Again, this is only a valid construction for one producer and one consumer.

Better, more flexible and robust solutions to the Producer/Consumer problem are possible with classes Level and Store, and the new yield statements 'yield put' and 'yield get', all introduced in SimPy 1.7.

 

horizontal rule

©Copyright 2004, 2005, 2006, 2007, 2008 SimPy Developer Team.
For problems or questions regarding this web contact SimPy webmaster.
Last updated: 20/06/08.