"""
Purpose: The main purpose is to demonstrate how to find the running median, mode and
mean over a sequence (list) of integers or reals or a mix of integers and reals.
The secondary purpose, is to inspire Python programmers to explore some of
the powerful packages (e.g. collections) available to the Python community and
to learn more about list comprehension and lambda functions.
Note:
1. Much of the code here has been taken from code posted to the web (e.g. stackoverflow)
by other Python programmers (e.g. Peter Otten)
Author: V. Stokes (vs@it.uu.se)
Version: 2013.03.06
"""
import numpy as np
#*******************************************************
from collections import deque,Counter
from bisect import insort, bisect_left
from itertools import islice
def RunningMode(seq,N,M):
"""
Purpose: Find the mode for the points in a sliding window as it
is moved from left (beginning of seq) to right (end of seq)
by one point at a time.
Inputs:
seq -- list containing items for which a running mode (in a sliding window) is
to be calculated
N -- length of sequence
M -- number of items in window (window size) -- must be an integer > 1
Otputs:
modes -- list of modes with size M - N + 1
Note:
1. The mode is the value that appears most often in a set of data.
2. In the case of ties it the last of the ties that is taken as the mode (this
is not by definition).
"""
# Load deque with first window of seq
d = deque(seq[0:M])
modes = [Counter(d).most_common(1)[0][0]] # contains mode of first window
# Now slide the window by one point to the right for each new position (each pass through
# the loop). Stop when the item in the right end of the deque contains the last item in seq
for item in islice(seq,M,N):
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
modes.append(Counter(d).most_common(1)[0][0])
return modes
def RunningMedian(seq, M):
"""
Purpose: Find the median for the points in a sliding window (odd number in size)
as it is moved from left to right by one point at a time.
Inputs:
seq -- list containing items for which a running median (in a sliding window)
is to be calculated
M -- number of items in window (window size) -- must be an integer > 1
Otputs:
medians -- list of medians with size N - M + 1
Note:
1. The median of a finite list of numbers is the "center" value when this list
is sorted in ascending order.
2. If M is an even number the two elements in the window that
are close to the center are averaged to give the median (this
is not by definition)
"""
seq = iter(seq)
s = []
m = M // 2
# Set up list s (to be sorted) and load deque with first window of seq
s = [item for item in islice(seq,M)]
d = deque(s)
# Simple lambda function to handle even/odd window sizes
median = lambda : s[m] if bool(M&1) else (s[m-1]+s[m])*0.5
# Sort it in increasing order and extract the median ("center" of the sorted window)
s.sort()
medians = [median()]
# Now slide the window by one point to the right for each new position (each pass through
# the loop). Stop when the item in the right end of the deque contains the last item in seq
for item in seq:
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
del s[bisect_left(s, old)] # locate insertion point and then remove old
insort(s, item) # insert newest such that new sort is not required
medians.append(median())
return medians
def RunningMean(seq,N,M):
"""
Purpose: Find the mean for the points in a sliding window (fixed size)
as it is moved from left to right by one point at a time.
Inputs:
seq -- list containing items for which a mean (in a sliding window) is
to be calculated (N items)
N -- length of sequence
M -- number of items in sliding window
Otputs:
means -- list of means with size N - M + 1
"""
# Load deque (d) with first window of seq
d = deque(seq[0:M])
means = [np.mean(d)] # contains mean of first window
# Now slide the window by one point to the right for each new position (each pass through
# the loop). Stop when the item in the right end of the deque contains the last item in seq
for item in islice(seq,M,N):
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
means.append(np.mean(d)) # mean for current window
return means
#*** Start of test area *****************************************************
#
# Set random seed for repeatability
#np.random.seed(7919) # the 1000th prime number
# Try the following sequences
#yn = np.random.random(18)*10 - 5
yn = [3,2,-1.0,2.0,3.0,5.0,-5.0,6.0,-5.0,4.0,9.0,6.3,1.3,0.0,-7.0,1.3,-5.0]
#yn = [3,2,-1.0,-1.0,-1.0,5.0,5.0]
#yn = [5,5.0,5,5.0,5]
#yn = [3,3,3,3,3,3,3]
#yn = [3.,3.,3.,3.,3.,3.,3.]
#yn = [-1,1,-1,1,-1,1]
#yn = [3,2,-1.0,2.0]
#yn = [5,3,2,2,-1]
#yn = [-5.0,3.0,2.0,2.0,-1.0]
N = len(yn)
# Try the follwing window sizes
#M = 1
#M = 2
M = 3
#M = 4
#M = 5
#M = 6
if M <= N and M >= 1:
print 'M = %2d,'%M,
print 'yn:'
print yn
means = RunningMean(yn,N,M)
print ' Means(%d):' %(N-M+1)
print means
medians = RunningMedian(yn,M)
print ' Medians(%d):' %(N-M+1)
print medians
modes = RunningMode(yn,N,M)
print ' Modes(%d):' %(N-M+1)
print modes
else:
print 'Window size (M=%d) out of range'%M