笔记 笔记
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

EasT-Duan

Java 开发
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

购买兑换码请添加

添加时候请写好备注,否则无法通过。

  • Maven

  • Bootstrap

  • Spring

  • Spring MVC

  • MyBatis

  • JUnit

  • GitFlow 工作流指南

  • SpringBoot

  • Reactor

    • 前置条件
    • Reactor 核心
      • 介绍
      • 依赖
        • 拓展
      • Reactor 入门
        • 为什么需要Reactive?
        • 1.阻塞会造成浪费
        • 2.异步也无法解决
        • Reactive 的真正价值是什么?
        • 1. Reactive 非阻塞模型
        • 2. Reactive 管理异步的复杂性
        • 3. Reactive 管理系统的弹性:背压
        • 4. Reactive 管理不确定性:统一的错误处理
        • 拓展:异步编程存在的问题?
      • Flux
        • Flux 的核心概念
        • 如何创建 Flux
        • 1. 从已知数据创建
        • 2. 创建序列
        • 3. 程序化生成
        • 4. 使用 Sinks 创建 (推荐)
        • 操作符 - 构建数据处理流水线
        • 1. 转换类操作符
        • 2. 组合类操作符
        • 订阅 - 启动流水线
        • 1. 基础订阅
        • 2. 终端操作符
        • 错误处理与恢复
        • 调度器 - 控制线程执行
        • doOnXXX 系列 - 副作用“窥视孔”
        • onXXX 系列 - 错误处理与恢复
        • hookOnXXX 系列 - 精细化的订阅者控制
        • 高级概念与最佳实践
        • 1. 热 Flux vs. 冷 Flux
        • 2. 背压策略
        • 3. Sinks - 现代化的编程式创建
        • onXXX 系列 - 错误处理与恢复
        • 1. onErrorReturn - 使用静态值恢复
        • 2. onErrorResume - 使用备用 Mono 恢复
        • 3. retry - 重试操作
        • hookOnXXX 系列 - 全局监控
        • Hooks.onEachOperator 和 Hooks.onLastOperator
        • 消费 Mono:subscribe() vs block()
        • 1. subscribe() - 异步消费
        • 2. block() - 阻塞消费
        • 高级概念与最佳实践
        • 1. 组合多个 Mono
        • 2. 缓存结果 (cache())
        • 3. 超时处理 (timeout())
        • 4. 上下文传递 (Context)
  • Spring WebFlux

  • 微服务

  • Java Web
  • Reactor
EasT-Duan
2025-02-24
目录

Reactor 核心

欢迎来到我的 ChatGPT 中转站,极具性价比,为付费不方便的朋友提供便利,有需求的可以添加左侧 QQ 二维码,另外,邀请新用户能获取余额哦!最后说一句,那啥:请自觉遵守《生成式人工智能服务管理暂行办法》。

# 介绍

Reactor 是一个用于 JVM 的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)” 的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并完全遵循和实现了 “响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

# 依赖

<!--时间2025年2月24日14:38:41的最新版本-->
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <!--单元测试的相关包,可以不引用-->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12

# 拓展

如果想使用 Milestones 版或者 Snapshots 版本,需要在 pom 文件里添加以下依赖,否则在下载不下来。

<!--Milestones-->
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Milestones Repository</name>
		<url>https://repo.spring.io/milestone</url>
	</repository>
</repositories>
1
2
3
4
5
6
7
8
<!--Snapshots-->
<repositories>
	<repository>
		<id>spring-snapshots</id>
		<name>Spring Snapshot Repository</name>
		<url>https://repo.spring.io/snapshot</url>
	</repository>
</repositories>
1
2
3
4
5
6
7
8

# Reactor 入门

Reactor 是 Reactive Programming(反应式编程)范式的一种实现。

微软在 .NET 生态系统中创建了反应式扩展(Rx)库迈出了向反应式编程方向的第一步。随后,RxJava 在 JVM 上实现了反应式编程。随着时间的推移,Java 通过 Reactive Streams 努力实现了标准化,该规范为 JVM 上的反应式库定义了一系列接口和交互规则。其接口已集成到 Java 9 的 Flow 类中。

在面向对象的语言中,反应式编程范式通常是观察者模式的扩展。也可以与迭代器设计模式进行比较,因为所有这些库中的 Iterable - Iterator 对都具有二重性。其中一个主要区别是,迭代器是基于拉的,而反应流则是基于推的。

迭代器模式是一种命令式编程(Imperative Programming)模式,尽管访问值的方法完全由 Iterable 负责。但是实际上,开发者需要决定何时访问序列中的下一个元素(即调用 next() 方法)。在反应式流中,与上述 Iterable-Iterator 对等的模式是发布者 - 订阅者(Publisher-Subscriber)模式。但在这种模式中,是发布者通知订阅者新值的可用性,这种推送(push)特性是反应式的关键。

除了推送值,错误处理和完成方面也有明确的定义。一个 Publisher 可以向其 Subscriber 推送新值(通过调用 onNext ),但也可以发出错误信号(通过调用 onError )或完成信号(通过调用 onComplete )。错误和完成都会终止序列。这可以归纳如下

onNext x 0..N [onError | onComplete]
1

这种方法非常灵活。该模式支持无值、一个值或 n 个值(包括值的无限序列,如时钟的持续滴答声)的用例。

# 为什么需要 Reactive?

# 1. 阻塞会造成浪费

