Python程序如何使用RabbitMQ进行消息传递

目录

一、使用思路

二、Rabbitmq服务器的准备

三、代码示例:python程序中连接RabbitMQ和使用

四、名词解释


一、使用思路

RabbitMQ也称 面向消息的中间件。RabbitMQ以服务器的形式 需要我们的python程序去连接它,进而向RabbitMQ中放消息(生产) 或 从其中拿消息(消费)。

二、Rabbitmq服务器的准备

(注意: 如果开发者所在公司已提供RabbitMQ服务,则可省略此步骤,直接拿到RabbitMQ配置如 账号密码等信息 后直接连接就好啦 )

如公司没提供则可以选择自己搭建RabbitMQ服务;也可以选择使用阿里云打造的RabbitMQ云消息服务,因为直接使用阿里云的更方便和安全,这里使用这种方式 (需付费)

关于如何使用阿里云的RabbitMQ服务:        阿里云官网进入'控制台'(目前在官网页面右上角) -> 登录 '消息队列RabbitMQ版控制台'  -> 创建/购买 '实例' -> '实例列表' 中点击创建好的实例名称则进一步可以查看和管理 当前实例下的 vhost列表、exchange列表、queue列表、消息内容等

三、代码示例:python程序中连接RabbitMQ和使用

代码中封装了RabbitMQ类,RabbitMQ类中封装了连接rabbitmq、发送消息get函数、获取消息set函数。类的实例化对象rabbit_obj调用类中实例方法,实现生产消息发送消息相关操作。

callback函数中拿到了消息,这里把消息打印了出来。

使用开子线程去执行消费的原因是,避免消费任务监听消息时阻塞主程序向下执行。

(阅读注释可方便理解)

import pika  # 安装pika模块连接mq
import json
import os
import threading


# 项目基本配置
class BaseConfig():
    env= os.environ.get('env','sit')  # 获取环境变量

# rabbitmq 配置信息
class RabbitMQConfig(object):
    if BaseConfig.env in ['dev', 'sit']:
        host = "***.***.***.***"
        port = ****
        username = "****"
        password = "******"       
        vhost = f"hello_mq_{BaseConfig.env}"
        exchange = f"hello_mq_{BaseConfig.env}"

        routing_key = f"hello_mq_{BaseConfig.env}"
        queue_name = f"hello_mq_{BaseConfig.env}"

    if BaseConfig.env in ['prod',]:
        pass

# 消费者消费时basic_consume函数中的on_message_callback参数会执行此方法,我们Python程序进而在这里可以拿到消息内容 和 给rabbitmq回调ack
def callback(ch, method, properties, body):
    print('callback body ', body.decode())  # 在这里对消息内容进行我们想做的处理
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动应答ack,确保消息真正消费后才应答

class RabbitMQ(object):
    def __init__(self, username, password, host, port, virtual_host):
        self.username = username
        self.password = password
        self.host = host
        self.port = port
        self.virtual_host = virtual_host
        self.get_connect()  # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器


    def get_connect(self):
        credentials = pika.PlainCredentials(username=self.username, password=self.password)  # 登录凭证
        self.connect = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials))
        self.channel = self.connect.channel()  # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务

    
    def set(self, exchange, routing_key, queue, body):
        # 创建exchange消息交换机, 并且exchange持久化
        self.channel.exchange_declare(
                                        exchange=exchange, 
                                        exchange_type='direct', 
                                        durable=True)
        # 声明队列, 并且队列持久化
        self.channel.queue_declare(queue=queue, durable=True)
        # 通过routing_key 绑定 消息交换机和队列
        self.channel.queue_bind(queue, exchange, routing_key)
        # 发消息
        self.channel.basic_publish(
                                    exchange=exchange,
                                    routing_key=routing_key,  # 根据exchange和routing进而指定 队列
                                    body=body,  # 发送的数据
                                    properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化存到硬盘, 1是非持久化
                                )


    def get(self, exchange, queue, callback):
        print('消费任务启动')  
        self.channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True)
        self.channel.queue_declare(queue=queue, durable=True)
        self.channel.basic_consume(
                                    queue=queue,  # 队列名
                                    on_message_callback=callback,  # 指定回调函数
                                    auto_ack=False,  # 关闭自动ack采用手动应答
                                )
        self.channel.start_consuming()  # 开始接收信息,并进入阻塞状态,队列里有信息才会调用on_message_callback进行处理


# 将要发送的消息
a = {
    "消息内容": "message",
    }


# 实例化的rabbbitmq对象 
rabbit_obj = RabbitMQ(username=RabbitMQConfig.username, 
                      password=RabbitMQConfig.password, 
                      host=RabbitMQConfig.host,
                      port=RabbitMQConfig.port, 
                      virtual_host=RabbitMQConfig.vhost)

                      
# 发消息
for i in range(20):
    rabbit_obj.set(RabbitMQConfig.exchange, RabbitMQConfig.routing_key, RabbitMQConfig.queue_name, json.dumps(a))
    print('生产者发送第 {} 个消息成功 '.format(i))

# 开子线程 去监听消费消息
p = threading.Thread(target=rabbit_obj.get,args=(RabbitMQConfig.exchange, RabbitMQConfig.queue_name, callback,))
p.start()

print('\n -- 主程序不被阻塞,执行其它操作 --- \n')

四、名词解释

vhost:虚拟主机,一个实例里可以开设多个vhost,用作不同用户的权限分离。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。

名词解释部分 借鉴了其他作者的文章,原文地址Python中RabbitMQ的使用_意大利面拌42号混凝土的博客-CSDN博客_python安装rabbitmq一、简介RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queuhttps://blog.csdn.net/prigilm/article/details/122416805

物联沃分享整理
物联沃-IOTWORD物联网 » Python程序如何使用RabbitMQ进行消息传递

发表评论