Skip to content

Commit

Permalink
Resolve Issue 1386, Add rpc_prefix (airtai#1484)
Browse files Browse the repository at this point in the history
* update adding nuid usage as well as reply_to_prefix

* add missing commas

* update reply_to nuid generation, add missing imports

* rename reply_to_prefix to rpc_prefix

* refactor rpc reply_to address generation.

* remove reply_to, update nats implentation

* add back reply_to

* remove rpc_prefix, update NatsJSFastProducer

* Add nats client inbox_prefix test

* fixes to inbox_prefix test

* lint: fix bandit

---------

Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
Co-authored-by: Nikita Pastukhov <diementros@yandex.ru>
  • Loading branch information
3 people committed Jun 17, 2024
1 parent dfec397 commit 1e87e76
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,8 @@ search:
- [to_async](api/faststream/utils/functions/to_async.md)
- no_cast
- [NoCast](api/faststream/utils/no_cast/NoCast.md)
- nuid
- [NUID](api/faststream/utils/nuid/NUID.md)
- path
- [compile_path](api/faststream/utils/path/compile_path.md)
- Contributing
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/utils/nuid/NUID.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.utils.nuid.NUID
9 changes: 2 additions & 7 deletions faststream/nats/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
from secrets import token_hex
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import uuid4

import nats
from typing_extensions import override
Expand Down Expand Up @@ -71,9 +69,7 @@ async def publish( # type: ignore[override]
if reply_to:
raise WRONG_PUBLISH_ARGS

token = client._nuid.next()
token.extend(token_hex(2).encode())
reply_to = token.decode()
reply_to = client.new_inbox()

future: asyncio.Future[Msg] = asyncio.Future()
sub = await client.subscribe(reply_to, future=future, max_msgs=1)
Expand Down Expand Up @@ -148,8 +144,7 @@ async def publish( # type: ignore[override]
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS

reply_to = str(uuid4())
reply_to = self._connection._nc.new_inbox()
future: asyncio.Future[Msg] = asyncio.Future()
sub = await self._connection._nc.subscribe(
reply_to, future=future, max_msgs=1
Expand Down
7 changes: 4 additions & 3 deletions faststream/redis/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional
from uuid import uuid4

from typing_extensions import override

Expand All @@ -10,6 +9,7 @@
from faststream.redis.parser import RawMessage, RedisPubSubParser
from faststream.redis.schemas import INCORRECT_SETUP_MSG
from faststream.utils.functions import timeout_scope
from faststream.utils.nuid import NUID

if TYPE_CHECKING:
from redis.asyncio.client import PubSub, Redis
Expand Down Expand Up @@ -67,8 +67,9 @@ async def publish( # type: ignore[override]
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS

reply_to = str(uuid4())
nuid = NUID()
rpc_nuid = str(nuid.next(), "utf-8")
reply_to = rpc_nuid
psub = self._connection.pubsub()
await psub.subscribe(reply_to)

Expand Down
68 changes: 68 additions & 0 deletions faststream/utils/nuid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2016-2018 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from random import Random
from secrets import randbelow, token_bytes
from sys import maxsize as max_int

DIGITS = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
BASE = 62
PREFIX_LENGTH = 12
SEQ_LENGTH = 10
MAX_SEQ = 839299365868340224 # BASE**10
MIN_INC = 33
MAX_INC = 333
INC = MAX_INC - MIN_INC
TOTAL_LENGTH = PREFIX_LENGTH + SEQ_LENGTH


class NUID:
"""NUID created is a utility to create a new id.
NUID is an implementation of the approach for fast generation
of unique identifiers used for inboxes in NATS.
"""

def __init__(self) -> None:
self._prand = Random(randbelow(max_int)) # nosec B311
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(BASE + 1, INC)
self._prefix = bytearray()
self.randomize_prefix()

def next(self) -> bytearray:
"""Next returns the next unique identifier."""
self._seq += self._inc
if self._seq >= MAX_SEQ:
self.randomize_prefix()
self.reset_sequential()

l_seq = self._seq
prefix = self._prefix[:]
suffix = bytearray(SEQ_LENGTH)
for i in reversed(range(SEQ_LENGTH)):
suffix[i] = DIGITS[int(l_seq) % BASE]
l_seq //= BASE

prefix.extend(suffix)
return prefix

def randomize_prefix(self) -> None:
random_bytes = token_bytes(PREFIX_LENGTH)
self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes)

def reset_sequential(self) -> None:
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(0, INC)
12 changes: 12 additions & 0 deletions tests/brokers/nats/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ def subscriber(m):

assert event.is_set()

@pytest.mark.nats()
async def test_inbox_prefix_with_real(
self,
queue: str,
):
broker = NatsBroker(inbox_prefix="test")

async with TestNatsBroker(broker, with_real=True) as br:
assert br._connection._inbox_prefix == b"test"
assert "test" in str(br._connection.new_inbox())


async def test_respect_middleware(self, queue):
routes = []

Expand Down

0 comments on commit 1e87e76

Please sign in to comment.