Skip to content

Commit

Permalink
Allow passing no bucket argument to rate controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
corentin-regent authored Apr 21, 2024
2 parents 32e3ef4 + 06752bc commit 65d90ce
Show file tree
Hide file tree
Showing 22 changed files with 160 additions and 253 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
Unreleased
----------

Nothing changed yet.
* Now allow passing no bucket argument to the rate controllers.

* **Breaking change**: Removed the now obsolete ``UnlimitedBucket``.

2.0.0
-----
Expand Down
10 changes: 0 additions & 10 deletions docs/buckets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ Incoming requests consume a certain amount of tokens from the bucket,
and the tokens consumed by each request are replenished
``duration`` seconds after the request has been made.

:class:`.UnlimitedBucket`
-------------------------

As the name suggests, the :class:`.UnlimitedBucket` has no restriction on
the amount of tokens it can acquire.

It can be useful for situations where you would like to set
a concurrency limit but no rate limits per se, for example
if you are looking to prevent overwhelming your web server with requests.

Integrating custom bucket algorithms
------------------------------------

Expand Down
4 changes: 2 additions & 2 deletions docs/examples/max_concurrency/max_concurrency_anyio.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from anyio import run
from rate_control import RateLimit, RateLimiter, UnlimitedBucket
from rate_control import RateLimit, RateLimiter

async def main() -> None:
async with RateLimiter(UnlimitedBucket(), max_concurrency=1) as rate_limiter:
async with RateLimiter(max_concurrency=1) as rate_limiter:
async with rate_limiter.request():
print('First request passes')
try:
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/max_concurrency/max_concurrency_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from asyncio import run
from rate_control import RateLimit, RateLimiter, UnlimitedBucket
from rate_control import RateLimit, RateLimiter

async def main() -> None:
async with RateLimiter(UnlimitedBucket(), max_concurrency=1) as rate_limiter:
async with RateLimiter(max_concurrency=1) as rate_limiter:
async with rate_limiter.request():
print('First request passes')
try:
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/max_concurrency/max_concurrency_trio.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from trio import run
from rate_control import RateLimit, RateLimiter, UnlimitedBucket
from rate_control import RateLimit, RateLimiter

async def main() -> None:
async with RateLimiter(UnlimitedBucket(), max_concurrency=1) as rate_limiter:
async with RateLimiter(max_concurrency=1) as rate_limiter:
async with rate_limiter.request():
print('First request passes')
try:
Expand Down
7 changes: 5 additions & 2 deletions docs/queues.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Request queues
==============

The :class:`.Scheduler` queues requests using the queue algorithm
you pass to its constructor. This page is a reference for these algorithms.

Available queues
----------------

Expand All @@ -12,8 +15,8 @@ The priority queue sorts requests so that they are processed by ascending weight
This allows to let through more lightweight requests,
that could otherwise be blocked by a heavier one.

.. warning::
Requests with identical weight are not guaranteed
.. note::
Requests with identical weights are not guaranteed
to be processed in the order they arrived.

:class:`.FifoQueue`
Expand Down
3 changes: 0 additions & 3 deletions docs/reference/buckets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,4 @@ Buckets
.. autoclass:: rate_control.LeakyBucket
.. autoclass:: rate_control.SlidingWindowLog

.. autoclass:: rate_control.UnlimitedBucket
:no-inherited-members:

.. autoclass:: rate_control.BucketGroup
38 changes: 19 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ optional = true
[tool.poetry.group.dev.dependencies]
anyio = {version = "*", extras = ["trio"]}
mypy = "^1.6"
ruff = "^0.3.0"
ruff = "^0.4.0"

[tool.poetry.group.test]
optional = true
Expand Down
3 changes: 1 addition & 2 deletions rate_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
'ReachedMaxPending',
'Scheduler',
'SlidingWindowLog',
'UnlimitedBucket',
]

from rate_control._bucket_group import BucketGroup
from rate_control._buckets import Bucket, FixedWindowCounter, LeakyBucket, SlidingWindowLog, UnlimitedBucket
from rate_control._buckets import Bucket, FixedWindowCounter, LeakyBucket, SlidingWindowLog
from rate_control._controllers import RateController, RateLimiter, Scheduler
from rate_control._enums import Duration, Priority
from rate_control._errors import RateLimit, ReachedMaxPending
5 changes: 2 additions & 3 deletions rate_control/_bucket_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from rate_control._buckets import Bucket
from rate_control._helpers import mk_repr
from rate_control._helpers._validation import validate_buckets

