Consumer
Consumer
One Consumer represents one queue and one callback.
from flask_rmq import Consumer, QueueConfig
queue = QueueConfig(
"orders",
dead_letter_exchange="orders.dlx",
dead_letter_routing_key="orders.failed",
)
consumer = Consumer(queue=queue, prefetch_count=1)
@consumer
def handle(channel, method, properties, body: bytes) -> None:
order = json.loads(body)
process(order)
channel.basic_ack(delivery_tag=method.delivery_tag)The handler receives native Pika values. Flask-RMQ does not auto-decode, auto-ack, or hide AMQP metadata.
Acknowledgement contract
A successful handler must call basic_ack itself. This keeps the commit boundary explicit: update application state first, then acknowledge. If the handler raises, Flask-RMQ calls:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)With a configured DLX, RabbitMQ routes the message to a failure queue. Without a DLX, RabbitMQ discards it. Do not use unconditional requeue for malformed messages; that creates a hot poison-message loop.
QoS and concurrency
prefetch_count=1 is the default and gives fair dispatch. Increase it for short I/O handlers after measuring. The CLI runs each registered consumer in its own non-daemon thread; scale process or pod replicas for parallelism on the same queue.
Reconnect lifecycle
The consumer polls Pika events with a one-second time limit so shutdown remains responsive. Recoverable disconnections wait initial, then 2×, 4×, up to maximum. SIGINT and SIGTERM set a shared stop event, including during backoff waits.
Flask context and databases
CLI-created threads run under app.app_context(), so current_app and Flask extension proxies work. Database session cleanup is ORM-specific: for Flask-SQLAlchemy, close or remove the session in the handler as required by your transaction pattern. Flask-RMQ intentionally does not import an ORM or commit on your behalf.