想象一下传统的餐厅服务模式:

  1. 客人(请求):一位客人坐下,点了一道需要长时间烹饪的菜(比如慢炖牛肉)。
  2. 厨师(线程):餐厅为这位客人分配了一位专属厨师。
  3. 阻塞等待:这位厨师从接单开始,就一直站在炉灶前,全程等待这道菜烹饪完成。在此期间,他不能去服务其他客人。
  4. 完成服务:菜做好后,厨师端给客人,然后才能被解放出来,去服务下一位客人。

这个模型的问题是什么?

  • 资源浪费:厨师(线程)大部分时间都在 “等待”(I/O 等待,如等待数据库、网络响应),而不是在工作。这是一种巨大的资源浪费。
  • 扩展性极差:如果餐厅同时来了 100 位客人,你就需要雇佣 100 位厨师。厨师的工资(线程的内存开销)非常昂贵!餐厅的厨房(服务器内存)很快就会爆满。
  • 脆弱性:一旦厨房里坐满了厨师,新来的客人就只能在外面排队(请求延迟),甚至被直接拒绝(服务不可用)。整个餐厅系统会变得非常缓慢甚至瘫痪。

这就是传统 “线程 - per - 请求” 的阻塞模型。它在并发量小的时候工作良好,但一旦面临高并发,就会立刻撞上性能天花板。

# 2. 异步也无法解决

异步模型并非万能银弹,它有几个根本性的局限。

  1. 无法解决 CPU 密集型任务

这是最核心的局限。异步模型的优势在于等待。当一个任务在等待 I/O(网络、磁盘)时,CPU 可以被释放去处理其他任务。

但是,如果一个任务本身就需要大量的 CPU 计算,比如:

  • 复杂的科学计算
  • 视频编码与解码
  • 大规模数据的加密 / 解密

这些任务没有 “等待” 时间,它们会持续占用 CPU。

  • 在传统模型中:你会启动多个线程,让它们在不同的 CPU 核心上并行计算,充分利用多核优势。
  • 在异步模型中:你通常只有一个(或少数几个)事件循环线程。如果这个线程被一个耗时的计算任务卡住,它就无法处理任何其他 I/O 事件。整个事件循环会被阻塞,导致所有请求都得不到响应,系统看起来就像 “卡死” 了一样。

一个形象的比喻:

  • I/O 密集型:服务员去厨房下单,他可以在厨房门口等待,也可以先去服务别的客人。等待是可被利用的间隙。
  • CPU 密集型:服务员接到一个任务,需要他亲手花 10 分钟揉一个复杂的面团。在这 10 分钟里,他什么也干不了,只能死死地揉面。即使有再多客人,他也无法响应。

结论:对于 CPU 密集型任务,传统的多线程并行模型通常是更好的选择。

  1. 无法消除物理延迟

异步不能让网络变快,也不能让硬盘读取加速。一个数据库查询需要 100ms,无论你用同步等待还是异步回调,这 100ms 的物理延迟是客观存在的。

  • 异步提高的是吞吐量:在等待这 100ms 的时候,系统可以处理成千上万个其他请求。
  • 异步不降低单次请求的延迟:对于那个发起查询的用户来说,他依然需要等待至少 100ms 才能拿到结果。
  • 手动处理多个异步依赖是极其困难的,会陷入回调地狱之中。

很多人误以为 “异步 = 快”,这是一个需要澄清的误区。异步是高效,不一定是高速。

举个例子:在用户界面上显示用户的前五个收藏,如果用户没有收藏,则显示建议。这需要通过以下三种服务(一种是提供收藏 ID,第二种是获取收藏详细信息,第三种是提供包含详细信息的建议)。那么代码就会变成如下这样。

// 需求很好理解,但是如果只从代码上去看,基本上很难直接看懂。
userService.getFavorites(userId, new Callback<List<String>>() { 
  // 成功时调用
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  } 
  // 出错时调用
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# Reactive 的真正价值是什么?

回到上面的问题中,Reactive 提供了很好的解决方案。

# 1. Reactive 非阻塞模型

  1. 客人(请求):一位客人坐下,点了一道慢炖牛肉。
  2. 服务员(事件循环线程):餐厅只有一位非常高效的服务员。他记下客人的菜单,然后把订单交给厨房。
  3. 非阻塞,立即释放:服务员不会在厨房门口傻等。他立刻回到大厅,去服务下一位客人,或者给其他桌倒水。他从不阻塞。
  4. 回调 / 事件驱动:当厨房把菜做好后,它会按一下铃(发出一个 “完成” 事件)。服务员听到铃声后,才去把菜端给对应的客人。

这个模型的优势是什么?

  • 极高的资源利用率:服务员(少量线程)几乎从不空闲,总是在处理事情。他一个人就能高效地服务几十甚至上百桌客人。
  • 极好的扩展性:无论来多少客人,餐厅都只需要这一位服务员。系统的资源消耗是固定的、可预测的,不会随着并发量线性增长。
  • 高弹性和响应性:即使客人再多,服务员也只是更忙一点,但系统不会崩溃。新来的客人总能被快速响应(至少服务员能过来记个单),而不会在门口无限等待。

这就是 Reactive 非阻塞模型。它用少量、固定的线程,通过事件驱动和回调机制,处理海量的并发操作。

# 2. Reactive 管理异步的复杂性

下面的代码对上面的代码进行简化

// 使用reactor之后整个代码就变成这样了
userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)  // 将Favorite转成流
           .switchIfEmpty(suggestionService.getSuggestions()) // 如果Favorite为空,那么从suggestionService拿推荐
           .take(5)  // 只获取5个
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1
2
3
4
5
6
7

