diff --git a/rate_limiters/.vscode/settings.json b/rate_limiters/.vscode/settings.json new file mode 100644 index 0000000..cc081ee --- /dev/null +++ b/rate_limiters/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python-envs.defaultEnvManager": "ms-python.python:venv" +} \ No newline at end of file diff --git a/rate_limiters/leaky_bucket.py b/rate_limiters/leaky_bucket.py new file mode 100644 index 0000000..ec0dbb8 --- /dev/null +++ b/rate_limiters/leaky_bucket.py @@ -0,0 +1,158 @@ +import asyncio +from time import time + + +class LeakyBucketLimiter: + def __init__(self, rate: float, max_queue_size: int): + self.rate = rate + self.max_queue_size = max_queue_size + self._calculate_delay() + self._queue = asyncio.Queue(maxsize=max_queue_size) + self._running = False + self._worker_task = None + self._monitor_task = None + + self.metrics = { + "processed_count": 0, + "total_dwell_time": 0.0, # Total time tasks waited in queue + "dropped_tasks": 0 + } + + + def _calculate_delay(self): + self.delay = 1.0 / self.rate + + + @property + def queue_depth(self) -> int: + return self._queue.qsize() + + + async def start(self): + if not self._running: + self._running = True + self._worker_task = asyncio.create_task(self._worker()) + self._monitor_task = asyncio.create_task(self._monitor()) + print("Limiter started!") + + + async def stop(self): + self._running = False + if self._worker_task: + await self._worker_task + print("Limiter stopped!") + + + async def set_rate(self, new_rate: float): + if new_rate <= 0: + raise ValueError("Rate must be a positive number!") + + print(f"Changing rate from {self.rate} to {new_rate}...") + self.rate = new_rate + self._calculate_delay() + + + async def _monitor(self): + while self._running: + await asyncio.sleep(2) + avg_dwell = (self.metrics["total_dwell_time"] / self.metrics["processed_count"] + if self.metrics["processed_count"] > 0 else 0) + + print("\n--- [METRICS REPORT] ---") + print(f" Queue Depth: {self.queue_depth} tasks") + print(f" Throughput: {self.metrics['processed_count']} tasks total") + print(f" Avg Dwell: {avg_dwell:.2f} seconds") + print(f" Drops: {self.metrics['dropped_tasks']}") + print("-------------------------\n") + + + async def _worker(self): + while self._running or not self._queue.empty(): + try: + item = await asyncio.wait_for(self._queue.get(), timeout=1) + + func, future, arrival_time = item + + dwell_time = time() - arrival_time + self.metrics["total_dwell_time"] += dwell_time + + await asyncio.sleep(self.delay) + + try: + result = await func() + future.set_result(result) + except Exception as e: + future.set_exception(e) + finally: + self.metrics["processed_count"] += 1 + self._queue.task_done() + + except asyncio.TimeoutError: + continue + except Exception as e: + print(f"Worker error!\nError: {e}") + + async def execute(self, func, *args, **kwargs): + if not self._running: + raise RuntimeError("Limiter is not running! Call .start() first.") + + # if self._queue.full(): + # raise RuntimeError(f"Queue is full! Retry in {self.delay:.2f} seconds.") + + loop = asyncio.get_running_loop() + future = loop.create_future() + + arrival_time = time() + + payload = (lambda: func(*args, **kwargs), future, arrival_time) + await self._queue.put(payload) + + return await future + + +async def mock_external_api_call(req_info: str): + pass + # await asyncio.sleep(0.000001) + + +async def make_requests(number: int, limiter_rate: float, limiter_max_queue_size: int): + limiter = LeakyBucketLimiter(limiter_rate, limiter_max_queue_size) + + await limiter.start() + + start_time = time() + + tasks = [] + + for i in range(number): + task = asyncio.create_task(limiter.execute(mock_external_api_call, f"REQ: {i}")) + tasks.append(task) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + end_time = time() + + + success = 0 + failures = 0 + + for r in results: + if isinstance(r, Exception): + failures += 1 + continue + success += 1 + + execution_time = end_time - start_time + effective_rate = success / execution_time + + print( + f"Total execution time: {execution_time:.2f}\nEffective rate: {effective_rate:.2f}" + ) + + print(f"Operation complete with {success} successful requests and {failures} failures.") + + await limiter.stop() + + +if __name__ == "__main__": + asyncio.run(make_requests(1000, 20, 60)) \ No newline at end of file diff --git a/rate_limiters/load_test.sh b/rate_limiters/load_test.sh new file mode 100755 index 0000000..ed6bacf --- /dev/null +++ b/rate_limiters/load_test.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +ab -c 10 -n 10000 http://192.168.0.40:8001/ \ No newline at end of file diff --git a/rate_limiters/middleware_starlette.py b/rate_limiters/middleware_starlette.py new file mode 100644 index 0000000..beeacc1 --- /dev/null +++ b/rate_limiters/middleware_starlette.py @@ -0,0 +1,45 @@ +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import Response +from starlette.routing import Route +from starlette.middleware import Middleware + +from token_bucket import AsyncTokenBucketLimiter, RateLimitExceeded + + +class RateLimiterMiddleware: + def __init__(self, app): + self.app = app + self.clients = {} + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + + client_data = scope["client"] + client_ip = client_data[0] if client_data else "UNKNOWN" + + limiter = self.clients.setdefault(client_ip, AsyncTokenBucketLimiter(10, 10)) + + try: + async with limiter: + await self.app(scope, receive, send) + except RateLimitExceeded as e: + retry_after = str(e.retry_after) + response = Response( + content=f"Rate limit exceeded! Retry after: {retry_after}", + headers={"Retry-After": retry_after}, + status_code=429 + ) + return await response(scope, receive, send) + + +def main(request: Request): + return Response(content="Hello there!") + + +routes = [Route("/", main)] + +middleware = [Middleware(RateLimiterMiddleware)] + +app = Starlette(routes=routes, middleware=middleware) diff --git a/rate_limiters/pyproject.toml b/rate_limiters/pyproject.toml index 70ecdfc..99952cb 100644 --- a/rate_limiters/pyproject.toml +++ b/rate_limiters/pyproject.toml @@ -5,5 +5,9 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ + "httpx>=0.28.1", "pytest>=9.0.3", + "pytest-asyncio>=1.3.0", + "starlette>=1.0.0", + "uvicorn>=0.46.0", ] diff --git a/rate_limiters/start_app.sh b/rate_limiters/start_app.sh new file mode 100755 index 0000000..e09c3ee --- /dev/null +++ b/rate_limiters/start_app.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +uvicorn --host 0.0.0.0 --port 8001 --reload middleware_starlette:app \ No newline at end of file diff --git a/rate_limiters/tests/test_leaky_bucket.py b/rate_limiters/tests/test_leaky_bucket.py new file mode 100644 index 0000000..6912f15 --- /dev/null +++ b/rate_limiters/tests/test_leaky_bucket.py @@ -0,0 +1,29 @@ +import asyncio +from time import time + +import pytest + +from leaky_bucket import LeakyBucketLimiter + +@pytest.mark.asyncio +async def test_duration() -> None: + async def test_function(): + pass + + + limiter = LeakyBucketLimiter(10, 100) + await limiter.start() + + start = time() + + tasks = [asyncio.create_task(limiter.execute(test_function)) for i in range(20)] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + end = time() + duration = end - start + + assert duration < 2.1 + + for item in results: + assert isinstance(item, Exception) is False \ No newline at end of file diff --git a/rate_limiters/tests/test_middleware_starlette.py b/rate_limiters/tests/test_middleware_starlette.py new file mode 100644 index 0000000..f147698 --- /dev/null +++ b/rate_limiters/tests/test_middleware_starlette.py @@ -0,0 +1,49 @@ +from time import sleep + +from starlette.testclient import TestClient + +from middleware_starlette import app + +def test_app_runs(): + client = TestClient(app) + + response = client.get("/") + + assert response.status_code == 200 + assert "Hello there" in response.text + + +def test_below_rate_limit(): + client = TestClient(app) + + # Limit is 10/s + for i in range(20): + response = client.get("/") + + assert response.status_code == 200 + assert "Hello there" in response.text + sleep(0.1) + + +def test_above_rate_limit(): + client = TestClient(app) + + successful = 0 + rate_limited = 0 + + # Limit is 10/s + for i in range(30): + response = client.get("/") + + if response.status_code != 200: + rate_limited += 1 + assert response.status_code == 429 + assert "Rate limit exceeded" in response.text + assert response.headers.get("retry-after") is not None + else: + successful += 1 + + assert successful != 0 + assert rate_limited != 0 + + \ No newline at end of file diff --git a/rate_limiters/token_bucket.py b/rate_limiters/token_bucket.py index 448e453..5ebc8e7 100644 --- a/rate_limiters/token_bucket.py +++ b/rate_limiters/token_bucket.py @@ -1,5 +1,6 @@ import threading from time import sleep, time +import asyncio class RateLimiterException(Exception): @@ -40,6 +41,34 @@ class TokenBucketLimiter: def __exit__(self, exc_type, exc_value, exc_traceback) -> bool: return False + +class AsyncTokenBucketLimiter: + def __init__(self, capacity: int, refill_rate: int): + self.capacity = float(capacity) + self.refill_rate = float(refill_rate) + self.tokens = float(capacity) + self.last_refill_time = time() + self._lock = asyncio.Lock() + + async def __aenter__(self): + async with self._lock: + now = time() + + elapsed = now - self.last_refill_time + new_tokens = elapsed * self.refill_rate + + self.tokens = min(self.tokens + new_tokens, self.capacity) + self.last_refill_time = time() + + if self.tokens < 1: + retry_after = (1 - self.tokens) / self.refill_rate + raise RateLimitExceeded(retry_after, self.tokens) + + self.tokens -= 1 + return self + + async def __aexit__(self, exc_type, exc_value, exc_traceback) -> bool: + return False def make_requests(number: int, limiter: TokenBucketLimiter) -> None: diff --git a/rate_limiters/uv.lock b/rate_limiters/uv.lock index cc32065..6e5ea33 100644 --- a/rate_limiters/uv.lock +++ b/rate_limiters/uv.lock @@ -2,6 +2,40 @@ version = 1 revision = 3 requires-python = ">=3.11" +[[package]] +name = "anyio" +version = "4.13.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "idna" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/19/14/2c5dd9f512b66549ae92767a9c7b330ae88e1932ca57876909410251fe13/anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc", size = 231622, upload-time = "2026-03-24T12:59:09.671Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/42/e921fccf5015463e32a3cf6ee7f980a6ed0f395ceeaa45060b61d86486c2/anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708", size = 114353, upload-time = "2026-03-24T12:59:08.246Z" }, +] + +[[package]] +name = "certifi" +version = "2026.4.22" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/25/ee/6caf7a40c36a1220410afe15a1cc64993a1f864871f698c0f93acb72842a/certifi-2026.4.22.tar.gz", hash = "sha256:8d455352a37b71bf76a79caa83a3d6c25afee4a385d632127b6afb3963f1c580", size = 137077, upload-time = "2026-04-22T11:26:11.191Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/22/30/7cd8fdcdfbc5b869528b079bfb76dcdf6056b1a2097a662e5e8c04f42965/certifi-2026.4.22-py3-none-any.whl", hash = "sha256:3cb2210c8f88ba2318d29b0388d1023c8492ff72ecdde4ebdaddbb13a31b1c4a", size = 135707, upload-time = "2026-04-22T11:26:09.372Z" }, +] + +[[package]] +name = "click" +version = "8.3.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/bb/63/f9e1ea081ce35720d8b92acde70daaedace594dc93b693c869e0d5910718/click-8.3.3.tar.gz", hash = "sha256:398329ad4837b2ff7cbe1dd166a4c0f8900c3ca3a218de04466f38f6497f18a2", size = 328061, upload-time = "2026-04-22T15:11:27.506Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ae/44/c1221527f6a71a01ec6fbad7fa78f1d50dfa02217385cf0fa3eec7087d59/click-8.3.3-py3-none-any.whl", hash = "sha256:a2bf429bb3033c89fa4936ffb35d5cb471e3719e1f3c8a7c3fff0b8314305613", size = 110502, upload-time = "2026-04-22T15:11:25.044Z" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -11,6 +45,52 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "h11" +version = "0.16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, +] + +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + +[[package]] +name = "idna" +version = "3.13" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ce/cc/762dfb036166873f0059f3b7de4565e1b5bc3d6f28a414c13da27e442f99/idna-3.13.tar.gz", hash = "sha256:585ea8fe5d69b9181ec1afba340451fba6ba764af97026f92a91d4eef164a242", size = 194210, upload-time = "2026-04-22T16:42:42.314Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5d/13/ad7d7ca3808a898b4612b6fe93cde56b53f3034dcde235acb1f0e1df24c6/idna-3.13-py3-none-any.whl", hash = "sha256:892ea0cde124a99ce773decba204c5552b69c3c67ffd5f232eb7696135bc8bb3", size = 68629, upload-time = "2026-04-22T16:42:40.909Z" }, +] + [[package]] name = "iniconfig" version = "2.3.0" @@ -63,13 +143,71 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, ] +[[package]] +name = "pytest-asyncio" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, +] + [[package]] name = "rate-limiters" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "httpx" }, { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "starlette" }, + { name = "uvicorn" }, ] [package.metadata] -requires-dist = [{ name = "pytest", specifier = ">=9.0.3" }] +requires-dist = [ + { name = "httpx", specifier = ">=0.28.1" }, + { name = "pytest", specifier = ">=9.0.3" }, + { name = "pytest-asyncio", specifier = ">=1.3.0" }, + { name = "starlette", specifier = ">=1.0.0" }, + { name = "uvicorn", specifier = ">=0.46.0" }, +] + +[[package]] +name = "starlette" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/81/69/17425771797c36cded50b7fe44e850315d039f28b15901ab44839e70b593/starlette-1.0.0.tar.gz", hash = "sha256:6a4beaf1f81bb472fd19ea9b918b50dc3a77a6f2e190a12954b25e6ed5eea149", size = 2655289, upload-time = "2026-03-22T18:29:46.779Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0b/c9/584bc9651441b4ba60cc4d557d8a547b5aff901af35bda3a4ee30c819b82/starlette-1.0.0-py3-none-any.whl", hash = "sha256:d3ec55e0bb321692d275455ddfd3df75fff145d009685eb40dc91fc66b03d38b", size = 72651, upload-time = "2026-03-22T18:29:45.111Z" }, +] + +[[package]] +name = "typing-extensions" +version = "4.15.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, +] + +[[package]] +name = "uvicorn" +version = "0.46.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1f/93/041fca8274050e40e6791f267d82e0e2e27dd165627bd640d3e0e378d877/uvicorn-0.46.0.tar.gz", hash = "sha256:fb9da0926999cc6cb22dc7cd71a94a632f078e6ae47ff683c5c420750fb7413d", size = 88758, upload-time = "2026-04-23T07:16:00.151Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/a3/5b1562db76a5a488274b2332a97199b32d0442aca0ed193697fd47786316/uvicorn-0.46.0-py3-none-any.whl", hash = "sha256:bbebbcbed972d162afca128605223022bedd345b7bc7855ce66deb31487a9048", size = 70926, upload-time = "2026-04-23T07:15:58.355Z" }, +]