import collections
import itertools
import functools
class ZeroDict(dict):
def __missing__(self, key):
return 0
def lru_cache(maxsize):
'''Decorator applying a least-recently-used cache with the given maximum size.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
'''
maxcache = maxsize
maxqueue = maxsize * 4
def decorating_function(f):
cache = {} # mapping of args to results
queue = collections.deque() # order that keys have been accessed
refcount = ZeroDict() # number of times each key is in the queue
@functools.wraps(f)
def wrapper(*args, **kwds):
# localize variable access (ugly but fast)
_cache=cache; _len=len; _refcount=refcount; queue_append=queue.append
# cache key based on both args and keyword args
key = args
if kwds:
key += tuple(sorted(kwds.iteritems()))
# record recent use of this key
queue_append(key)
_refcount[key] += 1
# get cache entry or compute if not found
try:
result = _cache[key]
wrapper.hits += 1
except KeyError:
result = _cache[key] = f(*args, **kwds)
wrapper.misses += 1
# purge least recently used cache contents
if _len(_cache) > maxcache:
queue_popleft = queue.popleft
while 1:
key = queue_popleft()
_refcount[key] -= 1
if not _refcount[key]:
del _cache[key]
del _refcount[key]
break
# periodically compact the queue by eliminating duplicate keys
if _len(queue) > maxqueue:
queue_popleft = queue.popleft
for _ in itertools.repeat(None, _len(queue)):
key = queue_popleft()
if _refcount[key] == 1:
queue_append(key)
else:
_refcount[key] -= 1
return result
wrapper.hits = wrapper.misses = 0
return wrapper
return decorating_function
if __name__ == '__main__':
@lru_cache(maxsize=20)
def f(x, y):
return 3*x+y
domain = range(5)
from random import choice
for i in range(1000):
r = f(choice(domain), choice(domain))
print f.hits, f.misses
Diff to Previous Revision
--- revision 2 2006-11-06 00:45:26
+++ revision 3 2010-07-28 17:20:11
@@ -1,54 +1,69 @@
-from collections import deque
+import collections
+import itertools
+import functools
+
+class ZeroDict(dict):
+ def __missing__(self, key):
+ return 0
def lru_cache(maxsize):
'''Decorator applying a least-recently-used cache with the given maximum size.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
+
'''
+ maxcache = maxsize
+ maxqueue = maxsize * 4
def decorating_function(f):
- cache = {} # mapping of args to results
- queue = deque() # order that keys have been accessed
- refcount = {} # number of times each key is in the access queue
- def wrapper(*args):
+ cache = {} # mapping of args to results
+ queue = collections.deque() # order that keys have been accessed
+ refcount = ZeroDict() # number of times each key is in the queue
+ @functools.wraps(f)
+ def wrapper(*args, **kwds):
# localize variable access (ugly but fast)
- _cache=cache; _len=len; _refcount=refcount; _maxsize=maxsize
- queue_append=queue.append; queue_popleft = queue.popleft
+ _cache=cache; _len=len; _refcount=refcount; queue_append=queue.append
+
+ # cache key based on both args and keyword args
+ key = args
+ if kwds:
+ key += tuple(sorted(kwds.iteritems()))
+
+ # record recent use of this key
+ queue_append(key)
+ _refcount[key] += 1
# get cache entry or compute if not found
try:
- result = _cache[args]
+ result = _cache[key]
wrapper.hits += 1
except KeyError:
- result = _cache[args] = f(*args)
+ result = _cache[key] = f(*args, **kwds)
wrapper.misses += 1
- # record that this key was recently accessed
- queue_append(args)
- _refcount[args] = _refcount.get(args, 0) + 1
-
- # Purge least recently accessed cache contents
- while _len(_cache) > _maxsize:
- k = queue_popleft()
- _refcount[k] -= 1
- if not _refcount[k]:
- del _cache[k]
- del _refcount[k]
+ # purge least recently used cache contents
+ if _len(_cache) > maxcache:
+ queue_popleft = queue.popleft
+ while 1:
+ key = queue_popleft()
+ _refcount[key] -= 1
+ if not _refcount[key]:
+ del _cache[key]
+ del _refcount[key]
+ break
- # Periodically compact the queue by duplicate keys
- if _len(queue) > _maxsize * 4:
- for i in [None] * _len(queue):
- k = queue_popleft()
- if _refcount[k] == 1:
- queue_append(k)
+ # periodically compact the queue by eliminating duplicate keys
+ if _len(queue) > maxqueue:
+ queue_popleft = queue.popleft
+ for _ in itertools.repeat(None, _len(queue)):
+ key = queue_popleft()
+ if _refcount[key] == 1:
+ queue_append(key)
else:
- _refcount[k] -= 1
- assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())
+ _refcount[key] -= 1
return result
- wrapper.__doc__ = f.__doc__
- wrapper.__name__ = f.__name__
wrapper.hits = wrapper.misses = 0
return wrapper
return decorating_function