testing rate-limiting requests via ASGI middleware in starlette

This commit is contained in:
V 2026-05-09 16:35:03 +01:00
parent 29656c8d15
commit d536dde6c9
10 changed files with 462 additions and 1 deletions

3
rate_limiters/.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python-envs.defaultEnvManager": "ms-python.python:venv"
}

View File

@ -0,0 +1,158 @@
import asyncio
from time import time
class LeakyBucketLimiter:
def __init__(self, rate: float, max_queue_size: int):
self.rate = rate
self.max_queue_size = max_queue_size
self._calculate_delay()
self._queue = asyncio.Queue(maxsize=max_queue_size)
self._running = False
self._worker_task = None
self._monitor_task = None
self.metrics = {
"processed_count": 0,
"total_dwell_time": 0.0, # Total time tasks waited in queue
"dropped_tasks": 0
}
def _calculate_delay(self):
self.delay = 1.0 / self.rate
@property
def queue_depth(self) -> int:
return self._queue.qsize()
async def start(self):
if not self._running:
self._running = True
self._worker_task = asyncio.create_task(self._worker())
self._monitor_task = asyncio.create_task(self._monitor())
print("Limiter started!")
async def stop(self):
self._running = False
if self._worker_task:
await self._worker_task
print("Limiter stopped!")
async def set_rate(self, new_rate: float):
if new_rate <= 0:
raise ValueError("Rate must be a positive number!")
print(f"Changing rate from {self.rate} to {new_rate}...")
self.rate = new_rate
self._calculate_delay()
async def _monitor(self):
while self._running:
await asyncio.sleep(2)
avg_dwell = (self.metrics["total_dwell_time"] / self.metrics["processed_count"]
if self.metrics["processed_count"] > 0 else 0)
print("\n--- [METRICS REPORT] ---")
print(f" Queue Depth: {self.queue_depth} tasks")
print(f" Throughput: {self.metrics['processed_count']} tasks total")
print(f" Avg Dwell: {avg_dwell:.2f} seconds")
print(f" Drops: {self.metrics['dropped_tasks']}")
print("-------------------------\n")
async def _worker(self):
while self._running or not self._queue.empty():
try:
item = await asyncio.wait_for(self._queue.get(), timeout=1)
func, future, arrival_time = item
dwell_time = time() - arrival_time
self.metrics["total_dwell_time"] += dwell_time
await asyncio.sleep(self.delay)
try:
result = await func()
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
self.metrics["processed_count"] += 1
self._queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Worker error!\nError: {e}")
async def execute(self, func, *args, **kwargs):
if not self._running:
raise RuntimeError("Limiter is not running! Call .start() first.")
# if self._queue.full():
# raise RuntimeError(f"Queue is full! Retry in {self.delay:.2f} seconds.")
loop = asyncio.get_running_loop()
future = loop.create_future()
arrival_time = time()
payload = (lambda: func(*args, **kwargs), future, arrival_time)
await self._queue.put(payload)
return await future
async def mock_external_api_call(req_info: str):
pass
# await asyncio.sleep(0.000001)
async def make_requests(number: int, limiter_rate: float, limiter_max_queue_size: int):
limiter = LeakyBucketLimiter(limiter_rate, limiter_max_queue_size)
await limiter.start()
start_time = time()
tasks = []
for i in range(number):
task = asyncio.create_task(limiter.execute(mock_external_api_call, f"REQ: {i}"))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time()
success = 0
failures = 0
for r in results:
if isinstance(r, Exception):
failures += 1
continue
success += 1
execution_time = end_time - start_time
effective_rate = success / execution_time
print(
f"Total execution time: {execution_time:.2f}\nEffective rate: {effective_rate:.2f}"
)
print(f"Operation complete with {success} successful requests and {failures} failures.")
await limiter.stop()
if __name__ == "__main__":
asyncio.run(make_requests(1000, 20, 60))

3
rate_limiters/load_test.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
ab -c 10 -n 10000 http://192.168.0.40:8001/

View File

@ -0,0 +1,45 @@
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route
from starlette.middleware import Middleware
from token_bucket import AsyncTokenBucketLimiter, RateLimitExceeded
class RateLimiterMiddleware:
def __init__(self, app):
self.app = app
self.clients = {}
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
client_data = scope["client"]
client_ip = client_data[0] if client_data else "UNKNOWN"
limiter = self.clients.setdefault(client_ip, AsyncTokenBucketLimiter(10, 10))
try:
async with limiter:
await self.app(scope, receive, send)
except RateLimitExceeded as e:
retry_after = str(e.retry_after)
response = Response(
content=f"Rate limit exceeded! Retry after: {retry_after}",
headers={"Retry-After": retry_after},
status_code=429
)
return await response(scope, receive, send)
def main(request: Request):
return Response(content="Hello there!")
routes = [Route("/", main)]
middleware = [Middleware(RateLimiterMiddleware)]
app = Starlette(routes=routes, middleware=middleware)

