Consumer
Consumer
Один Consumer соответствует одной очереди и одному обработчику:
queue = QueueConfig(
'orders',
dead_letter_exchange='orders.dlx',
dead_letter_routing_key='orders.failed',
)
consumer = Consumer(queue=queue, prefetch_count=1)
@consumer
def handle(channel, method, properties, body: bytes) -> None:
order = json.loads(body)
process(order)
channel.basic_ack(delivery_tag=method.delivery_tag)Handler получает нативные объекты Pika. Библиотека не декодирует body и не делает auto-ack.
Успешный handler обязан сам вызвать basic_ack после фиксации бизнес-изменений. Если handler выбрасывает исключение, Flask-RMQ выполняет basic_nack(..., requeue=False). С настроенным DLX сообщение попадёт в failure queue; без DLX оно будет отброшено брокером.
prefetch_count=1 даёт честное распределение. Увеличивайте его только после измерений коротких I/O-bound обработчиков. Для параллелизма масштабируйте replicas consumer-процесса.
При обрыве соединения задержка растёт экспоненциально от initial до max. SIGINT/SIGTERM прерывает и consume polling, и backoff wait.
Consumer threads запускаются с app.app_context(), поэтому доступны current_app и Flask extensions. Request context отсутствует. Управление ORM session и транзакциями остаётся ответственностью приложения.