欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

steam流处理是怎么执行的?
(last modified Aug 18, 2024, 6:22 PM )
by
侧边栏壁纸
  • 累计撰写 181 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

steam流处理是怎么执行的?

橙序员
2024-08-18 / 0 评论 / 0 点赞 / 126 阅读 / 2,664 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

通过使用JDK的流处理函数,可以快速对集合进行处理操作。流的内部结构包括中间操作和终端操作,其中中间操作分为非状态操作和状态操作。流处理有多个优点,比如声明式编程、并行处理、链式操作和避免副作用。在流处理中,源码实现是通过创建Stream实例来代表数据视图,进行懒加载的机制,当终端操作发生时才执行计算。ArrayList的stream()方法创建了一个ArrayListSpliterator来遍历和分割ArrayList,Spliterator提供了有效的方式来处理ArrayList的流操作。通过以上流处理的源码实现,可以更好地了解流处理的内部机制。


使用JDK的流处理函数可以对集合完成快捷的处理操作:

List<String> result = integers.stream().filter(integer -> integer > 5).map(integer -> "test" + integer).collect(Collectors.toList());

相比于for循环处理,流处理有以下优点:

  1. 声明式编程:流处理让代码更简洁、易读。
  2. 并行处理:流可以很容易地实现并行操作,提升性能。
  3. 链式操作:可以将多个操作链式连接,提高可维护性。
  4. 避免副作用:流操作通常是无状态的,更少产生副作用。

那么,在上述流处理在执行时源码是如何实现的呢?

1. 创建 Stream

ArrayListstream() 方法创建一个 Stream 实例。这个 Stream 代表了数据的一个视图,但不立即计算任何内容。它实际上是一个懒加载的机制,只有在终端操作发生时才会执行计算。

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

这个spliterator方法创建了一个ArrayListSpliterator, ArrayListSpliterator 实现了 Spliterator 接口,它负责遍历 ArrayList 内部的数组元素。ArrayListSpliterator 使用内部数组的起始和结束索引来维护当前的遍历位置,并能够按需分割(trySplit)流。

    @Override
    public Spliterator<E> spliterator() {
        return new ArrayListSpliterator<>(this, 0, -1, 0);
    }

在流操作中,Spliterator 提供了一种有效的方式来遍历和分割 ArrayList 的数据,支持并行处理,并能维持 ArrayList 内部数据的一致性。流的创建和处理基于这个 Spliterator 和它操作的 ArrayList 内部数组数据结构。

    static final class ArrayListSpliterator<E> implements Spliterator<E> {

        private final ArrayList<E> list;
        private int index; // current index, modified on advance/split
        private int fence; // -1 until used; then one past last index
        private int expectedModCount; // initialized when fence set

        ArrayListSpliterator(ArrayList<E> list, int origin, int fence,
                             int expectedModCount) {
            this.list = list; // OK if null unless traversed
            this.index = origin;
            this.fence = fence;
            this.expectedModCount = expectedModCount;
        }

		... ...
        //分割list
        public ArrayListSpliterator<E> trySplit() {
            int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
            return (lo >= mid) ? null : // divide range in half unless too small
                new ArrayListSpliterator<E>(list, lo, index = mid,
                                            expectedModCount);
        }
		//遍历list
        public void forEachRemaining(Consumer<? super E> action) {
            int i, hi, mc; // hoist accesses and checks from loop
            ArrayList<E> lst; Object[] a;
            if (action == null)
                throw new NullPointerException();
            if ((lst = list) != null && (a = lst.elementData) != null) {
                if ((hi = fence) < 0) {
                    mc = lst.modCount;
                    hi = lst.size;
                }
                else
                    mc = expectedModCount;
                if ((i = index) >= 0 && (index = hi) <= a.length) {
                    for (; i < hi; ++i) {
                        @SuppressWarnings("unchecked") E e = (E) a[i];
                        action.accept(e);
                    }
                    if (lst.modCount == mc)
                        return;
                }
            }
            throw new ConcurrentModificationException();
        }
        
        ... ...
    }

2. 流的内部结构

流的内部结构包括以下几个核心组件:

  • Source:流的源头,通常是集合、数组或其他数据结构。
  • Pipeline:流的操作链,包括中间操作和终端操作。
  • Operator:具体的操作(如 map, filter, distinct 等),每个操作都有一个对应的实现类。

image-20240818180922047

也就是说,当 ArrayListSpliterator 被用作流的源(source)时,流基于 ArrayList 内部的数据结构进行创建。

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

ReferencePipeline类中有三个内部类:HeadStatelessOpStatefulOp,在调用stream方法创建流时会创建一个Head,这个类代表了流管道的起点,负责将源数据提供给后续的中间操作。它可以将 Spliterator 传递给流操作链,并处理流的初始化:

        @Override
        public void forEach(Consumer<? super E_OUT> action) {
            if (!isParallel()) {
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
                super.forEach(action);
            }
        }

        @Override
        public void forEachOrdered(Consumer<? super E_OUT> action) {
            if (!isParallel()) {
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
                super.forEachOrdered(action);
            }
        }

3.中间操作

中间操作包括状态操作和非状态操作

3.1.非状态操作(Stateless Operations)

非状态操作在流的处理过程中并不会改变流的状态。它们会返回一个新的流,该流应用了这些操作。具体实现如下:

  • map():创建一个映射操作的中间操作,这个操作会将每个元素映射到一个新值。内部使用了函数式接口 Function 来处理映射。具体的实现通常是在 Streammap() 方法中创建一个新的 Stream 对象,并在该对象中维护一个映射函数。

        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    
  • filter():创建一个过滤操作的中间操作,内部会维护一个谓词(Predicate),用来决定哪些元素应该被保留。filter() 方法也会返回一个新的流,包含过滤后的结果。

        @Override
        public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
            Objects.requireNonNull(predicate);
            return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            };
        }
    

