BoundedBuffer

Up Next

Typical model scenario: a machine produces widgets which another machine consumes. Because the machines work at different and/or stochastic speeds, they are coupled by a buffer of a given size. The producer puts one or more widgets into the buffer at a time, and the consumer takes one or more out.

Problem: how to model the processes in SimPy so that the consumer can never take a widget out of an empty buffer (buffer underflow) and the producer can never put a widget into a buffer which is already full (buffer underflow). Clearly, (potentially) blocking synchronization is needed for both producer and consumer.

Such model system can be implemented in SimPy as is by using tests before putting a widget into the buffer or taking one out and passivating the process if necessary to avoid overflow or underflow. (see the producer/consumer approach presented on the HowTo's page). However, this approach has a couple of problems:

bulletit only works for models where one widget at a time is put into or taken out of the buffer, 
bulletit only works for "one producer/one consumer" models,
bulletit makes for messy, error-prone application programming of synchronizations.

A solution approach: It is possible to provide a clean solution for the bounded buffer problem by introducing two new yield synchronization statement constructs and a buffer class on which they work:

yield put,<producer>,<buffer>,<amount>

Try to put <amount> items into <buffer>. Blocks if insufficient space left in <buffer>.

yield get,<consumer>,<buffer>,<amount>

Try to get <amount> items from <buffer>. Blocks if insufficient number of items in <buffer>.

class Bin( . . . ): #the buffer class

    def __init__(self, . . .)

        self.capacity #buffer capacity

        self.inBin    #nr items in buffer

    def _put(self, . . .)

Implements 'yield put' semantics. Activates any consumer processes waiting for items as long as enough items in buffer. 

    def _get(self, . . .)

Implements 'yield get' semantics. Activates any producer processes waiting for buffer space as long as there is enough space free in buffer. 

In addition to these add-ons, the dispatch mechanism in function simulate(. . . ) SimPy module Simulation must be trivially amended to recognize the two new yield constructs.

An experimental implementation

Below is an implementation of the bounded buffer, implemented as an extension of the Simulation module. It just adds the new constructs and overloads the simulate(. . . .) function. It must therefore be imported after SimPy.Simulation. 

#addBinFinite.py

#Experimental implementation of bounded buffers for SimPy

import SimPy.Simulation as Sim
from SimPy.Lister import *
put=9999
get=666
def putfunc(a):
    a[0][2]._put(a)
def getfunc(a):
    a[0][2]._get(a)
class Bin(Lister):
    "Bounded buffer"
    def __init__(self,capacity=0,initialAvail=0,name="a_bin"):
        self.name=name
        self.inBin=initialAvail
        self.waitQput=[]
        self.waitQget=[]
        self.capacity=capacity
    def _put(self,par):
        "Put in buffer (blocking)"
        menge=par[0][3]
        if menge+self.inBin<= self.capacity:
            self.inBin+=menge
            if self.waitQget:
                for qd in self.waitQget:
                    if qd[1]<=self.inBin:
                        Sim.reactivate(qd[0],delay=0,prior=True)
                        self.inBin-=qd[1]
                        self.waitQget.remove(qd)
                    else:
                        break
            who=par[1]
            Sim._e._post(who,at=Sim._t)
        else:
            #wait for bin space
            self.waitQput.append((par[0][1],menge))
            par[0][1]._nextTime=None

    def _get(self,par):
        "Get from buffer (blocking)"
        menge=par[0][3]
        if menge>self.inBin:
            #queue here        process   menge (==requirement)
            self.waitQget.append((par[0][1],menge))
            par[0][1]._nextTime=None
        else :
            self.inBin-=menge
            who=par[1]
            # look for processes waiting for bin space
            if self.waitQput:
                for qd in self.waitQput:
                    if qd[1]<=(self.capacity-self.inBin):
                        Sim.reactivate(qd[0],delay=0,prior=True)
                        self.inBin+=qd[1]
                        self.waitQput.remove(qd)
                    else:
                        break
            Sim._e._post(who,at=Sim._t)
def simulate(until=0):
    Sim._stop=False
    if Sim._e == None:
        raise Sim.Simerror("Fatal SimPy error: Simulation not initialized")
    if Sim._e.events == {}:
        message="SimPy: No activities scheduled"
        return message
    Sim._endtime=until
    message="SimPy: Normal exit"
    dispatch={Sim.hold:Sim.holdfunc,Sim.request:Sim.requestfunc,Sim.release:Sim.releasefunc,
              Sim.passivate:Sim.passivatefunc,put:putfunc,get:getfunc}
    commandcodes=dispatch.keys()
    commandwords={Sim.hold:"hold",Sim.request:"request",Sim.release:"release",Sim.passivate:"passivate",
        Sim.waitevent:"waitevent",Sim.queueevent:"queueevent",Sim.waituntil:"waituntil",
        put:"put",get:"get"}
    while not Sim._stop and Sim._t<=Sim._endtime:
        try:
            a=Sim._e._nextev()
            if not a[0]==None:
                command = a[0][0]
                dispatch[command](a)
        except Sim.Simerror, error:
            message="SimPy: "+error.value
            Sim._stop = True
    Sim._e=None
    return message

Sample application demo program

To demo the new constructs, here is a SimPy model which has a producer feed a consumer via a buffer of capacity 5. The tries to put batches of 2 items into the buffer and the consumer tries to take three items out at a time. 

#addBinFiniteTest.py
#Test/demo of bounded buffer  
from SimPy.Simulation import *
from addBinFinite import *

class producer(Process):
    def produce(self):
        while True:
            yield put,self,buffer,2
            print now(),"put in 2"
            assert buffer.inBin<=buffer.capacity,"buffer overflow"
            yield hold,self,2

class consumer(Process):
    def consume(self):
        while True:
            yield get,self,buffer,3
            assert buffer.inBin>=0,"buffer underflow"
            print now(),"took out 3"
            yield hold,self,15

initialize()
buffer=Bin(capacity=5,name="boundBin")
p=producer("producer")
activate(p,p.produce())
c=consumer("consumer")
activate(c,c.consume())
print simulate(until=200)
OUTPUT:

0 put in 2
2 took out 3
2 put in 2
4 put in 2
6 put in 2
17 put in 2
17 took out 3
32 put in 2
32 took out 3

.  .  .  .  .  .  .  .  . (some output omitted)
154 put in 2
167 put in 2
167 took out 3
182 put in 2
182 took out 3
184 put in 2
197 put in 2
197 took out 3
SimPy: Normal exit

The assert statements test for violations of the buffer overflow and underflow conditions.

The output shows the producer correctly blocking at e.g. time 6.

Download

If you want to experiment with these experimental constructs yourself, download here:

bulletaddBinFinite.py (the extension module)
bulletaddBinFiniteTest.py (the demo)
 

horizontal rule

©Copyright 2004, 2005, 2006, 2007, 2008 SimPy Developer Team.
For problems or questions regarding this web contact SimPy webmaster.
Last updated: 20/06/08.