Springboot与Python通过RabbitMQ实现双向异步消息交互的示例演示
Spring Boot后端和Python算法之间解耦设计,采用通过消息总线RabbitMQ进行双向异步交互,以下是一个demo样例,罗列出了实现该功能需要做的工作,包括软件安装、RabbitMQ基本介绍、Springboot后端demo代码、Python demo代码、运行流程以及调试遇到问题
软件安装
Win 10 本地需要安装RabbitMQ,作为Springboot后端和Python模块通讯的消息中间件。RabbitMQ使用Erlang编写,所以需要先安装Erlang,在此基础上再安装RabbitMQ。
erlang:Erlang/OTP 27.2.4
RabbitMQ:rabbitmq-server-3.13.7
可参考如下链接进行安装
Win 10 本地安装RabbitMQ
RabbitMQ简介
什么是RabbitMQ
RabbitMQ 是一个开源的消息队列中间件,它实现了高度可靠、灵活和可扩展的消息传递模型。它基于 AMQP(高级消息队列协议)来进行消息的传输和交互。
以下是 RabbitMQ 的一些重要组成部分和特性的详细介绍:
消息队列:RabbitMQ 使用消息队列来存储和传递消息。消息队列通过先进先出(FIFO)的方式处理消息,允许生产者将消息发送到队列,然后消费者从队列中接收这些消息。
生产者:生产者是发送消息到 RabbitMQ 交换机的应用程序。生产者将消息发布到特定的交换机,并且可以选择将消息发送到特定的队列或交换机。
交换机:交换机是 RabbitMQ 接收生产者消息并路由到相应队列的组件。它根据指定的规则(路由键)将消息发送到一个或多个绑定的队列。
队列:队列是 RabbitMQ 中消息的目的地。生产者通过交换机将消息发送到队列,而消费者从队列中接收消息以进行处理。
消费者:消费者是从 RabbitMQ 队列中获取消息并对其进行处理的应用程序。消费者订阅一个或多个队列,并接收队列中的消息。
路由:RabbitMQ 使用路由机制将消息从交换机路由到队列。这是通过在交换机和队列之间建立绑定关系,并使用路由键来匹配消息。
ACK 机制:RabbitMQ 提供了 ACK(确认)机制,确保消息被正确处理。一旦消费者接收到并处理了消息,它可以发送一个 ACK 给 RabbitMQ,告知消息已被处理。如果消费者在处理消息过程中发生故障或崩溃,RabbitMQ 将重新传递未确认的消息给其他消费者。
可靠性:RabbitMQ 提供了可靠的消息传递机制。使用持久化(durable)队列和消息可以确保即使在发生故障或重启后,消息也不会丢失。
RabbitMQ五种消息模型
RabbitMQ 支持多种消息模型,以下是其中五种常见的消息模型:
简单模式(Simple Mode生产者消费者模式):
在简单模式中,一个生产者将消息发送到一个队列,然后一个消费者从该队列接收并处理消息。这是最基本的消息模型,适用于简单的应用场景。
工作队列模式(Work Queue Mode 广播模式):
工作队列模式也被称为任务队列模式。多个消费者共享一个队列,并通过轮询的方式接收消息。每个消息只会被一个消费者处理。适用于分布式任务的情况。
发布/订阅模式(Publish/Subscribe Mode):
在发布/订阅模式中,一个生产者将消息发送到交换机(Exchange),而不是直接发送到队列。然后,绑定到该交换机的多个队列都会收到消息。适用于广播类型的消息发送。
路由模式(Routing Mode):
路由模式中,消息根据路由键(Routing Key)的匹配规则被发送到特定的队列。生产者将消息发送到交换机,并指定一个路由键,在消费者端,队列通过绑定键(Binding Key)与交换机进行绑定。适用于根据条件筛选消息的情况。
主题模式(Topic Mode):
主题模式是路由模式的扩展,它支持使用通配符进行更灵活的匹配。通过使用特定的通配符匹配规则,可以实现灵活而强大的消息路由策略。适用于订阅特定主题的场景。
本项目中我们采用简单模式即可满足要求。
本项目采用RabbitMQ简单模式(生产者消费者模式)
Spring Boot 模块和Python模块 通过 RabbitMQ 进行双向通信,具体角色如下:
Python 模块:
生产者:向 Spring Boot 的消费者发送消息(Python → Spring Boot)。
消费者:接收来自 Spring Boot 生产者的消息(Spring Boot → Python)。
Spring Boot 模块:
消费者:接收来自 Python 生产者的消息(Python → Spring Boot)。
生产者:向 Python 的消费者发送消息(Spring Boot → Python)
RabbitMQ 队列:
python_to_spring_queue:Python 生产者 → Spring Boot 消费者。
spring_to_python_queue:Spring Boot 生产者 → Python 消费者。
消息格式:JSON(便于跨语言解析)。
Python代码
安装 pika
首先需要安装pika库,在终端(Windows 的 cmd 或 PowerShell)运行:
pip install pika
Python 生产者(向 Spring Boot 发送消息)
#python_producer.py
import pika
import json
# RabbitMQ 连接配置
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列(Spring Boot 消费者会监听这个队列)
queue_name = 'python_to_spring_queue'
channel.queue_declare(queue=queue_name, durable=True) # 持久化队列
# 消息内容(JSON 格式)
message = {"type": "python_message", "data": "Hello from Python!"}
message_json = json.dumps(message)
# 发送消息
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message_json,
properties=pika.BasicProperties(delivery_mode=2) # 消息持久化
)
print(f" [x] Python sent: {message_json}")
connection.close()
Python 消费者(接收 Spring Boot 的消息))
# python_consumer.py
import pika
import json
# RabbitMQ 连接配置
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列(Spring Boot 生产者会发送消息到这个队列)
queue_name = 'spring_to_python_queue'
channel.queue_declare(queue=queue_name, durable=True) # 持久化队列
# 消息处理回调函数
def callback(ch, method, properties, body):
try:
# 将字节流解码为 UTF-8 字符串,再解析为 JSON
message = json.loads(body.decode('utf-8'))
print(f" [x] Python received: {message}")
except json.JSONDecodeError as e:
print(f" [x] Failed to decode JSON: {e}, raw data: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认消息
# 订阅队列
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False # 关闭自动确认
)
print(' [*] Python waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 阻塞监听
Springboot代码
依赖(pom.xml)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置(application.yml)
spring: rabbitmq: connection-factory: cache-mode: CHANNEL # 默认就是 CHANNEL(按通道缓存) channel-cache-size: 1 # 限制通道缓存大小(避免复用旧通道) listener: simple: acknowledge-mode: manual retry: enabled: true max-attempts: 3
Spring Boot 消费者(接收 Python 的消息)
package com.znzx.quartz.listen;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* @author surq
* @data 2025/6/7 20:29
* @description
*/
@Component
public class PythonToSpringConsumer {
@RabbitListener(queues = "python_to_spring_queue", concurrency = "1")
public void receiveMessage(String messageJson, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> message = objectMapper.readValue(messageJson, Map.class);
System.out.println(" [x] Spring Boot received from Python: " + message);
channel.basicAck(deliveryTag, false); // 确认消息
}
catch (JsonProcessingException e) {
System.out.println(" [x] Failed to parse JSON message: {}" + e.getMessage());
// 拒绝消息并重新入队(如果解析失败)
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
System.out.println(" [x] IO error while acknowledging message: {}"+ e.getMessage());
// 如果是通道问题,可能需要重建连接
throw e; // 让 Spring AMQP 处理连接恢复
} catch (Exception e) {
System.out.println(" [x] Unexpected error: {}"+ e.getMessage());
// 拒绝消息并重新入队(其他异常)
channel.basicNack(deliveryTag, false, true);
}
}
}
Spring Boot 生产者(向 Python 发送消息)
//SpringToPythonProducer.java
package com.znzx.quartz.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.znzx.quartz.service.IRabbitmqService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Component
public class SpringToPythonProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
Map<String, Object> message = new HashMap<>();
message.put("type", "spring_message");
message.put("data", "Hello from Spring Boot!");
// 将 Map 转为 JSON 字符串(UTF-8 编码)
ObjectMapper objectMapper = new ObjectMapper();
String jsonMessage = "";
try {
jsonMessage = objectMapper.writeValueAsString(message);
} catch (Exception e) {
e.printStackTrace();
}
rabbitTemplate.convertAndSend("spring_to_python_queue", jsonMessage);
System.out.println(" [x] Spring Boot sent: " + jsonMessage);
}
}
调用生产者
// DemoController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private SpringToPythonProducer springToPythonProducer;
@GetMapping("/send-to-python")
public String sendToPython() {
springToPythonProducer.sendMessage();
return "Message sent to Python!";
}
}
运行流程
-
Python → Spring Boot:
-
运行
python_producer.py,发送消息到python_to_spring_queue。 -
Spring Boot 的
PythonToSpringConsumer接收并打印消息。 -
Spring Boot → Python:
-
访问
http://localhost:8080/send-to-python,Spring Boot 发送消息到spring_to_python_queue。 -
Python 的
python_consumer.py接收并打印消息。
调试结果
springboot打印发送消息:
[x] Spring Boot sent: {"data":"Hello from Spring Boot!","type":"spring_message"}
[x] Spring Boot sent: {"data":"Hello from Spring Boot!","type":"spring_message"}
python_consumer.py打印接收消息:
[x] Python received: {'data': 'Hello from Spring Boot!', 'type': 'spring_message'}
[x] Python received: {'data': 'Hello from Spring Boot!', 'type': 'spring_message'}
python_producer.py打印发送消息:
[x] Python sent: {"type": "python_message", "data": "Hello from Python!"}
springboot打印接收消息:
[x] Spring Boot received from Python: {type=python_message, data=Hello from Python!}
至此,该demo版本实现了springboot和python之间通过rabbitmq双向消息交互。
调试发现问题
问题1:
springboot和python之间约定要json交互,注意代码中要做转换,否则可能出现解析不了的情况。
问题2:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80) 表明 Spring Boot 消费者在尝试确认消息(ACK)时失败,原因是 使用了无效的 delivery tag(交付标签)。
解决措施:增加如下配置,在连接恢复后,让 Spring AMQP 自动重建通道,并确保每次消息确认都在新的通道上进行
spring:
rabbitmq:
connection-factory:
cache-mode: CHANNEL # 默认就是 CHANNEL(按通道缓存)
channel-cache-size: 1 # 限制通道缓存大小(避免复用旧通道)
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3
作者:同心圆码农