Rate Limiting
Formula
rate = (count_in_previous_window * weight) + count_in_current_window
where,
- count_in_previous_window = number of requests in the previous window
- count_in_current_window = number of requests in the current window
- weight = (window_size_in_seconds - time_elapsed_in_current_window_in_seconds) / window_size_in_seconds
Code
import time
class RateLimiter:
def __init__(self, limit, window_size=60):
self.limit = limit
self.window_size = window_size
self.prev_count = 0
self.curr_count = 0
self.window_start = time.time()
def allow(self):
now = time.time()
# Check if we have crossed into a new fixed window
if now >= self.window_start + self.window_size:
self.prev_count = self.curr_count
self.curr_count = 0
self.window_start = now
# Calculate weight (how much of the previous window still 'overlaps')
elapsed = now - self.window_start
weight = (self.window_size - elapsed) / self.window_size
# Estimate total requests in the sliding window
estimated_count = (self.prev_count * weight) + self.curr_count
if estimated_count >= self.limit:
return False # Rate limited!
self.curr_count += 1
return True