python-learning/rate_limiters/leaky_bucket.py

158 lines
4.5 KiB
Python
Raw Normal View History

import asyncio
from time import time
class LeakyBucketLimiter:
def __init__(self, rate: float, max_queue_size: int):
self.rate = rate
self.max_queue_size = max_queue_size
self._calculate_delay()
self._queue = asyncio.Queue(maxsize=max_queue_size)
self._running = False
self._worker_task = None
self._monitor_task = None
self.metrics = {
"processed_count": 0,
"total_dwell_time": 0.0, # Total time tasks waited in queue
"dropped_tasks": 0
}
def _calculate_delay(self):
self.delay = 1.0 / self.rate
@property
def queue_depth(self) -> int:
return self._queue.qsize()
async def start(self):
if not self._running:
self._running = True
self._worker_task = asyncio.create_task(self._worker())
self._monitor_task = asyncio.create_task(self._monitor())
print("Limiter started!")
async def stop(self):
self._running = False
if self._worker_task:
await self._worker_task
print("Limiter stopped!")
async def set_rate(self, new_rate: float):
if new_rate <= 0:
raise ValueError("Rate must be a positive number!")
print(f"Changing rate from {self.rate} to {new_rate}...")
self.rate = new_rate
self._calculate_delay()
async def _monitor(self):
while self._running:
await asyncio.sleep(2)
avg_dwell = (self.metrics["total_dwell_time"] / self.metrics["processed_count"]
if self.metrics["processed_count"] > 0 else 0)
print("\n--- [METRICS REPORT] ---")
print(f" Queue Depth: {self.queue_depth} tasks")
print(f" Throughput: {self.metrics['processed_count']} tasks total")
print(f" Avg Dwell: {avg_dwell:.2f} seconds")
print(f" Drops: {self.metrics['dropped_tasks']}")
print("-------------------------\n")
async def _worker(self):
while self._running or not self._queue.empty():
try:
item = await asyncio.wait_for(self._queue.get(), timeout=1)
func, future, arrival_time = item
dwell_time = time() - arrival_time
self.metrics["total_dwell_time"] += dwell_time
await asyncio.sleep(self.delay)
try:
result = await func()
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
self.metrics["processed_count"] += 1
self._queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Worker error!\nError: {e}")
async def execute(self, func, *args, **kwargs):
if not self._running:
raise RuntimeError("Limiter is not running! Call .start() first.")
# if self._queue.full():
# raise RuntimeError(f"Queue is full! Retry in {self.delay:.2f} seconds.")
loop = asyncio.get_running_loop()
future = loop.create_future()
arrival_time = time()
payload = (lambda: func(*args, **kwargs), future, arrival_time)
await self._queue.put(payload)
return await future
async def mock_external_api_call(req_info: str):
pass
# await asyncio.sleep(0.000001)
async def make_requests(number: int, limiter_rate: float, limiter_max_queue_size: int):
limiter = LeakyBucketLimiter(limiter_rate, limiter_max_queue_size)
await limiter.start()
start_time = time()
tasks = []
for i in range(number):
task = asyncio.create_task(limiter.execute(mock_external_api_call, f"REQ: {i}"))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time()
success = 0
failures = 0
for r in results:
if isinstance(r, Exception):
failures += 1
continue
success += 1
execution_time = end_time - start_time
effective_rate = success / execution_time
print(
f"Total execution time: {execution_time:.2f}\nEffective rate: {effective_rate:.2f}"
)
print(f"Operation complete with {success} successful requests and {failures} failures.")
await limiter.stop()
if __name__ == "__main__":
asyncio.run(make_requests(1000, 20, 60))