View File

@ -5,5 +5,9 @@ description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [
"httpx>=0.28.1",
"pytest>=9.0.3", "pytest>=9.0.3",
"pytest-asyncio>=1.3.0",
"starlette>=1.0.0",
"uvicorn>=0.46.0",
] ]

3
rate_limiters/start_app.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
uvicorn --host 0.0.0.0 --port 8001 --reload middleware_starlette:app

View File

@ -0,0 +1,29 @@
import asyncio
from time import time
import pytest
from leaky_bucket import LeakyBucketLimiter
@pytest.mark.asyncio
async def test_duration() -> None:
async def test_function():
pass
limiter = LeakyBucketLimiter(10, 100)
await limiter.start()
start = time()
tasks = [asyncio.create_task(limiter.execute(test_function)) for i in range(20)]
results = await asyncio.gather(*tasks, return_exceptions=True)
end = time()
duration = end - start
assert duration < 2.1
for item in results:
assert isinstance(item, Exception) is False

View File

@ -0,0 +1,49 @@
from time import sleep
from starlette.testclient import TestClient
from middleware_starlette import app
def test_app_runs():
client = TestClient(app)
response = client.get("/")
assert response.status_code == 200
assert "Hello there" in response.text
def test_below_rate_limit():
client = TestClient(app)
# Limit is 10/s
for i in range(20):
response = client.get("/")
assert response.status_code == 200
assert "Hello there" in response.text
sleep(0.1)
def test_above_rate_limit():
client = TestClient(app)
successful = 0
rate_limited = 0
# Limit is 10/s
for i in range(30):
response = client.get("/")
if response.status_code != 200:
rate_limited += 1
assert response.status_code == 429
assert "Rate limit exceeded" in response.text
assert response.headers.get("retry-after") is not None
else:
successful += 1
assert successful != 0
assert rate_limited != 0

View File

@ -1,5 +1,6 @@
import threading import threading
from time import sleep, time from time import sleep, time
import asyncio
class RateLimiterException(Exception): class RateLimiterException(Exception):
@ -40,6 +41,34 @@ class TokenBucketLimiter:
def __exit__(self, exc_type, exc_value, exc_traceback) -> bool: def __exit__(self, exc_type, exc_value, exc_traceback) -> bool:
return False return False
class AsyncTokenBucketLimiter:
def __init__(self, capacity: int, refill_rate: int):
self.capacity = float(capacity)
self.refill_rate = float(refill_rate)
self.tokens = float(capacity)
self.last_refill_time = time()
self._lock = asyncio.Lock()
async def __aenter__(self):
async with self._lock:
now = time()
elapsed = now - self.last_refill_time
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.tokens + new_tokens, self.capacity)
self.last_refill_time = time()
if self.tokens < 1:
retry_after = (1 - self.tokens) / self.refill_rate
raise RateLimitExceeded(retry_after, self.tokens)
self.tokens -= 1
return self
async def __aexit__(self, exc_type, exc_value, exc_traceback) -> bool:
return False
def make_requests(number: int, limiter: TokenBucketLimiter) -> None: def make_requests(number: int, limiter: TokenBucketLimiter) -> None:

View File

