本文共 10423 字,大约阅读时间需要 34 分钟。
使用stream操作表达更高级的数据处理请求,Part 1
作者:Raoul-Gabriel Urma 译者:石头狮子(v1.lion@qq.com) 校对:吴京润
没有了集合你会怎么做?几乎每一个Java应用都建立和处理集合。对于许多编程任务而言,这是基础的技术:集合分组和处理数据。例如,你可能想要建立一个银行交易集合来代表用户的账户记录。然后,你想要处理所有的集合找出用户花费了多少金额。尽管集合如此重要,但是Java的实现远非完美。
首先,典型的集合处理模式有点像SQL操作,例如”查找”(查找最大值的交易)或”分组”(编组所有与杂货购买有关的交易)。大部分的数据库可以允许我们声明式地指定这些操作。例如,后面的SQL查询可以让我们找出最高值的交易ID:”SELECT id, MAX(value) from transactions”。
正如所见,我们并不需要去实现如何计算最大值(例如,使用循环,一个变量跟踪最大的值)。我仅需要表达我们需要的。这个原则意味着,你并不需要担忧如何明确地实现这些查询–这完全不需要你处理。为什么我们不能让集合做相同的事情呢?想想你使用循环一次又一次的重新实现了这些操作几次?
其次,我们怎样才能有效率的处理大型的集合?理论上讲,需要加快处理的速度,可能要使用多核架构。然而,写出并行处理的代码并不容易,而且也容易出错。
Java SE 8 解决了这个问题。 Java API的设计者使用新的称为Stream的抽象更新了API,使得可以声明式的处理数据。此外,streams可以使用多核心架构,而你并不需要写任何一行关于多核心处理的代码。听起来很美,确实是这样吗?这就是本系列文章要表述的内容。
在我们详细表述使用streams可以做什么之前,先让我们看看一个例子。以便有一个使用Java SE 8 streams新的编程方式的概念。假设我们需要找出所有类型为grocery的交易,返回以交易金额为降序的交易ID列表。Java SE 7中,我们所做的如Listing 1。Java SE 8中,我们所做的如Listing 2。
01 | List<Transaction> groceryTransactions = new Arraylist<>(); |
02 | for (Transaction t: transactions){ |
03 | if (t.getType() == Transaction.GROCERY){ |
04 | groceryTransactions.add(t); |
05 | } |
06 | } |
07 | Collections.sort(groceryTransactions, new Comparator(){ |
08 | public int compare(Transaction t1, Transaction t2){ |
09 | return t2.getValue().compareTo(t1.getValue()); |
10 | } |
11 | }); |
12 | List<Integer> transactionIds = new ArrayList<>(); |
13 | for (Transaction t: groceryTransactions){ |
14 | transactionsIds.add(t.getId()); |
15 | } |
Listing 1
1 | List<Integer> transactionsIds = |
2 | transactions.stream() |
3 | .filter(t -> t.getType() == Transaction.GROCERY) |
4 | .sorted(comparing(Transaction::getValue).reversed()) |
5 | .map(Transaction::getId) |
6 | .collect(toList()); |
Listing 2
Figure 1 描述了Java SE 8的代码。首先,我们使用List上可用的stream()方法从transactions(数据源)列表上取到stream。随后,几个操作(filter,sorted,map,collect)串联起来形成pipeline(管道),pipeline可以看成是对数据查询的一种形式。
Figure 1
可是,如何才能并行执行代码呢?对于Java SE 8来说,这是否容易做到:只要使用parallelStream()替换stream()方法,正如Listing 3所示,Streams API内部会分解你的查询,使用你电脑上的多个核心。
1 | </pre> |
2 | List<Integer> transactionsIds = transactions.parallelStream() |
3 | .filter(t -> t.getType() == Transaction.GROCERY) |
4 | .sorted(comparing(Transaction::getValue).reversed()) |
5 | .map(Transaction::getId) |
6 | .collect(toList()); |
Listing 3
不必担忧这段代码是否无法理解。我们会在下一章中继续探究代码是如何工作的。注意到lambda 表达式(例如, t-> t.getCategory() == Transaction.GROCERY),和方法引用(例如,Transaction::getId)的使用。这些概念目前你应该是熟悉的。
现在,已经看到stream作为有效表达的抽象,就像集合数据上的SQL操作。此外,这些操作可以简洁的使用lambda 表达式参数化。
在学习Java SE 8 streams系列文章之后,你应该能够使用Streams API写出类似Listing 3上的代码,表达出强有力的查询。
使用Streams基础
我们先从一些理论开始。一个stream的定义是什么?简短的定义是”从一个支持聚集操作的源上获取的一序列元素”。让我们逐个解释:序列元素:stream为特定元素类型值集合提供了一个接口。但是,stream并不实际存储元素;元素只在需要的时候被计算。
源:Stream从数据提供源上消费数据,源可以是集合、数组、I/O资源等。聚集操作,Stream支持类SQL的操作,和函数式编程语言的共通操作,例如 filter, map, reduce, find, match, sorted等等。此外,stream操作有两个基本的特征,使得其和集合操作有极大的不同。
管道:许多stream 操作返回stream自身。这可以让操作串联成一个大的管道。这也使得某些优化技术,例如惰性(laziness)和短路(short-circuiting)得以实现,这些概念我们都会在后面阐释。
内部迭代:与集合相比,集合的迭代是明确地(外部迭代),而stream操作执行的迭代你无法感知到。让我们重新看看之前的代码来阐述这个概念。Figure 2表述了Listing 2的更多细节。
Figure 2
首先,我们从transactions list上调用stream()获取到stream。数据源是transaction list,并且提供元素序列给stream。接下来,我们使用一系列stream上的聚合操作:filter (使用给定的predicate过滤元素), sorted (使用给定的comparator排序元素), and map (抽取信息)。所有这些操作除了collect之外,都返回stream。所以,这些操作可以串联形成一个管道,管道可以看成是对源查询的视图。所有的操作只有在调用collect的时候才会执行。collect操作会开始处理管道,返回结果(一些不是stream;例子上是List)。不要太关心collect;我们会在之后的文章中详细阐述。现在,你可以把collect看成一个需要指定如何聚集stream元素汇总成结果的操作。例子中,toList()则描述了需要从Stream转换为List。
在我们阐述stream的方法之前,暂停并回顾一下stream 和collection之间的不同。
集合与stream在序列元素上所提供接口的新概念,都同时在java上存在。所以,不同的是什么?简而言之,集合是关于数据的,stream是关于计算的。想想存储在DVD上的电影。这就是集合(可能是字节,又可能是帧–这里,我们并不关心),因为其包含所有的数据结构。现在我们想想相同的视频,当视频是互联网上的流的情况。则这个时候就是stream(比特或帧)。视频流播放器只需要下载用户现在观看位置之前的几帧,所以你才可以从流的起始开始播放,在这之前,流里面的数据已经是被计算过了(想象下足球直播流)。
粗略的讲,集合和stream之间的不同则是在处理计算的事情时。集合是一个内存上的数据结构,持有所有的这个数据结构的值–集合上的每个元素在要添加进集合之前都需要被计算。相反,stream概念上是固定的数据结构,流内的每个元素只在需要的时候计算。
使用Collection接口则需要用户来完成迭代(例如,使用称为foreach的增强for循环);这个被叫做外部迭代。
相反,Streams库使用内部迭代–为你执行迭代操作并且在某处维护执行结果;你仅仅只要提供一个函数说我要完成这个。Listing 4里面的的代码(使用集合的外部迭代)和Listing 5(使用stream的内部迭代)则阐述了这点不同。
1 | List<String> transactionIds = new ArrayList<>(); |
2 | for (Transaction t: transactions){ |
3 | transactionIds.add(t.getId()); |
4 | } |
Listing 4
1 | List<Integer> transactionIds = |
2 | transactions.stream() |
3 | .map(Transaction::getId) |
4 | .collect(toList()); |
Listing 5
Listing 4上,我们明确地顺序迭代transactions list,抽取出每个交易ID并添加给聚集器。相反,当使用stream,并没有明确地迭代。Listing 5上的代码建立一个查询,其中map操作参数化为抽取交易ID,collect操作转换结果Stream到List。
到目前为止,你应该明确知道stream是什么,并且你可以使用它。现在,让我们看看stream提供的其他操作,这些操作可以让你表达你自己的数据处理查询。
java.util .stream.Stream中的Stream接口定义了许多操作,主要可以分成两类。正如Figure 1里面的例子,可以看到如下的操作:
filter, sorted, 和map, 这些可以从管道上连接在一起的。
collect 关闭管道并放回结果。Stream 上可以连接的操作称为中间操作。因为其返回的类型是Stream。关闭stream管道的操作称为结束操作。其从管道上产生结果,例如List,一个整数,甚至是void(任何非stream类型)。
你也许会疑惑这些物质的重要性。当然,中间操作在stream管道上执行结束之前是不会执行;中间操作是惰性的(Lazy),主要是因为中间操作通常是合并的,并且被结束操作处理进通道。
01 | List<Integer> numbers = Arrays.asList( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ); |
02 | List<Integer> twoEvenSquares = |
03 | numbers.stream() |
04 | .filter(n -> { |
05 | System.out.println( "filtering " + n); |
06 | return n % 2 == 0 ; |
07 | }) |
08 | .map(n -> { |
09 | System.out.println( "mapping " + n); |
10 | return n * n; |
11 | }) |
12 | .limit( 2 ) |
13 | .collect(toList()); |
Listing 6
例如,看看Listing 6上的代码,计算给定number list上两个偶数的平方:
filtering 1
filtering 2 mapping 2 filtering 3 filtering 4 mapping 4因为limit(2)使用短路特性;我们需要只处理stream的部分,并非全部地返回结果。这和计算用and串联操作的布尔表达式有点类似:只要一个表达式返回false,我们可以推断出整个表达式返回false,而不用全部计算。这里,limit操作返回大小为2的stream。
当然,filter和map操作合并到相同的通道中。
总结下我们目前学习到的,宏观上处理stream包括这三件事:
一个数据源(例如集合),在数据源上执行的查询
串联的中间操作,这些操作形成stream管道 一个结束操作, 执行stream管道,并且产生结果。现在,先看看stream上可用的一些操作。查阅java.util .stream.Stream接口获取全部的列表,同样也是这篇文章后面引用的资源。
Filtering. 有几个操作可以用来从stream中过滤元素:
filter(Predicate): 使用predicate (java.util.function.Predicate)作为参数,并返回包含所有匹配给定predict元素的stream。distinct: 返回一个有唯一元素的stream(根据stream中元素的equals实现)。
limit(n): 返回一个不长于给定大小n的stream。 skip(n): 返回一个丢弃了前面n个元素的stream。Finding and matching. 一个通常的数据处理模式是决定是否某些元素匹配给定的属性。你可以使用anyMatch,allMatch和noneMatch操作来帮助你完成这些操作。所有这些操作使用Predicate作为参数,返回一个布尔值作为结果(因此,这些是决定式的操作)。例如,你可以使用allMatch检查transaction stream中所有交易额大于100的元素,如 Listing 7所示的。
1 | boolean expensive = transactions.stream() |
2 | .allMatch(t -> t.getValue() > 100 ); |
Listing 7
Stream接口提供 findFirst 和findAny操作,用于从stream中取回任意的元素。主要可以用于连接其他的stream操作,例如filter。
findFirst 和findAny返回Optional对象,如Listing 8所示。1 | Optional<Transaction> = transactions.stream() |
2 | .filter(t -> t.getType() == Transaction.GROCERY) |
3 | .findAny(); |
Listing 8
Optional<T>类(java.util .Optional)是一个容器类,用于代表一个值存在或不存在。Listing 8中,findAny可能并不会返回任何grocery类型的交易。
Optional类有一些方法用于测试元素是否存在。例如,如果有交易存在,我们可以选择使用ifPresent方法选择对optional对象上应用操作,如Listing 9(我们只是打印交易)。
1 | transactions.stream() |
2 | .filter(t -> t.getType() == Transaction.GROCERY) |
3 | .findAny() |
4 | .ifPresent(System.out::println); |
Listing 9
Mapping. Stream支持map方法,使用function(java.util.function.Function)作为参数用于映射stream中的元素到另外一种形式。function会应用到每一个元素,映射元素到新的元素。
例如,你可能想要从stream的每个元素中抽出信息。Listing 10的例子中,我们从一个list上返回每个词长度的list。Reducing. 目前,我们所见的结束操作返回boolean(allMatch等),void(forEach),或一个Optional对象(findAny等)。并且同样已经使用collect组合所有stream中的元素为List。
1 | List<String> words = Arrays.asList( "Oracle" , "Java" , "Magazine" ); |
2 | List<Integer> wordLengths = |
3 | words.stream() |
4 | .map(String::length) |
5 | .collect(toList()); |
Listing 10
当然,你同样可以组合stream中的所有元素表述成更复杂的处理请求,例如,最高ID的交易是什么?或计算所有交易额的总数。
这可以使用stream上的reduce操作,这个操作重复地为每个元素应用操作(例如,添加两个数字),直到产生结果。函数式程序中一般称这操作为折叠操作(fold),你可以把这个操作看成是重复地折叠纸张的一部分(你的stream),直到形成一个小正方形,这就是折叠操作的结果。
先看下我们如何使用for循环计算list的和:
1 | int sum = 0 ; |
2 | for ( int x : numbers) { |
3 | sum += x; |
4 | } |
Numbers list上的每个元素重复地使用添加操作来产生一个结果。实际上,我们缩小numbers list到一个数值。代码中则有两个参数:sum变量的初始值,例子上为0,和组合所有list元素的操作,例子上为+。
使用stream的reduce方法,我们可以累加所有的stream元素。如 Listing 11所示的。
reduce方法使用两个参数:
1 | int sum = numbers.stream().reduce( 0 , (a, b) -> a + b); |
Listing 11
一个初始值,0
BinaryOperator<T>,用于组合两个元素并产生一个新的值。
reduce方法本质上抽象了重复的应用模式。其他查询例如”计算产品”或”计算最大值(见Listing 12)”则是成为reduce方法的特定例子。
1 | int product = numbers.stream().reduce( 1 , (a, b) -> a * b); |
2 | int product = numbers.stream().reduce( 1 , Integer::max); |
Listing 12
现在,已经看过了使用reduce方法用于计算整数stream和的例子。但是,这其中还是有一定的开销:我们执行多次装箱(boxing)操作,重复的在integer对象上求和。如果可以调用一个sum方法,可能会更好一点,正如Listing 13所示,是否更明确我们代码的目的?
1 | int statement = transactions.stream() |
2 | .map(Transaction::getValue) |
3 | .sum(); // error since Stream has no sum method |
Listing 13
Java SE 8 引入3个特定的primitive stream接口用于处理这个问题–IntStream,DoubleStream和LongStream–各自代表stream中的元素是int,double和long。
通常要转换stream到特定版本的stream所执行的方法是mapToInt,mapToDouble和mapToLong。这些方法工作起来完全像是我们之前见到的map方法,不同的是这些方法返回特定的stream而不是Stream<T>。例如,我们可以改进Listing 13的代码,如Listing 14所展示的。你同样可以通过装箱(boxed)操作从primitive stream转换为某个对象stream。
1 | int statementSum = |
2 | transactions.stream() |
3 | .mapToInt(Transaction::getValue) |
4 | .sum(); // works! |
Listing 14
最后,另一个numeric streams有用的形式是数字范围(numeric ranges)。例如,你可能想要产生所有1到100之间的数值。Java SE 8则引入了 IntStream, DoubleStream, 和LongStream上可用的2个静态方法辅助产生这样的范围:range和rangeClosed。
这两个方法都使用范围的起始作为首个参数,范围的结束作为第二个参数。range方法是开区间,而rangeClosed是闭区间的。 Listing 15则是一个使用rangeClose方法的例子,返回10到30之间数值的stream。
1 | IntStream oddNumbers = |
2 | IntStream.rangeClosed( 10 , 30 ) |
3 | .filter(n -> n % 2 == 1 ); |
Listing 15
有几种方式用于构建stream。我们已经看到如何从集合上获取到stream。同样,我也使用了number stream。你同样可以从值、数组或文件上建立stream。此外甚至可以从一个函数上获取stream 来产生无限的stream。
从值或从数组上建立stream十分简单:只要为值调用Stream.of的静态方法和为数组调用Arrays.stream生成。如 Listing 16所示。
1 | Stream<Integer> numbersFromValues = Stream.of( 1 , 2 , 3 , 4 ); |
2 | int [] numbers = { 1 , 2 , 3 , 4 }; |
3 | IntStream numbersFromArray = Arrays.stream(numbers); |
Listing 16
同样也可以使用Files.lines静态方法将文件转换为一个stream。例如,Listing 17计算文件中的行数。
1 | long numberOfLines = |
2 | Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset()) |
3 | .count(); |
Listing 17
Infinite streams. 最后,在我们结束关于stream的这篇文章之前,还有一个令人兴奋的概念。到目前为止,应该理解stream内的元素是按需产生的。这里有两个静态方法–Stream.iterate 和 Stream.generate可以从函数上建立stream。然而,由于元素是按需计算的,这两个操作可以一直产生元素。这就是为什么称为 infinite stream:没有固定大小的stream,与我们从固定集合建立的流相比。
Listing 18 是使用iterate的例子,创建一个所有10倍数的数字stream。Iterate方法使用一个初始值(例子上是,0)和一个用于连续地产生每个新值的lambda(类型为UnaryOperator<T>)。
1 | Stream<Integer> numbers = Stream.iterate( 0 , n -> n + 10 ); |
Listing 18
我们可以把这个无限的stream转换成固定大小的stream,通过使用limit操作。例如,我们可以限制stream的大小为5,如Listing 19所示。1 | numbers.limit( 5 ).forEach(System.out::println); // 0, 10, 20, 30, 40 |
Listing 19
Java SE 8 引入的stream API,可以让我们表达更复杂的数据处理逻辑。本文中,你已经看到stream支持许多方法,例如filter,map,reduce和iterate,这些方法组合可以写出简洁的代码并表达数据处理查询。这种新的代码编写方式与Java SE8 之前你要处理的集合十分的不同。显然,这有许多好处。首先,Stream API使用了许多技术,例如惰性和短路来优化数据处理查询。其次,stream可以是并行自动地使用多核心架构。本系列的下一章节中,我们会表述更高级的操作,例如flatMap和collect。