NOTE: This is an auto-generated file. Please edit docs/docs/en/index.md instead.
Effortless Event Stream integration for your services
FastStream is a powerful and easy-to-use Python library for building asynchronous services that interact with Event streams topics. Built on top of Pydantic and AsyncAPI, FastStream simplifies the process of writing producers and consumers for Message Queues, handling all the parsing, networking, task scheduling and data generation automatically. With FastStream, you can quickly prototype and develop high-performance Event-based services with minimal code, making it an ideal choice for developers looking to streamline their workflow and accelerate their projects.
FastStream works on macOS, Linux, and most Unix-style operating systems.
You can install it with pip
as usual:
pip install faststream
Here is an example python app using FastStream that consumes data from a topic, increments the value, and outputs the data to another topic.
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
return DataBasic(data=msg.data + 1.0)
FastStream uses Pydantic to parse input
JSON-encoded data into Python objects, making it easy to work with
structured data in your Kafka-based applications. Pydantic’s
BaseModel
class allows you
to define messages using a declarative syntax, making it easy to specify
the fields and types of your messages.
This example defines one message class for use in a FastStream
application, Data
.
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
# Code below omitted 👇
👀 Full file preview
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
return DataBasic(data=msg.data + 1.0)
These message class will be used to parse and validate incoming data when consuming and to produce a JSON-encoded message when producing. Using Pydantic’s BaseModel in combination with FastStream makes it easy to work with structured data in your Event-based applications.
This example shows how to initialize a FastStream application.
It starts by initialising a Broker
object with the address of the Message broker.
Next, an object of the FastStream
class is created and a Broker
object is passed to it.
# Code above omitted 👆
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
# Code below omitted 👇
👀 Full file preview
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
return DataBasic(data=msg.data + 1.0)
FastStream brokers provide convenient function decorators broker.subscriber
and @broker.publisher
to allow you to delegate the actual process of
-
consuming and producing data to Event queues, and
-
decoding and encoding JSON encoded messages
from user defined functions to the framework. The FastStream framework delegates these jobs to AIOKafka and Pydantic libraries.
These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.
This following example shows how to use the @broker.subscriber
and
@broker.publisher
decorators in a FastKafka application:
-
The
@broker.subscriber
decorator is applied to theon_input_data
function, which specifies that this function should be called whenever a message is received on the “input_data” Kafka topic. Theon_input_data
function takes a single argument which is expected to be an instance of theData
message class. Specifying the type of the single argument is instructing the Pydantic to useInputData.parse_raw()
on the consumed message before passing it to the user defined functionon_input_data
. -
The
@broker.publisher
decorator is applied also to theon_input_data
function, which specifies that this function should produce a message to the “output_data” topic whenever it is called. Theon_input_data
function takes the input data and creates a newData
message with incremented value and then returns it. The framework will call theData.json().encode("utf-8")
function on the returned value and produce it to the specified topic.
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
return DataBasic(data=msg.data + 1.0)
The service can be tested using the TestBroker
context managers which, by default, puts the Broker into "testing mode".
The Tester will redirect your subscriber
and publisher
decorated functions to the InMemory brokers so that you can quickly test your app without the need for a running broker and all its dependencies.
Using pytest, the test for our service would look like this:
import pytest
from faststream.kafka import TestKafkaBroker
from .basic import DataBasic, broker, on_input_data
@pytest.mark.asyncio
async def test_base_app():
@broker.subscriber("output_data")
async def on_output_data(msg: DataBasic):
pass
async with TestKafkaBroker(broker):
await broker.publish(DataBasic(data=0.2), "input_data")
on_input_data.mock.assert_called_once_with(dict(DataBasic(data=0.2)))
on_output_data.mock.assert_called_once_with(dict(DataBasic(data=1.2)))
First we pass our broker to the TestKafkaBroker
import pytest
from faststream.kafka import TestKafkaBroker
from .basic import DataBasic, broker, on_input_data
@pytest.mark.asyncio
async def test_base_app():
@broker.subscriber("output_data")
async def on_output_data(msg: DataBasic):
pass
async with TestKafkaBroker(broker):
await broker.publish(DataBasic(data=0.2), "input_data")
on_input_data.mock.assert_called_once_with(dict(DataBasic(data=0.2)))
on_output_data.mock.assert_called_once_with(dict(DataBasic(data=1.2)))
After passing the broker to the TestKafkaBroker
we can publish an event to "input_data" and check if the tested broker produced a response as a reaction to it.
To check the response, we registered an additional on_output_data
subscriber which will capture events on "output_data" topic.
import pytest
from faststream.kafka import TestKafkaBroker
from .basic import DataBasic, broker, on_input_data
@pytest.mark.asyncio
async def test_base_app():
@broker.subscriber("output_data")
async def on_output_data(msg: DataBasic):
pass
async with TestKafkaBroker(broker):
await broker.publish(DataBasic(data=0.2), "input_data")
on_input_data.mock.assert_called_once_with(dict(DataBasic(data=0.2)))
on_output_data.mock.assert_called_once_with(dict(DataBasic(data=1.2)))
The application can be started using builtin FastStream CLI command.
First we will save our application code to app.py
file. Here is the application code again:
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
return DataBasic(data=msg.data + 1.0)
To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app simbol to the command.
faststream run basic:app
After running the command you should see the following output:
INFO - FastStream app starting...
INFO - input_data | - `OnInputData` waiting for messages
INFO - FastStream app started successfully! To exit press CTRL+C