@ -2,6 +2,40 @@ version = 1
revision = 3 revision = 3
requires-python = ">=3.11" requires-python = ">=3.11"
[[package]]
name = "anyio"
version = "4.13.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/19/14/2c5dd9f512b66549ae92767a9c7b330ae88e1932ca57876909410251fe13/anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc", size = 231622, upload-time = "2026-03-24T12:59:09.671Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/da/42/e921fccf5015463e32a3cf6ee7f980a6ed0f395ceeaa45060b61d86486c2/anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708", size = 114353, upload-time = "2026-03-24T12:59:08.246Z" },
]
[[package]]
name = "certifi"
version = "2026.4.22"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/25/ee/6caf7a40c36a1220410afe15a1cc64993a1f864871f698c0f93acb72842a/certifi-2026.4.22.tar.gz", hash = "sha256:8d455352a37b71bf76a79caa83a3d6c25afee4a385d632127b6afb3963f1c580", size = 137077, upload-time = "2026-04-22T11:26:11.191Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/22/30/7cd8fdcdfbc5b869528b079bfb76dcdf6056b1a2097a662e5e8c04f42965/certifi-2026.4.22-py3-none-any.whl", hash = "sha256:3cb2210c8f88ba2318d29b0388d1023c8492ff72ecdde4ebdaddbb13a31b1c4a", size = 135707, upload-time = "2026-04-22T11:26:09.372Z" },
]
[[package]]
name = "click"
version = "8.3.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bb/63/f9e1ea081ce35720d8b92acde70daaedace594dc93b693c869e0d5910718/click-8.3.3.tar.gz", hash = "sha256:398329ad4837b2ff7cbe1dd166a4c0f8900c3ca3a218de04466f38f6497f18a2", size = 328061, upload-time = "2026-04-22T15:11:27.506Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ae/44/c1221527f6a71a01ec6fbad7fa78f1d50dfa02217385cf0fa3eec7087d59/click-8.3.3-py3-none-any.whl", hash = "sha256:a2bf429bb3033c89fa4936ffb35d5cb471e3719e1f3c8a7c3fff0b8314305613", size = 110502, upload-time = "2026-04-22T15:11:25.044Z" },
]
[[package]] [[package]]
name = "colorama" name = "colorama"
version = "0.4.6" version = "0.4.6"
@ -11,6 +45,52 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
] ]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]]
name = "idna"
version = "3.13"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ce/cc/762dfb036166873f0059f3b7de4565e1b5bc3d6f28a414c13da27e442f99/idna-3.13.tar.gz", hash = "sha256:585ea8fe5d69b9181ec1afba340451fba6ba764af97026f92a91d4eef164a242", size = 194210, upload-time = "2026-04-22T16:42:42.314Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5d/13/ad7d7ca3808a898b4612b6fe93cde56b53f3034dcde235acb1f0e1df24c6/idna-3.13-py3-none-any.whl", hash = "sha256:892ea0cde124a99ce773decba204c5552b69c3c67ffd5f232eb7696135bc8bb3", size = 68629, upload-time = "2026-04-22T16:42:40.909Z" },
]
[[package]] [[package]]
name = "iniconfig" name = "iniconfig"
version = "2.3.0" version = "2.3.0"
@ -63,13 +143,71 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" },
] ]
[[package]]
name = "pytest-asyncio"
version = "1.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pytest" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" },
]
[[package]] [[package]]
name = "rate-limiters" name = "rate-limiters"
version = "0.1.0" version = "0.1.0"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "httpx" },
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "starlette" },
{ name = "uvicorn" },
] ]
[package.metadata] [package.metadata]
requires-dist = [{ name = "pytest", specifier = ">=9.0.3" }] requires-dist = [
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "pytest", specifier = ">=9.0.3" },
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
{ name = "starlette", specifier = ">=1.0.0" },
{ name = "uvicorn", specifier = ">=0.46.0" },
]
[[package]]
name = "starlette"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/81/69/17425771797c36cded50b7fe44e850315d039f28b15901ab44839e70b593/starlette-1.0.0.tar.gz", hash = "sha256:6a4beaf1f81bb472fd19ea9b918b50dc3a77a6f2e190a12954b25e6ed5eea149", size = 2655289, upload-time = "2026-03-22T18:29:46.779Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/c9/584bc9651441b4ba60cc4d557d8a547b5aff901af35bda3a4ee30c819b82/starlette-1.0.0-py3-none-any.whl", hash = "sha256:d3ec55e0bb321692d275455ddfd3df75fff145d009685eb40dc91fc66b03d38b", size = 72651, upload-time = "2026-03-22T18:29:45.111Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "uvicorn"
version = "0.46.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/1f/93/041fca8274050e40e6791f267d82e0e2e27dd165627bd640d3e0e378d877/uvicorn-0.46.0.tar.gz", hash = "sha256:fb9da0926999cc6cb22dc7cd71a94a632f078e6ae47ff683c5c420750fb7413d", size = 88758, upload-time = "2026-04-23T07:16:00.151Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/31/a3/5b1562db76a5a488274b2332a97199b32d0442aca0ed193697fd47786316/uvicorn-0.46.0-py3-none-any.whl", hash = "sha256:bbebbcbed972d162afca128605223022bedd345b7bc7855ce66deb31487a9048", size = 70926, upload-time = "2026-04-23T07:15:58.355Z" },
]