软硬件环境
- ubuntu 18.04 64bit
- anaconda3 with python 3.6.4
- RabbitMQ
- pika 0.12.0
AMQP是什么
AMQP
(Advanced Message Queuing Protocol
),顾名思义,它是一个消息协议,能够使得遵循该协议的客户端和消息中间件(Broker
)进行通讯。
下图是官方给出的模型示意图,中间框内的就是Broker
消息发布给Exchange
,Exchange
相当于邮局或者信箱,它接收到消息后会根据不同的规则(称为Bindings
)把消息发送出去。
Exchange
总共有4
种类型,分别是
Direct Exchange
将消息中的Routing key
与该exchange
关联的所有Binding
中的Routing key
进行比较,如果相等,则发送到该Binding
对应的Queue
中。Topic Exchange
将消息中的Routing key
与该Exchange
关联的所有Binding
中的Routing key
进行对比,如果匹配上了,则发送到该Binding
对应的Queue
中。Fanout Exchange
直接将消息转发到所有Binding
的对应Queue
中,这种Exchange
在路由转发的时候,忽略Routing key
。Headers Exchange
将消息中的headers
与该Exchange
相关联的所有Binging
中的参数进行匹配,如果匹配上了,则发送到该Binding
对应的Queue
中
RabbitMQ简介
RabbitMQ
是当下最流行的开源Message Broker
,Broker
中文是经纪、掮客的意思,它非常轻量且容易部署,不管是在本地还是在云端,同时支持多种消息协议,而且支持多种编程语言。RabbitMQ
是针对AMQP
的一种实现。
Producing
意为发送,发送的程序就是producer
生产者Queue
即队列,相当于RabbitMQ
中的邮箱名称,producer
发送的消息就是存储在这里,队列受制于主机的内存大小和磁盘空间,本质上队列就是缓存。多个producer
可以发送数据到同一个队列,而consumer
可以从同一个队列中接收数据Consuming
意为接收,接收的程序就是consumer
消费者
RabbitMQ的消息处理流程
producer
发送消息到Exchange
Exchange
接收消息并且准备将消息根据Routing_key
路由到Queue
- 绑定
Queue
和Exchange
根据binding_key
(在上图中绑定了两个Queue
到这个Exchange
),Exchange
将消息路由到Queue
- 消息在被
consumer
接收处理之前,一直在Queue
中 consumer
接收处理消息
安装RabbitMQ
通过apt
安装
sudo apt install rabbitmq-server
接下来安装python
语言的支持包pika
,pika
同时支持python2
和python3
pip install pika
开启RabbitMQ
的web
管理界面
sudo rabbitmq-plugins enable rabbitmq_management
之后可以登录浏览器打开http://localhost:15672
,查看API
的话打开http://localhost:15672/api
开启消息的跟踪
sudo rabbitmq-plugins enable rabbitmq_tracing
然后打开http://localhost:15672
, 进入到admin
–> Tracing
–> Add a new trace
,然后就可以查看你添加的log
文件了。类似下面这样,其中Payload
就是消息内容了,发送方和接收方也都能看的很清楚了。
================================================================================
2019-04-17 18:26:54:998: Message published
Node: rabbit@ubuntu-server
Connection: 172.21.2.20:33828 -> 172.21.2.20:5672
Virtual host: /
User: longjing
Channel: 1
Exchange:
Routing keys: [<<"live">>]
Routed queues: [<<"live">>]
Properties: []
Payload:
{"hls": "http://172.21.2.20:8081/live/172.21.168.72:554/index.m3u8", "cmd": "livestart", "status": "200", "camera": "rtsp://admin:ABC123##X@172.21.168.72:554/cam/realmonitor?channel=1&subtype=0"}
================================================================================
2019-04-17 18:29:44:569: Message received
Node: rabbit@ubuntu-server
Connection: 10.1.1.40:53338 -> 172.21.2.20:5672
Virtual host: /
User: longjing
Channel: 1
Exchange:
Routing keys: [<<"live">>]
Queue: live
Properties: []
Payload:
{"hls": "http://172.21.2.20:8081/live/172.21.168.72:554/index.m3u8", "cmd": "livestart", "status": "200", "camera": "rtsp://admin:ABC123##X@172.21.168.72:554/cam/realmonitor?channel=1&subtype=0"}
用户管理
创建用户
sudo rabbitmqctl add_user xugaoxiang xugaoxiang
设置用户权限
sudo rabbitmqctl set_user_tags xugaoxiang administrator
sudo rabbitmqctl set_permissions -p / xugaoxiang '.*' '.*' '.*'
重启rabbitmq-server
,就可以新用户xugaoxiang
登录管理进行管理了。
RabbitMQ中的Hello world
消息发送端代码producer.py
# -*- coding: utf-8 -*-
# @Time : 18-12-21 下午11:12
# @Author : xugaoxiang
# @Email : xugx.ai@gmail.com
# @Website : http://www.xugaoxiang.com
# @File : producer.py
# @Software: PyCharm
import pika
import sys
message = sys.argv[1]
# 可以连接远程,localhost换成ip地址或者域名
credentials = pika.PlainCredentials('xugaoxiang', 'xugaoxiang')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
# 发送消息之前需要确认接收队列是否存在? 如果不存在,RabbitMQ就会丢弃。下面语句是我们创建`hello`队列
channel.queue_declare(queue='hello')
# 最后是发送
channel.basic_publish(exchange='', routing_key='hello', body=message)
print('Sent message: {}'.format(message))
connection.close()
消息接收端代码consumer.py
# -*- coding: utf-8 -*-
# @Time : 18-12-21 下午11:18
# @Author : xugaoxiang
# @Email : xugx.ai@gmail.com
# @Website : http://www.xugaoxiang.com
# @File : consumer.py
# @Software: PyCharm
import pika
def callback(ch, method, properties, body):
print('Receive: {}'.format(body))
# 建立连接跟发送端的一样
credentials = pika.PlainCredentials('xugaoxiang', 'xugaoxiang')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 注册回调函数callback, no_ack表示不支持消息反馈,在多任务的情况下,这个很有用,`producer`能够告诉RabbitMQ消息正被接收和处理。如果处理该消息的进程挂了,可以通过这个机制
channel.basic_consume(consumer_callback=callback, queue='hello', no_ack=True)
print('Waiting for messages. To exit press CTRL+C')
# 进入事件循环
channel.start_consuming()
执行顺序是: consumer.py
-> producer.py
在上面的示例中,我们使用的是BlockingConnection
适配器,除此以外,pika
还提供了其它几个适配器,根据需要选择使用
- BlockingConnection 最简单的一种,同步、阻塞
- AsyncioConnection 适用于
python3
异步IO
事件循环 - SelectConnection 同样是异步,但不依赖与第三方库
- TornadoConnection 依赖于
Tornado
- TwistedProtocolConnection 适用于
Twisted
异步包
在安装RabbitMQ
中,额外会生成命令行工具rabbitmqctl
和rabbitmqadmin
,这两个命令非常有用,在代码调试中也可以通过它们来查看当前系统状态,具体的参数可以通过--help
来查看。