软硬件环境
- ubuntu 18.04 64bit
- anaconda3 with python 3.6.4
- RabbitMQ
- pika 0.12.0
AMQP是什么
AMQP(Advanced Message Queuing Protocol),顾名思义,它是一个消息协议,能够使得遵循该协议的客户端和消息中间件(Broker)进行通讯。
下图是官方给出的模型示意图,中间框内的就是Broker

消息发布给Exchange,Exchange相当于邮局或者信箱,它接收到消息后会根据不同的规则(称为Bindings)把消息发送出去。
Exchange总共有4种类型,分别是
- Direct Exchange将消息中的- Routing key与该- exchange关联的所有- Binding中的- Routing key进行比较,如果相等,则发送到该- Binding对应的- Queue中。
- Topic Exchange将消息中的- Routing key与该- Exchange关联的所有- Binding中的- Routing key进行对比,如果匹配上了,则发送到该- Binding对应的- Queue中。
- Fanout Exchange直接将消息转发到所有- Binding的对应- Queue中,这种- Exchange在路由转发的时候,忽略- Routing key。
- Headers Exchange将消息中的- headers与该- Exchange相关联的所有- Binging中的参数进行匹配,如果匹配上了,则发送到该- Binding对应的- Queue中
RabbitMQ简介
RabbitMQ是当下最流行的开源Message Broker,Broker中文是经纪、掮客的意思,它非常轻量且容易部署,不管是在本地还是在云端,同时支持多种消息协议,而且支持多种编程语言。RabbitMQ是针对AMQP的一种实现。
- Producing意为发送,发送的程序就是- producer生产者
- Queue即队列,相当于- RabbitMQ中的邮箱名称,- producer发送的消息就是存储在这里,队列受制于主机的内存大小和磁盘空间,本质上队列就是缓存。多个- producer可以发送数据到同一个队列,而- consumer可以从同一个队列中接收数据
- Consuming意为接收,接收的程序就是- consumer消费者
RabbitMQ的消息处理流程

- producer发送消息到- Exchange
- Exchange接收消息并且准备将消息根据- Routing_key路由到- Queue
- 绑定Queue和Exchange根据binding_key(在上图中绑定了两个Queue到这个Exchange),Exchange将消息路由到Queue
- 消息在被consumer接收处理之前,一直在Queue中
- consumer接收处理消息
安装RabbitMQ
通过apt安装
sudo apt install rabbitmq-server接下来安装python语言的支持包pika,pika同时支持python2和python3
pip install pika开启RabbitMQ的web管理界面
sudo rabbitmq-plugins enable rabbitmq_management之后可以登录浏览器打开http://localhost:15672,查看API的话打开http://localhost:15672/api
开启消息的跟踪
sudo rabbitmq-plugins enable rabbitmq_tracing然后打开http://localhost:15672, 进入到admin –> Tracing –> Add a new trace,然后就可以查看你添加的log文件了。类似下面这样,其中Payload就是消息内容了,发送方和接收方也都能看的很清楚了。
================================================================================
2019-04-17 18:26:54:998: Message published
Node:         rabbit@ubuntu-server
Connection:   172.21.2.20:33828 -> 172.21.2.20:5672
Virtual host: /
User:         longjing
Channel:      1
Exchange:     
Routing keys: [<<"live">>]
Routed queues: [<<"live">>]
Properties:   []
Payload: 
{"hls": "http://172.21.2.20:8081/live/172.21.168.72:554/index.m3u8", "cmd": "livestart", "status": "200", "camera": "rtsp://admin:ABC123##X@172.21.168.72:554/cam/realmonitor?channel=1&subtype=0"}
================================================================================
2019-04-17 18:29:44:569: Message received
Node:         rabbit@ubuntu-server
Connection:   10.1.1.40:53338 -> 172.21.2.20:5672
Virtual host: /
User:         longjing
Channel:      1
Exchange:     
Routing keys: [<<"live">>]
Queue:        live
Properties:   []
Payload: 
{"hls": "http://172.21.2.20:8081/live/172.21.168.72:554/index.m3u8", "cmd": "livestart", "status": "200", "camera": "rtsp://admin:ABC123##X@172.21.168.72:554/cam/realmonitor?channel=1&subtype=0"}用户管理
创建用户
sudo rabbitmqctl add_user xugaoxiang xugaoxiang设置用户权限
sudo rabbitmqctl set_user_tags xugaoxiang administrator
sudo rabbitmqctl set_permissions -p / xugaoxiang '.*' '.*' '.*'重启rabbitmq-server,就可以新用户xugaoxiang登录管理进行管理了。

RabbitMQ中的Hello world
消息发送端代码producer.py
# -*- coding: utf-8 -*-
# @Time    : 18-12-21 下午11:12
# @Author  : xugaoxiang
# @Email   : xugx.ai@gmail.com
# @Website : http://www.xugaoxiang.com
# @File    : producer.py
# @Software: PyCharm
import pika
import sys
message = sys.argv[1]
# 可以连接远程,localhost换成ip地址或者域名
credentials = pika.PlainCredentials('xugaoxiang', 'xugaoxiang')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
# 发送消息之前需要确认接收队列是否存在? 如果不存在,RabbitMQ就会丢弃。下面语句是我们创建`hello`队列
channel.queue_declare(queue='hello')
# 最后是发送
channel.basic_publish(exchange='', routing_key='hello', body=message)
print('Sent message: {}'.format(message))
connection.close()消息接收端代码consumer.py
# -*- coding: utf-8 -*-
# @Time    : 18-12-21 下午11:18
# @Author  : xugaoxiang
# @Email   : xugx.ai@gmail.com
# @Website : http://www.xugaoxiang.com
# @File    : consumer.py
# @Software: PyCharm
import pika
def callback(ch, method, properties, body):
    print('Receive: {}'.format(body))
# 建立连接跟发送端的一样
credentials = pika.PlainCredentials('xugaoxiang', 'xugaoxiang')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 注册回调函数callback, no_ack表示不支持消息反馈,在多任务的情况下,这个很有用,`producer`能够告诉RabbitMQ消息正被接收和处理。如果处理该消息的进程挂了,可以通过这个机制
channel.basic_consume(consumer_callback=callback, queue='hello', no_ack=True)
print('Waiting for messages. To exit press CTRL+C')
# 进入事件循环
channel.start_consuming()执行顺序是: consumer.py -> producer.py

在上面的示例中,我们使用的是BlockingConnection适配器,除此以外,pika还提供了其它几个适配器,根据需要选择使用
- BlockingConnection 最简单的一种,同步、阻塞
- AsyncioConnection 适用于python3异步IO事件循环
- SelectConnection 同样是异步,但不依赖与第三方库
- TornadoConnection 依赖于Tornado
- TwistedProtocolConnection 适用于Twisted异步包
在安装RabbitMQ中,额外会生成命令行工具rabbitmqctl和rabbitmqadmin,这两个命令非常有用,在代码调试中也可以通过它们来查看当前系统状态,具体的参数可以通过--help来查看。



