Welcome, guest | Sign In | My Account | Store | Cart
import collections
import itertools
import functools

class ZeroDict(dict):
    def __missing__(self, key):
        return 0

def lru_cache(maxsize):
    '''Decorator applying a least-recently-used cache with the given maximum size.

    Arguments to the cached function must be hashable.
    Cache performance statistics stored in f.hits and f.misses.

    '''
    maxcache = maxsize
    maxqueue = maxsize * 4
    def decorating_function(f):
        cache = {}                  # mapping of args to results
        queue = collections.deque() # order that keys have been accessed
        refcount = ZeroDict()       # number of times each key is in the queue
        @functools.wraps(f)
        def wrapper(*args, **kwds):
            
            # localize variable access (ugly but fast)
            _cache=cache; _len=len; _refcount=refcount; queue_append=queue.append

            # cache key based on both args and keyword args
            key = args
            if kwds:
                key += tuple(sorted(kwds.iteritems()))

            # record recent use of this key
            queue_append(key)
            _refcount[key] += 1

            # get cache entry or compute if not found
            try:
                result = _cache[key]
                wrapper.hits += 1
            except KeyError:
                result = _cache[key] = f(*args, **kwds)
                wrapper.misses += 1

                # purge least recently used cache contents
                if _len(_cache) > maxcache:
                    queue_popleft = queue.popleft
                    while 1:
                        key = queue_popleft()
                        _refcount[key] -= 1
                        if not _refcount[key]:
                            del _cache[key]
                            del _refcount[key]
                            break
    
            # periodically compact the queue by eliminating duplicate keys
            if _len(queue) > maxqueue:
                queue_popleft = queue.popleft
                for _ in itertools.repeat(None, _len(queue)):
                    key = queue_popleft()
                    if _refcount[key] == 1:
                        queue_append(key)
                    else:
                        _refcount[key] -= 1

            return result
        wrapper.hits = wrapper.misses = 0
        return wrapper
    return decorating_function


if __name__ == '__main__':
         
    @lru_cache(maxsize=20)
    def f(x, y):
        return 3*x+y

    domain = range(5)
    from random import choice
    for i in range(1000):
        r = f(choice(domain), choice(domain))

    print f.hits, f.misses

Diff to Previous Revision

--- revision 2 2006-11-06 00:45:26
+++ revision 3 2010-07-28 17:20:11
@@ -1,54 +1,69 @@
-from collections import deque
+import collections
+import itertools
+import functools
+
+class ZeroDict(dict):
+    def __missing__(self, key):
+        return 0
 
 def lru_cache(maxsize):
     '''Decorator applying a least-recently-used cache with the given maximum size.
 
     Arguments to the cached function must be hashable.
     Cache performance statistics stored in f.hits and f.misses.
+
     '''
+    maxcache = maxsize
+    maxqueue = maxsize * 4
     def decorating_function(f):
-        cache = {}              # mapping of args to results
-        queue = deque()         # order that keys have been accessed
-        refcount = {}           # number of times each key is in the access queue
-        def wrapper(*args):
+        cache = {}                  # mapping of args to results
+        queue = collections.deque() # order that keys have been accessed
+        refcount = ZeroDict()       # number of times each key is in the queue
+        @functools.wraps(f)
+        def wrapper(*args, **kwds):
             
             # localize variable access (ugly but fast)
-            _cache=cache; _len=len; _refcount=refcount; _maxsize=maxsize
-            queue_append=queue.append; queue_popleft = queue.popleft
+            _cache=cache; _len=len; _refcount=refcount; queue_append=queue.append
+
+            # cache key based on both args and keyword args
+            key = args
+            if kwds:
+                key += tuple(sorted(kwds.iteritems()))
+
+            # record recent use of this key
+            queue_append(key)
+            _refcount[key] += 1
 
             # get cache entry or compute if not found
             try:
-                result = _cache[args]
+                result = _cache[key]
                 wrapper.hits += 1
             except KeyError:
-                result = _cache[args] = f(*args)
+                result = _cache[key] = f(*args, **kwds)
                 wrapper.misses += 1
 
-            # record that this key was recently accessed
-            queue_append(args)
-            _refcount[args] = _refcount.get(args, 0) + 1
-
-            # Purge least recently accessed cache contents
-            while _len(_cache) > _maxsize:
-                k = queue_popleft()
-                _refcount[k] -= 1
-                if not _refcount[k]:
-                    del _cache[k]
-                    del _refcount[k]
+                # purge least recently used cache contents
+                if _len(_cache) > maxcache:
+                    queue_popleft = queue.popleft
+                    while 1:
+                        key = queue_popleft()
+                        _refcount[key] -= 1
+                        if not _refcount[key]:
+                            del _cache[key]
+                            del _refcount[key]
+                            break
     
-            # Periodically compact the queue by duplicate keys
-            if _len(queue) > _maxsize * 4:
-                for i in [None] * _len(queue):
-                    k = queue_popleft()
-                    if _refcount[k] == 1:
-                        queue_append(k)
+            # periodically compact the queue by eliminating duplicate keys
+            if _len(queue) > maxqueue:
+                queue_popleft = queue.popleft
+                for _ in itertools.repeat(None, _len(queue)):
+                    key = queue_popleft()
+                    if _refcount[key] == 1:
+                        queue_append(key)
                     else:
-                        _refcount[k] -= 1
-                assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())
+                        _refcount[key] -= 1
 
             return result
-        wrapper.__doc__ = f.__doc__
-        wrapper.__name__ = f.__name__
         wrapper.hits = wrapper.misses = 0
         return wrapper
     return decorating_function

History