Propan is a modern framework for building Applications based on Messaging Architecture.
The key features are:
- Easy: Designed to be easy to use and learn.
- Intuitive: Great editor support. Autocompletion everywhere.
- Dependencies management: Minimize code duplication. Multiple features from each argument and parameter declaration.
- Integrations: Propan is ready to using in pair with any http framework you want
- MQ independent: Single interface to popular MQ:
- Greate to develop: cli tool provides great development expireince:
- framework-independent way to rule application environment
- application code hot reloading
Supported MQ brokers:
Broker | async | sync |
---|---|---|
RabbitMQ | ✔️ stable ✔️ | 🔍 planning 🔍 |
Nats | 🔍 planning 🔍 | |
NatsJS | 🛠️ in progress 🛠️ | 🔍 planning 🔍 |
Kafka | 🔍 planning 🔍 | 🔍 planning 🔍 |
Install using pip
:
$ pip install "propan[async-rabbit]"
# or
$ pip install "propan[async-nats]"
Create an application with the following code at serve.py
:
from propan import PropanApp
from propan.brokers.rabbit import RabbitBroker
# from propan.brokers.nats import NatsBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
app = PropanApp(broker)
@broker.handle("test")
async def base_handler(body):
'''Handle all default exchange messages with `test` routing key'''
print(body)
And just run it:
$ propan run serve:app
Propan uses pydantic
to cast incoming function arguments to type according their type annotation.
from pydantic import BaseModel
from propan import PropanApp, Context
from propan.brokers.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)
class SimpleMessage(BaseModel):
key: int
@broker.handle("test2")
async def second_handler(body: SimpleMessage):
assert isinstance(body.key, int)
Propan has dependencies management policy close to pytest fixtures
.
You can specify in functions arguments which dependencies
you would to use. Framework passes them from the global Context object.
Default context fields are: app, broker, context (itself), logger and message. If you call not existed field it returns None value.
But you can specify your own dependencies, call dependencies functions (like Fastapi Depends
)
and more.
from logging import Logger
import aio_pika
from propan import PropanApp, Context
from propan.brokers.rabbit import RabbitBroker
rabbit_broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)
@rabbit_broker.handle("test")
async def base_handler(body: dict,
app: PropanApp,
broker: RabbitBroker,
context: Context,
logger: Logger,
message: aio_pika.Message,
not_existed_field):
assert broker is rabbit_broker
assert not_existed_field is None
Propan has own cli tool providing following features:
- project generation
- multiprocessing workers
- project hot reloading
- custom context arguments passing
For example: pass your current .env project setting to context
$ propan run serve:app --env=.env.dev
from propan import PropanApp, Context
from propan.brokers.rabbit import RabbitBroker
from pydantic import BaseSettings
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)
class Settings(BaseSettings):
...
@app.on_startup
async def setup(env: str, context: Context):
settings = Settings(_env_file=env)
context.set_context("settings", settings)
Also propan cli is able to generate production-ready application template:
$ propan create [projectname]
Notice: project template require pydantic[dotenv]
installation.
Run created project:
# Run rabbimq first
$ docker compose --file [projectname]/docker-compose.yaml up -d
# Run project
$ propan run [projectname].app.serve:app --env=.env --reload
Now you can enjoy a new development experience!
You can use Propan MQBrokers without PropanApp. Just start and stop them according your application lifespan.
from fastapi import FastAPI
from propan.brokers.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastAPI()
@broker.handle("test")
async def base_handler(body):
print(body)
@app.on_event("startup")
async def start_broker():
await broker.start()
@app.on_event("shutdown")
async def stop_broker():
await broker.close()
To see more framework usages go to examples/