Java Stream 并行流详解:入门、应用及关键注意事项

1. 并行流简介

Java 8 引入了 Stream API,提供了一种高效的数据处理方式。而 ​并行流(Parallel Stream)​ 则是 Stream 的并行版本,能够将流操作分配到多个线程中执行,充分利用多核 CPU 的性能。

特点
  • 默认使用 ForkJoinPool.commonPool() 执行任务。
  • 适合处理 ​计算密集型 任务。
  • 任务执行顺序不确定。

  • 2. 并行流的简单使用

    将普通流转换为并行流非常简单,只需调用 parallel() 方法即可。

    示例:并行流的基本使用
    
    

    java

    import java.util.Arrays;
    import java.util.List;
    
    public class ParallelStreamExample {
        public static void main(String[] args) {
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
            // 将流转换为并行流
            numbers.parallelStream()
                   .forEach(num -> System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + num));
        }
    }

    输出示例

    线程: main, 处理: 6
    线程: ForkJoinPool.commonPool-worker-1, 处理: 3
    线程: ForkJoinPool.commonPool-worker-2, 处理: 8
    ...

    3. 配合自定义线程池

    默认情况下,并行流使用 ForkJoinPool.commonPool() 执行任务。你可以通过自定义线程池来控制并行流的执行环境。

    示例:自定义线程池
    
    

    java

    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class ParallelStreamCustomPool {
        public static void main(String[] args) {
            // 创建自定义线程池
            ForkJoinPool customPool = new ForkJoinPool(4);
    
            // 在自定义线程池中执行并行流任务
            customPool.submit(() -> {
                List<Integer> result = IntStream.rangeClosed(1, 10)
                                                .parallel()
                                                .map(i -> {
                                                    System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
                                                    return i * 2;
                                                })
                                                .boxed()
                                                .collect(Collectors.toList());
    
                System.out.println("结果: " + result);
            }).join(); // 等待任务完成
    
            customPool.shutdown(); // 关闭线程池
        }
    }

    示例:配合CompletableFuture实现异步

    java

    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class ParallelStreamWithCompletableFuture {
        public static void main(String[] args) {
            // 创建一个并行流
            List<CompletableFuture<Integer>> futures = IntStream.rangeClosed(1, 10)
                    .parallel()
                    .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
                        System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
                        return i * 2; // 模拟计算任务
                    }))
                    .collect(Collectors.toList());
    
            // 等待所有任务完成并获取结果
            List<Integer> results = futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
    
            System.out.println("结果: " + results);
        }
    }

    好处:

  • 并行流:适合处理数据流中的计算密集型任务,能够自动将任务分配到多个线程中执行。
  • CompletableFuture:提供强大的异步编程能力,可以处理任务的依赖关系、异常处理、结果合并等。
  • 结合两者的优势,可以实现:

    1. 异步并行处理:将并行流的任务异步化,进一步提升性能。
    2. 任务依赖管理:通过 CompletableFuture 管理任务之间的依赖关系。
    3. 结果合并:将多个任务的结果合并处理。

    4. 控制有序性

    并行流的任务执行顺序是不确定的。如果需要保持顺序,可以使用 forEachOrdered() 方法。

    示例:保持顺序
    
    

    java

    import java.util.Arrays;
    import java.util.List;
    
    public class ParallelStreamOrder {
        public static void main(String[] args) {
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
            numbers.parallelStream()
                   .forEachOrdered(System.out::println); // 输出顺序与流中元素顺序一致
        }
    }

    5. 共享资源的安全性

    并行流在多个线程中执行操作,如果操作共享可变状态,可能会导致线程安全问题。

    示例:线程安全问题
    
    

    java

    import java.util.ArrayList;
    import java.util.List;
    
    public class ParallelStreamThreadSafety {
        public static void main(String[] args) {
            List<Integer> result = new ArrayList<>();
    
            IntStream.rangeClosed(1, 1000)
                     .parallel()
                     .forEach(result::add); // 这里会出现线程安全问题
    
            System.out.println("结果大小: " + result.size()); // 结果可能小于 1000
        }
    }

    解决方法

  • 使用线程安全的集合,如 Collections.synchronizedList()
  • 使用 collect() 方法将结果收集到线程安全的容器中。
  • 示例:线程安全的解决方案
    
    

    java

    import java.util.List;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class ParallelStreamThreadSafety {
        public static void main(String[] args) {
            List<Integer> result = IntStream.rangeClosed(1, 1000)
                                            .parallel()
                                            .boxed()
                                            .collect(Collectors.toList()); // 使用 collect() 方法
    
            System.out.println("结果大小: " + result.size()); // 输出: 1000
        }
    }

    6. 注意事项

    1. 任务类型
    2. 适合 ​计算密集型 任务,不适合 ​I/O 密集型 任务。
    3. 线程安全
    4. 避免在并行流中操作共享可变状态。
    5. 任务顺序
    6. 并行流的任务执行顺序不确定,使用 forEachOrdered() 保持顺序。
    7. 线程池管理
    8. 使用自定义线程池时,记得关闭线程池,避免资源泄漏。

    7. 总结

    并行流是 Java 8 提供的一个强大工具,能够显著提升数据处理性能。但在使用时需要注意线程安全、任务顺序和线程池管理等问题。通过合理使用并行流,可以编写高效、灵活的代码。


    附录:完整代码

    
    

    java

    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class ParallelStreamDemo {
        public static void main(String[] args) {
            // 基本使用
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            numbers.parallelStream()
                   .forEach(num -> System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + num));
    
            // 自定义线程池
            ForkJoinPool customPool = new ForkJoinPool(4);
            customPool.submit(() -> {
                List<Integer> result = IntStream.rangeClosed(1, 10)
                                                .parallel()
                                                .map(i -> {
                                                    System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
                                                    return i * 2;
                                                })
                                                .boxed()
                                                .collect(Collectors.toList());
                System.out.println("结果: " + result);
            }).join();
            customPool.shutdown();
    
            // 保持顺序
            numbers.parallelStream()
                   .forEachOrdered(System.out::println);
    
            // 线程安全
            List<Integer> safeResult = IntStream.rangeClosed(1, 1000)
                                                .parallel()
                                                .boxed()
                                                .collect(Collectors.toList());
            System.out.println("结果大小: " + safeResult.size());
        }
    }

    希望这篇文章能帮助你更好地理解和使用 Java 的并行流!如果有任何问题,欢迎在评论区讨论!

    作者:向南怀瑾

    物联沃分享整理
    物联沃-IOTWORD物联网 » Java Stream 并行流详解:入门、应用及关键注意事项

    发表回复