import collections
import functools
from itertools import ifilterfalse
class Counter(dict):
'Mapping where default values are zero'
def __missing__(self, key):
return 0
def lru_cache(maxsize=100):
'''Least-recently-used cache decorator.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
'''
maxqueue = maxsize * 10
def decorating_function(user_function,
len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError):
cache = {} # mapping of args to results
queue = collections.deque() # order that keys have been used
refcount = Counter() # times each key is in the queue
sentinel = object() # marker for looping around the queue
# lookup optimizations (ugly but fast)
queue_append, queue_popleft = queue.append, queue.popleft
queue_appendleft, queue_pop = queue.appendleft, queue.pop
@functools.wraps(user_function)
def wrapper(*args, **kwds):
# cache key records both positional and keyword args
key = args
if kwds:
key += tuple(sorted(kwds.items()))
# record recent use of this key
queue_append(key)
refcount[key] += 1
# get cache entry or compute if not found
try:
result = cache[key]
wrapper.hits += 1
except KeyError:
result = user_function(*args, **kwds)
cache[key] = result
wrapper.misses += 1
# purge least recently used cache entry
if len(cache) > maxsize:
key = queue_popleft()
refcount[key] -= 1
while refcount[key]:
key = queue_popleft()
refcount[key] -= 1
del cache[key], refcount[key]
# periodically compact the queue by eliminating duplicate keys
# while preserving order of most recent access
if len(queue) > maxqueue:
refcount.clear()
queue_appendleft(sentinel)
for key in ifilterfalse(refcount.__contains__,
iter(queue_pop, sentinel)):
queue_appendleft(key)
refcount[key] = 1
assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())
return result
wrapper.hits = wrapper.misses = 0
return wrapper
return decorating_function
if __name__ == '__main__':
@lru_cache(maxsize=20)
def f(x, y):
return 3*x+y
domain = range(5)
from random import choice
for i in range(1000):
r = f(choice(domain), choice(domain))
print(f.hits, f.misses)
Diff to Previous Revision
--- revision 4 2010-07-28 20:51:17
+++ revision 5 2010-07-30 19:44:38
@@ -2,36 +2,37 @@
import functools
from itertools import ifilterfalse
-class ZeroDict(dict):
+class Counter(dict):
'Mapping where default values are zero'
def __missing__(self, key):
return 0
-def lru_cache(maxsize):
- '''Decorator applying a least-recently-used cache with the given maximum size.
+def lru_cache(maxsize=100):
+ '''Least-recently-used cache decorator.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
+ http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
'''
- maxcache = maxsize
maxqueue = maxsize * 10
- def decorating_function(f):
+ def decorating_function(user_function,
+ len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError):
cache = {} # mapping of args to results
queue = collections.deque() # order that keys have been used
- refcount = ZeroDict() # number of times each key is in the queue
+ refcount = Counter() # times each key is in the queue
sentinel = object() # marker for looping around the queue
# lookup optimizations (ugly but fast)
queue_append, queue_popleft = queue.append, queue.popleft
- _len, _tuple, _sorted, _KeyError = len, tuple, sorted, KeyError
+ queue_appendleft, queue_pop = queue.appendleft, queue.pop
- @functools.wraps(f)
+ @functools.wraps(user_function)
def wrapper(*args, **kwds):
# cache key records both positional and keyword args
key = args
if kwds:
- key += _tuple(_sorted(kwds.items()))
+ key += tuple(sorted(kwds.items()))
# record recent use of this key
queue_append(key)
@@ -41,38 +42,36 @@
try:
result = cache[key]
wrapper.hits += 1
- except _KeyError:
- result = f(*args, **kwds)
+ except KeyError:
+ result = user_function(*args, **kwds)
cache[key] = result
wrapper.misses += 1
# purge least recently used cache entry
- if _len(cache) > maxcache:
+ if len(cache) > maxsize:
key = queue_popleft()
refcount[key] -= 1
while refcount[key]:
key = queue_popleft()
refcount[key] -= 1
- del cache[key]
- del refcount[key]
+ del cache[key], refcount[key]
# periodically compact the queue by eliminating duplicate keys
# while preserving order of most recent access
- if _len(queue) > maxqueue:
- queue_appendleft, _refcount = queue.appendleft, refcount
- _refcount.clear()
+ if len(queue) > maxqueue:
+ refcount.clear()
queue_appendleft(sentinel)
- for key in ifilterfalse(_refcount.__contains__,
- iter(queue.pop, sentinel)):
- _refcount[key] = 1
+ for key in ifilterfalse(refcount.__contains__,
+ iter(queue_pop, sentinel)):
queue_appendleft(key)
+ refcount[key] = 1
+ assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())
return result
-
wrapper.hits = wrapper.misses = 0
return wrapper
+ return decorating_function
- return decorating_function
if __name__ == '__main__':
@@ -86,4 +85,4 @@
for i in range(1000):
r = f(choice(domain), choice(domain))
- print f.hits, f.misses
+ print(f.hits, f.misses)