python-learning/rate_limiters/tests/test_leaky_bucket.py

30 lines
592 B
Python
Raw Normal View History

import asyncio
from time import time
import pytest
from leaky_bucket import LeakyBucketLimiter
2026-05-09 20:17:23 +00:00
@pytest.mark.asyncio
async def test_duration() -> None:
async def test_function():
pass
limiter = LeakyBucketLimiter(10, 100)
await limiter.start()
start = time()
tasks = [asyncio.create_task(limiter.execute(test_function)) for i in range(20)]
results = await asyncio.gather(*tasks, return_exceptions=True)
end = time()
duration = end - start
assert duration < 2.1
for item in results:
2026-05-09 20:17:23 +00:00
assert isinstance(item, Exception) is False