Welcome, guest | Sign In | My Account | Store | Cart
import collections
import functools
from itertools import ifilterfalse

class Counter(dict):
    'Mapping where default values are zero'
    def __missing__(self, key):
        return 0

def lru_cache(maxsize=100):
    '''Least-recently-used cache decorator.

    Arguments to the cached function must be hashable.
    Cache performance statistics stored in f.hits and f.misses.
    http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used

    '''
    maxqueue = maxsize * 10
    def decorating_function(user_function,
            len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError):
        cache = {}                  # mapping of args to results
        queue = collections.deque() # order that keys have been used
        refcount = Counter()        # times each key is in the queue
        sentinel = object()         # marker for looping around the queue

        # lookup optimizations (ugly but fast)
        queue_append, queue_popleft = queue.append, queue.popleft
        queue_appendleft, queue_pop = queue.appendleft, queue.pop

        @functools.wraps(user_function)
        def wrapper(*args, **kwds):
            # cache key records both positional and keyword args
            key = args
            if kwds:
                key += tuple(sorted(kwds.items()))

            # record recent use of this key
            queue_append(key)
            refcount[key] += 1

            # get cache entry or compute if not found
            try:
                result = cache[key]
                wrapper.hits += 1
            except KeyError:
                result = user_function(*args, **kwds)
                cache[key] = result
                wrapper.misses += 1

                # purge least recently used cache entry
                if len(cache) > maxsize:
                    key = queue_popleft()
                    refcount[key] -= 1
                    while refcount[key]:
                        key = queue_popleft()
                        refcount[key] -= 1
                    del cache[key], refcount[key]

            # periodically compact the queue by eliminating duplicate keys
            # while preserving order of most recent access
            if len(queue) > maxqueue:
                refcount.clear()
                queue_appendleft(sentinel)
                for key in ifilterfalse(refcount.__contains__,
                                        iter(queue_pop, sentinel)):
                    queue_appendleft(key)
                    refcount[key] = 1
                assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())

            return result
        wrapper.hits = wrapper.misses = 0
        return wrapper
    return decorating_function



if __name__ == '__main__':

    @lru_cache(maxsize=20)
    def f(x, y):
        return 3*x+y

    domain = range(5)
    from random import choice
    for i in range(1000):
        r = f(choice(domain), choice(domain))

    print(f.hits, f.misses)

Diff to Previous Revision

--- revision 4 2010-07-28 20:51:17
+++ revision 5 2010-07-30 19:44:38
@@ -2,36 +2,37 @@
 import functools
 from itertools import ifilterfalse
 
-class ZeroDict(dict):
+class Counter(dict):
     'Mapping where default values are zero'
     def __missing__(self, key):
         return 0
 
-def lru_cache(maxsize):
-    '''Decorator applying a least-recently-used cache with the given maximum size.
+def lru_cache(maxsize=100):
+    '''Least-recently-used cache decorator.
 
     Arguments to the cached function must be hashable.
     Cache performance statistics stored in f.hits and f.misses.
+    http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
 
     '''
-    maxcache = maxsize
     maxqueue = maxsize * 10
-    def decorating_function(f):
+    def decorating_function(user_function,
+            len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError):
         cache = {}                  # mapping of args to results
         queue = collections.deque() # order that keys have been used
-        refcount = ZeroDict()       # number of times each key is in the queue
+        refcount = Counter()        # times each key is in the queue
         sentinel = object()         # marker for looping around the queue
 
         # lookup optimizations (ugly but fast)
         queue_append, queue_popleft = queue.append, queue.popleft
-        _len, _tuple, _sorted, _KeyError = len, tuple, sorted, KeyError
+        queue_appendleft, queue_pop = queue.appendleft, queue.pop
 
-        @functools.wraps(f)
+        @functools.wraps(user_function)
         def wrapper(*args, **kwds):
             # cache key records both positional and keyword args
             key = args
             if kwds:
-                key += _tuple(_sorted(kwds.items()))
+                key += tuple(sorted(kwds.items()))
 
             # record recent use of this key
             queue_append(key)
@@ -41,38 +42,36 @@
             try:
                 result = cache[key]
                 wrapper.hits += 1
-            except _KeyError:
-                result = f(*args, **kwds)
+            except KeyError:
+                result = user_function(*args, **kwds)
                 cache[key] = result
                 wrapper.misses += 1
 
                 # purge least recently used cache entry
-                if _len(cache) > maxcache:
+                if len(cache) > maxsize:
                     key = queue_popleft()
                     refcount[key] -= 1
                     while refcount[key]:
                         key = queue_popleft()
                         refcount[key] -= 1
-                    del cache[key]
-                    del refcount[key]
+                    del cache[key], refcount[key]
 
             # periodically compact the queue by eliminating duplicate keys
             # while preserving order of most recent access
-            if _len(queue) > maxqueue:
-                queue_appendleft, _refcount = queue.appendleft, refcount
-                _refcount.clear()
+            if len(queue) > maxqueue:
+                refcount.clear()
                 queue_appendleft(sentinel)
-                for key in ifilterfalse(_refcount.__contains__,
-                                        iter(queue.pop, sentinel)):
-                    _refcount[key] = 1
+                for key in ifilterfalse(refcount.__contains__,
+                                        iter(queue_pop, sentinel)):
                     queue_appendleft(key)
+                    refcount[key] = 1
+                assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())
 
             return result
-
         wrapper.hits = wrapper.misses = 0
         return wrapper
+    return decorating_function
 
-    return decorating_function
 
 
 if __name__ == '__main__':
@@ -86,4 +85,4 @@
     for i in range(1000):
         r = f(choice(domain), choice(domain))
 
-    print f.hits, f.misses
+    print(f.hits, f.misses)

History