import asyncio import threading from time import sleep, time class RateLimiterException(Exception): pass class RateLimitExceeded(RateLimiterException): def __init__(self, retry_after: float, current_tokens: float): self.retry_after = retry_after self.current_tokens = current_tokens super().__init__(f"Rate exceeded! Retry after {retry_after:.2f} seconds.") class TokenBucketLimiter: def __init__(self, capacity: int, refill_rate: int): self.capacity = float(capacity) self.refill_rate = float(refill_rate) self.tokens = float(capacity) self.last_refill_time = time() self._lock = threading.Lock() def __enter__(self): with self._lock: now = time() elapsed = now - self.last_refill_time new_tokens = elapsed * self.refill_rate self.tokens = min(self.tokens + new_tokens, self.capacity) self.last_refill_time = time() if self.tokens < 1: retry_after = (1 - self.tokens) / self.refill_rate raise RateLimitExceeded(retry_after, self.tokens) self.tokens -= 1 return self def __exit__(self, exc_type, exc_value, exc_traceback) -> bool: return False class AsyncTokenBucketLimiter: def __init__(self, capacity: int, refill_rate: int): self.capacity = float(capacity) self.refill_rate = float(refill_rate) self.tokens = float(capacity) self.last_refill_time = time() self._lock = asyncio.Lock() async def __aenter__(self): async with self._lock: now = time() elapsed = now - self.last_refill_time new_tokens = elapsed * self.refill_rate self.tokens = min(self.tokens + new_tokens, self.capacity) self.last_refill_time = time() if self.tokens < 1: retry_after = (1 - self.tokens) / self.refill_rate raise RateLimitExceeded(retry_after, self.tokens) self.tokens -= 1 return self async def __aexit__(self, exc_type, exc_value, exc_traceback) -> bool: return False def make_requests(number: int, limiter: TokenBucketLimiter) -> None: start_time = time() successful = 0 rate_limited = 0 failed = 0 print(f"Starting request cycle with {number} requests...") for i in range(1, number): try: with limiter: successful += 1 except RateLimitExceeded as e: rate_limited += 1 sleep(e.retry_after) try: with limiter: successful += 1 except RuntimeError: failed += 1 print("Retry failed! Request limit reached!") end_time = time() execution_time = end_time - start_time effective_rate = number / execution_time print( f"Results: \n\nSuccessfull: {successful}\nRate limited: {rate_limited}\nFailed: {failed}\n" ) print( f"Total execution time: {execution_time:.2f}\nEffective rate: {effective_rate:.2f}" ) # limiter = TokenBucketLimiter(10, 10) # make_requests(100, limiter)