3.2. 状态操作(Stateful Operations)

状态操作可能会需要依赖于流中元素的状态或维护内部状态。例如:

  • distinct():去重操作内部使用了一个 Set 来存储已见过的元素,并用来过滤重复元素。这些操作需要维护一个内部集合以检查和存储已见过的元素。

        @Override
        public final Stream<P_OUT> distinct() {
            return DistinctOps.makeRef(this);
        }
        
    	//----DistinctOps
        static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
            return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                          StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
               	... ...
    
                @Override
                Sink<T> opWrapSink(int flags, Sink<T> sink) {
                    Objects.requireNonNull(sink);
    
                    if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                        return sink;
                    } else if (StreamOpFlag.SORTED.isKnown(flags)) {
                        return new Sink.ChainedReference<T, T>(sink) {
                            boolean seenNull;
                            T lastSeen;
    
                            @Override
                            public void begin(long size) {
                                seenNull = false;
                                lastSeen = null;
                                downstream.begin(-1);
                            }
    
                            @Override
                            public void end() {
                                seenNull = false;
                                lastSeen = null;
                                downstream.end();
                            }
    
                            @Override
                            public void accept(T t) {
                                if (t == null) {
                                    if (!seenNull) {
                                        seenNull = true;
                                        downstream.accept(lastSeen = null);
                                    }
                                } else if (lastSeen == null || !t.equals(lastSeen)) {
                                    downstream.accept(lastSeen = t);
                                }
                            }
                        };
                    } else {
                        return new Sink.ChainedReference<T, T>(sink) {
                            Set<T> seen;
    
                            @Override
                            public void begin(long size) {
                                seen = new HashSet<>();
                                downstream.begin(-1);
                            }
    
                            @Override
                            public void end() {
                                seen = null;
                                downstream.end();
                            }
    
                            @Override
                            public void accept(T t) {
                                if (!seen.contains(t)) {
                                    seen.add(t);
                                    downstream.accept(t);
                                }
                            }
                        };
                    }
                }
            };
        }
    
  • sorted():排序操作会将流中的元素收集到一个临时集合中,然后对集合进行排序,最后返回一个新的排序后的流。

        @Override
        public final Stream<P_OUT> sorted() {
            return SortedOps.makeRef(this);
        }
        
    	//----SortedOps
    	static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
            return new OfRef<>(upstream);
        }
        
        //----OfRef
        private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
            /**
             * Comparator used for sorting
             */
            private final boolean isNaturalSort;
            private final Comparator<? super T> comparator;
    
            /**
             * Sort using natural order of {@literal <T>} which must be
             * {@code Comparable}.
             */
            OfRef(AbstractPipeline<?, T, ?> upstream) {
                super(upstream, StreamShape.REFERENCE,
                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
                this.isNaturalSort = true;
                // Will throw CCE when we try to sort if T is not Comparable
                @SuppressWarnings("unchecked")
                Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
                this.comparator = comp;
            }
    
            /**
             * Sort using the provided comparator.
             *
             * @param comparator The comparator to be used to evaluate ordering.
             */
            OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
                super(upstream, StreamShape.REFERENCE,
                      StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
                this.isNaturalSort = false;
                this.comparator = Objects.requireNonNull(comparator);
            }
    
            @Override
            public Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);
    
                // If the input is already naturally sorted and this operation
                // also naturally sorted then this is a no-op
                if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                    return sink;
                else if (StreamOpFlag.SIZED.isKnown(flags))
                    return new SizedRefSortingSink<>(sink, comparator);
                else
                    return new RefSortingSink<>(sink, comparator);
            }
    
            ... ...
        }
    

4. 终端操作(Terminal Operations)

终端操作会触发实际的流处理并生成结果。它们会遍历流中的元素并应用所有之前定义的中间操作。常见的终端操作包括:

  • collect():将流中的元素收集到一个集合中,内部使用了 Collector 来完成收集操作。Collector 定义了如何将元素累积到目标集合中。

        @Override
        public final <R> R collect(Supplier<R> supplier,
                                   BiConsumer<R, ? super P_OUT> accumulator,
                                   BiConsumer<R, R> combiner) {
            return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
        }
    
    
  • reduce():将流中的元素按照给定的归约操作合并为一个单一的结果。内部使用了 BinaryOperator 来进行归约操作。

        @Override
        public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
            return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
        }
    

ReducingSink创建:

    public static <T, U> TerminalOp<T, U>
    makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
        Objects.requireNonNull(reducer);
        Objects.requireNonNull(combiner);
        class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
            @Override
            public void begin(long size) {
                state = seed;
            }

            @Override
            public void accept(T t) {
                state = reducer.apply(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

evaluate方法通过调用集合拆分器,循环执行sink中的中间操作:

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
		//判断是否并发流处理
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

	@Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }

    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //调用拆分器,循环执行管道中的操作
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            //执行不同中间操作
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

5. 总结

ArrayListStream API 在底层使用了延迟计算的方式(lazy evaluation)和管道化处理(pipelining)。流操作的执行是懒惰的,只有在终端操作触发时才会进行实际计算。中间操作生成新的流,而终端操作则会对流进行遍历并执行实际的数据处理任务。这样,流的操作可以高效地处理数据,同时提供了清晰的 API 来进行数据转换和处理。

0

评论区