阻塞队列深度解析与简易实现——多线程编程案例解析系列之四

目录

前言

阻塞队列

 阻塞队列相比普通队列的优势

1.天然线程安全

2.实现生产者-消费者模型更加简单

3.自动等待与唤醒

 生产者-消费者模型

 JAVA标准库中的阻塞队列

阻塞队列的简单实现

前言

在现代软件开发中,多线程编程能力已经成为程序员必须掌握的一项核心技能。随着计算机硬件的不断升级,单核CPU早已无法满足复杂应用的性能需求,多核并行运算已成为主流。而多线程正是实现资源最大化利用、任务高效并行执行的基础手段。

而本博客是笔者多线程编程的第四篇博客!

前三篇URL在此:

如何简单的去使用jconsloe 查看线程 (多线程编程篇1)_java jconsloe-CSDN博客

浅谈Thread类及常见方法与线程的状态(多线程编程篇2)_thread.join() 和thread.get()-CSDN博客

多线程编程的简单案例——单例模式[多线程编程篇(3)]-CSDN博客

 笔者将继续介绍和分享学习多线程编程时遇见过的简单案例, 希望读者们通过这些简单案例能加深对于多线程编程的理解

在本篇博客中,笔者将会分享两个知识点:

1.阻塞式队列

2.生产者消费者模型

为什么分享呢

其实原因很简单:根据资料显示,阻塞式队列,是多线程编程里非常经典、非常实用的模型。在实际开发中,无论是任务调度、生产者消费者,还是定时触发某些功能,模型几乎无处不在。

可以说:理解它,就等于掌握了并发编程的一块基础地基。

那么,让我们开始介绍阻塞队列吧!!!!

博客中出现的参考图都是笔者手画的,代码示例也是笔者手敲的!影响虽小,但请勿抄袭

阻塞队列

首先我们要明白,阻塞队列作为一种数据结构,阻塞只是定语,它终究是一种队列,作为队列,它也同样遵守"先进先出"的规则

但它与一般的队列又有不同,它拥有以下的特征:

1.当队列满的时候
,
继续入队列就会阻塞
,
直到有其他线程从队列中取走元素
.

2.当队列空的时候
,
继续出队列也会阻塞
,
直到有其他线程往队列中插入元素
.

