Springboot与Python通过RabbitMQ实现双向异步消息交互的示例演示

Spring Boot后端和Python算法之间解耦设计,采用通过消息总线RabbitMQ进行双向异步交互,以下是一个demo样例,罗列出了实现该功能需要做的工作,包括软件安装、RabbitMQ基本介绍、Springboot后端demo代码、Python demo代码、运行流程以及调试遇到问题

软件安装

Win 10 本地需要安装RabbitMQ,作为Springboot后端和Python模块通讯的消息中间件。RabbitMQ使用Erlang编写,所以需要先安装Erlang,在此基础上再安装RabbitMQ。

  • erlang:Erlang/OTP 27.2.4

  • RabbitMQ:rabbitmq-server-3.13.7

  • 可参考如下链接进行安装

    Win 10 本地安装RabbitMQ

    RabbitMQ简介

    什么是RabbitMQ

    RabbitMQ 是一个开源的消息队列中间件,它实现了高度可靠、灵活和可扩展的消息传递模型。它基于 AMQP(高级消息队列协议)来进行消息的传输和交互。

    以下是 RabbitMQ 的一些重要组成部分和特性的详细介绍:

  • 消息队列:RabbitMQ 使用消息队列来存储和传递消息。消息队列通过先进先出(FIFO)的方式处理消息,允许生产者将消息发送到队列,然后消费者从队列中接收这些消息。

  • 生产者:生产者是发送消息到 RabbitMQ 交换机的应用程序。生产者将消息发布到特定的交换机,并且可以选择将消息发送到特定的队列或交换机。

  • 交换机:交换机是 RabbitMQ 接收生产者消息并路由到相应队列的组件。它根据指定的规则(路由键)将消息发送到一个或多个绑定的队列。

  • 队列:队列是 RabbitMQ 中消息的目的地。生产者通过交换机将消息发送到队列,而消费者从队列中接收消息以进行处理。

  • 消费者:消费者是从 RabbitMQ 队列中获取消息并对其进行处理的应用程序。消费者订阅一个或多个队列,并接收队列中的消息。

  • 路由:RabbitMQ 使用路由机制将消息从交换机路由到队列。这是通过在交换机和队列之间建立绑定关系,并使用路由键来匹配消息。

  • ACK 机制:RabbitMQ 提供了 ACK(确认)机制,确保消息被正确处理。一旦消费者接收到并处理了消息,它可以发送一个 ACK 给 RabbitMQ,告知消息已被处理。如果消费者在处理消息过程中发生故障或崩溃,RabbitMQ 将重新传递未确认的消息给其他消费者。

  • 可靠性:RabbitMQ 提供了可靠的消息传递机制。使用持久化(durable)队列和消息可以确保即使在发生故障或重启后,消息也不会丢失。

  • RabbitMQ五种消息模型

    RabbitMQ 支持多种消息模型,以下是其中五种常见的消息模型:

  • 简单模式(Simple Mode生产者消费者模式):

    在简单模式中,一个生产者将消息发送到一个队列,然后一个消费者从该队列接收并处理消息。这是最基本的消息模型,适用于简单的应用场景。

  • 工作队列模式(Work Queue Mode 广播模式):

    工作队列模式也被称为任务队列模式。多个消费者共享一个队列,并通过轮询的方式接收消息。每个消息只会被一个消费者处理。适用于分布式任务的情况。

  • 发布/订阅模式(Publish/Subscribe Mode):

    在发布/订阅模式中,一个生产者将消息发送到交换机(Exchange),而不是直接发送到队列。然后,绑定到该交换机的多个队列都会收到消息。适用于广播类型的消息发送。

  • 路由模式(Routing Mode):

    路由模式中,消息根据路由键(Routing Key)的匹配规则被发送到特定的队列。生产者将消息发送到交换机,并指定一个路由键,在消费者端,队列通过绑定键(Binding Key)与交换机进行绑定。适用于根据条件筛选消息的情况。

  • 主题模式(Topic Mode):

    主题模式是路由模式的扩展,它支持使用通配符进行更灵活的匹配。通过使用特定的通配符匹配规则,可以实现灵活而强大的消息路由策略。适用于订阅特定主题的场景。

    本项目中我们采用简单模式即可满足要求。

  • 本项目采用RabbitMQ简单模式(生产者消费者模式)

    Spring Boot 模块和Python模块 通过 RabbitMQ 进行双向通信,具体角色如下:

    Python 模块:

  • 生产者:向 Spring Boot 的消费者发送消息(Python → Spring Boot)。

  • 消费者:接收来自 Spring Boot 生产者的消息(Spring Boot → Python)。

  • Spring Boot 模块:

  • 消费者:接收来自 Python 生产者的消息(Python → Spring Boot)。

  • 生产者:向 Python 的消费者发送消息(Spring Boot → Python)

  • RabbitMQ 队列:

  • python_to_spring_queue:Python 生产者 → Spring Boot 消费者。

  • spring_to_python_queue:Spring Boot 生产者 → Python 消费者。

  • 消息格式:JSON(便于跨语言解析)。

  • Python代码

    安装 pika

    首先需要安装pika库,在终端(Windows 的 cmd 或 PowerShell)运行:

    pip install pika

    Python 生产者(向 Spring Boot 发送消息)

    #python_producer.py
    import pika
    import json
    ​
    # RabbitMQ 连接配置
    ​
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    ​
    # 声明队列(Spring Boot 消费者会监听这个队列)
    ​
    queue_name = 'python_to_spring_queue'
    channel.queue_declare(queue=queue_name, durable=True)  # 持久化队列
    ​
    # 消息内容(JSON 格式)
    ​
    message = {"type": "python_message", "data": "Hello from Python!"}
    message_json = json.dumps(message)
    ​
    # 发送消息
    ​
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=message_json,
        properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化
    )
    print(f" [x] Python sent: {message_json}")
    ​
    connection.close()

    Python 消费者(接收 Spring Boot 的消息)

    # python_consumer.py
    import pika
    import json
    ​
    # RabbitMQ 连接配置
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    ​
    # 声明队列(Spring Boot 生产者会发送消息到这个队列)
    queue_name = 'spring_to_python_queue'
    channel.queue_declare(queue=queue_name, durable=True)  # 持久化队列
    ​
    # 消息处理回调函数
    def callback(ch, method, properties, body):
        try:
            # 将字节流解码为 UTF-8 字符串,再解析为 JSON
            message = json.loads(body.decode('utf-8'))
            print(f" [x] Python received: {message}")
        except json.JSONDecodeError as e:
            print(f" [x] Failed to decode JSON: {e}, raw data: {body}")
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认消息
    ​
    # 订阅队列
    channel.basic_consume(
        queue=queue_name,
        on_message_callback=callback,
        auto_ack=False  # 关闭自动确认
    )
    ​
    print(' [*] Python waiting for messages. To exit press CTRL+C')
    channel.start_consuming()  # 阻塞监听

    Springboot代码

    依赖(pom.xml)

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    配置(application.yml)

    spring:
     rabbitmq:
      connection-factory:
       cache-mode: CHANNEL  # 默认就是 CHANNEL(按通道缓存)
       channel-cache-size: 1  # 限制通道缓存大小(避免复用旧通道)
      listener:
       simple:
       acknowledge-mode: manual
        retry:
         enabled: true
         max-attempts: 3

    Spring Boot 消费者(接收 Python 的消息)

    package com.znzx.quartz.listen;
    ​
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    ​
    import java.io.IOException;
    import java.util.Map;
    /**
     * @author surq
     * @data 2025/6/7 20:29
     * @description
     */
    @Component
    public class PythonToSpringConsumer {
        @RabbitListener(queues = "python_to_spring_queue", concurrency = "1")
        public void receiveMessage(String messageJson, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                Map<String, Object> message = objectMapper.readValue(messageJson, Map.class);
                System.out.println(" [x] Spring Boot received from Python: " + message);
                channel.basicAck(deliveryTag, false);  // 确认消息
            }
            catch (JsonProcessingException e) {
                System.out.println(" [x] Failed to parse JSON message: {}" + e.getMessage());
                // 拒绝消息并重新入队(如果解析失败)
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException e) {
                System.out.println(" [x] IO error while acknowledging message: {}"+ e.getMessage());
                // 如果是通道问题,可能需要重建连接
                throw e;  // 让 Spring AMQP 处理连接恢复
            } catch (Exception e) {
                System.out.println(" [x] Unexpected error: {}"+  e.getMessage());
                // 拒绝消息并重新入队(其他异常)
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    ​

    Spring Boot 生产者(向 Python 发送消息)

    //SpringToPythonProducer.java
    package com.znzx.quartz.service.impl;
    ​
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.znzx.quartz.service.IRabbitmqService;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    ​
    import java.util.HashMap;
    import java.util.Map; 
    @Component
    public class SpringToPythonProducer {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          public void sendMessage() {
            Map<String, Object> message = new HashMap<>();
            message.put("type", "spring_message");
            message.put("data", "Hello from Spring Boot!");
            // 将 Map 转为 JSON 字符串(UTF-8 编码)
            ObjectMapper objectMapper = new ObjectMapper();
            String jsonMessage = "";
            try {
                 jsonMessage = objectMapper.writeValueAsString(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
            rabbitTemplate.convertAndSend("spring_to_python_queue", jsonMessage);
            System.out.println(" [x] Spring Boot sent: " + jsonMessage);
        }
    }

    调用生产者

    // DemoController.java
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    ​
    @RestController
    public class DemoController {
    ​
        @Autowired
        private SpringToPythonProducer springToPythonProducer;
    ​
        @GetMapping("/send-to-python")
        public String sendToPython() {
            springToPythonProducer.sendMessage();
            return "Message sent to Python!";
        }
    }

    运行流程

    1. Python → Spring Boot

    2. 运行 python_producer.py,发送消息到 python_to_spring_queue

    3. Spring Boot 的 PythonToSpringConsumer 接收并打印消息。

    4. Spring Boot → Python

    5. 访问 http://localhost:8080/send-to-python,Spring Boot 发送消息到 spring_to_python_queue

    6. Python 的 python_consumer.py 接收并打印消息。

    调试结果

    springboot打印发送消息:

     [x] Spring Boot sent: {"data":"Hello from Spring Boot!","type":"spring_message"}
     [x] Spring Boot sent: {"data":"Hello from Spring Boot!","type":"spring_message"}

    python_consumer.py打印接收消息:

    [x] Python received: {'data': 'Hello from Spring Boot!', 'type': 'spring_message'}
    [x] Python received: {'data': 'Hello from Spring Boot!', 'type': 'spring_message'}

    python_producer.py打印发送消息:

    [x] Python sent: {"type": "python_message", "data": "Hello from Python!"}

    springboot打印接收消息:

     [x] Spring Boot received from Python: {type=python_message, data=Hello from Python!}

    至此,该demo版本实现了springboot和python之间通过rabbitmq双向消息交互。

    调试发现问题

    问题1:

    springboot和python之间约定要json交互,注意代码中要做转换,否则可能出现解析不了的情况。

    问题2:

    com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80) 表明 Spring Boot 消费者在尝试确认消息(ACK)时失败,原因是 使用了无效的 delivery tag(交付标签)。

    解决措施:增加如下配置,在连接恢复后,让 Spring AMQP 自动重建通道,并确保每次消息确认都在新的通道上进行

    spring:
      rabbitmq:
        connection-factory:
          cache-mode: CHANNEL  # 默认就是 CHANNEL(按通道缓存)
          channel-cache-size: 1  # 限制通道缓存大小(避免复用旧通道)
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true
              max-attempts: 3

    作者:同心圆码农

    物联沃分享整理
    物联沃-IOTWORD物联网 » Springboot与Python通过RabbitMQ实现双向异步消息交互的示例演示

    发表回复