前置条件
先定义一个接口
public interface MyInterface {
int sum(int i, int j);
}
2
3
# 传统方式
//实现类的方式
public class MyInterfaceImpl implements MyInterface {
@Override
public int sum(int i, int j) {
return i + j;
}
}
2
3
4
5
6
7
public class Main {
public static void main(String[] args) {
// 传统方式
// 方式 1
MyInterface myInterface = new MyInterfaceImpl();
int res = myInterface.sum(1, 2);
System.out.println(res);
// 方式 2 内部类实现
MyInterface myInterface1 = new MyInterface() {
@Override
public int sum(int i, int j) {
return i + j;
}
};
res = myInterface1.sum(1, 2);
System.out.println(res);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Lambda
public class Main {
public static void main(String[] args) {
MyInterface myInterface = (i, j) -> i + j;
// 更加简洁的写法
// MyInterface myInterface = Integer::sum;
int res = myInterface.sum(1, 2);
System.out.println(res);
}
}
2
3
4
5
6
7
8
9
Lambda 表达式的语法糖: 参数列表 + 箭头 + 方法体 。函数式接口是 Lambda 表达式的目标类型。
# 函数式接口
- 单一抽象方法(Single Abstract Method, SAM):函数式接口只能有一个抽象方法。这个方法也被称为函数式接口的函数描述符。
- 可以包含默认方法和静态方法:函数式接口可以包含默认方法(使用
default关键字定义的方法)和静态方法,这些方法可以有具体的实现,不违反函数式接口的定义。 - 可以包含 Object 类中的公共方法:函数式接口可以声明
equals,hashCode和toString等方法,因为这些方法是从Object类继承而来的,不是新增的抽象方法。
简单来说:
- 只要是函数式接口就可以用 Lambda 表达式简化
- 接口中有且只有一个未实现的方法,这个接口就叫函数式接口
//在Lambda中的例子其实就是一个函数式接口的实现
//参数类型可以不写,只写(参数名),参数变量名随意定义;
interface MyHaha {
int haha();
}
MyHaha myHaha = () -> 2;
res = myHaha.haha();
System.out.println(res);
interface MyHehe {
int hehe(int i);
}
MyHehe myHehe = x -> x + 1;
res = myHehe.hehe(1);
System.out.println(res);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 如何识别
通过 @FunctionalInterface 注解可以检测当前这个接口是否符合规范,例如
@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {
int hehe(int i);
// 如定义中所描述,有默认实现的也可以是个函数式接口
default int heihei() {
return hehe(10);
}
// 但如果是个抽象的定义,这种就会发生错误
// int lala(int i);
}
2
3
4
5
6
7
8
9
10
# java.util.function
最核心的四个函数式接口:
Consumer、Function、Supplier和Predicate
好的,我们来详细地聊一聊 Java 8 引入的 java.util.function 包下最核心的四个函数式接口: Consumer 、 Function 、 Supplier 和 Predicate 。
这四个接口是 Java 函数式编程的基石,它们分别代表了四种最常见的行为模式:消费、转换、供给 和 判断。理解了它们,你就掌握了使用 Lambda 表达式和 Stream API 的精髓。
# Consumer - 消费者
核心概念
Consumer 的名字很直观,它是一个 “消费者”。它的作用是接收一个参数,对其进行处理(消费),但不返回任何结果。
你可以把它想象成一个只干活不汇报的员工。你给他一个任务(输入参数),他执行完毕后,不会给你任何反馈(返回 void )。
核心方法
void accept(T t);
- 参数
T t: 要被消费的数据。 - 返回值
void: 没有返回值。
代码示例
示例 1:基本用法
import java.util.function.Consumer;
public class ConsumerExample {
public static void main(String[] args) {
// 定义一个 Consumer,它接收一个字符串并打印到控制台
Consumer<String> printer = str -> System.out.println("打印: " + str);
// 调用 accept 方法来消费数据
printer.accept("Hello, Consumer!");
// 输出: 打印: Hello, Consumer!
}
}
2
3
4
5
6
7
8
9
10
11
12
示例 2:在 Stream 中的应用
Consumer 最常见的应用场景就是 Stream.forEach() 方法。 forEach 需要一个 Consumer 来遍历流中的每一个元素。
import java.util.Arrays;
import java.util.List;
public class ConsumerStreamExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
// 使用 Lambda 表达式
System.out.println("--- 使用 Lambda ---");
names.forEach(name -> System.out.println("Hello, " + name));
// 使用方法引用 (Method Reference),更简洁
System.out.println("--- 使用方法引用 ---");
// System.out::println 等价于 s -> System.out.println(s)
names.forEach(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
默认方法
Consumer 接口还有一个非常有用的默认方法 andThen ,用于链式调用。它允许你将多个 Consumer 组合起来,按顺序执行。
Consumer<String> printer = System.out::println;
Consumer<String> upperCasePrinter = str -> System.out.println(str.toUpperCase());
// 组合两个 Consumer,先执行 printer,再执行 upperCasePrinter
Consumer<String> combinedConsumer = printer.andThen(upperCasePrinter);
combinedConsumer.accept("hello");
// 输出:
// hello
// HELLO
2
3
4
5
6
7
8
9
10
# Function - 函数
核心概念
Function 代表一个 “函数”。它的作用是接收一个参数,经过处理后,返回一个结果。这是最符合数学中函数概念(f (x) = y)的接口。
你可以把它想象成一个加工厂,你送入原材料(输入参数),它给你产出成品(返回值)。
核心方法
R apply(T t);
- 参数
T t: 输入数据的类型。 - 返回值
R: 输出(处理后)数据的类型。
代码示例
示例 1:基本用法
import java.util.function.Function;
public class FunctionExample {
public static void main(String[] args) {
// 定义一个 Function,接收一个字符串,返回其长度
Function<String, Integer> stringLength = str -> str.length();
// 调用 apply 方法
Integer length = stringLength.apply("Hello, Function!");
System.out.println("字符串长度为: " + length); // 输出: 字符串长度为: 16
// 使用方法引用
Function<String, Integer> lengthRef = String::length;
System.out.println("字符串长度为: " + lengthRef.apply("Java")); // 输出: 字符串长度为: 4
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
示例 2:在 Stream 中的应用
Function 最常见的应用场景是 Stream.map() 方法,用于将流中的每个元素转换为另一种形式。
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class FunctionStreamExample {
public static void main(String[] args) {
List<String> words = Arrays.asList("apple", "banana", "cherry");
// 将字符串列表转换为其长度列表
List<Integer> lengths = words.stream()
.map(String::length) // map 接收一个 Function
.collect(Collectors.toList());
System.out.println(lengths); // 输出: [5, 6, 6]
// 将字符串列表转换为大写列表
List<String> upperWords = words.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(upperWords); // 输出: [APPLE, BANANA, CHERRY]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
默认方法
Function 也有 andThen 和 compose 两个默认方法,用于函数的组合。
andThen:function1.andThen(function2).apply(x)等价于function2.apply(function1.apply(x))。先执行function1,再将其结果作为function2的输入。compose:function1.compose(function2).apply(x)等价于function1.apply(function2.apply(x))。先执行function2,再将其结果作为function1的输入。
Function<String, Integer> toLength = String::length;
Function<Integer, String> toString = Object::toString;
// andThen: "Hello" -> 5 -> "5"
String result1 = toLength.andThen(toString).apply("Hello");
System.out.println(result1); // 输出: 5
// compose: "Hello" -> "Hello" (toString) -> 5 (toLength)
// 注意:这里 compose 的参数类型必须与当前 Function 的输入类型匹配
// toString 的输出是 String,正好是 toLength 的输入
Integer result2 = toLength.compose(toString).apply("Hello");
System.out.println(result2); // 输出: 5
2
3
4
5
6
7
8
9
10
11
12
# Supplier - 供给者
核心概念
Supplier 是一个 “供给者”。它的作用是不接收任何参数,只返回一个结果。
你可以把它想象成一个自动售货机或者一个工厂,你不需要给它任何东西,按一下按钮(调用方法),它就会给你一个商品(返回值)。它常用于创建新对象或生成值。
核心方法
T get();
- 参数:无。
- 返回值
T: 供给的数据类型。
代码示例
示例 1:基本用法
import java.util.function.Supplier;
public class SupplierExample {
public static void main(String[] args) {
// 定义一个 Supplier,每次调用都返回一个随机数
Supplier<Double> randomSupplier = () -> Math.random();
// 调用 get 方法获取值
System.out.println("随机数1: " + randomSupplier.get());
System.out.println("随机数2: " + randomSupplier.get());
// 使用方法引用
Supplier<String> stringSupplier = String::new; // 创建一个新的空字符串
System.out.println("新字符串: '" + stringSupplier.get() + "'");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
示例 2:在 Stream 中的应用
Supplier 常用于 Stream.generate() 方法,创建一个无限流。
import java.util.stream.Stream;
public class SupplierStreamExample {
public static void main(String[] args) {
// 使用 Supplier 生成一个无限的随机数流
Stream.generate(Math::random)
.limit(5) // 限制只取前5个
.forEach(System.out::println);
// 使用 Supplier 生成一个无限的整数序列
Stream<Integer> infiniteStream = Stream.generate(new Supplier<Integer>() {
private int value = 0;
@Override
public Integer get() {
return value++; // 每次调用返回当前值并自增
}
});
infiniteStream.limit(5).forEach(System.out::println); // 输出 0,1, 2, 3, 4
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Predicate - 谓词
核心概念
Predicate 的意思是 “谓词”,在逻辑和数学中指一个判断真假的陈述。它的作用是接收一个参数,返回一个布尔值( true 或 false )。
你可以把它想象成一个过滤器或者一个质检员,你给它一个产品(输入参数),它告诉你这个产品是否合格(返回 boolean )。
核心方法
boolean test(T t);
- 参数
T t: 要被测试的数据。 - 返回值
boolean: 测试结果。
代码示例
示例 1:基本用法
import java.util.function.Predicate;
public class PredicateExample {
public static void main(String[] args) {
// 定义一个 Predicate,判断字符串是否为空
Predicate<String> isEmpty = String::isEmpty;
System.out.println(isEmpty.test("")); // 输出: true
System.out.println(isEmpty.test("Hello")); // 输出: false
// 判断数字是否大于10
Predicate<Integer> isGreaterThan10 = num -> num > 10;
System.out.println(isGreaterThan10.test(5)); // 输出: false
System.out.println(isGreaterThan10.test(15)); // 输出: true
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
示例 2:在 Stream 中的应用
Predicate 最常见的应用场景是 Stream.filter() 方法,用于根据条件过滤流中的元素。
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PredicateStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 5, 8, 12, 15, 20);
// 过滤出所有大于10的数字
List<Integer> filteredNumbers = numbers.stream()
.filter(num -> num > 10) // filter 接收一个 Predicate
.collect(Collectors.toList());
System.out.println(filteredNumbers); // 输出: [12, 15, 20]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
默认方法
Predicate 提供了三个强大的默认方法用于逻辑组合: and 、 or 、 negate 。
and: 逻辑与,两个条件都为true时才为true。or: 逻辑或,只要有一个条件为true就为true。negate: 逻辑非,对当前条件取反。
Predicate<String> isNotEmpty = str -> !str.isEmpty();
Predicate<String> isLongerThan5 = str -> str.length() > 5;
// 组合条件:字符串不为空 且 长度大于5
Predicate<String> complexPredicate = isNotEmpty.and(isLongerThan5);
System.out.println(complexPredicate.test("Hello")); // false (长度不大于5)
System.out.println(complexPredicate.test("HelloWorld")); // true
System.out.println(complexPredicate.test("")); // false (为空)
// 使用 negate
Predicate<String> isShort = isLongerThan5.negate(); // 等价于 str -> str.length() <= 5
System.out.println(isShort.test("Hi")); // true
2
3
4
5
6
7
8
9
10
11
12
13
# 总结与对比
| 接口 | 核心方法 | 用途 | 输入 | 输出 |
|---|---|---|---|---|
| Consumer | void accept(T t) | 消费数据,执行操作 | 有 | 无 ( void ) |
| Supplier | T get() | 供给数据,创建对象 | 无 | 有 |
| Function | R apply(T t) | 转换数据,类型映射 | 有 | 有 (类型可不同) |
| Predicate | boolean test(T t) | 判断数据,条件测试 | 有 | 有 ( boolean ) |
# 协同工作
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CombinedExample {
public static void main(String[] args) {
// 1. Supplier: 提供数据源
Supplier<Stream<String>> streamSupplier = () -> Stream.of("apple", "banana", "kiwi", "avocado", "orange");
List<String> result = streamSupplier.get() // 获取流
// 2. Predicate: 过滤数据
.filter(fruit -> fruit.length() > 5) // 只保留长度大于5的水果
// 3. Function: 转换数据
.map(String::toUpperCase) // 转换为大写
// 4. Consumer: 消费数据 (forEach 内部)
.collect(Collectors.toList()); // collect 内部会使用 Consumer 来将元素添加到 List 中
System.out.println(result); // 输出: [BANANA, AVOCADO, ORANGE]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
通过这个例子,可以清晰地看到:
- Supplier (
Stream.of) 负责创建数据流。 - Predicate (
filter) 负责根据条件筛选数据。 - Function (
map) 负责将数据从一种形式转换为另一种形式。 - Consumer (
collect内部使用) 负责最终消费处理好的数据,将其存入集合。
# java.util.function 中的变体
进入到这个包下会发现除了那四个核心接口,包里还提供了大量的变体,它们主要是为了解决两个问题:
- 处理多个参数(以
Bi开头的接口)。 - 避免自动装箱 / 拆箱,提升性能(针对原始类型
int,long,double的接口)。
# “Bi” 系列 - 处理双参数
Bi 是 "Binary"(二元)的缩写,表示这类接口需要两个输入参数。它们是核心接口的直接扩展。
| 核心接口 | “Bi” 系列变体 | 核心方法 | 用途 |
|---|---|---|---|
Consumer<T> | BiConsumer<T, U> | void accept(T t, U u) | 接收两个参数并消费它们 |
Function<T, R> | BiFunction<T, U, R> | R apply(T t, U u) | 接收两个参数,处理后返回一个结果 |
Predicate<T> | BiPredicate<T, U> | boolean test(T t, U u) | 接收两个参数,进行判断并返回 boolean |
Supplier<T> | (无) | (无) | 供给者本身就不需要参数,所以没有 “Bi” 版本 |
代码示例
BiConsumer<T, U>import java.util.HashMap; import java.util.Map; import java.util.function.BiConsumer; public class BiConsumerExample { public static void main(String[] args) { Map<String, Integer> scores = new HashMap<>(); scores.put("Alice", 95); scores.put("Bob", 88); // 使用 BiConsumer 打印 Map 的每个键值对 BiConsumer<String, Integer> printer = (name, score) -> System.out.println(name + " 的分数是: " + score); scores.forEach(printer); // 或者直接使用 Lambda scores.forEach((name, score) -> System.out.println(name + " -> " + score)); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19BiFunction<T, U, R>常用于Map的compute、merge、replaceAll等操作,这些操作需要同时知道键和旧值来计算新值。import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; public class BiFunctionExample { public static void main(String[] args) { Map<String, String> capitals = new HashMap<>(); capitals.put("China", "Beijing"); capitals.put("Japan", "Tokyo"); // 使用 BiFunction 替换所有值(这里简单地把值变大写) BiFunction<String, String, String> mapper = (country, city) -> city.toUpperCase(); capitals.replaceAll(mapper); System.out.println(capitals); // {China=BEIJING, Japan=TOKYO} // 更复杂的例子:如果键不存在,则根据键计算一个新值 capitals.computeIfAbsent("Korea", BiFunctionExample::getCapitalByCountry); System.out.println(capitals); // {China=BEIJING, Japan=TOKYO, Korea=Seoul} } private static String getCapitalByCountry(String country) { if ("Korea".equals(country)) return "Seoul"; return "Unknown"; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26**
BiPredicate<T, U>** 基于两个条件进行判断时使用。import java.util.function.BiPredicate; public class BiPredicateExample { public static void main(String[] args) { // 定义一个 BiPredicate,检查一个字符串是否包含指定的子串,并且长度大于5 BiPredicate<String, String> containsAndIsLong = (str, sub) -> str.contains(sub) && str.length() > 5; System.out.println(containsAndIsLong.test("HelloWorld", "World")); // true System.out.println(containsAndIsLong.test("Java", "va")); // false (长度不大于5) System.out.println(containsAndIsLong.test("Programming", "Py")); // false (不包含) } }1
2
3
4
5
6
7
8
9
10
11
12
13
# 原始类型特化 - 提升性能
Java 的泛型不支持原始类型(如 int , double ),只能使用它们的包装类( Integer , Double )。从 int 到 Integer 的转换称为装箱,反之称为拆箱。这个过程会带来额外的性能开销,尤其是在大量数据处理时(如 Stream 中)。
为了解决这个问题, java.util.function 包为 int , long , double 这三种最常用的原始类型提供了专门的接口。
代码示例
原始类型特化接口在 Stream API 中被大量使用,例如 mapToInt , mapToLong , mapToDouble 等。
import java.util.Arrays;
import java.util.List;
import java.util.function.IntFunction;
import java.util.function.ToIntFunction;
import java.util.stream.IntStream;
public class PrimitiveSpecializationExample {
public static void main(String[] args) {
List<String> numbers = Arrays.asList("1", "2", "3", "4", "5");
// 不使用特化接口:Stream<String> -> Stream<Integer> -> int
// 每个元素都经历了 String -> Integer (拆箱) -> int (拆箱) 的过程
int sum1 = numbers.stream()
.map(Integer::parseInt) // 返回 Stream<Integer>
.mapToInt(Integer::intValue) // 再转为 IntStream
.sum();
// 使用特化接口:直接从 Stream<String> 映射到 IntStream
// mapToInt 接收一个 ToIntFunction,它接收 T,直接返回 int,避免了中间的 Integer 对象
int sum2 = numbers.stream()
.mapToInt(Integer::parseInt) // parseInt 返回 int,直接适配 ToIntFunction
.sum();
System.out.println("Sum is: " + sum2); // Sum is: 15
// 直接使用 IntStream
IntStream intStream = IntStream.range(1, 6);
IntFunction<String[]> intFunction = size -> new String[size]; // 接收int,返回String[]
String[] array = intStream.mapToObj(String::valueOf).toArray(intFunction);
System.out.println(Arrays.toString(array)); // [1, 2, 3, 4, 5]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
在这个例子中, mapToInt(Integer::parseInt) 比 map(Integer::parseInt).mapToInt(Integer::intValue) 更高效,因为它直接生成了原始类型的 IntStream ,省去了创建 Integer 对象的开销。
# 操作符特化 - 特殊的 Function
还有两个特殊的接口,它们是 Function 和 BiFunction 的子接口,专门用于表示 “操作”,即输入和输出类型相同的情况。
UnaryOperator<T>(一元操作符)
- 继承自:
Function<T, T> - 核心方法:
T apply(T t) - 用途:接收一个参数,返回一个相同类型的结果。可以看作是对一个值的 “操作”。
- 常见应用:
List.replaceAll()。
import java.util.Arrays;
import java.util.List;
import java.util.function.UnaryOperator;
public class UnaryOperatorExample {
public static void main(String[] args) {
List<String> words = Arrays.asList("apple", "banana", "cherry");
// 定义一个一元操作符:将字符串转换为大写
UnaryOperator<String> toUpperCase = String::toUpperCase;
// 使用 replaceAll 替换列表中的每个元素
words.replaceAll(toUpperCase);
System.out.println(words); // [APPLE, BANANA, CHERRY]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
BinaryOperator<T>(二元操作符)
- 继承自:
BiFunction<T, T, T> - 核心方法:
T apply(T t1, T t2) - 用途:接收两个相同类型的参数,返回一个相同类型的结果。可以看作是对两个值的 “组合操作”。
- 常见应用:
Stream.reduce(),用于将流中的元素归约成一个结果。
import java.util.Arrays;
import java.util.List;
import java.util.function.BinaryOperator;
public class BinaryOperatorExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// 定义一个二元操作符:求和
BinaryOperator<Integer> sum = Integer::sum;
// 使用 reduce 进行归约操作
// (1 + 2) -> 3, (3 + 3) -> 6, (6 + 4) -> 10, (10 + 5) -> 15
int result = numbers.stream()
.reduce(0, sum); // 0 是初始值
System.out.println("Sum is: " + result); // Sum is: 15
// BinaryOperator 还提供了两个便捷方法来求最大/最小值
BinaryOperator<Integer> max = BinaryOperator.maxBy(Integer::compareTo);
Integer maxValue = numbers.stream().reduce(max).orElse(-1);
System.out.println("Max value is: " + maxValue); // Max value is: 5
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 总结与类比
| 工具类别 | 解决的问题 | 代表工具 |
|---|---|---|
| 核心基础工具 | 基本的消费、供给、转换、判断 | Consumer , Supplier , Function , Predicate |
| “Bi” 系列工具 | 需要同时处理两个零件(参数) | BiConsumer , BiFunction , BiPredicate |
| 高性能工具 | 针对特定材质(原始类型)进行高效加工,避免损耗 | IntConsumer , LongFunction , DoublePredicate 等 |
| 操作专用工具 | 用于组合或修改零件,且输入输出类型一致 | UnaryOperator (一元操作), BinaryOperator (二元操作) |
# Stream API
先明确几个核心概念:
- 数据源:Stream 的来源,通常是集合(如
List,Set)、数组、I/O channel 等。 - 数据处理的 “做什么” 而非 “怎么做”:你只需要告诉 Stream 你想要做什么(比如 “过滤掉所有偶数”),而不需要关心它内部是如何实现的(比如如何遍历、如何创建新集合等)。
- 不可变性:Stream 不会修改原始的数据源。它总是返回一个新的 Stream,操作的结果是产生一个新的数据集或一个最终值。
- 惰性求值:中间操作不会立即执行。它们只是被 “记录” 下来,只有当终止操作被调用时,整个流水线才会一次性执行。这可以提高效率。
- 可并行:只需简单地将
stream()换成parallelStream(),就可以让数据处理过程并行化,充分利用多核 CPU 的优势。
# 创建 Stream
# 从集合创建 (最常用)
Java 8 为 Collection 接口新增了两个方法:
stream(): 创建一个串行流。parallelStream(): 创建一个并行流。
List<String> list = Arrays.asList("a", "b", "c");
// 创建一个串行流
Stream<String> stream = list.stream();
// 创建一个并行流
Stream<String> parallelStream = list.parallelStream();
2
3
4
5
6
# 从数组创建
Arrays 工具类提供了一个静态方法 stream() 来创建流。
String[] array = {"d", "e", "f"};
Stream<String> streamFromArray = Arrays.stream(array);
2
# 直接使用 Stream 的静态方法
Stream 接口本身提供了几个静态工厂方法:
Stream.of(T... values): 直接将一系列元素转换成一个流。Stream.generate(Supplier s): 创建一个无限流,通过Supplier不断生成元素。通常需要配合limit()来限制大小。Stream.iterate(T seed, UnaryOperator f): 创建一个无限流,从初始元素seed开始,通过f函数不断迭代产生下一个元素。也需要配合limit()。
// Stream.of()
Stream<String> streamOf = Stream.of("g", "h", "i");
// Stream.generate() - 生成10个随机数
Stream<Double> streamGenerate = Stream.generate(Math::random).limit(10);
// Stream.iterate() - 生成0到9的整数
Stream<Integer> streamIterate = Stream.iterate(0, n -> n + 1).limit(10);
2
3
4
5
6
7
8
# 其他方式
IntStream.range(int start, int end): 创建一个从start到end(不包含)的IntStream。BufferedReader.lines(): 读取文件内容,将每一行转换成一个流。Pattern.splitAsStream(CharSequence input): 将字符串按正则表达式分割成一个流。
# 中间操作
中间操作是流水线上的 “加工工序”。它们返回一个新的 Stream 对象,因此可以链式调用。这些操作是惰性的,不会立即执行。
# 筛选与切片
filter(Predicate<? super T> predicate): 过滤元素,只保留满足predicate条件的元素。distinct(): 去除流中重复的元素(通过equals()和hashCode()判断)。limit(long maxSize): 截断流,使其最多包含maxSize个元素。skip(long n): 跳过流的前n个元素。
Stream<String> s = Stream.of("apple", "banana", "orange", "apple", "grape");
s.filter(fruit -> fruit.length() > 5) // 只保留长度大于5的
.distinct() // 去重
.limit(2) // 只取前两个
.forEach(System.out::println);
// 输出:
// banana
// orange
2
3
4
5
6
7
8
# 映射
map(Function<? super T, ? extends R> mapper): 将流中的每个元素T通过mapper函数转换为另一个元素R。这是最常用的方法之一。flatMap(Function<? super T, ? extends Stream<? extends R>> mapper): 将流中的每个元素T映射为一个流Stream<R>,然后再将这多个小流 “扁平化” 成一个大的流。常用于处理嵌套集合。
// map 示例:将字符串转换为其长度
Stream.of("a", "bb", "ccc").map(String::length)
.forEach(System.out::println);
// 输出: 1 2 3
// flatMap 示例:将 List<List<String>> 扁平化为 List<String>
List<List<String>> nestedList = Arrays.asList(
Arrays.asList("a", "b"),
Arrays.asList("c", "d"),
Arrays.asList("e", "f")
);
nestedList.stream()
.flatMap(List::stream) // 将每个小List转换成一个Stream,然后合并
.forEach(System.out::println);
// 输出: a b c d e f
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 排序
sorted(): 产生一个新流,其中按自然顺序排序。sorted(Comparator<? super T> comparator): 产生一个新流,其中按自定义比较器排序。
Stream.of(3, 1, 4, 1, 5, 9)
.sorted()
.forEach(System.out::print); // 输出: 11459
List<String> list = Arrays.asList("banana", "apple", "orange");
list.stream()
.sorted(String::compareTo) // 或者写为 Comparator.naturalOrder()
.forEach(System.out::println);
// 输出: apple, banana, orange
2
3
4
5
6
7
8
9
# 终止操作
终止操作是流水线的 “终点”。当调用终止操作时,才会触发整个中间操作链的执行,并产生最终结果。一个 Stream 只能被消费一次(即只能有一个终止操作)。
# 查找与匹配
anyMatch(Predicate<? super T> predicate): 检查是否至少有一个元素匹配条件。返回boolean。allMatch(Predicate<? super T> predicate): 检查是否所有元素都匹配条件。返回boolean。noneMatch(Predicate<? super T> predicate): 检查是否没有元素匹配条件。返回boolean。findFirst(): 返回流的第一个元素。返回Optional<T>。findAny(): 返回流中的任意一个元素。在并行流中可能更高效。返回Optional<T>。
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
boolean anyEven = numbers.stream().anyMatch(n -> n % 2 == 0); // true
boolean allLessThan10 = numbers.stream().allMatch(n -> n < 10); // true
Optional<String> first = Stream.of("a", "b", "c").findFirst(); // Optional[a]
2
3
4
5
# 归约
reduce(T identity, BinaryOperator<T> accumulator): 将流中元素反复结合,得到一个值。identity是初始值。reduce(BinaryOperator<T> accumulator): 与上面类似,但没有初始值,返回Optional<T>。
// 求和
int sum = Stream.of(1, 2, 3, 4, 5).reduce(0, (a, b) -> a + b); // 15
// 使用方法引用更简洁
int sum2 = Stream.of(1, 2, 3, 4, 5).reduce(0, Integer::sum); // 15
// 求最大值
Optional<Integer> max = Stream.of(1, 2, 3, 4, 5).reduce(Integer::max); // Optional[5]
2
3
4
5
6
7
# 收集
collect(Collector<? super T, A, R> collector): 最灵活、最常用的终止操作。它将流中的元素收集到另一个容器中,如List,Set,Map等。Collectors类提供了大量的静态工厂方法来创建常用的Collector。
List<String> fruits = Arrays.asList("apple", "banana", "orange", "apple");
// 收集为 List
List<String> fruitList = fruits.stream().collect(Collectors.toList());
// 收集为 Set (自动去重)
Set<String> fruitSet = fruits.stream().collect(Collectors.toSet());
// 拼接字符串
String joined = fruits.stream().collect(Collectors.joining(", ")); // "apple, banana, orange, apple"
// 分组
Map<String, List<String>> groupedByLength = fruits.stream()
.collect(Collectors.groupingBy(String::length));
// 结果: {5=[apple], 6=[banana, orange]}
// 求和、平均值、最大值、最小值等统计信息
IntSummaryStatistics stats = fruits.stream()
.collect(Collectors.summarizingInt(String::length));
System.out.println(stats); // IntSummaryStatistics{count=4, sum=22, min=5, average=5.500000, max=6}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 遍历
forEach(Consumer<? super T> action): 对流中的每个元素执行一个操作。通常用于输出或产生副作用。
Stream.of("a", "b", "c").forEach(System.out::println);
# 整合
假设有一个 Transaction 交易类,包含类型、金额和城市。现在要找出所有来自北京的 “餐饮” 类交易,并按金额从高到低排序,最后只提取交易金额。
class Transaction {
private String type;
private int value;
private String city;
// 构造函数, getter, setter...
}
List<Transaction> transactions = ...; // 假设这里有很多交易数据
List<Integer> beijingRestaurantValues = transactions.stream() // 1. 创建流
.filter(t -> "北京".equals(t.getCity())) // 2. 中间操作:过滤出北京的
.filter(t -> "餐饮".equals(t.getType())) // 2. 中间操作:过滤出餐饮的
.sorted(Comparator.comparing(Transaction::getValue).reversed()) // 2. 中间操作:按金额降序
.map(Transaction::getValue) // 2. 中间操作:提取金额
.collect(Collectors.toList()); // 3. 终止操作:收集到List
// 现在 beijingRestaurantValues 就包含了所有符合条件的交易金额列表
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 总结
| 阶段 | 作用 | 特点 | 常用方法 |
|---|---|---|---|
| 创建 | 获取一个 Stream 对象 | 数据来源多样 | Collection.stream() , Arrays.stream() , Stream.of() |
| 中间 | 对数据进行处理和转换 | 惰性求值、链式调用、返回新 Stream | filter() , map() , flatMap() , sorted() , distinct() , limit() |
| 终止 | 触发计算并获取最终结果 | 立即执行、消费 Stream | collect() , reduce() , forEach() , match() , find() |
在 Stream 中进行操作的时候,注意两个东西
- 接收的参数:就如上面讲的
Function响应式编程一样,看清入参和返回,并且要看清当前函数式接口继承于什么,例如<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);这种就说明要返回是一个 Stream 类型,而<R> Stream<R> map(Function<? super T, ? extends R> mapper);这种就说明返回的是自定义的类型。 - 是中间操作还是终止操作:点开方法的源代码,要看清文档标注的是什么类型的操作,如果是一个中间操作,那么后续可以继续进行操作,如果是一个终止操作,这个方法写完后就无法再对这个流进行操作了。
# Reactive-Stream
Reactive Streams 是 JVM 面向流的库的标准和规范
- 处理可能无限数量的元素
- 有序
- 在组件之间异步传递元素
- 强制性非阻塞,背压模式
推荐阅读:响应式宣言:https://www.reactivemanifesto.org/zh-CNReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md
# 正压(Push)
正压指的是生产者(数据源)主动推动数据流向消费者。也就是说,数据在流动过程中是由生产者控制的,消费者无需主动请求数据,而是自动接受和处理流入的数据。正压是响应式编程的核心机制之一。

# 背压(Backpressure)
背压则是指当消费者处理数据的速度跟不上生产者生成数据的速度时,消费者向生产者发出请求,表示暂时无法继续接收更多的数据。背压机制能有效防止由于生产者过快产生数据导致消费者无法及时处理,进而避免内存溢出或系统崩溃等问题。
背压通常是在流数据中遇到的消费者处理能力不及生产者速度时进行的一种调节方式。

# 多线程
线程数绝对不是越多越好,当线程数量多于 CPU 核心数时,系统需要频繁地在线程之间进行切换,这种切换会带来额外的开销,包括上下文切换(保存和恢复线程状态)、内存开销(更多的线程占用更多的内存)以及时间开销(线程等待时间增加)。过多的线程会导致资源浪费,因为线程之间会竞争 CPU 时间片,并且频繁切换带来的开销会使系统整体性能下降。
最好的方法是基于核心数合理设置线程池大小,使每个线程保持繁忙,而不是创建大量线程去等待和切换。
# 实时的数据流系统
通过全异步 + 缓冲区的方式可以构建出来一个实时的数据流系统,其模式比较类似于 MQ。
MQ 通常是基于异步通信的模式,在生产者(发送者)和消费者(接收者)之间解耦。生产者发送消息时不会等待消费者处理完毕,而是直接将消息放入队列中,接着可以继续处理其他任务。消费者从队列中取出消息进行处理,这种异步方式提高了系统的吞吐量和响应性。
高可用:分片、复制、选领导。
高并发:缓存、异步、队排好。

// 分布式响应式系统,MQ就是类似这种做法,在传统的开发模式中,就需要引入MQ等等一些列的中间件完成无阻塞
public class NonBlockingDemo {
private static final String[] buffer = new String[10];
public static void main(String[] args) {
NonBlockingDemo nonBlockingDemo = new NonBlockingDemo();
new Thread(nonBlockingDemo::a).start();//发布数据
new Thread(nonBlockingDemo::b).start();//消费数据
}
private void a() {
String A = "A";
System.out.println("A发送了消息");
buffer[0] = A; // 放入缓冲区
// 执行其他的任务
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("A完成了任务");
}
private void b() {
// B的执行并不会等待A的执行完成,
String A = null;
while (A == null) {
// 重试去拿数据
A = buffer[0];
}
System.out.println("B完成了任务" + A);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Flow API
提示
java.util.concurrent.Flow API 是 Java 9 正式引入的一套接口,它完整地实现了 Reactive Streams 规范。
# 核心组件
Flow API 的核心就是四个接口,它们与 Reactive Streams 规范中的四个接口完全对应:
Flow.Publisher<T>:数据发布者。- 方法:
void subscribe(Flow.Subscriber<? super T> subscriber) - 这是数据流的源头,当有
Subscriber订阅时,它会与之建立联系。
- 方法:
Flow.Subscriber<T>:数据订阅者。- 方法:
void onSubscribe(Flow.Subscription subscription):订阅成功后,Publisher调用此方法,传递一个Subscription对象。void onNext(T item):接收到一个数据项时被调用。void onError(Throwable throwable):发生错误时被调用。void onComplete():数据流正常结束时被调用。
- 方法:
Flow.Subscription:订阅关系。- 方法:
void request(long n):背压的核心!Subscriber通过此方法告诉Publisher“我还能处理n个数据,请发给我”。void cancel():取消订阅,告诉Publisher不再需要数据。
- 方法:
Flow.Processor<T, R>:数据处理器。- 它既是
Subscriber又是Publisher,可以看作是数据流的一个中间处理环节,接收上游数据,处理后发送给下游。
- 它既是
# Flow API 核心组件交互
- Publisher 启动数据流,通过
subscribe方法通知一个Subscriber来订阅数据。 - 订阅后,
Publisher会将数据流传递给Subscriber,并通过Subscription进行数据请求与流量控制。 Subscriber在接收到数据时,调用onNext方法进行处理。如果需要更多的数据,调用request方法告诉Publisher继续传送数据。Subscription也允许Subscriber在处理过程中取消数据流(通过cancel方法)或控制数据请求量,避免被过多数据淹没。
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class FlowApiExample {
// 1. 创建一个自定义的 Subscriber
static class MySubscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("【Subscriber】订阅建立成功!");
this.subscription = subscription;
// 关键:向 Publisher 请求第一个数据
this.subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("【Subscriber】接收到数据: " + item);
// 处理完一个后,再请求下一个
// 这就是背压:按需索取
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("【Subscriber】发生错误: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("【Subscriber】数据接收完毕!");
}
}
public static void main(String[] args) throws InterruptedException {
// 2. 创建一个 Publisher (使用 JDK 提供的便利类)
// 参数:第一个是数据类型,第二个是发布任务的线程池,第三个是最大缓冲区
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
// 3. 创建 Subscriber 并订阅 Publisher
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
System.out.println("【Publisher】开始发布数据...");
// 4. Publisher 发布数据
// submit 方法是非阻塞的
publisher.submit("Hello");
publisher.submit("Reactive");
publisher.submit("Streams");
System.out.println("【Publisher】数据已提交到缓冲区。");
// 5. 关闭 Publisher,这会触发 Subscriber 的 onComplete 方法
publisher.close();
}
// 主线程等待一下,确保异步的订阅和接收过程完成
Thread.sleep(1000);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 流的生命周期
一个流完整的生命周期分为以下五个阶段:
# 阶段一:创建与订阅
这是流的 “诞生” 阶段。
- 创建
Publisher:首先,需要一个数据源,即Publisher。它可以是任何能够产生数据流的对象,比如一个包装了集合的Flux,或者一个代表 HTTP 请求的Mono。 - 创建
Subscriber:需要一个消费者来处理数据。 - 建立订阅:
Subscriber调用Publisher.subscribe()方法,表达 “我想接收你的数据” 的意愿。
关键事件:
Publisher接收到订阅请求后,会立即回调Subscriber的onSubscribe(Subscription s)方法。- 这个
Subscription对象是两者之间的 “契约”,它代表了这次订阅关系本身。
状态:流已建立,但数据尚未开始流动。就像水龙头已经接上水管,但阀门还没打开。
# 阶段二:请求与数据传输
这是流的 “活跃” 阶段,也是背压机制发挥作用的核心。
- 发起请求:
Subscriber通过Subscription对象调用request(long n)方法,向上游Publisher声明:“我准备好接收n个数据了”。 - 数据推送:
Publisher收到请求后,开始向Subscriber发送数据。它会调用Subscriber的onNext(T item)方法,最多调用n次。 - 循环:
Subscriber每处理完一个或一批数据后,会再次调用request(n)来请求更多数据。这个request -> onNext -> request -> onNext的循环构成了数据流的主体。
关键事件:
subscription.request(n)subscriber.onNext(item)
状态:数据正在流动。流速由 Subscriber 的 request 调用频率和 n 的大小共同控制。
# 阶段三:完成
这是流的 “正常死亡” 阶段。
- 数据发送完毕:
Publisher已经发送了所有它需要发送的数据。 - 发出完成信号:
Publisher调用Subscriber的onComplete()方法。
关键事件:
subscriber.onComplete()
状态:流正常结束。此后, Publisher 不能再调用 onNext 或 onError 。整个订阅关系终止。
# 阶段四:错误
这是流的 “异常死亡” 阶段。
- 发生不可恢复的错误:在数据流的任何环节(如数据生成、处理、转换),发生了一个异常。
- 发出错误信号:
Publisher调用Subscriber的onError(Throwable t)方法,并将错误对象传递过去。
关键事件:
subscriber.onError(throwable)
状态:流异常结束。与 onComplete 一样,此后不能再有任何 onNext 调用。整个订阅关系立即终止。
# 阶段五:取消
这是流的 “主动终止” 阶段,由 Subscriber 发起。
- 不再需要数据:
Subscriber因为某种原因(比如用户关闭了页面,或者找到了所需的数据),决定不再接收后续数据。 - 发出取消信号:
Subscriber调用Subscription的cancel()方法。
关键事件:
subscription.cancel()
状态:流被主动取消。 Publisher 收到信号后,应该停止生产数据并清理资源。这是一个干净的终止方式。
# 背压机制(Backpressure)
背压机制是 Flow API 的重要特性,它允许 Subscriber 控制从 Publisher 请求数据的数量。 Subscriber 通过 subscription.request(n) 来指定希望接收的数据量,而 Publisher 只能根据消费者的请求量来发送数据。这避免了消费者处理速度慢时,生产者发送过多数据导致的内存溢出。
# 和 mq 的区别
Reactive 和 MQ 的在核心功能上有一定的相似性。但是两者的用法和适用场景是完全不一致的。
相似性
- 异步通信:两者都支持异步通信,可以在不同的线程或进程中处理消息的发布和接收。
- 解耦:两者都能解耦生产者和消费者,使得系统更加灵活和可扩展。
区别
- 实现复杂度:
SubmissionPublisher和Flow.Subscriber是 Java 标准库中的一部分,不需要额外的依赖。而 MQ 通常需要依赖外部的消息中间件(如 RabbitMQ、Kafka 等),这可能会增加系统的复杂性和运维成本。 - 适用场景:MQ 通常用于跨进程、跨机器的通信,适用于需要高吞吐量、高可靠性的分布式系统。而
SubmissionPublisher和Flow.Subscriber更适合在 **同一 JVM 内** 进行异步任务处理。 - 性能:在同一个 JVM 内,
SubmissionPublisher和Flow.Subscriber的通信开销相对较小,因为它们不需要序列化 / 反序列化数据,也不需要网络传输。 - 灵活性:使用 Java 标准库中的组件,可以更容易地进行单元测试和调试,因为所有逻辑都在同一个 JVM 内运行。
# 总结
响应式编程
- 底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
- 编码:流式编程 + 链式调用 + 声明式 API
- 效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源