Finding the maximum in a sliding window

Assume you are monitoring a network flow. At any given time Ti, there will be a network traffic volume Vi. Note that ti is not necessarily spaced evenly: there could be pauses between downloads, or sometimes multiple processes access the network at the same time, and so on.

The task is to find the maximum traffic volume Vi, max within a given time window w. For example, the task would be to find the maximum in the first 5 seconds. To make the task more complex, assume the window is sliding across the data: so seconds 1..5, 2..6, 3..7, 4..8 and so on.

Ask yourself: Do I understand the problem?

Setting up a test harness

As usual, our first step is to prepare a testbed. What we want to do is verify that our solution is correct, and the easiest way to verify that is to compare it to a known solution. We do this by implementing the result list in various different ways and verifying that each algorithm returns the same results.

#! -*- Encoding: Latin-1 -*-

def selftest():
    t0 = [0]
    def create_timestamp(k):
        result = t0[0]
        t0[0] += random.random()
        return result

    A = [ (create_timestamp(k), random.random(), ) for k in range(1000)]

    methods = { 'using brute force' : max_sliding_window_brute_force,
                'using simple max-queue' : max_sliding_window_simaque,
                'using stack-based max-queue' : max_sliding_window_stabaque
              }

    expected_result = None
    for method_name in methods:
        method = methods[method_name]
        t0 = time.time()
        received_result = method(A,5.0,1.0)
        print "%30s took %.8f seconds" % (method_name, time.time() - t0, )
        assert received_result is not None
        if expected_result is None:
            expected_result = received_result
        else:
            assert expected_result == received_result

if __name__ == "__main__":
    selftest()

There is a slight trick here: the method create_timestamp adds random values to the last timestamp, thus ensuring that the sequence of timestamps is ascending (but random).

Using brute-force

What could brute-force mean here? Well, we need to slide the window across the data, and for each window calculate the max, duh! But if you think about it like this, you see it is really two problems:

  1. Finding the items that match the current window
  2. Finding the max in those items

It turns out that the first task is the same in all solutions: its the second task where the solutions differ.

def max_sliding_window_brute_force(A,window_size_in_seconds,step_in_seconds):
    result = []
    N = len(A)

    # the start point of the current window
    t0 = 0.0

    # the end point of the current window
    tmax = t0 + window_size_in_seconds

    # index in the array where the current window starts
    t0_index = 0

    # visit each item in the input array
    for i in xrange(N):

        # if the timestamp of this item exceeds the window, then we've seen all items in the window
        t = A[i][0]
        if t >= tmax:

            # calculate maximum in that window by testing each item from t0_index to i
            m = A[t0_index][1]
            for k in xrange(t0_index+1, i):
                if A[k][1] > m:
                    m = A[k][1]

            result.append( (t0, tmax, m, ) )

            # move window up stepsize seconds
            t0 += step_in_seconds
            while A[t0_index][0] < t0:
                t0_index += 1
            tmax += step_in_seconds

    return result

Some notes on this algorithm:

We need a max queue!

So what could we optimize here? Well, the way max is calculated. I should point out that for small window sizes (where small is defined as: having few entries falling in the time window) this algorithm is hard to beat. It breaks down when the window size passes a certain threshold: you can easily see that if M is "almost" N, the algorithm behaves more like O(N2). So we want to find a better way to calculate the maximum.

What are we really doing when we slide the window across the data? We're using something like an ADT queue: adding a few new elements at the top, and throwing a few old elements out. So what we need would be a queue-like data structure, that also keeps track of the current maximum.

Well, a trivial implementation would be to just use a regular queue and calculate the max of the items in the queue. But clearly that could be at best as efficient as our brute force solution, so we need to find something better.

A first idea can be seen by studying the following sequence:

    Vi        2   5   9   3   4   
    Vi, max    9   9   9   4   4

The second table obviously somehow encodes the maximum. Let's add a new, higher number: 7. We adjust all previous maximum values less than the new maximum, like this:

    Vi        2   5   9   3   4   7
    Vi, max    9   9   9   7   7   7

If we add a lower number, it simply means that no other maximum values are adjusted.

    Vi        2   5   9   3   4   7   2   1
    Vi, max    9   9   9   7   7   7   2   1

And if a newer higher number comes, we adjust the lower numbers once again

    Vi        2   5   9   3   4   7   2   1   6
    Vi, max    9   9   9   7   7   7   6   6   6

What is the current maximum? It is always going to be V0, max. You can see that if you look what happens when you delete values in the list.

    Vi        5   9   3   4   7   2   1   6
    Vi, max    9   9   7   7   7   6   6   6
    Vi        9   3   4   7   2   1   6
    Vi, max    9   7   7   7   6   6   6
    Vi        3   4   7   2   1   6
    Vi, max    7   7   7   6   6   6

With that idea in mind, we can write our first max-queue, which we're going to call the "simple max queue"

class simple_max_queue(object):
    def __init__(self):

        # list of V<sub>i</sub>
        self.data = []

        # list of V<sub>i, max</sub>
        self.maxi = []
        
    def get_max(self):
        return self.maxi[0]
        
    def enqueue(self, value):
        self.data.append( value )
        self.maxi.append( value )
       
        k = len(self.maxi)-2
        
        while k >= 0:
            if self.maxi[k] < value:
                self.maxi[k] = value
                k -= 1
            else:
                break
        
    def dequeue(self):
        result = self.data[0]
        del self.data[0]
        del self.maxi[0]
        return result

