Topology and Click CLI
Topology and Click CLI
Topology functions are ordinary idempotent callables that receive a Pika channel:
def setup_orders(channel) -> None:
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)
channel.queue_declare(queue='orders.created', durable=True)
channel.queue_bind(
exchange='orders',
queue='orders.created',
routing_key='orders.created',
)Register after init_app:
with app.app_context():
get_setup_registry().register(setup_orders)
get_consumers_registry().register(order_consumer)The registry guards against registering the same object twice, which is useful with the development reloader. Different callbacks remain ordered.
Commands
# no network access; validates config and registration
flask --app service:create_app rmq check
# declares topology on every alias
flask --app service:create_app rmq setup
# starts all consumer threads
flask --app service:create_app rmq consume
# scope either operation to one broker
flask --app service:create_app rmq setup --using analytics
flask --app service:create_app rmq consume --using analyticsThese are Click commands registered on Flask's standard CLI. This is important for app factories: Flask performs app discovery, loads environment/configuration, initializes extensions, and only then invokes the RabbitMQ command.
setup prints every callback it completes. Declarations should always be idempotent; RabbitMQ accepts repeated declarations only when the existing entity has compatible properties.
Deployment split
Run topology setup as a release job or init job, then run consumers as a long-lived process:
command: ['flask', '--app', 'service:create_app', 'rmq', 'consume']Do not start background consumers from a WSGI worker. Gunicorn may fork or restart workers, creating unpredictable duplicate threads. A dedicated consumer process gives signals, replicas, health checks, and resource limits a clear owner.