import threading
import time
class ThreadGateException(Exception):
pass
class ThreadGate(object):
""" A class which works as a FIFO 'gate' for threads. By
default the gate is open and any thread calling on the
'enter' method returns immediately.
A thread can 'close' the gate by calling the method 'close'.
This thread becomes the owner of the gate. Any other thread
calling 'enter' after this is automatically blocked till
the owner calls reopens the gate by calling 'open'.
The gate requires a certain number of threads to block
before the owner exits from the 'close' method. Otherwise,
the owner waits for a timeout before returning from the 'close'
method, without actually closing the gate.
The gate class can be used to block running threads for a
particular operation and making sure that they resume after
the operation is complete, for a fixed number of threads.
"""
def __init__(self, numthreads, timeout=0):
self.lock = threading.Lock()
self.sem = threading.BoundedSemaphore(1)
self.evt = threading.Event()
self.count = 0
self.owner_timeout = timeout
self.owner = None
self.nthreads = numthreads
# Open by default
self.position = 1
def close(self):
""" Close the gate. The calling thread
becomes the owner of the gate and blocks
till the requisite number of threads block
on the gate or a timeout occurs, whichever
is first.
It is an error if the gate is already closed """
if self.position == 0:
# Already closed
raise ThreadGateException,"trying to close an already closed gate"
try:
self.lock.acquire()
self.position = 0
self.owner = threading.currentThread()
self.sem.acquire()
finally:
self.lock.release()
# Wait on the event till timeout
self.evt.clear()
self.evt.wait(self.owner_timeout)
# If event was set, requisite number off
# threads have blocked, else reset the gate
if not self.evt.isSet():
try:
print 'Owner thread timedout, re-setting gate'
self.lock.acquire()
self.position = 1
self.owner = None
self.sem.release()
finally:
self.lock.release()
return -1
return 0
def open(self):
""" Open the gate. The calling thread should
be the owner of the gate. It is an error if
the gate is already open """
if self.position == 1:
# Already open
raise ThreadGateException,"trying to open an already opened gate"
if threading.currentThread() != self.owner:
raise ThreadGateException,"not owner, cannot open gate"
try:
self.lock.acquire()
self.position = 1
self.owner = None
self.sem.release()
finally:
self.lock.release()
def enter(self):
""" Enter the gate. If the gate is open, returns
immediately, else gets blocked till the gate is
opened by the owner """
if self.position==1:
return 0
# Lock mutex and increment count
try:
self.lock.acquire()
self.count += 1
if self.count==self.nthreads:
self.evt.set()
finally:
self.lock.release()
ct = threading.currentThread()
print 'Thread %s - Entering Gate' % (ct.getName())
# Lock mutex and decrement count
try:
# Will block here
self.sem.acquire()
self.lock.acquire()
self.count -= 1
finally:
self.lock.release()
self.sem.release()
print 'Thread %s - Exiting Gate' % (ct.getName())
def get_count(self):
""" Return count of blocked threads """
return self.count
def test():
""" Test code """
import random
import Queue
enterlog = Queue.Queue(0)
exitlog = Queue.Queue(0)
def enter(index):
enterlog.put(index)
def exit(index):
exitlog.put(index)
class OwnerThread(threading.Thread):
""" Owner thread class for gate demo """
def __init__(self, gate):
self.gate = gate
threading.Thread.__init__(self, None, 'Owner thread')
def run(self):
# Close the gate
print 'Closing gate...'
ret = self.gate.close()
if ret==0:
print 'Gate closed successfully'
print 'Gate count=>',self.gate.get_count()
# Open gate after sleeping some time
time.sleep(5)
if ret==0:
print 'Opening gate'
self.gate.open()
else:
print 'Gate closing not successful'
class SampleThread(threading.Thread):
""" Sample thread class for gate demo """
def __init__(self, index, gate):
self.gate = gate
self.index = index
threading.Thread.__init__(self, None, 'Thread %d' % self.index, None)
def run(self):
# Sleep randomly
time.sleep(random.choice(range(1,10)))
# Mark entry to gate
enter(self.index)
self.gate.enter()
# Mark exit out of gate
exit(self.index)
def test1():
""" Test code where gate is closed successfully """
print 'test1()...'
gate = ThreadGate(10, 20)
random.seed()
print 'Starting threads...'
# Create 10 threads
threads = []
threads.append(OwnerThread(gate))
for x in range(10):
threads.append(SampleThread(x, gate))
# Start threads and join
for x in range(11):
threads[x].start()
# Join with threads
for x in range(11):
threads[x].join()
print 'Joined with threads'
print 'Gate count=>',gate.get_count()
# Exit and entry logs must be same
print enterlog
print exitlog
def test2():
""" Test code where gate is closed unsuccessfully """
print 'test1()...'
gate = ThreadGate(10, 5)
random.seed()
print 'Starting threads...'
# Create 10 threads
threads = []
threads.append(OwnerThread(gate))
for x in range(10):
threads.append(SampleThread(x, gate))
# Start threads and join
for x in range(11):
threads[x].start()
# Join with threads
for x in range(11):
threads[x].join()
print 'Joined with threads'
print 'Gate count=>',gate.get_count()
# Exit and entry logs must be same
print enterlog
print exitlog
test1()
while not enterlog.empty():
print enterlog.get(),
print
while not exitlog.empty():
print exitlog.get(),
print
test2()
while not enterlog.empty():
print enterlog.get(),
print
while not exitlog.empty():
print exitlog.get(),
print
if __name__ == "__main__":
test()