if sys.version_info >= (3, 9):
from collections.abc import Iterable
Expand Down Expand Up @@ -40,7 +39,6 @@ def __init__(self, *buckets: Bucket, should_enter_context: bool = True, **kwargs
Defaults to `True`.
"""
super().__init__(**kwargs)
validate_buckets(buckets)
self._buckets = buckets
self._should_enter_context = should_enter_context
self._send_stream, self._recv_stream = create_memory_object_stream[Bucket]()
Expand Down Expand Up @@ -89,7 +87,8 @@ async def wait_for_refill(self) -> None:

@override
def can_acquire(self, tokens: float) -> bool:
"""
"""Whether the given amount of tokens can be acquired.
Args:
tokens: The amount of tokens that we want to acquire.
Expand Down
2 changes: 0 additions & 2 deletions rate_control/_buckets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
'FixedWindowCounter',
'LeakyBucket',
'SlidingWindowLog',
'UnlimitedBucket',
]

from ._base import Bucket
from ._fixed_window_counter import FixedWindowCounter
from ._leaky_bucket import LeakyBucket
from ._sliding_window_log import SlidingWindowLog
from ._unlimited import UnlimitedBucket
5 changes: 3 additions & 2 deletions rate_control/_buckets/_base/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ async def wait_for_refill(self) -> None:

@abstractmethod
def can_acquire(self, tokens: float) -> bool:
"""
"""Whether the given amount of tokens can be acquired.
Args:
tokens: The amount of tokens that we want to acquire.
Returns:
Whether the given amount of tokens is available.
Whether the given amount of tokens is available to consume.
"""

@abstractmethod
Expand Down
30 changes: 0 additions & 30 deletions rate_control/_buckets/_unlimited.py

This file was deleted.

30 changes: 20 additions & 10 deletions rate_control/_controllers/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from rate_control._buckets import Bucket
from rate_control._errors import RateLimit
from rate_control._helpers import mk_repr
from rate_control._helpers._validation import validate_buckets, validate_max_concurrency
from rate_control._helpers._validation import validate_max_concurrency

if sys.version_info >= (3, 9):
from collections.abc import AsyncIterator, Iterator
Expand Down Expand Up @@ -41,18 +41,21 @@ def __init__(
) -> None:
"""
Args:
bucket: The buckets that will be managed by the rate controller.
buckets: The buckets that will be managed by the rate controller, optional.
should_enter_context: Whether entering the context of the rate controller
should also enter the context of the underlying buckets.
should also enter the context of the underlying buckets, if any.
Defaults to True.
max_concurrency: The maximum amount of concurrent requests allowed.
Defaults to `None` (no limit).
"""
super().__init__(**kwargs)
validate_buckets(buckets)
validate_max_concurrency(max_concurrency)
self._bucket = (
buckets[0] if len(buckets) == 1 else BucketGroup(*buckets, should_enter_context=should_enter_context)
None
if not buckets
else buckets[0]
if len(buckets) == 1
else BucketGroup(*buckets, should_enter_context=should_enter_context)
)
self._should_enter_context = should_enter_context
self._max_concurrency = max_concurrency
Expand All @@ -64,7 +67,7 @@ async def __aenter__(self) -> Self:
Also enters the context of the underlying buckets,
if the `should_enter_context` flag was set to `True`.
"""
if self._should_enter_context:
if self._should_enter_context and self._bucket is not None:
await self._bucket.__aenter__()
return self

Expand All @@ -74,14 +77,21 @@ async def __aexit__(self, *exc_info: Any) -> Optional[bool]:
Also exits the context of the underlying buckets,
if the `should_enter_context` flag was set to `True`.
"""
if self._should_enter_context:
if self._should_enter_context and self._bucket is not None:
await self._bucket.__aexit__(*exc_info)
return False

@override
def __repr__(self) -> str:
return mk_repr(
self, self._bucket, should_enter_context=self._should_enter_context, max_concurrency=self._max_concurrency
return (
mk_repr(
self,
self._bucket,
should_enter_context=self._should_enter_context,
max_concurrency=self._max_concurrency,
)
if self._bucket is not None
else mk_repr(self, max_concurrency=self._max_concurrency)
)

def can_acquire(self, tokens: float = 1) -> bool:
Expand All @@ -93,7 +103,7 @@ def can_acquire(self, tokens: float = 1) -> bool:
Returns:
Whether a request for the given amount of tokens can be processed instantly.
"""
return not self._is_concurrency_limited and self._bucket.can_acquire(tokens)
return not self._is_concurrency_limited and (self._bucket is None or self._bucket.can_acquire(tokens))

@asynccontextmanager
@abstractmethod
Expand Down
4 changes: 3 additions & 1 deletion rate_control/_controllers/_rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ async def request(self, tokens: float = 1) -> AsyncIterator[None]:
Args:
tokens: The number of tokens to acquire.
Defaults to `1`.
Raises:
RateLimit: The request cannot be fulfilled instantly.
"""
self._assert_can_acquire(tokens)
self._bucket.acquire(tokens)
if self._bucket is not None:
self._bucket.acquire(tokens)
with self._hold_concurrency():
yield
Loading

0 comments on commit 65d90ce

Please sign in to comment.