徐高翔的个人网站

python中如何使用RabbitMQ

2018-11-23

软硬件环境

  • ubuntu 18.04 64bit
  • anaconda3 with python 3.6.4
  • RabbitMQ
  • pika 0.12.0

AMQP是什么

AMQP(Advanced Message Queuing Protocol),顾名思义,它是一个消息协议,能够使得遵循该协议的客户端和消息中间件(Broker)进行通讯。

下图是官方给出的模型示意图,中间框内的就是Broker

rabbitmq_hello

消息发布给ExchangeExchange相当于邮局或者信箱,它接收到消息后会根据不同的规则(称为Bindings)把消息发送出去。

Exchange总共有4种类型,分别是

  • Direct Exchange 将消息中的Routing key与该exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。
  • Topic Exchange 将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。
  • Fanout Exchange 直接将消息转发到所有Binding的对应Queue中,这种Exchange在路由转发的时候,忽略Routing key
  • Headers Exchange 将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue

RabbitMQ简介

RabbitMQ是当下最流行的开源Message BrokerBroker中文是经纪、掮客的意思,它非常轻量且容易部署,不管是在本地还是在云端,同时支持多种消息协议,而且支持多种编程语言。RabbitMQ是针对AMQP的一种实现。

  • Producing意为发送,发送的程序就是producer生产者
  • Queue即队列,相当于RabbitMQ中的邮箱名称,producer发送的消息就是存储在这里,队列受制于主机的内存大小和磁盘空间,本质上队列就是缓存。多个producer可以发送数据到同一个队列,而consumer可以从同一个队列中接收数据
  • Consuming意为接收,接收的程序就是consumer消费者

RabbitMQ的消息处理流程

rabbitmq_hello

  • producer发送消息到Exchange
  • Exchange接收消息并且准备将消息根据Routing_key路由到Queue
  • 绑定QueueExchange根据binding_key(在上图中绑定了两个Queue到这个Exchange),Exchange将消息路由到Queue
  • 消息在被consumer接收处理之前,一直在Queue
  • consumer接收处理消息

安装RabbitMQ

通过apt安装

1
sudo apt install rabbitmq-server

接下来安装python语言的支持包pikapika同时支持python2python3

1
pip install pika

开启RabbitMQweb管理界面

1
sudo rabbitmq-plugins enable rabbitmq_management

之后可以登录浏览器打开http://localhost:15672,查看API的话打开http://localhost:15672/api

开启消息的跟踪

1
sudo rabbitmq-plugins enable rabbitmq_tracing

然后打开http://localhost:15672, 进入到admin –> Tracing –> Add a new trace,然后就可以查看你添加的log文件了。类似下面这样,其中Payload就是消息内容了,发送方和接收方也都能看的很清楚了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

================================================================================
2019-04-17 18:26:54:998: Message published

Node: rabbit@ubuntu-server
Connection: 172.21.2.20:33828 -> 172.21.2.20:5672
Virtual host: /
User: longjing
Channel: 1
Exchange:
Routing keys: [<<"live">>]
Routed queues: [<<"live">>]
Properties: []
Payload:
{"hls": "http://172.21.2.20:8081/live/172.21.168.72:554/index.m3u8", "cmd": "livestart", "status": "200", "camera": "rtsp://admin:ABC123##X@172.21.168.72:554/cam/realmonitor?channel=1&subtype=0"}

================================================================================
2019-04-17 18:29:44:569: Message received

Node: rabbit@ubuntu-server
Connection: 10.1.1.40:53338 -> 172.21.2.20:5672
Virtual host: /
User: longjing
Channel: 1
Exchange:
Routing keys: [<<"live">>]
Queue: live
Properties: []
Payload:
{"hls": "http://172.21.2.20:8081/live/172.21.168.72:554/index.m3u8", "cmd": "livestart", "status": "200", "camera": "rtsp://admin:ABC123##X@172.21.168.72:554/cam/realmonitor?channel=1&subtype=0"}

用户管理

创建用户

1
sudo rabbitmqctl add_user xugaoxiang xugaoxiang

设置用户权限

1
2
sudo rabbitmqctl set_user_tags xugaoxiang administrator
sudo rabbitmqctl set_permissions -p / xugaoxiang '.*' '.*' '.*'

重启rabbitmq-server,就可以新用户xugaoxiang登录管理进行管理了。

rabbitmq_user

RabbitMQ中的Hello world

消息发送端代码producer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-
# @Time : 18-12-21 下午11:12
# @Author : xugaoxiang
# @Email : djstava@gmail.com
# @Website : http://www.xugaoxiang.com
# @File : producer.py
# @Software: PyCharm

import pika
import sys

message = sys.argv[1]

# 可以连接远程,localhost换成ip地址或者域名
credentials = pika.PlainCredentials('xugaoxiang', 'xugaoxiang')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()

# 发送消息之前需要确认接收队列是否存在? 如果不存在,RabbitMQ就会丢弃。下面语句是我们创建`hello`队列
channel.queue_declare(queue='hello')

# 最后是发送
channel.basic_publish(exchange='', routing_key='hello', body=message)
print('Sent message: {}'.format(message))
connection.close()

消息接收端代码consumer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# -*- coding: utf-8 -*-
# @Time : 18-12-21 下午11:18
# @Author : xugaoxiang
# @Email : djstava@gmail.com
# @Website : http://www.xugaoxiang.com
# @File : consumer.py
# @Software: PyCharm

import pika

def callback(ch, method, properties, body):
print('Receive: {}'.format(body))

# 建立连接跟发送端的一样
credentials = pika.PlainCredentials('xugaoxiang', 'xugaoxiang')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()

channel.queue_declare(queue='hello')

# 注册回调函数callback, no_ack表示不支持消息反馈,在多任务的情况下,这个很有用,`producer`能够告诉RabbitMQ消息正被接收和处理。如果处理该消息的进程挂了,可以通过这个机制
channel.basic_consume(consumer_callback=callback, queue='hello', no_ack=True)
print('Waiting for messages. To exit press CTRL+C')

# 进入事件循环
channel.start_consuming()

执行顺序是: consumer.py -> producer.py

rabbitmq_code

在上面的示例中,我们使用的是BlockingConnection适配器,除此以外,pika还提供了其它几个适配器,根据需要选择使用

  • BlockingConnection 最简单的一种,同步、阻塞
  • AsyncioConnection 适用于python3异步IO事件循环
  • SelectConnection 同样是异步,但不依赖与第三方库
  • TornadoConnection 依赖于Tornado
  • TwistedProtocolConnection 适用于Twisted异步包

在安装RabbitMQ中,额外会生成命令行工具rabbitmqctlrabbitmqadmin,这两个命令非常有用,在代码调试中也可以通过它们来查看当前系统状态,具体的参数可以通过--help来查看。

参考资料

推荐文章(由hexo文章推荐插件驱动)

使用支付宝打赏
使用微信打赏

请博主喝咖啡!