Java8 Stream源码精讲中间操作原理详解

通过分析创建Stream的过程,详细介绍了Spliterator接口定义,Spliterator子类的实现细节,Spliterator在Stream中的调用时机,以及代表源阶段Stream的Head类结构。本章将继续带着大家深入理解什么是Stream中间操作,进入每一个中间操作的源码了解我们定义的lambda表达式是如何在流上处理数据的。

中间操作

Stream是惰性流,中间操作只是将lambda表达式记录下来,返回一个新的Stream,只有终止操作被调用时,才会触发计算。这样可以保证数据被尽量少的遍历,这也是Stream高效的原因之一。中间操作分为无状态操作和有状态操作,无状态操作不会存储元素的中间状态,有状态操作会保存元素中间状态。

我看到有这样的说法:无状态操作是指元素的处理不受之前元素的影响;有状态操作是指该操作只有拿到所有元素之后才能继续下去。实际上真的是这样吗?我们后面会通过分析源码寻找答案。现在先来看看中间操作的划分吧:

操作分类 方法
无状态操作 filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() unordered()
有状态操作 distinct() sorted() limit() skip()

无状态操作

在Java8 Stream源码精讲(一):从一个简单的例子入手 中提到,map()和filter()中间操作被调用之后,返回的是一个StatelessOp匿名子类的实例。通过类继承结构可以看到,它跟Head一样都是继承ReferencePipeline,不同的是它是一个抽象类,所以具体的逻辑还是放在子类中的。

实际上StatelessOp就代表无状态中间操作,它将操作声明的lambda函数保存在某个地方,在适当的时机调用。

