import collections
import functools
from itertools import ifilterfalse
class ZeroDict(dict):
'Mapping where default values are zero'
def __missing__(self, key):
return 0
def lru_cache(maxsize):
'''Decorator applying a least-recently-used cache with the given maximum size.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
'''
maxcache = maxsize
maxqueue = maxsize * 10
def decorating_function(f):
cache = {} # mapping of args to results
queue = collections.deque() # order that keys have been used
refcount = ZeroDict() # number of times each key is in the queue
sentinel = object() # marker for looping around the queue
# lookup optimizations (ugly but fast)
queue_append, queue_popleft = queue.append, queue.popleft
_len, _tuple, _sorted, _KeyError = len, tuple, sorted, KeyError
@functools.wraps(f)
def wrapper(*args, **kwds):
# cache key records both positional and keyword args
key = args
if kwds:
key += _tuple(_sorted(kwds.items()))
# record recent use of this key
queue_append(key)
refcount[key] += 1
# get cache entry or compute if not found
try:
result = cache[key]
wrapper.hits += 1
except _KeyError:
result = f(*args, **kwds)
cache[key] = result
wrapper.misses += 1
# purge least recently used cache entry
if _len(cache) > maxcache:
key = queue_popleft()
refcount[key] -= 1
while refcount[key]:
key = queue_popleft()
refcount[key] -= 1
del cache[key]
del refcount[key]
# periodically compact the queue by eliminating duplicate keys
# while preserving order of most recent access
if _len(queue) > maxqueue:
queue_appendleft, _refcount = queue.appendleft, refcount
_refcount.clear()
queue_appendleft(sentinel)
for key in ifilterfalse(_refcount.__contains__,
iter(queue.pop, sentinel)):
_refcount[key] = 1
queue_appendleft(key)
return result
wrapper.hits = wrapper.misses = 0
return wrapper
return decorating_function
if __name__ == '__main__':
@lru_cache(maxsize=20)
def f(x, y):
return 3*x+y
domain = range(5)
from random import choice
for i in range(1000):
r = f(choice(domain), choice(domain))
print f.hits, f.misses
Diff to Previous Revision
--- revision 3 2010-07-28 17:20:11
+++ revision 4 2010-07-28 20:51:17
@@ -1,8 +1,9 @@
import collections
-import itertools
import functools
+from itertools import ifilterfalse
class ZeroDict(dict):
+ 'Mapping where default values are zero'
def __missing__(self, key):
return 0
@@ -14,63 +15,68 @@
'''
maxcache = maxsize
- maxqueue = maxsize * 4
+ maxqueue = maxsize * 10
def decorating_function(f):
cache = {} # mapping of args to results
- queue = collections.deque() # order that keys have been accessed
+ queue = collections.deque() # order that keys have been used
refcount = ZeroDict() # number of times each key is in the queue
+ sentinel = object() # marker for looping around the queue
+
+ # lookup optimizations (ugly but fast)
+ queue_append, queue_popleft = queue.append, queue.popleft
+ _len, _tuple, _sorted, _KeyError = len, tuple, sorted, KeyError
+
@functools.wraps(f)
def wrapper(*args, **kwds):
-
- # localize variable access (ugly but fast)
- _cache=cache; _len=len; _refcount=refcount; queue_append=queue.append
-
- # cache key based on both args and keyword args
+ # cache key records both positional and keyword args
key = args
if kwds:
- key += tuple(sorted(kwds.iteritems()))
+ key += _tuple(_sorted(kwds.items()))
# record recent use of this key
queue_append(key)
- _refcount[key] += 1
+ refcount[key] += 1
# get cache entry or compute if not found
try:
- result = _cache[key]
+ result = cache[key]
wrapper.hits += 1
- except KeyError:
- result = _cache[key] = f(*args, **kwds)
+ except _KeyError:
+ result = f(*args, **kwds)
+ cache[key] = result
wrapper.misses += 1
- # purge least recently used cache contents
- if _len(_cache) > maxcache:
- queue_popleft = queue.popleft
- while 1:
+ # purge least recently used cache entry
+ if _len(cache) > maxcache:
+ key = queue_popleft()
+ refcount[key] -= 1
+ while refcount[key]:
key = queue_popleft()
- _refcount[key] -= 1
- if not _refcount[key]:
- del _cache[key]
- del _refcount[key]
- break
-
+ refcount[key] -= 1
+ del cache[key]
+ del refcount[key]
+
# periodically compact the queue by eliminating duplicate keys
+ # while preserving order of most recent access
if _len(queue) > maxqueue:
- queue_popleft = queue.popleft
- for _ in itertools.repeat(None, _len(queue)):
- key = queue_popleft()
- if _refcount[key] == 1:
- queue_append(key)
- else:
- _refcount[key] -= 1
+ queue_appendleft, _refcount = queue.appendleft, refcount
+ _refcount.clear()
+ queue_appendleft(sentinel)
+ for key in ifilterfalse(_refcount.__contains__,
+ iter(queue.pop, sentinel)):
+ _refcount[key] = 1
+ queue_appendleft(key)
return result
+
wrapper.hits = wrapper.misses = 0
return wrapper
+
return decorating_function
if __name__ == '__main__':
-
+
@lru_cache(maxsize=20)
def f(x, y):
return 3*x+y