再拓展一下,如果在 800ms 以内没有检索到收藏 ID,则从缓存中获取它们。在基于回调的代码中,这是比较复杂的处理逻辑。在 Reactor 中,只需在链中添加一个 timeout 操作符即可,如下所示:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) // 超过800ms
           .onErrorResume(cacheService.cachedFavoritesFor(userId))  // 从缓存中获取
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1
2
3
4
5
6
7
8

# 3. Reactive 管理系统的弹性:背压

这是 Reactive 最独特的贡献之一。单纯的异步模型没有背压概念。

想象一个场景:一个快速的生产者(如股票行情源)和一个慢消费者(如写入数据库)。

  • 没有背压的异步:生产者会快速产生事件,消费者异步处理。但消费者的处理队列会无限增长,最终导致内存溢出。
  • 有背压的 Reactive:消费者可以主动向上游反馈:“我只能处理 10 个”,生产者就会暂停发送。这是一种主动的、系统级的自我保护机制,是单纯的异步所不具备的。

# 4. Reactive 管理不确定性:统一的错误处理

在异步世界里,错误可能在任何时间点发生。Reactor 提供了 onErrorReturn , onErrorResume , retry 等操作符,让你可以在数据流的任何位置定义统一的错误恢复策略,而不是在每个回调里写 try-catch 。

# 拓展:异步编程存在的问题?

Future 和 CompletableFuture 。它们解决了 “如何获取一个异步任务的结果” 这个问题,但它们并没有完全解决 “如何优雅地处理异步数据流” 这个问题。

Future 对象比回调要好一些,但在组合方面仍然做得不好,尽管 CompletableFuture 在 Java 8 中有所改进。将多个 Future 对象协调在一起是可行的,但是也并不直观。此外, Future 还有其他问题。

  1. 调用 get() 方法很容易导致 Future 对象出现另一种阻塞情况。

  2. 不支持懒计算, CompletableFuture 关联的任务通常是急切执行的,而响应式流(如 Flux )是真正懒的,只有在被订阅时才会启动数据生产。

  3. 不支持多值和高级错误处理。

再看一个例子:我们得到一个 ID 列表,想从中获取一个名称和一个统计信息,然后将它们配对组合,所有这些都是异步完成的。下面的示例使用 CompletableFuture 类型的列表实现了这一功能:

CompletableFuture<List<String>> ids = ifhIds();

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> {
				CompletableFuture<String> nameTask = ifhName(i);
				CompletableFuture<Integer> statTask = ifhStat(i);
				// 合并两个结果
				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
			});
    
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
	// 一起执行
	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join)
			.collect(Collectors.toList()));
});

List<String> results = result.join();
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 用reactor改造
Flux<String> ids = ifhrIds();

