Welcome, guest | Sign In | My Account | Store | Cart
import collections
import functools
from itertools import ifilterfalse

class ZeroDict(dict):
    'Mapping where default values are zero'
    def __missing__(self, key):
        return 0

def lru_cache(maxsize):
    '''Decorator applying a least-recently-used cache with the given maximum size.

    Arguments to the cached function must be hashable.
    Cache performance statistics stored in f.hits and f.misses.

    '''
    maxcache = maxsize
    maxqueue = maxsize * 10
    def decorating_function(f):
        cache = {}                  # mapping of args to results
        queue = collections.deque() # order that keys have been used
        refcount = ZeroDict()       # number of times each key is in the queue
        sentinel = object()         # marker for looping around the queue

        # lookup optimizations (ugly but fast)
        queue_append, queue_popleft = queue.append, queue.popleft
        _len, _tuple, _sorted, _KeyError = len, tuple, sorted, KeyError

        @functools.wraps(f)
        def wrapper(*args, **kwds):
            # cache key records both positional and keyword args
            key = args
            if kwds:
                key += _tuple(_sorted(kwds.items()))

            # record recent use of this key
            queue_append(key)
            refcount[key] += 1

            # get cache entry or compute if not found
            try:
                result = cache[key]
                wrapper.hits += 1
            except _KeyError:
                result = f(*args, **kwds)
                cache[key] = result
                wrapper.misses += 1

                # purge least recently used cache entry
                if _len(cache) > maxcache:
                    key = queue_popleft()
                    refcount[key] -= 1
                    while refcount[key]:
                        key = queue_popleft()
                        refcount[key] -= 1
                    del cache[key]
                    del refcount[key]

            # periodically compact the queue by eliminating duplicate keys
            # while preserving order of most recent access
            if _len(queue) > maxqueue:
                queue_appendleft, _refcount = queue.appendleft, refcount
                _refcount.clear()
                queue_appendleft(sentinel)
                for key in ifilterfalse(_refcount.__contains__,
                                        iter(queue.pop, sentinel)):
                    _refcount[key] = 1
                    queue_appendleft(key)

            return result

        wrapper.hits = wrapper.misses = 0
        return wrapper

    return decorating_function


if __name__ == '__main__':

    @lru_cache(maxsize=20)
    def f(x, y):
        return 3*x+y

    domain = range(5)
    from random import choice
    for i in range(1000):
        r = f(choice(domain), choice(domain))

    print f.hits, f.misses

Diff to Previous Revision

--- revision 3 2010-07-28 17:20:11
+++ revision 4 2010-07-28 20:51:17
@@ -1,8 +1,9 @@
 import collections
-import itertools
 import functools
+from itertools import ifilterfalse
 
 class ZeroDict(dict):
+    'Mapping where default values are zero'
     def __missing__(self, key):
         return 0
 
@@ -14,63 +15,68 @@
 
     '''
     maxcache = maxsize
-    maxqueue = maxsize * 4
+    maxqueue = maxsize * 10
     def decorating_function(f):
         cache = {}                  # mapping of args to results
-        queue = collections.deque() # order that keys have been accessed
+        queue = collections.deque() # order that keys have been used
         refcount = ZeroDict()       # number of times each key is in the queue
+        sentinel = object()         # marker for looping around the queue
+
+        # lookup optimizations (ugly but fast)
+        queue_append, queue_popleft = queue.append, queue.popleft
+        _len, _tuple, _sorted, _KeyError = len, tuple, sorted, KeyError
+
         @functools.wraps(f)
         def wrapper(*args, **kwds):
-            
-            # localize variable access (ugly but fast)
-            _cache=cache; _len=len; _refcount=refcount; queue_append=queue.append
-
-            # cache key based on both args and keyword args
+            # cache key records both positional and keyword args
             key = args
             if kwds:
-                key += tuple(sorted(kwds.iteritems()))
+                key += _tuple(_sorted(kwds.items()))
 
             # record recent use of this key
             queue_append(key)
-            _refcount[key] += 1
+            refcount[key] += 1
 
             # get cache entry or compute if not found
             try:
-                result = _cache[key]
+                result = cache[key]
                 wrapper.hits += 1
-            except KeyError:
-                result = _cache[key] = f(*args, **kwds)
+            except _KeyError:
+                result = f(*args, **kwds)
+                cache[key] = result
                 wrapper.misses += 1
 
-                # purge least recently used cache contents
-                if _len(_cache) > maxcache:
-                    queue_popleft = queue.popleft
-                    while 1:
+                # purge least recently used cache entry
+                if _len(cache) > maxcache:
+                    key = queue_popleft()
+                    refcount[key] -= 1
+                    while refcount[key]:
                         key = queue_popleft()
-                        _refcount[key] -= 1
-                        if not _refcount[key]:
-                            del _cache[key]
-                            del _refcount[key]
-                            break
-    
+                        refcount[key] -= 1
+                    del cache[key]
+                    del refcount[key]
+
             # periodically compact the queue by eliminating duplicate keys
+            # while preserving order of most recent access
             if _len(queue) > maxqueue:
-                queue_popleft = queue.popleft
-                for _ in itertools.repeat(None, _len(queue)):
-                    key = queue_popleft()
-                    if _refcount[key] == 1:
-                        queue_append(key)
-                    else:
-                        _refcount[key] -= 1
+                queue_appendleft, _refcount = queue.appendleft, refcount
+                _refcount.clear()
+                queue_appendleft(sentinel)
+                for key in ifilterfalse(_refcount.__contains__,
+                                        iter(queue.pop, sentinel)):
+                    _refcount[key] = 1
+                    queue_appendleft(key)
 
             return result
+
         wrapper.hits = wrapper.misses = 0
         return wrapper
+
     return decorating_function
 
 
 if __name__ == '__main__':
-         
+
     @lru_cache(maxsize=20)
     def f(x, y):
         return 3*x+y

History