你可以理解为一个传送带

  • 一边的工人(线程)往上传送包裹  -> put()

  • 另一边的工人(线程)从传送带上取包裹 -> take()

  • 如果传送带满了,送货的工人只能在原地等着;如果传送带空了,取货的工人也得站着干等。

     阻塞队列相比普通队列的优势

    结合网上各类资料说明,我大致总结出了那么三点

    1.天然线程安全

    普通队列在多线程场景下,容易出现数据错乱、并发冲突,需要我们手动加锁,编写复杂的同步逻辑。而阻塞队列内部已经实现了线程安全机制,不需要我们操心锁的问题,简化开发,避免Bug

    举个例子:下面的代码演示了在多线程环境下使用普通队列,可能出现的问题

    import java.util.LinkedList;
    import java.util.Queue;
    
    public class NormalQueueTest {
        static Queue<Integer> queue = new LinkedList<>();
    
        public static void main(String[] args) throws InterruptedException {
            Thread producer = new Thread(() -> {
                for (int i = 0; i < 10000; i++) {
                    queue.add(i); // 非线程安全,可能抛异常
                }
            });
    
            Thread consumer = new Thread(() -> {
                int count = 0;
                while (count < 10000) {
                    Integer val = queue.poll(); // 可能返回 null,导致数据丢失
                    if (val != null) {
                        count++;
                    }
                }
            });
    
            producer.start();
            consumer.start();
            producer.join();
            consumer.join();
    
            //检查队列是否为空、数量是否一致
            System.out.println("剩余元素数量:" + queue.size());
            System.out.println("主线程走到这里了?");
        }
    }

    笔者使用两个线程,一个负责生产数据 producer,另一个负责消费数据 consumer

    生产线程尝试向队列中添加 10,000 个整数;

    消费线程尝试从队列中取出 10,000 个元素;

    期望结果: 所有数据被完整消费,最终队列应为空,输出结果应为:

    剩余元素数量:0
    主线程走到这里了?

    但是实际上呢?我们可以跑一遍看看:

     

    结果是查无此人,主线程压根走不到最后就被卡死了,出BUG了 

    如果我们换成阻塞队列实现,结果又将如何?

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockingQueueTest {
        static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10000); // 容量固定为 10000
    
        public static void main(String[] args) throws InterruptedException {
            Thread producer = new Thread(() -> {
                try {
                    for (int i = 0; i < 10000; i++) {
                        queue.put(i); // 阻塞式放入,不会丢数据
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            Thread consumer = new Thread(() -> {
                int count = 0;
                try {
                    while (count < 10000) {
                        Integer val = queue.take(); // 阻塞式获取,不会空取
                        count++;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            producer.start();
            consumer.start();
            producer.join();
            consumer.join();
            System.out.println("剩余元素数量:" + queue.size()); // ✅ 结果稳定:0
            System.out.println("主线程走到这里了?");
        }
    }
    

    结果如下:

    可以发现,利用阻塞队列,进程可以结束,代码可以走完 

    为什么用一般队列会卡死? 原因出在这里

            Thread consumer = new Thread(() -> {
                int count = 0;
                while (count < 10000) {
                    Integer val = queue.poll(); // 可能返回 null,导致数据丢失
                    if (val != null) {
                        count++;
                    }
                }
            });
    

    假设:consumer 线程先于 producer 启动

    此时队列是空的,但由于我们使用的是 poll() 方法:

  • 如果队列为空,poll()直接返回 null

  • 然而程序中并没有任何等待机制或线程阻塞逻辑;

  • 所以 consumer不断地空转轮询,疯狂执行 poll(),每次都拿不到数据。

  • 这会导致什么问题?

  • 如果 producer 启动得太慢,或者系统调度延迟,consumer 就一直处于无效轮询状态;

  • 因为 count 只有在拿到有效数据(val != null)时才增加,否则一直卡在 < 10000 的条件中;

  • 最终结果是:程序一直运行不结束,也没有任何输出,看起来就像“卡死”了一样。

  • 假设我们选用阻塞式队列:

            Thread producer = new Thread(() -> {
                try {
                    for (int i = 0; i < 10000; i++) {
                        queue.put(i); // 阻塞式放入,不会丢数据
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            Thread consumer = new Thread(() -> {
                int count = 0;
                try {
                    while (count < 10000) {
                        Integer val = queue.take(); // 阻塞式获取,不会空取
                        count++;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    

    如果 consumer 线程比 producer 先运行,会发生什么?

  • consumer 执行 queue.take() 时,发现队列是空的;

  • 不同于 poll() 返回 nulltake()自动进入等待状态;直到 有数据被put 进队列中,才会被唤醒

  • 同理,如果队列满了,put() 也会阻塞

  • 如果你设置了队列容量上限(比如 new ArrayBlockingQueue<>(1000));

  • 当队列被填满,put() 方法也会阻塞;

  • 等到 consumer 取走了数据,才继续放入。

  • 它会一直阻塞在那里,直到有生产者线程调用 put() 放入数据;

  • 也就是说,consumer 不会“瞎忙活”,而是乖乖等着数据到来

  • 2.实现生产者-消费者模型更加简单

    什么是生产者-消费者模型我们等下再介绍

    阻塞队列最经典的应用场景就是生产者-消费者模式。普通队列实现这个模型需要显式控制线程间的协调与资源共享,而阻塞队列通过 put()take() 方法,天然支持这一模式,既高效又稳定

    3.自动等待与唤醒

    你当然可以说,阻塞队列的逻辑用普通队列当然可以实现了,但是有一个不方便的点,在普通队列中,如果队列为空、或队列满了,开发者必须自己编写控制逻辑,比如使用 wait/notify 或者 ReentrantLock/Condition 这些低层API。而阻塞队列会自动阻塞和唤醒线程,隐藏了这些底层细节,让线程间协作变得更自然、更优雅

     

    接下来,我们介绍什么是生产者消费者模型 

     

     生产者-消费者模型

    什么是生产者模型呢? 

    生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

    生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

    笔者这里举一个例子,请读者看如下的Demo:

    有一个客户端向服务器X发送请求,希望得到一些数据,假设仅仅只靠服务器X无法胜任改任务

    而需要多个服务器相互配合,因此X要继续请求服务器Y, 服务器Y返回数据给X,X返回给客户端

    这样的话我产生几个问题

    第一:

    耦合度太高

    如果 X 和 Y 直接交互,它们彼此之间耦合度就会很高,如果 Y 出现问题,那 X 也会被影响,同理,X出问题,Y也会被影响

    第二:

    难以横向扩展

    如果未来客户端的请求产生了变化,还需要服务器Z来帮忙,那么X中的代码也需要被修改

     

     

    这该怎么办?

     这个时候我们可以引入一个阻塞队列架在 服务器X,Y之间,如图所示

    原先X将请求直接发送给Y,现在,时代变了,X只会把请求发送到阻塞队列中, Y也不会直接收到X的请求,而是通过阻塞队列获取请求 .

    这样做的好处是显而易见的,那就是,耦合度被大大降低了, 后续假设有服务器Z参与,他也只是从队列中获取请求

     

    因为在服务器XYZ他们仨的视角里,从始至终都不知道对方的存在,它们只和阻塞队列交互了

    这就是一个简单的生产者消费者模型

     当把阻塞队列封装成单独的服务器程序时,它被称为"消息队列"

    刚刚笔者写了它的第一个优势——有效降低各个模块这件的耦合度,还有一个好处就是 降峰削流,提升系统稳定性,还是用刚刚那个例子

    我们假设客户端在短时间内向服务器 X 发起大量请求,而每个请求都需要服务器 Y 进一步处理。如果 X 把这些请求直接转发给 Y,Y 可能会瞬间被压垮,导致系统崩溃或响应超时。

     

    这时候就可以在 X 和 Y 之间引入一个 阻塞队列

  • X 接收到请求后,并不直接转发给 Y,而是先将请求 放入阻塞队列 中;

  • Y 则以自己的节奏,一个一个地从队列中取出请求进行处理

  • 这样一来:

  • X 能快速响应客户端,不必等待 Y 处理完再继续;

  • Y 能在自己的处理能力范围内稳定工作,不会被“淹没”;

  • 系统整体也更能承受高并发压力.

  • 比如在“抢票”场景下,服务器在短时间内可能会接收到大量用户的支付请求。如果直接同步处理这些请求,服务器极有可能被瞬间压垮 —— 毕竟每个支付请求都涉及一系列复杂、耗时的操作。

    这个时候,引入阻塞队列作为缓冲区,就是一个非常实用的思路。所有请求会先被有序地放入队列中,系统得以“兜住流量高峰”;而后端的消费者线程则按自身处理能力,逐个从队列中取出请求进行处理。

    这种“削峰填谷”的机制,有效防止了高并发导致的资源挤兑,从而保障了系统的稳定运行。

     JAVA标准库中的阻塞队列

    正是因为阻塞队列在多线程编程中具有重要性,Java 的并发包 java.util.concurrent 中早已为我们提供了多个成熟、线程安全的阻塞队列实现。我们常用的包括:

  • ArrayBlockingQueue

  • 基于数组实现,有界队列,大小在创建时就确定。

  • 适合生产速度和消费速度接近的场景,能够很好地限制内存使用,防止请求无限堆积。

  • LinkedBlockingQueue

  • 基于链表实现,可以是有界无界(默认 Integer.MAX_VALUE)。

  • 适合生产者远快于消费者的情况,可存放更多任务,避免任务丢失。

  • SynchronousQueue

  • 一个“无容量”的队列,每一个 put() 操作都必须等待一个 take() 操作,反之亦然。

  • 适用于任务直接交付的场景,例如线程池中用于任务“即时交付”给工作线程。

  • PriorityBlockingQueue

  • 支持元素按优先级出队(而不是 FIFO),没有固定容量。

  • 可用于带有任务优先级的调度系统,例如消息优先级推送等。

  • 为了更好地贯彻“面向接口编程”的设计理念,Java 提供了一个统一的接口 BlockingQueue ,用于规范所有阻塞队列的行为。这样做不仅提升了代码的扩展性和灵活性,也让我们在使用不同实现类时可以保持一致的编程方式。

    示例代码: 其中 put 方法用于阻塞式的入队列, take 用于阻塞式的出队列

    class Demo16 {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(1000);
            blockingQueue.put(2);
            blockingQueue.take();
        }
    }

    阻塞队列的简单实现

    那么接下来,就让我们简单实现一个阻塞队列吧!!!

    首先展示阻塞队列的特性:

    1.当队列中没有元素时,take()方法进入阻塞等待,直到有新的元素进入才会被唤醒

    2.当队列中的元素数量等于既定数量后,put()方法进去阻塞等待,直到有元素被取出

    很显然,我们只要在 实现take()和put()之前,判断它们是否需要进入阻塞等待即可

    笔者这里给一个泛型版本:

    public class MyBlockQueue<T> {
        private final Object[] data = new Object[1000];
        private volatile int head = 0;
        private volatile int tail = 0;
        private volatile int size = 0;
    
        public synchronized void put(T elem) throws InterruptedException {
            while (size == data.length) {
                this.wait();
            }
            data[tail] = elem;
            tail = (tail + 1) % data.length;
            size++;
            this.notifyAll(); // 避免只唤醒 put 或 take 的线程
        }
    
        public synchronized T take() throws InterruptedException {
            while (size == 0) {
                this.wait();
            }
            @SuppressWarnings("unchecked")
            T res = (T) data[head];
            head = (head + 1) % data.length;
            size--;
            this.notifyAll(); // 同样需要唤醒所有线程
            return res;
        }
    }
    

     这里唯一注意的是检查 大小的时候需要用while而不是if

  • 如果使用 if 语句,假设有多个线程在等待 take(),其中一个被唤醒后执行,但它还发现队列为空,那么它会继续执行而不会再次进入阻塞状态,导致它可能出错。

  • 但是,如果使用 while 循环,线程会继续检查队列的状态,只有在确实满足条件时才会继续执行,否则就会再次进入阻塞等待。 

    然后笔者给一个数据类型为String 的例子:

    public class MyBlockingQueue {
        // 此处这里的最大长度, 也可以指定构造方法, 由构造方法的参数来制定.
        private String[] data = new String[10];
        // 队列的起始位置.
        private volatile int head = 0;
        // 队列的结束位置的下一个位置.
        private volatile int tail = 0;
        // 保证每次都从内存读取数据,而不是从寄存器或者缓存中读取
        // 队列中有效元素的个数.
        private volatile int size = 0;
    
        // private final Object locker = new Object();
    
        // 提供核心方法, 入队列和出队列.
    
        // 阻塞队列的实现
        public void put(String elem) throws InterruptedException {
            synchronized (this) {
                while (size == data.length) {
                    // 队列满了.
                    // 如果是队列满, 继续插入元素, 就会阻塞.
                    this.wait();
                }
                // 队列没满, 真正的往里面添加元素
                data[tail] = elem;
                tail++;
                // 如果 tail 自增之后, 到达了数组末尾. 这个时候就需要让它回到开头 (环形队列)
                if (tail == data.length) {
                    tail = 0;
                }
                size++;
                // 这个 notify 用来唤醒 take 中的 wait
                this.notify();
            }
        }
    
        public String take() throws InterruptedException {
            synchronized (this) {
                while (size == 0) {
                    // 队列空了.
                    this.wait();
                }
                // 队列不空, 就可以把队首元素 (head 位置的元素) 删除掉, 并进行返回.
                String ret = data[head];
                head++;
                if (head == data.length) {
                    head = 0;
                }
                size--;
                // 这个 notify 用来唤醒 put 中的 wait
                this.notify();
                return ret;
            }
        }
    }
    
    class Demo16 {
        public static void main(String[] args) {
            // 生产者, 消费者, 分别使用一个线程表示. (也可以使用多个线程)
    
            MyBlockingQueue queue = new MyBlockingQueue();
    
            // 生产者
            Thread t1 = new Thread(() -> {
                int num = 1;
                while (true) {
                    try {
                        queue.put(num + "");
                        System.out.println("生产元素: " + num);
                        num++;
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            // 消费者
            Thread t2 = new Thread(() -> {
                while (true) {
                    try {
                        String result = queue.take();
                        System.out.println("消费元素: " + result);
    
                        // 暂时先不 sleep
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            t1.start();
            t2.start();
        }
    }

    笔者可以拷贝到IDE中自己试验一下,结果肯定和你事先想好的一模一样!!!

    结尾:

    这一期本来还想介绍一下定时器的,但是篇幅有点长了,笔者也需要重新整理一下思路

    写博客的过程也就是整理思路的过程,有些人觉得这事纯浪费时间,但对于笔者来说,这相当于二次复习和分享知识,只要有人有收获笔者就会有满满的成就感!!! 

     

     

    作者:callJJ

    物联沃分享整理
    物联沃-IOTWORD物联网 » 阻塞队列深度解析与简易实现——多线程编程案例解析系列之四

    发表回复