Flux<String> combinations =
		ids.flatMap(id -> {
			Mono<String> nameTask = ifhrName(id);
			Mono<Integer> statTask = ifhrStat(id);

			return nameTask.zipWith(statTask,
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList();
// 阻塞,等待所有任务完成,和CompletableFuture.allOf差不多。
List<String> results = result.block();

assertThat(results).containsExactly(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

好的,这份 Flux 文档的结构和内容已经非常出色了!特别是对 doOnXXX 、 onXXX 和 hookOnXXX 的区分和总结表格,做得非常清晰。

我将在此基础上,帮你修正一些细节、补充几个关键的高级概念,并优化示例代码,使其成为一份更加全面和权威的 Flux 指南。


# Flux

提示

Flux 是一个可以发射多个数据项的异步消防水管。一旦打开阀门(订阅),水(数据)就会源源不断地流出来,直到水管排空(序列结束)或者发生爆管(错误)。

# Flux 的核心概念

  1. 发射 0 到 N 个元素:这是 Flux 与 Mono 最根本的区别。它可以代表一个数据流,例如:
    • 来自数据库的多条查询结果。
    • 来自消息队列(如 Kafka, RabbitMQ)的连续消息。
    • 用户界面的一系列点击事件。
    • 一个定时器周期性发出的时间戳。
  2. 异步与背压: Flux 是 Reactive Streams 规范的实现。这意味着它天生是异步的,并且强制支持背压。下游的订阅者可以向上游的发布者反馈自己的处理能力,从而避免因生产者速度过快而导致系统崩溃。
  3. 声明式 API:与 Mono 一样,你通过一系列操作符来声明一个数据处理流水线。这个流水线本身是惰性的,只有在被订阅时才会真正开始工作。

# 如何创建 Flux

创建 Flux 是使用它的第一步。Reactor 提供了非常丰富的静态工厂方法。

# 1. 从已知数据创建

// 1. Flux.just() - 从一个或多个直接指定的元素创建
Flux<String> fluxFromJust = Flux.just("Apple", "Banana", "Cherry");

// 2. Flux.fromIterable() - 从任何 Iterable 集合创建
List<Integer> list = Arrays.asList(1, 2, 3, 4);
Flux<Integer> fluxFromList = Flux.fromIterable(list);

// 3. Flux.fromArray() - 从数组创建
String[] array = {"A", "B", "C"};
Flux<String> fluxFromArray = Flux.fromArray(array);
1
2
3
4
5
6
7
8
9
10

# 2. 创建序列

// 4. Flux.range() - 创建一个整数序列
// 从 1 开始,发射 5 个元素:1, 2, 3, 4, 5
Flux<Integer> rangeFlux = Flux.range(1, 5);

// 5. Flux.interval() - 创建一个从 0 开始的递增 Long 序列,并按指定周期发射
// 每隔 1 秒发射一个数字:0, 1, 2, 3...
// 默认运行在 Schedulers.parallel() 线程池上
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
1
2
3
4
5
6
7
8

# 3. 程序化生成

// 6. Flux.generate() - 同步地、逐个地生成元素
// 适用于状态同步生成
Flux<String> generatedFlux = Flux.generate(
    () -> 0, // 初始状态
    (state, sink) -> {
        sink.next("3 x " + state + " = " + (3 * state)); // 发射一个元素
        if (state == 10) sink.complete(); // 当状态达到10时,完成流
        return state + 1; // 返回下一个状态
    }
);

// 7. Flux.create() - 更灵活的异步创建方式
// 适用于桥接现有的异步API,如监听器
Flux<String> listenerFlux = Flux.create(sink -> {
    // 模拟一个事件监听器
    someAsyncEventListener(event -> {
        sink.next(event.getData()); // 当事件到达时,发射数据
        if (event.isLast()) {
            sink.complete();
        }
    });
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 4. 使用 Sinks 创建 (推荐)

Sinks 是 Reactor 3.4+ 引入的、更安全、更灵活的以编程方式创建 Flux 和 Mono 的方式,是 Flux.create 和 Mono.create 的现代化替代品。

// Sinks.many().multicast(): //多播 创建一个可以向多个订阅者广播数据的 Sink
// Sinks.many().unicast(); // 单播,这个管道只能绑定单个消费者
// Sinks.many().replay(); // 重放,这个管道可以重放元素,把之前的元素重发
// 它支持背压,并且是线程安全的
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

Flux<String> fluxFromSink = sink.asFlux();

// 在其他线程中,可以向 sink 推送数据
sink.tryEmitNext("Hello");
sink.tryEmitNext("World");
sink.tryEmitComplete();

// 订阅者可以接收到这些数据
fluxFromSink.subscribe(System.out::println); // 输出: Hello, World
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

提示

目前项目中真实在用的方式。这个架构的核心是解耦和实时通信:

  1. 解耦请求与处理:用户的 HTTP 请求( customAi )是瞬时的,而 AI 算法处理是耗时的。不能让 HTTP 连一直等待结果。因此,系统采用 “请求 - 响应 - 分离” 模式:

    • 请求:HTTP 接口立即返回一个 Flux ,作为数据传输的 “管道”。
    • 处理:真正的 AI 任务被异步地投递到消息队列(RabbitMQ)中,由后台消费者处理。
  2. 通过 Sinks.many ().multicast () 多播的方式来使得同一时间可供给多个消费者,questionId 为唯一键来隔离会话。

/**  接收mq发送的消息 **/
@RabbitListener(queues = "${screening.mq.screen-queue-name}")
public void receiveMsg(String message) {
    log.info("【收到AI定制处理计划】:{}", message);
    try {
        CustomAiAlgoResDTO algoReportDTO = JSONUtil.toBean(message, CustomAiAlgoResDTO.class);
        if (Objects.isNull(algoReportDTO)){
            log.warn("【收到报告解读消息】数据解析失败:{}", message);
        }

        // 响应到对应的数据流
        streamService.pushScreenCustomSink(algoReportDTO.getId(), JSONUtil.toJsonStr(algoReportDTO));

        // 关闭数据流
        if (1 == algoReportDTO.getCompleted()){
            streamService.delScreenCustomSink(algoReportDTO.getId());
        }
    } catch (Exception e) {
        log.warn("【收到报告解读消息】处理异常:{}", e.getMessage());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class StreamService {

    /** 报告解读数据流 */
    private final ConcurrentHashMap<String, Sinks.Many<String>> reportAnalyzeSinks = new ConcurrentHashMap<>();

    /**
     * 获取筛查定制数据流
     * @param questionId 问题ID
     */
    public Flux<String> getScreenCustomSink(String questionId) {
        Sinks.Many<String> questionSink = reportAnalyzeSinks.get(questionId);
        return Objects.nonNull(questionSink) ?
                questionSink.asFlux() :
                Flux.using(() -> createReportAnalyzeSink(questionId),
                Sinks.Many::asFlux,
                sink -> delScreenCustomSink(questionId));
    }

    /**
     * 给筛查定制数据流推送数据
     * @param questionId 问题ID
     * @param data 数据
     */
    @Override
    public Boolean pushScreenCustomSink(String questionId, String data) {
        Sinks.Many<String> reportAnalyzeMany = reportAnalyzeSinks.get(questionId);
        reportAnalyzeMany.tryEmitNext(data);
        return true;
    }

    /**
     * 创建报告解读数据流
     * @param questionId 问题ID
     */
    private Sinks.Many<String> createReportAnalyzeSink(String questionId) {
        return reportAnalyzeSinks.computeIfAbsent(questionId, k -> Sinks.many().multicast().onBackpressureBuffer());
    }

    /**
     * 移除筛查定制数据流
     * @param questionId 问题ID
     */
    public void delScreenCustomSink(String questionId) {
        Sinks.Many<String> sink = reportAnalyzeSinks.get(questionId);
        if (null != sink) {
            sink.tryEmitComplete();
            reportAnalyzeSinks.remove(questionId);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public Flux<String> customAi(CustomAiParams params) {
    String questionId = params.getQuestionId();
    // 创建流对象并响应
    Flux<String> reportAnalyzeSink = streamService.getScreenCustomSink(questionId);
    // 调用算法
    CustomAiAlgoDTO customAiAlgoDTO = new CustomAiAlgoDTO();
    customAiAlgoDTO.setId(questionId);
    // ....省略封装
    ThreadUtil.execAsync(() -> {
                HttpClient.sendPost(customAiUrl, JSONUtil.parseObj(customAiAlgoDTO));
            }
    );
    return reportAnalyzeSink;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Operation(summary = "AI定制处理")
@PostMapping(value = "/customAi", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> customAi(@RequestBody @Valid CustomAiParams params) {
    return screenPatientService.customAi(params);
}
1
2
3
4
5

# 操作符 - 构建数据处理流水线

# 1. 转换类操作符

Flux.range(1, 5)
    .map(i -> i * 2) // map: 一对一转换,1->2, 2->4, ...
    .filter(i -> i > 5) // filter: 过滤,只保留大于5的元素
    .subscribe(System.out::println); // 输出: 6, 8, 10
1
2
3
4

flatMap 用于将每个元素映射为一个新的 Flux,然后将所有这些 Flux 扁平化合并成一个 Flux 。

// 假设有两个服务调用
Flux<Integer> ids = Flux.just(1, 2, 3);

// 对于每个 id,异步地获取一个用户详情
// fetchUserDetails(id) 返回的是一个 Mono<UserDetail>
// flatMap 会订阅这个 Mono,并在其发出结果后将结果发射到主 Flux 中
Flux<UserDetail> userDetails = ids.flatMap(id -> {
    return fetchUserDetails(id); 
});

// 如果 fetchUserDetails 返回 Flux,flatMap 也能完美处理
// Flux<Item> items = ids.flatMap(id -> fetchItemsForUser(id));
1
2
3
4
5
6
7
8
9
10
11
12

# 2. 组合类操作符

Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");

// merge: 交错合并两个流,不保证顺序
Flux<String> merged = Flux.merge(flux1, flux2); // 可能输出 A, D, B, E, C, F

// concat: 按顺序连接,等前一个流完成后再订阅下一个
Flux<String> concatenated = Flux.concat(flux1, flux2); // 输出 A, B, C, D, E, F

// zip: 一对一配对,等待所有源都发射了元素后才将它们组合
Flux<String> zipped = Flux.zip(flux1, flux2, (s1, s2) -> s1 + s2);
// 输出: AD, BE, CF
1
2
3
4
5
6
7
8
9
10
11
12

# 订阅 - 启动流水线

Flux 是惰性的,定义好流水线后,必须通过 subscribe() 来启动它。

# 1. 基础订阅

Flux.just("red", "green", "blue")
    .subscribe(); // 启动了,但什么也不做

Flux.just("red", "green", "blue")
    .subscribe(color -> System.out.println("Color: " + color)); // 处理每个元素

Flux.just("red", "green", "blue")
    .subscribe(
        color -> System.out.println("Color: " + color), // onNext
        error -> System.err.println("Error: " + error), // onError
        () -> System.out.println("Stream completed!") // onComplete
    );
1
2
3
4
5
6
7
8
9
10
11
12

# 2. 终端操作符

这些操作符会隐式地触发订阅,并将结果收集起来。

// collectList: 将所有元素收集到一个 List 中,返回 Mono<List<T>>
Mono<List<String>> listMono = Flux.just("a", "b", "c").collectList();
listMono.subscribe(list -> System.out.println(list)); // 输出 [a, b, c]

// count: 计算元素数量,返回 Mono<Long>
Mono<Long> countMono = Flux.range(1, 10).count();
countMono.subscribe(cnt -> System.out.println("Count: " + cnt)); // 输出 Count: 10

// block(): 阻塞当前线程,等待流结束并返回最终结果
// **警告**: block() 应谨慎使用,主要在测试或无法使用异步环境时使用,可能导致死锁。
List<String> blockedList = Flux.just("x", "y", "z").collectList().block();
System.out.println(blockedList); // 输出 [x, y, z]
1
2
3
4
5
6
7
8
9
10
11
12

# 错误处理与恢复

在响应式流中,错误是一个终止事件。一旦发生错误,整个流就会停止。Reactor 提供了多种操作符来优雅地处理错误。

Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i) // 当 i=0 时会抛出 ArithmeticException
    //.onErrorReturn(-1) // 发生错误时,返回一个默认值 -1 并正常结束流
    .onErrorResume(e -> Flux.just(99, 100)) // 发生错误时,切换到一个备用流
    .subscribe(
        result -> System.out.println("Result: " + result),
        err -> System.err.println("Unhandled Error: " + err), // 如果上面的错误处理没起作用,会到这里
        () -> System.out.println("Finished!")
    );
1
2
3
4
5
6
7
8
9

# 调度器 - 控制线程执行

默认情况下,操作符会继续在发布者所在的线程上执行。你可以使用 publishOn 和 subscribeOn 来切换执行线程。

  • publishOn: 影响其后续操作符的执行线程。
  • subscribeOn: 影响整个流的订阅和源头数据发射的执行线程。
Flux.just("A", "B", "C")
    .map(item -> { // 默认在 main 线程
        System.out.println("map 1: " + item + " on " + Thread.currentThread().getName());
        return item.toLowerCase();
    })
    .publishOn(Schedulers.boundedElastic()) // 切换线程池
    .map(item -> { // 后续操作在 boundedElastic 线程池中执行
        System.out.println("map 2: " + item + " on " + Thread.currentThread().getName());
        return item + "_processed";
    })
    .subscribe();
1
2
3
4
5
6
7
8
9
10
11

# doOnXXX 系列 - 副作用 “窥视孔”

doOnXXX 系列操作符是副作用操作符。它们像是在数据流上安装的 “窃听器” 或 “监控探头”,可以让你在不改变数据流本身的情况下,观察到流中发生的各种事件,并执行一些外部操作,如日志记录、指标发送、调试打印等。

核心特点:

  • 不修改流:它们只是 “看一眼”,然后让数据原封不动地继续向下传递。
  • 返回原始流: doOnXXX 的返回值仍然是 Flux<T> 或 Mono<T> 本身,所以它们可以无缝地嵌入到操作符链的任何位置。

示例

Flux.range(1, 5)
    .doOnSubscribe(subscription -> 
        System.out.println("Subscribed!")) // 订阅时触发
    .doOnNext(i -> 
        System.out.println("onNext: " + i)) // 每个元素发射时触发
    .doOnComplete(() -> 
        System.out.println("Sequence completed!")) // 流正常完成时触发
    .doOnError(error -> 
        System.err.println("Error occurred: " + error)) // 流发生错误时触发
    .doOnTerminate(() -> 
        System.out.println("Terminated.")) // 流终止时触发(无论成功或失败)
    .doOnCancel(() -> 
        System.out.println("Cancelled!")) // 流被取消时触发
    .subscribe();

// 输出:
// Subscribed!
// onNext: 1
// onNext: 2
// ...
// Sequence completed!
// Terminated.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# onXXX 系列 - 错误处理与恢复

这个系列主要用于错误处理,它们是响应式编程中构建弹性系统的核心。与 doOnXXX 只是 “观察” 错误不同, onXXX 操作符会消费错误并改变流的行为。

核心特点:

  • 消费错误:一旦 onErrorXXX 操作符处理了错误,原始的 onError 信号就会被 “吞掉”,不会继续向下传递。
  • 改变流:它们会根据错误情况,让流以不同的方式继续或结束。
Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .doOnError(e -> System.err.println("doOnError saw the error: " + e)) // 只是看一眼,错误会继续传递
    .onErrorReturn(-1) // 捕获错误,并返回一个默认值 -1,流正常结束
    .subscribe(
        result -> System.out.println("Result: " + result),
        err -> System.err.println("Final error: " + err), // 这行不会执行,因为错误被 onErrorReturn 消费了
        () -> System.out.println("Completed!")
    );

// 输出:
// Result: 10
// Result: 5
// doOnError saw the error: java.lang.ArithmeticException: / by zero
// Result: -1
// Completed!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

关键操作符对比:

  • onErrorReturn(E value) :发生错误时,发射一个静态的默认值,然后结束流。
  • onErrorResume(Function<Throwable, Publisher<T>> fallback) :发生错误时,切换到一个备用的 Flux 或 Mono。
  • onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) :发生错误时,跳过导致错误的那个元素,然后继续处理后续元素。这在处理批量数据时非常有用。
  • onErrorStop() :与 onErrorContinue 相反,它是默认行为,遇到错误就终止整个流。

# hookOnXXX 系列 - 精细化的订阅者控制

这个系列与前两者完全不同。hookOnXXX 不是 Flux 或 Mono 的操作符,而是 BaseSubscriber 这个抽象类中的钩子方法。

当你需要对订阅行为本身进行精细控制,特别是手动管理背压时,你会选择创建一个 BaseSubscriber 的匿名子类,并重写这些钩子方法。

核心特点:

  • 不是操作符:它不是链式调用的一部分,而是 subscribe() 方法的参数。
  • 控制订阅生命周期:让你能介入到订阅、请求、接收、取消等每一个环节。
  • 手动背压:最核心的用途是实现自定义的背压策略。

下面的例子展示了如何创建一个每次只请求 1 个元素的订阅者,从而实现精确的背压控制。

// 修正后的示例
Flux<Integer> source = Flux.range(1, 10);

source.subscribe(new BaseSubscriber<Integer>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed!");
        System.out.println("Requesting first element...");
        request(1); // 必须主动请求,否则不会有数据
    }

    @Override
    protected void hookOnNext(Integer value) {
        System.out.println("Received element: " + value);
        if (value < 10) {
            System.out.println("Requesting next element...");
            request(1); // 处理完一个后,再请求下一个
        } else {
            System.out.println("Last element received, no more requests.");
        }
    }

    @Override
    protected void hookOnComplete() {
        System.out.println("Stream completed!");
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable);
    }
});

// 输出:
// Subscribed!
// Requesting first element...
// Received element: 1
// Requesting next element...
// Received element: 2
// ...
// Received element: 10
// Last element received, no more requests.
// Stream completed!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 高级概念与最佳实践

# 1. 热 Flux vs. 冷 Flux

  • 冷 Flux:像一张光盘。每个订阅者都会从头到尾播放一遍,订阅者之间互不影响。 Flux.just 、 Flux.range 、 Flux.fromIterable 以及大多数基于 generate / create 的 Flux 都是冷的。
  • 热 Flux:像一场直播。数据流独立于订阅者存在,订阅者只能从它加入的时刻开始接收数据,错过了就是错过了。 Flux.interval 是一个典型的热 Flux 。

转换冷 Flux 为热 Flux

使用 publish() 和 refCount() (或更简单的 share() ) 可以将一个冷 Flux 变成热 Flux ,从而让多个订阅者共享同一个数据流。

Flux<Integer> coldFlux = Flux.range(1, 5).delayElements(Duration.ofMillis(200));

Flux<Integer> hotFlux = coldFlux.publish().refCount(); // 或者 .share()

// 第一个订阅者
hotFlux.subscribe(i -> System.out.println("Subscriber 1: " + i));

try { Thread.sleep(500); } catch (InterruptedException e) {}

// 第二个订阅者,它会错过前几个元素
hotFlux.subscribe(i -> System.out.println("Subscriber 2: " + i));

// 可能的输出:
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Subscriber 2: 3  // 从当前元素开始
// Subscriber 1: 4
// Subscriber 2: 4
// ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2. 背压策略

当下游处理速度跟不上上游发射速度时, Flux 提供了多种背压策略来决定如何处理 “积压” 的数据。这些策略通常用在 onBackpressureXXX 操作符中。

Flux.interval(Duration.ofMillis(10)) // 快速生产者
    .onBackpressureBuffer(100) // 缓冲最多100个元素,超过则抛出异常
    // .onBackpressureDrop() // 丢弃下游无法处理的元素
    // .onBackpressureLatest() // 只保留最新的元素,丢弃旧的
    .doOnNext(i -> {
        try { Thread.sleep(100); } catch (InterruptedException e) {} // 慢消费者
    })
    .subscribe(System.out::println);
1
2
3
4
5
6
7
8

# 3. Sinks - 现代化的编程式创建

Sinks 是 Flux.create 和 Mono.create 的现代化替代品,提供了更清晰的 API 和更好的线程安全保证。

  • Sinks.one() : 创建一个只能发射 0 或 1 个元素的 Mono 。
  • Sinks.many() : 创建一个可以发射 0 到 N 个元素的 Flux 。
    • multicast() : 支持多个订阅者,是热流。
    • unicast() : 只支持一个订阅者。
    • replay() : 会向新订阅者重放所有历史数据。
// 创建一个可以向多个订阅者广播的 Sink
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> flux = sink.asFlux();

flux.subscribe(s -> System.out.println("Subscriber A: " + s));
sink.tryEmitNext("Hello");

flux.subscribe(s -> System.out.println("Subscriber B: " + s));
sink.tryEmitNext("World");
sink.tryEmitComplete();

// 输出:
// Subscriber A: Hello
// Subscriber A: World
// Subscriber B: World
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# onXXX 系列 - 错误处理与恢复

与 doOnXXX 不同, onXXX 系列操作符是错误处理操作符。它们可以捕获上游的错误,并决定如何恢复流,从而阻止错误向下游传播。

# 1. onErrorReturn - 使用静态值恢复

当发生错误时,忽略错误并发射一个静态的默认值。

Mono<String> monoWithError = Mono.error(new IllegalStateException("Failed"));
monoWithError
    .onErrorReturn("Fallback Value") // 捕获错误并返回默认值
    .subscribe(
        value -> System.out.println("Received: " + value),
        error -> System.err.println("Should not be here!") // 不会执行
    );
// 输出: Received: Fallback Value
1
2
3
4
5
6
7
8

# 2. onErrorResume - 使用备用 Mono 恢复

这是最灵活的错误恢复方式。当错误发生时,可以动态地切换到一个备用的 Mono 。

Mono<String> primaryService = Mono.error(new RuntimeException("Primary service is down"));
Mono<String> fallbackService = Mono.just("Response from fallback service");

primaryService
    .onErrorResume(error -> {
        System.out.println("Primary failed, invoking fallback. Reason: " + error.getMessage());
        return fallbackService; // 切换到备用 Mono
    })
    .subscribe(System.out::println);

// 输出:
// Primary failed, invoking fallback. Reason: Primary service is down
// Response from fallback service
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3. retry - 重试操作

当发生错误时,自动重新订阅上游的 Mono ,尝试最多 n 次。

AtomicInteger attempt = new AtomicInteger(0);
Mono<String> unreliableMono = Mono.fromCallable(() -> {
        if (attempt.incrementAndGet() < 3) {
            throw new RuntimeException("Temporary failure");
        }
        return "Success after retries";
    })
    .retry(2) // 重试2次 (总共会尝试3次)
    .subscribe(
        System.out::println,
        error -> System.err.println("Final failure after all retries: " + error.getMessage())
    );

// 输出: Success after retries
1
2
3
4
5
6
7
8
9
10
11
12
13
14

注意: retry 会在错误发生后立即重试。对于更复杂的场景(如指数退避),应使用 retryWhen 。


# hookOnXXX 系列 - 全局监控

Hooks 提供了全局性的、跨所有响应式流的监控和修改能力。它们通常用于应用启动时设置,以进行统一的日志记录、追踪或指标收集。

# Hooks.onEachOperator 和 Hooks.onLastOperator

  • Hooks.onEachOperator : 将一个操作符应用到每一个新创建的操作符上。
  • Hooks.onLastOperator : 将一个操作符应用到每个响应式链的最后一个操作符上。

示例:为所有 Mono 添加全局日志

// 在应用启动时设置
Hooks.onEachOperator(operatorCall -> operatorCall
    .call("globalLogger", new MonoLogger()) // 自定义一个日志操作符
);

// ... 应用代码中的任意 Mono
Mono.just("Some data")
    .map(String::toUpperCase)
    .subscribe(); // 订阅时会触发全局日志

// 自定义日志操作符 (简化版)
class MonoLogger implements CoreSubscriber<Object> {
    // ... 实现细节略,主要是包装下游订阅者并在 onNext, onError 等方法中打印日志
    // 这是一个高级用法,实际中可以使用 reactor-tools-extra 等库
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 消费 Mono: subscribe() vs block()

Mono 定义了一个异步流程,要触发它,你需要 “消费” 它。主要有两种方式:

# 1. subscribe() - 异步消费

这是标准的、非阻塞的方式。它会启动异步流程,并立即返回,不会等待结果完成。

Mono<String> mono = Mono.just("Async Result");

// 启动异步流程
Disposable disposable = mono.subscribe(
    result -> System.out.println("Got result: " + result), // onNext
    error -> System.err.println("Error: " + error),       // onError
    () -> System.out.println("Completed.")               // onComplete
);

System.out.println("This line prints immediately.");

// 输出:
// This line prints immediately.
// Got result: Async Result
// Completed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

subscribe() 返回的 Disposable 对象可以用来取消订阅 ( disposable.dispose() )。

# 2. block() - 阻塞消费

block() 会阻塞当前线程,直到 Mono 发出元素或完成,然后返回结果。如果发生错误,它会抛出异常。

Mono<String> mono = Mono.just("Blocking Result");

String result = mono.block(); // 阻塞等待结果
System.out.println("Got result: " + result);

// 输出:
// Got result: Blocking Result
1
2
3
4
5
6
7

⚠️ 重要警告:

  • 永远不要在响应式环境(如 Spring WebFlux 控制器)中调用 block()。这会阻塞事件循环线程,完全抵消响应式编程的优势,并可能导致应用死锁。
  • block() 的合理使用场景包括:在 main 方法中、单元测试中,或者在与非响应式的遗留代码集成的边界上。

# 高级概念与最佳实践

# 1. 组合多个 Mono

  • Mono.zip(): 并行执行多个 Mono ,当它们都成功完成后,将结果组合成一个 Tuple 。

    Mono<String> userMono = findUser(1);
    Mono<String> orderMono = findOrder(123);
    
    Mono<Tuple2<String, String>> tupleMono = Mono.zip(userMono, orderMono);
    
    tupleMono.subscribe(tuple -> {
        System.out.println("User: " + tuple.getT1());
        System.out.println("Order: " + tuple.getT2());
    });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
  • Mono.when(): 并行执行多个 Mono ,等待它们全部完成,但忽略它们的结果。返回一个 Mono<Void> ,适用于编排多个独立的副作用任务。

    Mono<Void> updateTask = updateUser(user);
    Mono<Void> logTask = logUpdate(user);
    
    Mono.when(updateTask, logTask)
        .subscribe(() -> System.out.println("All tasks completed."));
    
    1
    2
    3
    4
    5

# 2. 缓存结果 ( cache() )

cache() 操作符会缓存 Mono 的结果(成功或失败)。后续的订阅者将立即获得缓存的结果,而不会重新执行源操作。这对于昂贵的、只需计算一次的操作非常有用。

Mono<String> expensiveMono = Mono.fromCallable(() -> {
    System.out.println("Performing expensive computation...");
    Thread.sleep(1000);
    return "Computed Value";
}).cache(); // 缓存结果

System.out.println("First subscription:");
expensiveMono.subscribe(System.out::println);

System.out.println("Second subscription:");
expensiveMono.subscribe(System.out::println);

// 输出:
// First subscription:
// Performing expensive computation...
// Computed Value
// Second subscription:
// Computed Value (第二次没有重新计算)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 3. 超时处理 ( timeout() )

timeout() 操作符可以防止 Mono 因下游服务响应慢而无限期等待。如果在指定时间内没有完成,它会发出一个 TimeoutException 。

Mono<String> slowMono = Mono.fromCallable(() -> {
    Thread.sleep(2000); // 模拟2秒延迟
    return "Finally";
});

slowMono
    .timeout(Duration.ofSeconds(1)) // 设置1秒超时
    .onErrorResume(TimeoutException.class, e -> Mono.just("Timeout! Using fallback."))
    .subscribe(System.out::println);

// 输出: Timeout! Using fallback.
1
2
3
4
5
6
7
8
9
10
11

# 4. 上下文传递 ( Context )

Context 是一个在响应式链中从上游向下游传递元数据(如请求 ID、用户凭证)的强大机制,它不作为数据流的一部分,而是作为附加信息传递。

String key = "message";

Mono<String> mono = Mono.deferContextual(contextView ->
        Mono.just("Hello, " + contextView.get(key))
    )
    .contextWrite(context -> context.put(key, "Reactor"));

mono.subscribe(System.out::println);

// 输出: Hello, Reactor
1
2
3
4
5
6
7
8
9
10
#响应式编程
上次更新: 2025/11/14, 07:03:26
前置条件
Spring WebFlux

← 前置条件 Spring WebFlux→

最近更新
01
Spring WebFlux
11-18
02
前置条件
10-30
03
计算机网络
09-13
更多文章>
Theme by Vdoing | Copyright © 2019-2025 powered by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式