有勇气的牛排博客

RabbitMQ 入门安装到Python场景应用

有勇气的牛排 63 服务器/中间件 2025-03-25 21:07:05

1 前言

1.1 什么是 RabbitMQ?

RabbitMQ 是一个开源的 消息中间件,基于 AMQP(Advanced Message Queuing Protocol)高级消息队列协议 开发,用于在分布式系统中传递消息、解耦模块、削峰填谷、实现异步处理。

简单理解:RabbitMQ 就像一个快递中转站,负责接收消息、存储消息、转发消息。

1.2 RabbitMQ 有哪些特点?

特点 说明
📜 协议标准 基于 AMQP 协议,跨平台、跨语言
🔧 高可靠性 支持消息持久化、ACK确认机制、镜像队列(集群时)
📈 高可用性 支持集群、高可用队列,节点挂了也能继续处理
🔁 灵活路由 支持多种交换机类型(直连、广播、通配符、Header)
📦 插件系统丰富 支持 Web 管理界面、监控、集群管理、Shovel、Federation 等插件
🧩 支持多语言客户端 Python、Java、Go、Node.js 等都可以使用

1.3 核心概念

名称 说明
Producer(生产者) 消息的发送者
Consumer(消费者) 消息的接收者
Queue(队列) 存储消息的缓冲区,消息在这里等待被消费者消费
Exchange(交换机) 根据规则将消息分发到不同的队列,可设定分发策略
Binding(绑定) 把队列和交换机按照路由规则连接起来
Routing Key 路由关键字,用于消息在交换机和队列之间分发的依据
Message(消息) 实际传递的数据内容,一般是字符串、JSON 等

1.4 四种交换机类型对比]

类型 路由规则 常见用途
fanout 广播到所有绑定队列 广播消息、日志系统
direct 精确匹配 routing_key 单个/指定消费者
topic 通配符匹配 routing_key 复杂路由,日志、事件
headers 根据消息 headers 条件匹配 header 控制精细路由
类型 说明 应用场景
direct 精确匹配 routing key 点对点消息
fanout 广播,忽略 routing key,发给绑定的所有队列 日志广播、消息通知
topic 模糊匹配,支持 *# 通配符 订阅/发布模型,事件匹配
headers 按照 header 属性分发 高度定制的路由规则

1.5 RabbitMQ 与 Kafka 对比

对比项 RabbitMQ Kafka
设计理念 消息队列(面向消息) 日志系统(面向事件)
传输协议 AMQP 自定义协议(Kafka协议)
消息持久化 默认支持 高效磁盘持久化
消费模型 推(Push)模型 拉(Pull)模型
消息顺序 有序,但集群时需要配置保证顺序 分区内严格有序
吞吐量 中等偏高 超高吞吐量,适合大数据流
使用场景 实时消息通信、业务解耦、异步任务 日志采集、行为数据、数据管道

1.6 使用场景举例

场景 描述
🛍️ 电商下单后异步发短信 下单成功后,将“发送短信”任务放入消息队列,由异步服务处理
🧾 日志异步处理 应用不直接写数据库/文件,而是把日志写入 MQ,由后台服务统一处理
🎮 游戏事件分发 玩家行为被打包成事件,广播给多个服务(比如日志、成就、排行榜)
📧 邮件通知/延迟任务队列 延迟消费邮件、短信等任务,用 TTL + 死信队列实现

2 安装

https://www.rabbitmq.com/docs/install-rpm

2.1 CentOS7 安装

2.2 docker安装

# latest RabbitMQ 4.0.x docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

docker部署rabbitmq

2.3 开启管理面板

sudo rabbitmq-plugins enable rabbitmq_management

打开浏览器访问:
http://127.0.0.1:15672
默认账号:guest,密码:guest

rabbitmq管理面板

3 交换机类型(python案例)

RabbitMQ 的四种交换机类型:directtopicfanoutheaders 各有适用场景。

3.1 fanout(广播)

路由规则: 发送到所有绑定队列,不看 routing_key

场景示例:

  • 日志系统,一条日志需要发送给多个子系统(存储、邮件、短信、实时通知等)

结构图:

     Publisher
         |
     [Exchange: logs]  -- fanout
      /        \
 [Queue1]    [Queue2]    <-- 多个消费者绑定

publisher.py

import pika import json # 建立连接 mq_host = "192.168.56.20" if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters(mq_host)) channel = connection.channel() # 声明交换机 channel.exchange_declare(exchange='logs', exchange_type='fanout') # 要发送的数据 data = { "event": "user_signup", "username": "cs", "nickname": "有勇气的牛排", "timestamp": "2099-01-01 12:00:00" } # 将字典转换为 JSON 字符串 message = json.dumps(data) channel.basic_publish(exchange='logs', routing_key='', body=message) print(f"[x] Sent: {message}") connection.close()

consumer.py

import pika import json mq_host = "192.168.56.20" if __name__ == '__main__': # 建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(mq_host)) channel = connection.channel() # 声明交换机 channel.exchange_declare( exchange='test', # 交换机名称 exchange_type='fanout', # 交换机类型:fanout, direct, topic, headers passive=False, # 检查是否存在,不创建(默认 False) durable=False, # 是否持久化(重启 RabbitMQ 后是否还在) auto_delete=False, # 无绑定队列时是否自动删除 internal=False, # 是否为内部交换机(客户端不能直接发布消息) # arguments=None # 额外参数(高级设置,如消息过期时间等) ) # 创建临时队列 result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # 绑定队列 channel.queue_bind(exchange='logs', queue=queue_name) print('[*] Waiting for messages. To exit press CTRL+C') # 回调函数 def callback(ch, method, properties, body): # 将 JSON 字符串转换为字典 data = json.loads(body.decode()) print(f"收到消息:{data}") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()

RabbitMQ fanout广播消息

3.2 direct(精准路由)

路由规则: 根据 routing_key 精确匹配队列绑定的 key。

场景:系统中不同服务只接收自己关心的消息,如 email 只处理 email 消息,sms 处理短信。

publisher.py

channel.exchange_declare(exchange='direct_logs', exchange_type='direct') channel.basic_publish(exchange='direct_logs', routing_key='email', body='邮件消息内容')

consumer_email.py

channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='email')

consumer_sms.py

channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='sms'

3.3 topic(模糊/通配路由)

路由规则: 支持通配符匹配 routing_key:

  • *:匹配一个单词
  • #:匹配零个或多个单词

场景:日志系统:按模块/级别动态订阅,如

  • log.error.app1
  • log.info.*
  • log.#(全接)

publisher.py

channel.exchange_declare(exchange='topic_logs', exchange_type='topic') channel.basic_publish(exchange='topic_logs', routing_key='log.error.app1', body='Error log')

consumer.py(接收所有 error)

channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='log.error.*')

consumer.py(接收所有日志)

channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='log.#'

3.4 headers(基于 headers 路由)

路由规则: 不依赖 routing_key,而是根据消息 headers 中的键值匹配。

场景:多条件筛选,如 “user_type=vip” 且 “region=us” 才接收消息。

publisher.py

headers = {'user_type': 'vip', 'region': 'us'} properties = pika.BasicProperties(headers=headers) channel.exchange_declare(exchange='header_logs', exchange_type='headers') channel.basic_publish(exchange='header_logs', routing_key='', body='hello headers', properties=properties)

consumer.py

args = { 'x-match': 'all', # 'all' 代表必须全部匹配,'any' 代表任意一个 'user_type': 'vip', 'region': 'us' } channel.queue_bind(exchange='header_logs', queue=queue_name, arguments=args)

留言

专栏
文章
加入群聊