Now, we can quite easily put that into the body of our algorithm found above:

def max_sliding_window_simaque(A,window_size_in_seconds,step_in_seconds):

    result = []
    N = len(A)

    # the start point of the current window
    t0 = 0.0

    # the end point of the current window
    tmax = t0 + window_size_in_seconds

    # index in the array where the current window starts
    t0_index = 0

    # new: get a max queue
    mq = simple_max_queue()

    # visit each item in the input array
    for i in xrange(N):

        # if the timestamp of this item exceeds the window, then we've seen all items in the window
        t = A[i][0]        
        
        # new: at items in window to the max-queue
        if t < tmax:
            mq.enqueue(A[i][1])
            
        else:
            result.append( (t0, tmax, mq.get_max(), ) )
            
            # new: enqueue this item as well (easy to forget, but crucial!)
            mq.enqueue(A[i][1])
            
            # new dequeue items from the beginning of the time window
            t0 += step_in_seconds
            while A[t0_index][0] < t0:
                t0_index += 1
                v = mq.dequeue()
            tmax += step_in_seconds

    return result

Actually, there is a almost perverse way to get a max-queue that's better: using two max-stacks. Let's discuss a max-stack first, then the max-queue, and then see the results.

A Max-Stack

We can implement a very simple stack-class in Python

class stack(object):

    def __init__(self):
        self.data = []

    def push(self, value):
        self.data.append( value )

    def pop(self):
        result = self.data[-1]
        del self.data[-1]
        return result

If we want to keep track of the maximum of values in the stack, the easiest solution is to keep track of two stacks: the values, and the maximum values. Let's see how this looks like:

class max_stack(object):

    def __init__(self):
        self.data = []
        self.maxi = []

    def get_max(self):
        return self.maxi[-1]

    def push(self, value):
        if self.data:
            last_known_max = self.maxi[-1]
            if value > last_known_max:
                last_known_max = value
            self.data.append( value )
            self.maxi.append( last_known_max )

        else:
            self.data.append( value )
            self.maxi.append( value )

    def pop(self):
        result = self.data[-1]
        del self.data[-1]
        del self.maxi[-1]
        return result

As you can see, get_max is O(1), and so is the push operation (well, within the limits of the Python language)

A Max-Queue based on Max-Stacks

A typical programming question asks to implement a queue using two stacks. By using two max-stacks rather than two regular ones, we trivially get a queue that also has a max operation.

class max_stack_based_max_queue(object):
    def __init__(self):
        self.stacks = [max_stack(), max_stack()]

    def enqueue(self, value):
        self.stacks[1].push(value)

    def dequeue(self):
        if len(self.stacks[0].data) == 0:
            while len(self.stacks[1].data):
                self.stacks[0].push(self.stacks[1].pop())

        return self.stacks[0].pop()

    def get_max(self):
        if self.stacks[0].data:
            if self.stacks[1].data:
                return max(self.stacks[0].get_max(), self.stacks[1].get_max())
            return self.stacks[0].get_max()
        return self.stacks[1].get_max()

It is instructive to see that the code for the max-queue is actually smaller than the code for a max-stack. Now, given this object it is trivial to change our code to add a third variant based on this type of queue:

def max_sliding_window_stabaque(A,window_size_in_seconds,step_in_seconds):

    result = []
    N = len(A)

    # the start point of the current window
    t0 = 0.0

    # the end point of the current window
    tmax = t0 + window_size_in_seconds

    # index in the array where the current window starts
    t0_index = 0

    # new: get a max queue    
    mq = max_stack_based_max_queue()

    # visit each item in the input array
    for i in xrange(N):

        # if the timestamp of this item exceeds the window, then we've seen all items in the window
        t = A[i][0]        
        
        # new: at items in window to the max-queue
        if t < tmax:
            mq.enqueue(A[i][1])
            
        else:
            result.append( (t0, tmax, mq.get_max(), ) )
            
            # new: enqueue this item as well (easy to forget, but crucial!)
            mq.enqueue(A[i][1])
            
            # new dequeue items from the beginning of the time window
            t0 += step_in_seconds
            while A[t0_index][0] < t0:
                t0_index += 1
                v = mq.dequeue()
            tmax += step_in_seconds

    return result

So the only change really is that we're now using a different max-queue class.

But how does it perform?

Unsurprisingly, performance very much depends on window size, step size, and input data size. The following table was calculated in an array with 50.000 items:

    
        window size in seconds         5            50            500          5.000
             using brute force    0.06800008    0.33299994    2.88400006   23.00000000
        using simple max-queue    0.13399982    0.16500020    0.21699977    0.47900009
   using stack-based max-queue    0.24600005    0.24499989    0.23000002    0.19899988    

So for small window sizes, brute-force outperforms the queue approaches, but even for 50 items it fairs worse than the other two solutions. Our simple max-queue fares better and holds its own, but it still fails if the window size is large enough (why? because on average too many items need to be updated). The most complex solution using the max_stack_based_max_queue initially takes a hit because the code is so complex, but for large window sizes it beats the other two approaches.