Finding the maximum in a sliding window
Assume you are monitoring a network flow. At any given time Ti, there will be a network traffic volume Vi. Note that ti is not necessarily spaced evenly: there could be pauses between downloads, or sometimes multiple processes access the network at the same time, and so on.
The task is to find the maximum traffic volume Vi, max within a given time window w. For example, the task would be to find the maximum in the first 5 seconds. To make the task more complex, assume the window is sliding across the data: so seconds 1..5, 2..6, 3..7, 4..8 and so on.
Ask yourself: Do I understand the problem?
- Input is an array of pairs (Ti, Vi). By definition Ti must be increasing (unless we've just invented a time machine to go backwards in time), whereas Vi can be anything, really.
- Let's assume that Ti is in seconds, but that the actual data can be fractions of a second.
- Further input is a window size w: the time window we should be looking at. In our example above: 5 seconds
- Further input is an increment i: the speed which is used to slide the window across the data. In our example above: 1 second
- Output is not really specified. We should compute the maximum in each window, so maybe the best output would be a list of tuples (start time, stop time, max within that range).
Setting up a test harness
As usual, our first step is to prepare a testbed. What we want to do is verify that our solution is correct, and the easiest way to verify that is to compare it to a known solution. We do this by implementing the result list in various different ways and verifying that each algorithm returns the same results.
#! -*- Encoding: Latin-1 -*-
def selftest():
t0 = [0]
def create_timestamp(k):
result = t0[0]
t0[0] += random.random()
return result
A = [ (create_timestamp(k), random.random(), ) for k in range(1000)]
methods = { 'using brute force' : max_sliding_window_brute_force,
'using simple max-queue' : max_sliding_window_simaque,
'using stack-based max-queue' : max_sliding_window_stabaque
}
expected_result = None
for method_name in methods:
method = methods[method_name]
t0 = time.time()
received_result = method(A,5.0,1.0)
print "%30s took %.8f seconds" % (method_name, time.time() - t0, )
assert received_result is not None
if expected_result is None:
expected_result = received_result
else:
assert expected_result == received_result
if __name__ == "__main__":
selftest()
There is a slight trick here: the method create_timestamp
adds random values to the last timestamp, thus
ensuring that the sequence of timestamps is ascending (but random).
Using brute-force
What could brute-force mean here? Well, we need to slide the window across the data, and for each window calculate the max, duh! But if you think about it like this, you see it is really two problems:
- Finding the items that match the current window
- Finding the max in those items
It turns out that the first task is the same in all solutions: its the second task where the solutions differ.
def max_sliding_window_brute_force(A,window_size_in_seconds,step_in_seconds):
result = []
N = len(A)
# the start point of the current window
t0 = 0.0
# the end point of the current window
tmax = t0 + window_size_in_seconds
# index in the array where the current window starts
t0_index = 0
# visit each item in the input array
for i in xrange(N):
# if the timestamp of this item exceeds the window, then we've seen all items in the window
t = A[i][0]
if t >= tmax:
# calculate maximum in that window by testing each item from t0_index to i
m = A[t0_index][1]
for k in xrange(t0_index+1, i):
if A[k][1] > m:
m = A[k][1]
result.append( (t0, tmax, m, ) )
# move window up stepsize seconds
t0 += step_in_seconds
while A[t0_index][0] < t0:
t0_index += 1
tmax += step_in_seconds
return result
Some notes on this algorithm:
- The time complexity of this algorithm is O(NM), where M is the number of items in a (typical) window.
- The algorithm doesn't allocate any additional memory except for the one needed for holding the result.
- You may be tempted to rewrite the four lines that calculate the max somewhat like this:
but a) it wouldn't work (A is an array of tuples, not of volumes), and b) it would allocate additional memory and end up being (much) slower, so don't.
m = max(A[t0_index:k])
We need a max queue!
So what could we optimize here? Well, the way max is calculated. I should point out that for small window sizes (where small is defined as: having few entries falling in the time window) this algorithm is hard to beat. It breaks down when the window size passes a certain threshold: you can easily see that if M is "almost" N, the algorithm behaves more like O(N2). So we want to find a better way to calculate the maximum.
What are we really doing when we slide the window across the data? We're using something like an ADT queue: adding a few new elements at the top, and throwing a few old elements out. So what we need would be a queue-like data structure, that also keeps track of the current maximum.
Well, a trivial implementation would be to just use a regular queue and calculate the max of the items in the queue. But clearly that could be at best as efficient as our brute force solution, so we need to find something better.
A first idea can be seen by studying the following sequence:
Vi 2 5 9 3 4 Vi, max 9 9 9 4 4
The second table obviously somehow encodes the maximum. Let's add a new, higher number: 7. We adjust all previous maximum values less than the new maximum, like this:
Vi 2 5 9 3 4 7 Vi, max 9 9 9 7 7 7
If we add a lower number, it simply means that no other maximum values are adjusted.
Vi 2 5 9 3 4 7 2 1 Vi, max 9 9 9 7 7 7 2 1
And if a newer higher number comes, we adjust the lower numbers once again
Vi 2 5 9 3 4 7 2 1 6 Vi, max 9 9 9 7 7 7 6 6 6
What is the current maximum? It is always going to be V0, max. You can see that if you look what happens when you delete values in the list.
Vi 5 9 3 4 7 2 1 6 Vi, max 9 9 7 7 7 6 6 6 Vi 9 3 4 7 2 1 6 Vi, max 9 7 7 7 6 6 6 Vi 3 4 7 2 1 6 Vi, max 7 7 7 6 6 6
With that idea in mind, we can write our first max-queue, which we're going to call the "simple max queue"
class simple_max_queue(object):
def __init__(self):
# list of V<sub>i</sub>
self.data = []
# list of V<sub>i, max</sub>
self.maxi = []
def get_max(self):
return self.maxi[0]
def enqueue(self, value):
self.data.append( value )
self.maxi.append( value )
k = len(self.maxi)-2
while k >= 0:
if self.maxi[k] < value:
self.maxi[k] = value
k -= 1
else:
break
def dequeue(self):
result = self.data[0]
del self.data[0]
del self.maxi[0]
return result
Now, we can quite easily put that into the body of our algorithm found above:
def max_sliding_window_simaque(A,window_size_in_seconds,step_in_seconds):
result = []
N = len(A)
# the start point of the current window
t0 = 0.0
# the end point of the current window
tmax = t0 + window_size_in_seconds
# index in the array where the current window starts
t0_index = 0
# new: get a max queue
mq = simple_max_queue()
# visit each item in the input array
for i in xrange(N):
# if the timestamp of this item exceeds the window, then we've seen all items in the window
t = A[i][0]
# new: at items in window to the max-queue
if t < tmax:
mq.enqueue(A[i][1])
else:
result.append( (t0, tmax, mq.get_max(), ) )
# new: enqueue this item as well (easy to forget, but crucial!)
mq.enqueue(A[i][1])
# new dequeue items from the beginning of the time window
t0 += step_in_seconds
while A[t0_index][0] < t0:
t0_index += 1
v = mq.dequeue()
tmax += step_in_seconds
return result
- The time complexity of this algorithm is hard to say, because our max queue basically doesn't use linked lists, but python lists, and operations such as
del
will wreak havoc with much of Big-Oh semantics. But assume that a regular queue has O(1) for enqueue and dequeue - then we need to add the time needed for calculating the max on enqueue: which is O(m) (As a worst case, assume data that is only increasing in volume, so for each enqueue all items in the queue have to be adjusted). - The algorithm needs additional space for the queue, O(w) = O(window size)
Actually, there is a almost perverse way to get a max-queue that's better: using two max-stacks. Let's discuss a max-stack first, then the max-queue, and then see the results.
A Max-Stack
We can implement a very simple stack-class in Python
class stack(object):
def __init__(self):
self.data = []
def push(self, value):
self.data.append( value )
def pop(self):
result = self.data[-1]
del self.data[-1]
return result
If we want to keep track of the maximum of values in the stack, the easiest solution is to keep track of two stacks: the values, and the maximum values. Let's see how this looks like:
class max_stack(object):
def __init__(self):
self.data = []
self.maxi = []
def get_max(self):
return self.maxi[-1]
def push(self, value):
if self.data:
last_known_max = self.maxi[-1]
if value > last_known_max:
last_known_max = value
self.data.append( value )
self.maxi.append( last_known_max )
else:
self.data.append( value )
self.maxi.append( value )
def pop(self):
result = self.data[-1]
del self.data[-1]
del self.maxi[-1]
return result
As you can see, get_max
is O(1), and so is the push operation (well, within the limits of the Python language)
A Max-Queue based on Max-Stacks
A typical programming question asks to implement a queue using two stacks. By using two max-stacks rather than two regular ones, we trivially get a queue that also has a max operation.
class max_stack_based_max_queue(object):
def __init__(self):
self.stacks = [max_stack(), max_stack()]
def enqueue(self, value):
self.stacks[1].push(value)
def dequeue(self):
if len(self.stacks[0].data) == 0:
while len(self.stacks[1].data):
self.stacks[0].push(self.stacks[1].pop())
return self.stacks[0].pop()
def get_max(self):
if self.stacks[0].data:
if self.stacks[1].data:
return max(self.stacks[0].get_max(), self.stacks[1].get_max())
return self.stacks[0].get_max()
return self.stacks[1].get_max()
It is instructive to see that the code for the max-queue
is actually smaller than the code for a max-stack
. Now,
given this object it is trivial to change our code to add a third variant based on this type of queue:
def max_sliding_window_stabaque(A,window_size_in_seconds,step_in_seconds):
result = []
N = len(A)
# the start point of the current window
t0 = 0.0
# the end point of the current window
tmax = t0 + window_size_in_seconds
# index in the array where the current window starts
t0_index = 0
# new: get a max queue
mq = max_stack_based_max_queue()
# visit each item in the input array
for i in xrange(N):
# if the timestamp of this item exceeds the window, then we've seen all items in the window
t = A[i][0]
# new: at items in window to the max-queue
if t < tmax:
mq.enqueue(A[i][1])
else:
result.append( (t0, tmax, mq.get_max(), ) )
# new: enqueue this item as well (easy to forget, but crucial!)
mq.enqueue(A[i][1])
# new dequeue items from the beginning of the time window
t0 += step_in_seconds
while A[t0_index][0] < t0:
t0_index += 1
v = mq.dequeue()
tmax += step_in_seconds
return result
So the only change really is that we're now using a different max-queue class.
But how does it perform?
Unsurprisingly, performance very much depends on window size, step size, and input data size. The following table was calculated in an array with 50.000 items:
window size in seconds 5 50 500 5.000 using brute force 0.06800008 0.33299994 2.88400006 23.00000000 using simple max-queue 0.13399982 0.16500020 0.21699977 0.47900009 using stack-based max-queue 0.24600005 0.24499989 0.23000002 0.19899988
So for small window sizes, brute-force outperforms the queue approaches, but even for 50 items it fairs
worse than the other two solutions. Our simple max-queue fares better and holds its own, but it still fails if the
window size is large enough (why? because on average too many items need to be updated). The most complex solution using the
max_stack_based_max_queue
initially takes a hit because the code is so complex, but for large window sizes
it beats the other two approaches.