Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
davorrunje committed Sep 1, 2023
1 parent 194cbeb commit 63eb355
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
key: ${{ runner.os }}-python-${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml') }}-test-v03
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install .[rabbit,kafka,docs,testing]
run: pip install .[rabbit,kafka,docs,testing]
- name: Install Pydantic v1
if: matrix.pydantic-version == 'pydantic-v1'
run: pip install "pydantic>=1.10.0,<2.0.0"
Expand Down
38 changes: 38 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files

- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]

- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black

# - repo: https://github.com/astral-sh/ruff-pre-commit
# # Ruff version.
# rev: v0.0.286
# hooks:
# - id: ruff

- repo: local
hooks:
- id: mypy
name: mypy
entry: "scripts/run_mypy.sh"
language: python
language_version: python3.8
types: [python]
require_serial: true
verbose: true
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Activate the new environment with:
source ./venv/bin/activate
```

Make sure you have the latest pip version on your virtual environment to
Make sure you have the latest pip version on your virtual environment to

```bash
python -m pip install --upgrade pip
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
THE SOFTWARE.
4 changes: 2 additions & 2 deletions docs/en/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pip install faststream

## Writing server code

Here is an example python app using FastStream that consumes data from a
Here is an example python app using FastStream that consumes data from a
topic, increments the value, and outputs the data to another topic.

``` python
Expand Down Expand Up @@ -122,4 +122,4 @@ This following example shows how to use the `@broker.subscriber` and

``` python hl_lines="14-18"
{!> ../../docs_src/kafka/base_example/app.py!}
```
```
5 changes: 4 additions & 1 deletion docs_src/kafka/base_example/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class Data(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: Data, logger: Logger) -> Data:
logger.info(msg)
return Data(data=msg.data+1.0)
return Data(data=msg.data + 1.0)
9 changes: 6 additions & 3 deletions docs_src/kafka/base_example/app_chain.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import asyncio
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class Data(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("intermediate_data")
@broker.subscriber("input_data")
async def on_input_data(msg: Data, logger: Logger) -> Data:
logger.info(msg)
return Data(data=msg.data+1.0)
return Data(data=msg.data + 1.0)


@broker.publisher("output_data")
@broker.subscriber("intermediate_data")
async def on_intermediate(msg: Data, logger: Logger) -> Data:
logger.info(msg)
return Data(data=msg.data*2.0)
return Data(data=msg.data * 2.0)
12 changes: 4 additions & 8 deletions docs_src/kafka/base_example/testing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import pytest

from docs_src.kafka.base_example.app import Data, broker, on_input_data
from faststream.kafka import TestKafkaBroker
from docs_src.kafka.base_example.app import (
broker,
on_input_data,
Data
)


@pytest.mark.asyncio
async def test_base_app():
Expand All @@ -14,9 +11,8 @@ async def on_output_data(msg: Data):
pass

async with TestKafkaBroker(broker) as tester:

await tester.publish(Data(data=0.2), "input_data")

on_input_data.mock.assert_called_with(dict(Data(data=0.2)))
on_output_data.mock.assert_called_once_with(dict(Data(data=1.2)))

on_output_data.mock.assert_called_once_with(dict(Data(data=1.2)))
9 changes: 3 additions & 6 deletions docs_src/kafka/base_example/testing_chain.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import pytest

from docs_src.kafka.base_example.app_chain import Data, broker
from faststream.kafka import TestKafkaBroker
from docs_src.kafka.base_example.app_chain import (
Data,
broker,
)


@pytest.mark.asyncio
async def test_end_to_end():

@broker.subscriber("output_data")
async def on_output_data(msg: Data):
pass

async with TestKafkaBroker(broker) as tester:
await tester.publish(Data(data=0.2), "input_data")
on_output_data.mock.assert_called_once_with({"data": 2.4})
on_output_data.mock.assert_called_once_with({"data": 2.4})
2 changes: 1 addition & 1 deletion faststream/broker/fastapi/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_session(
) -> Callable[[NativeMessage[Any]], Awaitable[SendableMessage]]:
if dependant.call is None:
raise RuntimeError()

func = get_app(dependant, dependency_overrides_provider)

dependencies_names = tuple(i.name for i in dependant.dependencies)
Expand Down
4 changes: 3 additions & 1 deletion faststream/broker/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri

if await filter_(message):
if processed:
raise RuntimeError("You can't proccess a message with multiple consumers")
raise RuntimeError(
"You can't proccess a message with multiple consumers"
)

try:
async with AsyncExitStack() as consume_stack:
Expand Down
6 changes: 4 additions & 2 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ def run(
False, "--reload", is_flag=True, help="Restart app at directory files changes"
),
app_dir: Optional[str] = typer.Option(
None, "--app-dir", help=(
None,
"--app-dir",
help=(
"Look for APP in the specified directory, by adding this to the PYTHONPATH."
" Defaults to the current working directory."
)
),
),
) -> None:
"""Run [MODULE:APP] FastStream application"""
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def publish( # type: ignore[override]
) -> None:
if self._producer is None:
raise RuntimeError("Please, setup `_producer` first")
if not(self.batch or len(messages) < 2):
if not (self.batch or len(messages) < 2):
raise RuntimeError("You can't send multiple messages without `batch` flag")

if not self.batch:
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ lint = [
"ruff==0.0.275",
"pyupgrade-directories",
"bandit==1.7.5",
"pre-commit==3.3.3",
]

testing = [
Expand Down Expand Up @@ -134,7 +135,7 @@ disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
show_error_codes = true
warn_unused_ignores = true

disallow_incomplete_defs = true
Expand Down
32 changes: 32 additions & 0 deletions scripts/run_mypy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bash

# from: https://jaredkhan.com/blog/mypy-pre-commit

# A script for running mypy,
# with all its dependencies installed.

set -o errexit

# Change directory to the project root directory.
echo "$(dirname "$0")"/..
cd "$(dirname "$0")"/..

# Install the dependencies into the mypy env.
# Note that this can take seconds to run.
# In my case, I need to use a custom index URL.
# Avoid pip spending time quietly retrying since
# likely cause of failure is lack of VPN connection.
pip install --editable ".[dev]" \
--retries 1 \
--no-input \
--quiet

# Run on all files,
# ignoring the paths passed to this script,
# so as not to miss type errors.
# My repo makes use of namespace packages.
# Use the namespace-packages flag
# and specify the package to run on explicitly.
# Note that we do not use --ignore-missing-imports,
# as this can give us false confidence in our results.
mypy faststream
12 changes: 9 additions & 3 deletions tests/asyncapi/base/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Callable, Type

from dirty_equals import IsStr
from fastapi import FastAPI
from fastapi.testclient import TestClient
from dirty_equals import IsStr

from faststream.asyncapi.generate import get_app_schema
from faststream.broker.core.abc import BrokerUsecase
Expand Down Expand Up @@ -44,8 +44,14 @@ def test_fastapi_full_information(self):
"title": "CustomApp",
"version": "1.1.1",
"description": "Test description",
"contact": {"name": "support", "url": IsStr(regex=r"https\:\/\/support\.com\/?")},
"license": {"name": "some", "url": IsStr(regex=r"https\:\/\/some\.com\/?")},
"contact": {
"name": "support",
"url": IsStr(regex=r"https\:\/\/support\.com\/?"),
},
"license": {
"name": "some",
"url": IsStr(regex=r"https\:\/\/some\.com\/?"),
},
},
"servers": {
"development": {
Expand Down

0 comments on commit 63eb355

Please sign in to comment.