python-learning/rate_limiters/token_bucket.py

83 lines
2.3 KiB
Python
Raw Normal View History

import threading
from time import sleep, time
class RateLimiterException(Exception):
pass
class RateLimitExceeded(RateLimiterException):
def __init__(self, retry_after: float, current_tokens: float):
self.retry_after = retry_after
self.current_tokens = current_tokens
super().__init__(f"Rate exceeded! Retry after {retry_after:.2f} seconds.")
class TokenBucketLimiter:
def __init__(self, capacity: int, refill_rate: int):
self.capacity = float(capacity)
self.refill_rate = float(refill_rate)
self.tokens = float(capacity)
self.last_refill_time = time()
self._lock = threading.Lock()
def __enter__(self):
with self._lock:
now = time()
elapsed = now - self.last_refill_time
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.tokens + new_tokens, self.capacity)
self.last_refill_time = time()
if self.tokens < 1:
retry_after = (1 - self.tokens) / self.refill_rate
raise RateLimitExceeded(retry_after, self.tokens)
self.tokens -= 1
return self
def __exit__(self, exc_type, exc_value, exc_traceback) -> bool:
return False
def make_requests(number: int, limiter: TokenBucketLimiter) -> None:
start_time = time()
successful = 0
rate_limited = 0
failed = 0
print(f"Starting request cycle with {number} requests...")
for i in range(1, number):
try:
with limiter:
successful += 1
except RateLimitExceeded as e:
rate_limited += 1
sleep(e.retry_after)
try:
with limiter:
successful += 1
except RuntimeError:
failed += 1
print("Retry failed! Request limit reached!")
end_time = time()
execution_time = end_time - start_time
effective_rate = number / execution_time
print(
f"Results: \n\nSuccessfull: {successful}\nRate limited: {rate_limited}\nFailed: {failed}\n"
)
print(
f"Total execution time: {execution_time:.2f}\nEffective rate: {effective_rate:.2f}"
)
# limiter = TokenBucketLimiter(10, 10)
# make_requests(100, limiter)