Быстрый старт
Быстрый старт
Требования: Python 3.10–3.14, Flask 3.0–3.1 и RabbitMQ 3.13/4.x.
pip install Flask-RMQСоздайте extension отдельно от приложения:
# extensions.py
from flask_rmq import FlaskRMQ
rmq = FlaskRMQ()Инициализируйте после загрузки config и явно зарегистрируйте messaging-компоненты:
from flask import Flask
from flask_rmq import get_consumers_registry, get_setup_registry
from .extensions import rmq
from .messaging import consumer, setup_topology
def create_app() -> Flask:
app = Flask(__name__)
app.config['RABBITMQ_CONNECTIONS'] = {
'default': {'HOST': 'localhost'},
}
rmq.init_app(app)
with app.app_context():
get_setup_registry().register(setup_topology)
get_consumers_registry().register(consumer)
return appfrom flask_rmq import Consumer, Producer
producer = Producer(queue='events')
consumer = Consumer(queue='events')
@consumer
def handle_event(channel, method, properties, body: bytes) -> None:
print(body.decode())
channel.basic_ack(delivery_tag=method.delivery_tag)
def setup_topology(channel) -> None:
channel.queue_declare(queue='events', durable=True)Проверьте wiring, объявите очередь и запустите отдельный consumer-процесс:
flask --app service:create_app rmq check
flask --app service:create_app rmq setup
flask --app service:create_app rmq consumeПубликация из route или service layer:
producer.publish('{"event":"created"}')В production WSGI API и rmq consume должны быть разными процессами или контейнерами.