abstract static class StatelessOp         extends ReferencePipeline {          //传入上一个阶段的Stream,构建双向链表     StatelessOp(AbstractPipeline upstream,                 StreamShape inputShape,                 int opFlags) {         super(upstream, opFlags);         assert upstream.getOutputShape() == inputShape;     }      //标识当前是一个无状态操作     @Override     final boolean opIsStateful() {         return false;     } } 复制代码

可以看到StatelessOp并没有多少内容,只是可以通过构造器与上一个Stream连接成一个链表,然后实现了opIsStateful()方法。还得看一下它的父类的父类AbstractPipeline的构造函数:

AbstractPipeline(AbstractPipeline previousStage, int opFlags) {     if (previousStage.linkedOrConsumed)         throw new IllegalStateException(MSG_STREAM_LINKED);     previousStage.linkedOrConsumed = true;     //构建双向链表的过程     previousStage.nextStage = this;      this.previousStage = previousStage;     this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;     this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);     this.sourceStage = previousStage.sourceStage;     //每一次中间操作,如果是有状态的,那么sourceStage的sourceAnyStateful会被标记为true     if (opIsStateful())         sourceStage.sourceAnyStateful = true;     this.depth = previousStage.depth + 1; } 复制代码

下面我们来看看每一个中间操作方法有哪些异同吧。

filter()方法

filter()方法大家都很熟悉了,返回一个匹配predicate函数的元素组成的Stream。

public final Stream filter(Predicate predicate) {     Objects.requireNonNull(predicate);     //把当前阶段的Stream传入,将新返回的StatelessOp加入链表末尾     return new StatelessOp(this, StreamShape.REFERENCE,                                  StreamOpFlag.NOT_SIZED) {         @Override         Sink opWrapSink(int flags, Sink sink) {             return new Sink.ChainedReference(sink) {                 @Override                 public void begin(long size) {                     downstream.begin(-1);                 }                  //predicate不会被立即被调用,会在恰当的时机触发                 @Override                 public void accept(P_OUT u) {                     if (predicate.test(u))                         downstream.accept(u);                 }             };         }     }; } 复制代码

跟上面分析的一样,调用filter()会返回一个新的StatelessOp,与上一个Stream组成链表,lambda表达式不会被马上调用,只是保存在内部。 它被调用的地方是Sink.ChainedReference的accept()方法。

这里大家可能被绕晕,Sink是什么?ChainedReference又是什么,它内部的downstream又是什么?

Sink接口扩展自Consumer,用于在流管道的各个阶段传递元素,并使用其begin()、end()和cancellationRequested()方法来管理大小信息、控制流。

interface Sink extends Consumer {          default void begin(long size) {}      default void end() {}      default boolean cancellationRequested() {         return false;     } } 复制代码
  • begin()方法:在Stream中的元素传递给sink之前会调用这个方法,对于有状态操作,通常会做一些初始化工作,参数是元素大小,如果大小不确定,传递-1。
  • accept()方法:Stream中的每一个元素都会经过accept()方法进行逻辑处理。
  • end()方法:当所有元素都被sink处理了,会调用这个方法表示结束,对于有状态操作,会做清理工作以及将结果发送给下一个sink。
  • cancellationRequested():如果返回true,表示sink不再处理Stream中后续的元素,用于短路操作。

ChainedReference实现了Sink接口,通过一个downstream变量来连接下游的sink形成一个sink链。begin()、end()、cancellationRequested()方法都直接向下游传递。

static abstract class ChainedReference implements Sink {     protected final Sink downstream;      //downstream为传入下游的sink对象,形成链表     public ChainedReference(Sink downstream) {         this.downstream = Objects.requireNonNull(downstream);     }      @Override     public void begin(long size) {         downstream.begin(size);     }      @Override     public void end() {         downstream.end();     }      @Override     public boolean cancellationRequested() {         return downstream.cancellationRequested();     } } 复制代码

前两章讲过,调用终止操作时会从Pipeline链表的尾部向前遍历直到Head头节点(不包含),每遍历到一个节点,就会调用它的opWrapSink()方法,通过downstream连接传入的后一个节点的sink对象,返回一个新的sink对象。最终形成一个从第一个中间操作到终止操作(终止操作就是一个特殊的sink)的sink链。

比如调用stream.map().filter().forEach()就会形成下面这样一个结构:

我们再来看filter()方法中的sink:

return new Sink.ChainedReference(sink) {     @Override     public void begin(long size) {         downstream.begin(-1);     }      @Override     public void accept(P_OUT u) {         if (predicate.test(u))             downstream.accept(u);     } }; 复制代码
  • 重写了begin()方法,因为经过filter过滤,传递给下游sink的元素大小是未知的,所以这里传入的是-1。
  • 实现了accept()方法,元素只有匹配predicate,才会传递给下游sink。

map()方法

map()方法,返回一个原Stream中的元素经过参数mapper函数转换的结果组成的Stream。

public final  Stream map(Function mapper) {     Objects.requireNonNull(mapper);     return new StatelessOp(this, StreamShape.REFERENCE,                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {         @Override         Sink opWrapSink(int flags, Sink sink) {             return new Sink.ChainedReference(sink) {                 //其它的都跟filter()类似,重点关注这里                 @Override                 public void accept(P_OUT u) {                     downstream.accept(mapper.apply(u));                 }             };         }     }; } 复制代码
  • 实现了accept()方法,将元素经过mapper函数转化之后再传递给下游的sink处理。

flatMap()方法

flatMap()方法,返回一个Stream,这个Stream中的元素是原Stream中的每一个元素经过mapper函数转换为一个新的Stream中的元素合并的结果。这样说很拗口,我们还是来看源码吧。

public final  Stream flatMap(Function> mapper) {     Objects.requireNonNull(mapper);     // We can do better than this, by polling cancellationRequested when stream is infinite     return new StatelessOp(this, StreamShape.REFERENCE,                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {         @Override         Sink opWrapSink(int flags, Sink sink) {             return new Sink.ChainedReference(sink) {                 //重写了begin()方法                 @Override                 public void begin(long size) {                     downstream.begin(-1);                 }                                  //重点关注这里                 @Override                 public void accept(P_OUT u) {                     //每一个元素都会经过mapper映射为一个Stream                     try (Stream result = mapper.apply(u)) {                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it                         if (result != null)                             //然后遍历Stream中的元素,传递给下游sink处理                             result.sequential().forEach(downstream);                     }                 }             };         }     }; } 复制代码

源码还是比较容易理解的。

  • 重写了begin()方法,因为每个元素都被转换成一个Stream,所以经过flatMap()处理之后的元素大小是未知的,所以传递的参数是-1。
  • 实现了accept()方法,每一个元素都会被mapper函数映射成一个新的Stream,然后遍历这个Stream,将其中的元素传递给下游的sink继续处理。

peek()方法

peek()方法,返回一个新的Stream,其中的元素就是原Stream中的元素,只是每一个元素都会被action额外的处理。这个方法大家应该用得少,主要是用于Stream调试,千万别理解成forEach()方法哈,这是一个中间操作,没有调用终止操作的话是不会被触发的。

public final Stream peek(Consumer action) {     Objects.requireNonNull(action);     return new StatelessOp(this, StreamShape.REFERENCE,                                  0) {         @Override         Sink opWrapSink(int flags, Sink sink) {             return new Sink.ChainedReference(sink) {                 //关注这里                 @Override                 public void accept(P_OUT u) {                     action.accept(u);                     downstream.accept(u);                 }             };         }     }; } 复制代码
  • 每一个元素都会被action的accept()方法处理一下,然后继续传递给下游sink。

unordered()方法

unordered()方法,返回一个无序的Stream。

public Stream unordered() {     if (!isOrdered())         return this;     return new StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {         @Override         Sink opWrapSink(int flags, Sink sink) {             return sink;         }     }; } 复制代码

有状态操作

调用有状态操作,返回的是StatefulOp的子类实例,它跟StatelessOp一样,都继承自ReferencePipeline。

abstract static class StatefulOp         extends ReferencePipeline {      StatefulOp(AbstractPipeline upstream,                StreamShape inputShape,                int opFlags) {         super(upstream, opFlags);         assert upstream.getOutputShape() == inputShape;     }      @Override     final boolean opIsStateful() {         return true;     }      @Override     abstract  Node opEvaluateParallel(PipelineHelper helper,                                                    Spliterator spliterator,                                                    IntFunction generator); } 复制代码

唯一不同的是opIsStateful()返回true。

distinct()方法

distinct()方法,返回一个原Stream中不同元素组成的Stream,也就是将元素去重。

public final Stream distinct() {     return DistinctOps.makeRef(this); } 复制代码

distinct()调用了DistinctOps#makeRef()工厂方法,进入里面详细分析:

static  ReferencePipeline makeRef(AbstractPipeline upstream) {     //省略了并行流处理相关的方法     return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE,                                                   StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {          @Override         Sink opWrapSink(int flags, Sink sink) {             Objects.requireNonNull(sink);                          //1.如果原Stream中的元素已经不重复了,则直接返回下一个节点的sink             if (StreamOpFlag.DISTINCT.isKnown(flags)) {                 return sink;             //2.原Stream中的元素已经是有序的             } else if (StreamOpFlag.SORTED.isKnown(flags)) {                ...             //3.原Stream中的元素是无序的             } else {                ...             }         }     }; } 复制代码

opWrapSink()方法内部非常复杂,主要是根据原Stream的标记是否重复、是否有序来判断,然后返回不同的Sink,我们分开来看。

  1. 原Stream元素已经不重复,则直接返回下游的Sink对象,不再经过额外处理。
if (StreamOpFlag.DISTINCT.isKnown(flags)) {         return sink;     } 复制代码
  1. 原Stream中的元素是有序的。
return new Sink.ChainedReference(sink) {                     //是否已经有为null的元素被当前sink处理                     boolean seenNull;                     //最后被当前sink处理的元素                     T lastSeen;                                          //Stream中的元素被处理之前被调用                     @Override                     public void begin(long size) {                         //表示还没有为null的元素被处理                            seenNull = false;                         //没有最后被当前sink处理的元素                         lastSeen = null;                         //调用下一下sink的begin()方法,结果distinct()处理过后元素大小未知,所以传-1                         downstream.begin(-1);                     }                                          //清理状态                     @Override                     public void end() {                         seenNull = false;                         lastSeen = null;                         downstream.end();                     }                      @Override                     public void accept(T t) {                         //这里的逻辑是如果传入的元素是null,并且之前没有处理过,                         //则向下游sink传递,并且更改seenNull和lastSeen的值;否则不向下游传递                         if (t == null) {                             if (!seenNull) {                                 seenNull = true;                                 downstream.accept(lastSeen = null);                             }                         //元素为非null的值,则判断是否跟最后一次被处理的元素相同,                         //不同则表示不重复,向下游sink传递;否则元素重复,不再传递                         } else if (lastSeen == null || !t.equals(lastSeen)) {                             downstream.accept(lastSeen = t);                         }                     }                 }; 复制代码

元素是按顺序被sink处理的,所以可以通过seenNull和lastSeen两个变量判断元素是否重复。

  1. 原Stream中的元素是无序的
return new Sink.ChainedReference(sink) {                     Set seen;                      //元素被处理之前,做初始化工作                     @Override                     public void begin(long size) {                         //初始化一个HashSet                         seen = new HashSet<>();                         //去重之后元素大小未知,所以传递-1                         downstream.begin(-1);                     }                      //清理工作                     @Override                     public void end() {                         seen = null;                         downstream.end();                     }                      @Override                     public void accept(T t) {                         //Set中没有相同的元素,则添加,向下游sink传递                         if (!seen.contains(t)) {                             seen.add(t);                             downstream.accept(t);                         }                     }                 }; 复制代码

对于无序的元素,是通过使用一个HashSet来去重的。

sorted()方法

sorted()方法,返回一个将元素排序之后的Stream,有两个重载方法,一个表示按照自然顺序排序,一个利用比较器排序。

我们来分析带比较器的方法:

public final Stream sorted(Comparator comparator) {     return SortedOps.makeRef(this, comparator); } 复制代码

SortedOps#makeRef():

static  Stream makeRef(AbstractPipeline upstream,                             Comparator comparator) {     return new OfRef<>(upstream, comparator); } 复制代码

返回OfRef对象,OfRef继承自ReferencePipeline.StatefulOp:

private static final class OfRef extends ReferencePipeline.StatefulOp  复制代码

我们还是看它的opWrapSink()方法:

public Sink opWrapSink(int flags, Sink sink) {     Objects.requireNonNull(sink);          //1.原Stream是有序的,并且排序规则是自然顺序     // If the input is already naturally sorted and this operation     // also naturally sorted then this is a no-op     if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)         return sink;     //2.元素大小确定     else if (StreamOpFlag.SIZED.isKnown(flags))         return new SizedRefSortingSink<>(sink, comparator);     //3.元素大小不确定     else         return new RefSortingSink<>(sink, comparator); } 复制代码

还是根据原Stream中元素的标记情况走不同的逻辑,我们分开看:

  1. 原Stream是有序的,并且需要排序的规则是自然顺序,则直接返回下游的sink节点,不做操作
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)     return sink; 复制代码
  1. 元素大小确定的情况,返回SizedRefSortingSink对象,我们进入SizedRefSortingSink类分析:
private static final class SizedRefSortingSink extends AbstractRefSortingSink {     //临时保存元素的数组     private T[] array;     //数组下标偏移量     private int offset;      SizedRefSortingSink(Sink sink, Comparator comparator) {         super(sink, comparator);     }      @Override     @SuppressWarnings("unchecked")     public void begin(long size) {         if (size >= Nodes.MAX_ARRAY_SIZE)             throw new IllegalArgumentException(Nodes.BAD_SIZE);         //初始化用于排序的数组         array = (T[]) new Object[(int) size];     }      @Override     public void end() {         //利用比较器排序数组元素         Arrays.sort(array, 0, offset, comparator);         //排序完毕,调用下游sink         downstream.begin(offset);         //判断是否短路,没有取消请求则非短路,将排序之后的元素依次发送给下游sink         if (!cancellationWasRequested) {             for (int i = 0; i < offset; i++)                 downstream.accept(array[i]);         }         //短路,可能发送部分元素就结束         else {             for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)                 downstream.accept(array[i]);         }         downstream.end();         array = null;     }      @Override     public void accept(T t) {         //每一个元素经过排序sink处理,只是将元素缓存到数组上         array[offset++] = t;     } } 复制代码
  • 对于已知元素大小的sorted()操作,会利用一个数组来排序,在元素被发送给排序sink之前,先初始化一个数组,然后每一个元素发送给sink都保存到数组上,这个时候不会传递给下游sink,当sink#end()被调用时,表示再没有元素了,开始排序,然后按顺序将数组元素传递给下游的sink。

3.元素大小不确定的情况,返回的是RefSortingSink对象。

private static final class RefSortingSink extends AbstractRefSortingSink {     private ArrayList list;      RefSortingSink(Sink sink, Comparator comparator) {         super(sink, comparator);     }      @Override     public void begin(long size) {         if (size >= Nodes.MAX_ARRAY_SIZE)             throw new IllegalArgumentException(Nodes.BAD_SIZE);         list = (size >= 0) ? new ArrayList((int) size) : new ArrayList();     }      @Override     public void end() {         list.sort(comparator);         downstream.begin(list.size());         if (!cancellationWasRequested) {             list.forEach(downstream::accept);         }         else {             for (T t : list) {                 if (downstream.cancellationRequested()) break;                 downstream.accept(t);             }         }         downstream.end();         list = null;     }      @Override     public void accept(T t) {         list.add(t);     } } 复制代码
  • 可以看到它跟SizedRefSortingSink类似,不同的是由于元素大小未知,所以是利用一个ArrayList来缓存元素、排序的。

limit()方法

limit()方法,返回一个由原Stream中的元素组成的新的Stream,Stream中的元素是原Stream中从第一个元素开始,不超过maxSize的部分元素。

public final Stream limit(long maxSize) {     if (maxSize < 0)         throw new IllegalArgumentException(Long.toString(maxSize));     return SliceOps.makeRef(this, 0, maxSize); } 复制代码

limit()方法校验了maxSize参数,然后还是利用工厂方法SliceOps#makeRef()来创建一个StatefulOp:

public static  Stream makeRef(AbstractPipeline upstream,                                     long skip, long limit) {     if (skip < 0)         throw new IllegalArgumentException("Skip must be non-negative: " + skip);      return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE,                                                   flags(limit)) {         //省略了方法         ......         }; } 复制代码

注意makeRef()的参数,upstream代表上一个Stream,skip代表跳过多少元素,limit代表传递给下游sink的元素最大值。我们还是来看ReferencePipeline.StatefulOp实现的opWrapSink()方法:

Sink opWrapSink(int flags, Sink sink) {     return new Sink.ChainedReference(sink) {         //上面传递的参数赋值,对于limit()操作,skip是0         long n = skip;         long m = limit >= 0 ? limit : Long.MAX_VALUE;          @Override         public void begin(long size) {             //计算经过limit()操作之后元素大小,传递给下游             downstream.begin(calcSize(size, skip, m));         }          @Override         public void accept(T t) {             //limit()操作n是0,将元素传递给下游sink,同时m-1,直到m等于0,后面的元素不再传递             if (n == 0) {                 if (m > 0) {                     m--;                     downstream.accept(t);                 }             }             //这是skip()的逻辑,开始的n个元素不传递,直到n等于0之后,再走上面的逻辑             else {                 n--;             }         }          @Override         public boolean cancellationRequested() {             //判断短路             return m == 0 || downstream.cancellationRequested();         }     }; } 复制代码
  • begin()方法就是将经过limit()处理之后的元素大小传递给下游sink,看下它的计算逻辑:
private static long calcSize(long size, long skip, long limit) {     //如果大小未知,直接返回-1,否则比较limit和size-skip的最小值     return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1; } 复制代码
  • accept()方法的逻辑注释在代码上了。

skip()方法

skip()方法,返回一个丢弃了Stream中前n个元素之后,剩余的元素组成的新的Stream。

public final Stream skip(long n) {     if (n < 0)         throw new IllegalArgumentException(Long.toString(n));     if (n == 0)         return this;     else         return SliceOps.makeRef(this, n, -1); } 复制代码

可以看到跟limit()一样,返回的同样是由SliceOps#makeRef()创建的StatefulOp,注意传入参数的区别,不再展开讲解。

总结

本章先解释了Stream中间操作的概念,然后说明了无状态操作和有状态操作怎样划分,以及哪些方法属于无状态操作,哪些属于有状态操作。通过分析源码的方式,深入理解了分别代表无状态和有状态操作的StatelessOp和StatefulOp,最后分析了每一个中间操作方法是如何通过继承StatelessOp或StatefulOp而具备相应功能的。