文章摘要(AI生成)
流处理函数提供了对集合进行快速处理的功能,并具有声明式编程、并行处理、链式操作和避免副作用等优点。在实际执行时,通过ArrayList的stream()方法创建了一个Stream实例,利用ArrayListSpliterator来实现数据的遍历和分割。ArrayListSpliterator维护了当前的遍历位置和ArrayList内部数据的一致性,支持并行处理。通过trySplit方法可以分割list,而forEachRemaining方法则用于遍历list并执行相应操作。因此,使用流处理时,可以更高效地对集合进行操作,同时实现较好的性能和可维护性。
使用JDK的流处理函数可以对集合完成快捷的处理操作:
List<String> result = integers.stream().filter(integer -> integer > 5).map(integer -> "test" + integer).collect(Collectors.toList());
相比于for循环处理,流处理有以下优点:
- 声明式编程:流处理让代码更简洁、易读。
- 并行处理:流可以很容易地实现并行操作,提升性能。
- 链式操作:可以将多个操作链式连接,提高可维护性。
- 避免副作用:流操作通常是无状态的,更少产生副作用。
那么,在上述流处理在执行时源码是如何实现的呢?
1. 创建 Stream
ArrayList
的 stream()
方法创建一个 Stream
实例。这个 Stream
代表了数据的一个视图,但不立即计算任何内容。它实际上是一个懒加载的机制,只有在终端操作发生时才会执行计算。
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
这个spliterator方法创建了一个ArrayListSpliterator
, ArrayListSpliterator
实现了 Spliterator
接口,它负责遍历 ArrayList
内部的数组元素。ArrayListSpliterator
使用内部数组的起始和结束索引来维护当前的遍历位置,并能够按需分割(trySplit
)流。
@Override
public Spliterator<E> spliterator() {
return new ArrayListSpliterator<>(this, 0, -1, 0);
}
在流操作中,Spliterator
提供了一种有效的方式来遍历和分割 ArrayList
的数据,支持并行处理,并能维持 ArrayList
内部数据的一致性。流的创建和处理基于这个 Spliterator
和它操作的 ArrayList
内部数组数据结构。
static final class ArrayListSpliterator<E> implements Spliterator<E> {
private final ArrayList<E> list;
private int index; // current index, modified on advance/split
private int fence; // -1 until used; then one past last index
private int expectedModCount; // initialized when fence set
ArrayListSpliterator(ArrayList<E> list, int origin, int fence,
int expectedModCount) {
this.list = list; // OK if null unless traversed
this.index = origin;
this.fence = fence;
this.expectedModCount = expectedModCount;
}
... ...
//分割list
public ArrayListSpliterator<E> trySplit() {
int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
return (lo >= mid) ? null : // divide range in half unless too small
new ArrayListSpliterator<E>(list, lo, index = mid,
expectedModCount);
}
//遍历list
public void forEachRemaining(Consumer<? super E> action) {
int i, hi, mc; // hoist accesses and checks from loop
ArrayList<E> lst; Object[] a;
if (action == null)
throw new NullPointerException();
if ((lst = list) != null && (a = lst.elementData) != null) {
if ((hi = fence) < 0) {
mc = lst.modCount;
hi = lst.size;
}
else
mc = expectedModCount;
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
action.accept(e);
}
if (lst.modCount == mc)
return;
}
}
throw new ConcurrentModificationException();
}
... ...
}
2. 流的内部结构
流的内部结构包括以下几个核心组件:
Source
:流的源头,通常是集合、数组或其他数据结构。Pipeline
:流的操作链,包括中间操作和终端操作。Operator
:具体的操作(如map
,filter
,distinct
等),每个操作都有一个对应的实现类。
也就是说,当 ArrayList
的 Spliterator
被用作流的源(source)时,流基于 ArrayList
内部的数据结构进行创建。
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
ReferencePipeline
类中有三个内部类:Head
、StatelessOp
、StatefulOp
,在调用stream
方法创建流时会创建一个Head
,这个类代表了流管道的起点,负责将源数据提供给后续的中间操作。它可以将 Spliterator
传递给流操作链,并处理流的初始化:
@Override
public void forEach(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
@Override
public void forEachOrdered(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEachOrdered(action);
}
}
3.中间操作
中间操作包括状态操作和非状态操作
3.1.非状态操作(Stateless Operations)
非状态操作在流的处理过程中并不会改变流的状态。它们会返回一个新的流,该流应用了这些操作。具体实现如下:
-
map()
:创建一个映射操作的中间操作,这个操作会将每个元素映射到一个新值。内部使用了函数式接口Function
来处理映射。具体的实现通常是在Stream
的map()
方法中创建一个新的Stream
对象,并在该对象中维护一个映射函数。@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
-
filter()
:创建一个过滤操作的中间操作,内部会维护一个谓词(Predicate
),用来决定哪些元素应该被保留。filter()
方法也会返回一个新的流,包含过滤后的结果。@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
3.2. 状态操作(Stateful Operations)
状态操作可能会需要依赖于流中元素的状态或维护内部状态。例如:
-
distinct()
:去重操作内部使用了一个Set
来存储已见过的元素,并用来过滤重复元素。这些操作需要维护一个内部集合以检查和存储已见过的元素。@Override public final Stream<P_OUT> distinct() { return DistinctOps.makeRef(this); } //----DistinctOps static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) { return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { ... ... @Override Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); if (StreamOpFlag.DISTINCT.isKnown(flags)) { return sink; } else if (StreamOpFlag.SORTED.isKnown(flags)) { return new Sink.ChainedReference<T, T>(sink) { boolean seenNull; T lastSeen; @Override public void begin(long size) { seenNull = false; lastSeen = null; downstream.begin(-1); } @Override public void end() { seenNull = false; lastSeen = null; downstream.end(); } @Override public void accept(T t) { if (t == null) { if (!seenNull) { seenNull = true; downstream.accept(lastSeen = null); } } else if (lastSeen == null || !t.equals(lastSeen)) { downstream.accept(lastSeen = t); } } }; } else { return new Sink.ChainedReference<T, T>(sink) { Set<T> seen; @Override public void begin(long size) { seen = new HashSet<>(); downstream.begin(-1); } @Override public void end() { seen = null; downstream.end(); } @Override public void accept(T t) { if (!seen.contains(t)) { seen.add(t); downstream.accept(t); } } }; } } }; }
-
sorted()
:排序操作会将流中的元素收集到一个临时集合中,然后对集合进行排序,最后返回一个新的排序后的流。@Override public final Stream<P_OUT> sorted() { return SortedOps.makeRef(this); } //----SortedOps static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { return new OfRef<>(upstream); } //----OfRef private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { /** * Comparator used for sorting */ private final boolean isNaturalSort; private final Comparator<? super T> comparator; /** * Sort using natural order of {@literal <T>} which must be * {@code Comparable}. */ OfRef(AbstractPipeline<?, T, ?> upstream) { super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); this.isNaturalSort = true; // Will throw CCE when we try to sort if T is not Comparable @SuppressWarnings("unchecked") Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); this.comparator = comp; } /** * Sort using the provided comparator. * * @param comparator The comparator to be used to evaluate ordering. */ OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); this.isNaturalSort = false; this.comparator = Objects.requireNonNull(comparator); } @Override public Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); } ... ... }
4. 终端操作(Terminal Operations)
终端操作会触发实际的流处理并生成结果。它们会遍历流中的元素并应用所有之前定义的中间操作。常见的终端操作包括:
-
collect()
:将流中的元素收集到一个集合中,内部使用了Collector
来完成收集操作。Collector
定义了如何将元素累积到目标集合中。@Override public final <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super P_OUT> accumulator, BiConsumer<R, R> combiner) { return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner)); }
-
reduce()
:将流中的元素按照给定的归约操作合并为一个单一的结果。内部使用了BinaryOperator
来进行归约操作。@Override public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) { return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator)); }
ReducingSink
创建:
public static <T, U> TerminalOp<T, U>
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
@Override
public void begin(long size) {
state = seed;
}
@Override
public void accept(T t) {
state = reducer.apply(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
evaluate
方法通过调用集合拆分器,循环执行sink中的中间操作:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
//判断是否并发流处理
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//调用拆分器,循环执行管道中的操作
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
//执行不同中间操作
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
5. 总结
ArrayList
的 Stream
API 在底层使用了延迟计算的方式(lazy evaluation)和管道化处理(pipelining)。流操作的执行是懒惰的,只有在终端操作触发时才会进行实际计算。中间操作生成新的流,而终端操作则会对流进行遍历并执行实际的数据处理任务。这样,流的操作可以高效地处理数据,同时提供了清晰的 API 来进行数据转换和处理。
评论区