RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件 ,RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。本文主要介绍Python中使用pika模块fanout消息订阅模式来实现与RabbitMQ通讯,以及相关的示例代码。

1、安装RabbitMQ

参考文档:

Windows 上下载安装 RabbitMQ 的方法步骤

Linux 上下载安装 RabbitMQ 的方法步骤

2、安装pika

pika 是 AMQP 0-9-1 协议的纯 Python 实现,它基本保持完全独立于底层网络支持库。Pika可以通过PyPI下载,并可以使用easy_install或pip安装:

pip install pika

easy_install pika

相关文档:https://pika.readthedocs.io/en/stable/

3、使用说明

1)关键词说明

关键词

说明

Broker

消息队列服务器实体。

Exchange

消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue

消息队列载体,每个消息都会被投入到一个或多个队列。

Binding

绑定,它的作用就是把exchange

和queue按照路由规则绑定起来。

Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

vhost

虚拟主机,一个broker里可以开设多个vhost,

用作不同用户的权限分离。

producer

消息生产者,就是投递消息的程序。

consumer

消息消费者,就是接受消息的程序。

channel

消息通道,在客户端的每个连接里,

可建立多个channel,每个channel代表一个会话任务。 

2)消息队列运行机制

客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。

Exchange接收到消息后,就根据消息的key和已经设置的binding,将消息投递到一个或多个队列里。

注意:在声明一个队列后,如果将其持久化,则下次不需要进行声明。

3)Exchange说明

Exchange类型

说明

Direct交换机

依据key进行投递,

如绑定时设置了routing key为”cjavapy”,

客户端提交的消息,

则只有设置了key为”cjavapy”的才会投递到队列。

Topic交换机

对key模式匹配后进行投递,

符号”#”匹配一个或多个词,符号”*”匹配一个词。

Fanout交换机

不需要key,采取广播模式,

一个消息进来时,投递到与该交换机绑定的所有队列

4、fanout消息订阅模式的使用

⽣产者将消息发送到Exchange,Exchange再转发到与之绑定的Queue中,每个消费者再到⾃⼰的Queue中取消息。

1)生产者

import pika
credentials = pika.PlainCredentials('admin', 'admin') # mq⽤户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',
credentials=credentials))
# 建⽴rabbit协议的通道
channel = connection.channel()
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时⼴播)
# direct: 通过routingKey和exchange决定的那⼀组的queue可以接收消息(有选择接受)
# topic:所有符合routingKey(此时可以是⼀个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)
channel.exchange_declare('cjavapy', exchange_type='fanout',durable=True)
#因为是fanout⼴播类型的exchange,这⾥⽆需指定routing_key
for i in range(10):
    channel.basic_publish(exchange='cjavapy',routing_key='',body='Hello world!%s' % i,properties=pika.BasicProperties(delivery_mode=2))
# 关闭与rabbitmq server的连接
connection.close()

2)消费者

import pika
credentials = pika.PlainCredentials('admin', 'admin')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()
#作为好的习惯,在producer和consumer中分别声明⼀次以保证所要使⽤的exchange存在
channel.exchange_declare(exchange='cjavapy',exchange_type='fanout',durable=True)
# 随机⽣成⼀个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
# 是排他的。
queue_name='cjavapy_fanout'
# 定义⼀个回调函数来处理消息队列中的消息,这⾥是打印出来
def callback(ch, method, properties, body):
# ⼿动发送确认消息
    print(body.decode())
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 告诉⽣产者,消费者已收到消息
#ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果该消费者的channel上未确认的消息数达到了prefetch_count数,则不向该消费者发送消息
channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,⽤callback来接收消息
# 默认情况下是要对消息进⾏确认的,以防⽌消息丢失。
# 此处将no_ack明确指明为True,不对消息进⾏确认。
channel.basic_consume(queue=queue_name,on_message_callback=callback) # ⾃动发送确认消息
# 开始接收信息,并进⼊阻塞状态,队列⾥有信息才会调⽤callback进⾏处理
channel.start_consuming()

相关文档:

Python RabbitMQ pika的安装及work消息模型的使用

Python RabbitMQ pika的安装及单生产单消费模型的使用

推荐文档