Rate Limiting

Formula

rate = (count_in_previous_window * weight) + count_in_current_window

where,

  • count_in_previous_window = number of requests in the previous window
  • count_in_current_window = number of requests in the current window
  • weight = (window_size_in_seconds - time_elapsed_in_current_window_in_seconds) / window_size_in_seconds

Code

import time

class RateLimiter:
    def __init__(self, limit, window_size=60):
        self.limit = limit
        self.window_size = window_size
        
        self.prev_count = 0
        self.curr_count = 0
        self.window_start = time.time()

    def allow(self):
        now = time.time()
        
        # Check if we have crossed into a new fixed window
        if now >= self.window_start + self.window_size:
            self.prev_count = self.curr_count
            self.curr_count = 0
            self.window_start = now

        # Calculate weight (how much of the previous window still 'overlaps')
        elapsed = now - self.window_start
        weight = (self.window_size - elapsed) / self.window_size
        
        # Estimate total requests in the sliding window
        estimated_count = (self.prev_count * weight) + self.curr_count

        if estimated_count >= self.limit:
            return False  # Rate limited!
            
        self.curr_count += 1
        return True