import asyncio from time import time class LeakyBucketLimiter: def __init__(self, rate: float, max_queue_size: int): self.rate = rate self.max_queue_size = max_queue_size self._calculate_delay() self._queue = asyncio.Queue(maxsize=max_queue_size) self._running = False self._worker_task = None self._monitor_task = None self.metrics = { "processed_count": 0, "total_dwell_time": 0.0, # Total time tasks waited in queue "dropped_tasks": 0 } def _calculate_delay(self): self.delay = 1.0 / self.rate @property def queue_depth(self) -> int: return self._queue.qsize() async def start(self): if not self._running: self._running = True self._worker_task = asyncio.create_task(self._worker()) self._monitor_task = asyncio.create_task(self._monitor()) print("Limiter started!") async def stop(self): self._running = False if self._worker_task: await self._worker_task print("Limiter stopped!") async def set_rate(self, new_rate: float): if new_rate <= 0: raise ValueError("Rate must be a positive number!") print(f"Changing rate from {self.rate} to {new_rate}...") self.rate = new_rate self._calculate_delay() async def _monitor(self): while self._running: await asyncio.sleep(2) avg_dwell = (self.metrics["total_dwell_time"] / self.metrics["processed_count"] if self.metrics["processed_count"] > 0 else 0) print("\n--- [METRICS REPORT] ---") print(f" Queue Depth: {self.queue_depth} tasks") print(f" Throughput: {self.metrics['processed_count']} tasks total") print(f" Avg Dwell: {avg_dwell:.2f} seconds") print(f" Drops: {self.metrics['dropped_tasks']}") print("-------------------------\n") async def _worker(self): while self._running or not self._queue.empty(): try: item = await asyncio.wait_for(self._queue.get(), timeout=1) func, future, arrival_time = item dwell_time = time() - arrival_time self.metrics["total_dwell_time"] += dwell_time await asyncio.sleep(self.delay) try: result = await func() future.set_result(result) except Exception as e: future.set_exception(e) finally: self.metrics["processed_count"] += 1 self._queue.task_done() except asyncio.TimeoutError: continue except Exception as e: print(f"Worker error!\nError: {e}") async def execute(self, func, *args, **kwargs): if not self._running: raise RuntimeError("Limiter is not running! Call .start() first.") # if self._queue.full(): # raise RuntimeError(f"Queue is full! Retry in {self.delay:.2f} seconds.") loop = asyncio.get_running_loop() future = loop.create_future() arrival_time = time() payload = (lambda: func(*args, **kwargs), future, arrival_time) await self._queue.put(payload) return await future async def mock_external_api_call(req_info: str): pass # await asyncio.sleep(0.000001) async def make_requests(number: int, limiter_rate: float, limiter_max_queue_size: int): limiter = LeakyBucketLimiter(limiter_rate, limiter_max_queue_size) await limiter.start() start_time = time() tasks = [] for i in range(number): task = asyncio.create_task(limiter.execute(mock_external_api_call, f"REQ: {i}")) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) end_time = time() success = 0 failures = 0 for r in results: if isinstance(r, Exception): failures += 1 continue success += 1 execution_time = end_time - start_time effective_rate = success / execution_time print( f"Total execution time: {execution_time:.2f}\nEffective rate: {effective_rate:.2f}" ) print(f"Operation complete with {success} successful requests and {failures} failures.") await limiter.stop() if __name__ == "__main__": asyncio.run(make_requests(1000, 20, 60))