Reactor 核心
# 介绍
Reactor 是一个用于 JVM 的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)” 的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并完全遵循和实现了 “响应式扩展规范”(Reactive Extensions Specification)。
Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。
# 依赖
<!--时间2025年2月24日14:38:41的最新版本-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2024.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!--单元测试的相关包,可以不引用-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
# 拓展
如果想使用 Milestones 版或者 Snapshots 版本,需要在 pom 文件里添加以下依赖,否则在下载不下来。
<!--Milestones-->
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones Repository</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
2
3
4
5
6
7
8
<!--Snapshots-->
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshot Repository</name>
<url>https://repo.spring.io/snapshot</url>
</repository>
</repositories>
2
3
4
5
6
7
8
# Reactive(响应式编程)入门
Reactor 是 Reactive Programming(反应式编程)范式的一种实现。
微软在 .NET 生态系统中创建了反应式扩展(Rx)库迈出了向反应式编程方向的第一步。随后,RxJava 在 JVM 上实现了反应式编程。随着时间的推移,Java 通过 Reactive Streams 努力实现了标准化,该规范为 JVM 上的反应式库定义了一系列接口和交互规则。其接口已集成到 Java 9 的 Flow
类中。
在面向对象的语言中,反应式编程范式通常是观察者模式的扩展。也可以与迭代器设计模式进行比较,因为所有这些库中的 Iterable
- Iterator
对都具有二重性。其中一个主要区别是,迭代器是基于拉的,而反应流则是基于推的。
迭代器模式是一种命令式编程(Imperative Programming)模式,尽管访问值的方法完全由 Iterable 负责。但是实际上,开发者需要决定何时访问序列中的下一个元素(即调用 next()
方法)。在反应式流中,与上述 Iterable-Iterator 对等的模式是发布者 - 订阅者(Publisher-Subscriber)模式。但在这种模式中,是发布者通知订阅者新值的可用性,这种推送(push)特性是反应式的关键。
除了推送值,错误处理和完成方面也有明确的定义。一个 Publisher
可以向其 Subscriber
推送新值(通过调用 onNext
),但也可以发出错误信号(通过调用 onError
)或完成信号(通过调用 onComplete
)。错误和完成都会终止序列。这可以归纳如下
onNext x 0..N [onError | onComplete]
这种方法非常灵活。该模式支持无值、一个值或 n 个值(包括值的无限序列,如时钟的持续滴答声)的用例。
# 为什么需要 Reactive 呢?
# 1. 阻塞会造成浪费
提高计划绩效的方法大致有两种:
并行化,以使用更多线程和更多硬件资源。
提高使用效率。
通常,Java 开发人员在编写程序时会使用阻塞代码。在出现性能瓶颈之前,这种做法是没有问题的。这时就需要引入更多线程,运行类似的阻塞代码。但是,这种资源利用率的扩展很快就会带来争用和并发问题。
更糟糕的是,阻塞会浪费资源。如果你仔细观察,只要程序涉及一些延迟(尤其是 I/O,如数据库请求或网络调用),资源就会被浪费,因为线程(可能有很多线程)现在处于闲置状态,等待数据。
# 2. 异步也无法解决
通过编写异步、非阻塞代码,可以让执行切换到另一个使用相同底层资源的活动任务,然后在异步处理完成后返回当前进程。Java 提供了两种异步编程模式。
- Callbacks:没有返回值,但需要一个额外的
callback
参数(一个 lambda 或匿名类),当结果可用时,该参数会被调用。一个众所周知的例子就是 Swing 的EventListener
层次结构。 - Futures: 会返回一个
Future<T>
值。异步进程会计算一个T
值,但Future
对象会封装对它的访问。该值不会立即可用,可以轮询该对象,直到该值可用为止。例如,运行Callable<T>
任务的ExecutorService
使用Future
对象。
但是回调很难组合在一起,很快就会导致代码难以阅读和维护(被称为 "回调地狱")。
举个例子:在用户界面上显示用户的前五个收藏,如果用户没有收藏,则显示建议。这需要通过以下三种服务(一种是提供收藏 ID,第二种是获取收藏详细信息,第三种是提供包含详细信息的建议)。那么代码就会变成如下这样。
// 需求很好理解,但是如果只从代码上去看,基本上很难直接看懂。
userService.getFavorites(userId, new Callback<List<String>>() {
// 成功时调用
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
// 出错时调用
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 使用reactor之后整个代码就变成这样了
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails) // 将Favorite转成流
.switchIfEmpty(suggestionService.getSuggestions()) // 如果Favorite为空,那么从suggestionService拿推荐
.take(5) // 只获取5个
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
2
3
4
5
6
7
再拓展一下,如果在 800ms 以内没有检索到收藏 ID,则从缓存中获取它们。在基于回调的代码中,这是比较复杂的处理逻辑。在 Reactor 中,只需在链中添加一个 timeout
操作符即可 **,如下所示:**
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800)) // 超过800ms
.onErrorResume(cacheService.cachedFavoritesFor(userId)) // 从缓存中获取
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
2
3
4
5
6
7
8
Future
对象比回调要好一些,但在组合方面仍然做得不好,尽管 CompletableFuture
在 Java 8 中有所改进。将多个 Future
对象协调在一起是可行的,但是也并不直观。此外, Future
还有其他问题。
- 调用
get()
方法很容易导致Future
对象出现另一种阻塞情况。 - 不支持懒计算。
- 不支持多值和高级错误处理。
再看一个例子:我们得到一个 ID 列表,想从中获取一个名称和一个统计信息,然后将它们配对组合,所有这些都是异步完成的。下面的示例使用 CompletableFuture
类型的列表实现了这一功能:
CompletableFuture<List<String>> ids = ifhIds();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = ifhName(i);
CompletableFuture<Integer> statTask = ifhStat(i);
// 合并两个结果
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
// 一起执行
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join();
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 用reactor改造
Flux<String> ids = ifhrIds();
Flux<String> combinations =
ids.flatMap(id -> {
Mono<String> nameTask = ifhrName(id);
Mono<Integer> statTask = ifhrStat(id);
return nameTask.zipWith(statTask,
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList();
// 阻塞,等待所有任务完成,和CompletableFuture.allOf差不多。
List<String> results = result.block();
assertThat(results).containsExactly(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 在 subscribe()
之前,什么都不会发生
一个发布者最少要有一个订阅者,否则程序不会有任何行为。
# Flux
一个 Flux<T>
是一个标准的 Publisher<T>
,表示 0 到 N 个发射项的异步序列,可选择以完成信号或错误结束。与 Reactive Streams 规范中一样,这三种类型的信号可转化为对下游 Subscriber 的 onNext
、 onComplete
和 onError
方法的调用。
# 案例
public class FluxDemo01 {
public static void main(String[] args) {
// Flux 0 个或多个元素
// 创建一个Flux 流
Flux<Integer> just = Flux.just(1, 2, 3, 4, 5);
// 订阅,流不消费就没用
just.subscribe(e -> System.out.println("e1收到数据:" + e));
// 一个流可以多次订阅
just.subscribe(e -> System.out.println("e2收到数据:" + e));
}
}
2
3
4
5
6
7
8
9
10
11
12
public class FluxDemo02 {
public static void main(String[] args) {
// 创建一个每秒发送一次的流
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
interval.subscribe(e -> System.out.println("e3收到数据:" + e));
// 主线程不能停止,否则程序结束,流还没打印
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
public class FluxDemo03 {
/**
*
* 使用 Reactor 的 Flux 创建一个整数序列流,该流从1开始,包含10个元素。
* 每个元素在发出前都会延迟1秒。
*
* 通过 doOnComplete、doOnCancel 和 doOnNext 方法添加事件感知回调,
* 分别在流结束时、取消时和发出下一个元素时执行指定的操作。
*
* 注意:如果不订阅流,doOnComplete 方法将不会被调用。
*/
public static void main(String[] args) {
Flux<Integer> complete = Flux.range(1, 10)// 产生1-10的数字
// 延迟1秒
.delayElements(Duration.ofSeconds(1))
.doOnComplete(() -> System.out.println("流结束"))
.doOnCancel(() -> System.out.println("流取消"))
.doOnNext(e -> System.out.println("下一个元素是:" + e));
// 如果不订阅,doOnComplete方法不会被调用
complete.subscribe(e -> System.out.println("e4收到数据:" + e));
// 防止主线程结束
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class FluxDemo04 {
public static void main(String[] args) {
Flux<Integer> range = Flux.range(1, 10)
// 延迟100毫秒
.delayElements(Duration.ofMillis(100));
range
// doOnNext,它会在流中每次发出一个元素时触发
.doOnNext(item -> System.out.println("下一个元素是: " + item))
.subscribe(item -> System.out.println("当前接收到的元素是: " + item));
// doOnEach,不仅能感知元素而且,还能感知信号
range
.doOnEach(signal -> {
if (signal.isOnNext()) {
Integer i = signal.get();
System.out.println("下一个元素是: " + i);
if (7 == i) {
i = i / 0;
}
} else if (signal.isOnComplete()) {
System.out.println("流结束了");
} else if (signal.isOnError()) {
System.out.println("流出现异常了: " + Objects.requireNonNull(signal.getThrowable()).getMessage());
}
})
.subscribe(item -> System.out.println("当前接收到的元素是: " + item));
// 防止主线程结束
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FluxDemo05 {
public static void main(String[] args) {
Flux<Integer> complete = Flux.range(1, 10)
// 延迟1秒
.delayElements(Duration.ofSeconds(1))
// 整流操作
.filter(i -> i % 2 == 0);
// 自定义订阅者
complete.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("开始订阅了");
// 请求一个数据,如果不写,则不会输出下一个数据
request(1);
// 请求无界的数据
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("接收到了元素:" + value);
// 再次请求一个元素,如果请求的是无界的数据那么这里就不需要进行请求了
request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("流结束了");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("发生了错误");
}
@Override
protected void hookOnCancel() {
System.out.println("流取消了");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终执行了"+type);
}
});
// 防止主线程结束
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class FluxDemo06 {
public static void main(String[] args) {
// 事件感知API,当流结束时,会调用onComplete方法, doOnxxx就是一个回调方法,本质是Hook方法
Flux<Integer> complete = Flux.range(1, 50)
// 延迟1秒
.delayElements(Duration.ofMillis(100))
// 整流操作
.filter(i -> i % 2 == 0).doOnNext(e -> {
if (e == 26)
e = 10 / 0;
})
// 出错时流直接完成
.onErrorComplete();
// 出错时跳过错误继续执行
// .onErrorContinue((throwable, o) -> {
// System.out.println(throwable.getMessage());
// System.out.println("发生报错的元素是" + o);
// });
// 自定义订阅者1
// complete.subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()),
// () -> System.out.println("完成"));
// 自定义订阅者2
complete.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 请求无界数量
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("元素是"+value);
}
@Override
protected void hookOnComplete() {
System.out.println("已完成");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("流已取消");
}
});
// 防止主线程结束
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class FluxDemo07 {
public static void main(String[] args) {
Flux.range(1, 10).doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("元素是 " + integer);
// 取消流,程序直接完成。
cancel();
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FluxDemo08 {
public static void main(String[] args) {
// buffer是一个缓冲区,能把指定的元素根据缓冲器的大小,分成一个个ArrayList
Flux<List<Integer>> flux = Flux.range(1, 10).buffer(3);
flux.subscribe(new BaseSubscriber<List<Integer>>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(List<Integer> value) {
request(1);
System.out.println("请求到的元素是:" + Arrays.toString(value.toArray()));
}
});
}
//请求到的元素是:[1, 2, 3]
//请求到的元素是:[4, 5, 6]
//请求到的元素是:[7, 8, 9]
//请求到的元素是:[10]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 说明
在上面的例子中会发现有很多的 doNoXXX、onXXX、hookOnXXX 操作。