Wrote some simple implementation of pool for pymongo package under gevent coroutine library.
Base bug here was with pymongo.connection.Pool because in the original package it is thread-local, so when you spawn new greenlet and trying to get already open connection, it creates new connection because in this greenlet pool is empty. So if you will implement your own pool don’t forget about this.
Example of use:
# Create Pool. 
db = Mongo('test_db',10)
# Get connection from pool
conn = db.get_conn()
# Get raw connection for GridFS
raw_conn = conn.getDB
#Mongo is a singleton. So if you want to get connection in another part of application just type
db = Mongo()
conn = db.get_conn()
#Connection will get back to pool when context will be closed.
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119  | __author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
 
import pymongo
from gevent.queue import PriorityQueue
import os
import time
class MongoPoolException(Exception):
    pass
class MongoPoolCantConnect(MongoPoolException):
    pass
    
class MongoPoolAutoReconnect(MongoPoolException):
    pass
class GPool(object):
    """
        Rewrited non-thread local implementation of pymongo.connection._Pool
    """
    __slots__ = ["sockets", "socket_factory", "pool_size","sock"]
    def __init__(self, socket_factory):
        object.__init__(self)
        self.pool_size = 1
        if not hasattr(self,"sock"):
            self.sock = None
        self.socket_factory = socket_factory
        if not hasattr(self, "sockets"):
            self.sockets = []
    def socket(self):
        # we store the pid here to avoid issues with fork /
        # multiprocessing - see
        # test.test_connection:TestConnection.test_fork for an example
        # of what could go wrong otherwise
        pid = os.getpid()
        if self.sock is not None and self.sock[0] == pid:
            return self.sock[1]
        try:
            self.sock = (pid, self.sockets.pop())
        except IndexError:
            self.sock = (pid, self.socket_factory())
        return self.sock[1]
    def return_socket(self):
        
        if self.sock is not None and self.sock[0] == os.getpid():
            # There's a race condition here, but we deliberately
            # ignore it.  It means that if the pool_size is 10 we
            # might actually keep slightly more than that.
            if len(self.sockets) < self.pool_size:
                self.sockets.append(self.sock[1])
            else:
                self.sock[1].close()
        self.sock = None
pymongo.connection._Pool = GPool
class MongoConnection(object):
    """Memcache pool auto-destruct connection"""
    def __init__(self,pool,conn):
        self.pool = pool
        self.conn = conn
        
    def getDB(self):
        return self.conn
    def __getattr__(self, name):
        return getattr(self.conn, name)
    
    def __getitem__(self, name):
        return self.conn[name]
                                             
    def __del__(self):
        self.pool.queue.put((time.time(),self.conn))
        del self.pool
        del self.conn
class Mongo(object):    
    """MongoDB Pool"""
    def __new__(cls,size=5,dbname='',*args,**kwargs):
        if not hasattr(cls,'_instance'):
            cls._instance = object.__new__(cls)
            cls._instance.dbname = dbname
            cls._instance.queue = PriorityQueue(size)
            for x in xrange(size):
                try:
                    cls._instance.queue.put(
                        (time.time(),pymongo.Connection(*args,**kwargs)[dbname])
                    )
                except Exception,e:
                    raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
                    
        return cls._instance     
        
    def get_conn(self,block=True,timeout=None):
        """Get Mongo connection wrapped in MongoConnection"""
        obj = MongoConnection
        return obj(self,self.queue.get(block,timeout)[1]) 
        
def autoreconnect(func,*args,**kwargs):
    while True
        try:
            result = func(*args,**kwargs)
        except pymongo.errors.AutoReconnect:
            raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')      
        else: 
            return result
            break
        
    
        
 | 
Download
Copy to clipboard