笔记 笔记
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

EasT-Duan

Java 开发
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

购买兑换码请添加

添加时候请写好备注,否则无法通过。

  • Maven

  • Bootstrap

  • Spring

  • Spring MVC

  • MyBatis

  • JUnit

  • GitFlow 工作流指南

  • SpringBoot

  • Reactor

    • 前置条件
    • Reactor 核心
      • 介绍
      • 依赖
        • 拓展
      • Reactive(响应式编程)入门
        • 为什么需要Reactive呢?
        • 1.阻塞会造成浪费
        • 2.异步也无法解决
        • 在 subscribe() 之前,什么都不会发生
      • Flux
        • 案例
        • 说明
        • doOn系列
      • Mono
  • 微服务

  • Java Web
  • Reactor
EasT-Duan
2025-02-24
目录

Reactor 核心

欢迎来到我的 ChatGPT 中转站,极具性价比,为付费不方便的朋友提供便利,有需求的可以添加左侧 QQ 二维码,另外,邀请新用户能获取余额哦!最后说一句,那啥:请自觉遵守《生成式人工智能服务管理暂行办法》。

# 介绍

Reactor 是一个用于 JVM 的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)” 的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并完全遵循和实现了 “响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

# 依赖

<!--时间2025年2月24日14:38:41的最新版本-->
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <!--单元测试的相关包,可以不引用-->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12

# 拓展

如果想使用 Milestones 版或者 Snapshots 版本,需要在 pom 文件里添加以下依赖,否则在下载不下来。

<!--Milestones-->
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Milestones Repository</name>
		<url>https://repo.spring.io/milestone</url>
	</repository>
</repositories>
1
2
3
4
5
6
7
8
<!--Snapshots-->
<repositories>
	<repository>
		<id>spring-snapshots</id>
		<name>Spring Snapshot Repository</name>
		<url>https://repo.spring.io/snapshot</url>
	</repository>
</repositories>
1
2
3
4
5
6
7
8

# Reactive(响应式编程)入门

Reactor 是 Reactive Programming(反应式编程)范式的一种实现。

微软在 .NET 生态系统中创建了反应式扩展(Rx)库迈出了向反应式编程方向的第一步。随后,RxJava 在 JVM 上实现了反应式编程。随着时间的推移,Java 通过 Reactive Streams 努力实现了标准化,该规范为 JVM 上的反应式库定义了一系列接口和交互规则。其接口已集成到 Java 9 的 Flow 类中。

在面向对象的语言中,反应式编程范式通常是观察者模式的扩展。也可以与迭代器设计模式进行比较,因为所有这些库中的 Iterable - Iterator 对都具有二重性。其中一个主要区别是,迭代器是基于拉的,而反应流则是基于推的。

迭代器模式是一种命令式编程(Imperative Programming)模式,尽管访问值的方法完全由 Iterable 负责。但是实际上,开发者需要决定何时访问序列中的下一个元素(即调用 next() 方法)。在反应式流中,与上述 Iterable-Iterator 对等的模式是发布者 - 订阅者(Publisher-Subscriber)模式。但在这种模式中,是发布者通知订阅者新值的可用性,这种推送(push)特性是反应式的关键。

除了推送值,错误处理和完成方面也有明确的定义。一个 Publisher 可以向其 Subscriber 推送新值(通过调用 onNext ),但也可以发出错误信号(通过调用 onError )或完成信号(通过调用 onComplete )。错误和完成都会终止序列。这可以归纳如下

onNext x 0..N [onError | onComplete]
1

这种方法非常灵活。该模式支持无值、一个值或 n 个值(包括值的无限序列,如时钟的持续滴答声)的用例。

# 为什么需要 Reactive 呢?

# 1. 阻塞会造成浪费

提高计划绩效的方法大致有两种:

  • 并行化,以使用更多线程和更多硬件资源。

  • 提高使用效率。

通常,Java 开发人员在编写程序时会使用阻塞代码。在出现性能瓶颈之前,这种做法是没有问题的。这时就需要引入更多线程,运行类似的阻塞代码。但是,这种资源利用率的扩展很快就会带来争用和并发问题。

更糟糕的是,阻塞会浪费资源。如果你仔细观察,只要程序涉及一些延迟(尤其是 I/O,如数据库请求或网络调用),资源就会被浪费,因为线程(可能有很多线程)现在处于闲置状态,等待数据。

# 2. 异步也无法解决

通过编写异步、非阻塞代码,可以让执行切换到另一个使用相同底层资源的活动任务,然后在异步处理完成后返回当前进程。Java 提供了两种异步编程模式。

  • Callbacks:没有返回值,但需要一个额外的 callback 参数(一个 lambda 或匿名类),当结果可用时,该参数会被调用。一个众所周知的例子就是 Swing 的 EventListener 层次结构。
  • Futures: 会返回一个 Future<T> 值。异步进程会计算一个 T 值,但 Future 对象会封装对它的访问。该值不会立即可用,可以轮询该对象,直到该值可用为止。例如,运行 Callable<T> 任务的 ExecutorService 使用 Future 对象。

但是回调很难组合在一起,很快就会导致代码难以阅读和维护(被称为 "回调地狱")。

举个例子:在用户界面上显示用户的前五个收藏,如果用户没有收藏,则显示建议。这需要通过以下三种服务(一种是提供收藏 ID,第二种是获取收藏详细信息,第三种是提供包含详细信息的建议)。那么代码就会变成如下这样。

// 需求很好理解,但是如果只从代码上去看,基本上很难直接看懂。
userService.getFavorites(userId, new Callback<List<String>>() { 
  // 成功时调用
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  } 
  // 出错时调用
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 使用reactor之后整个代码就变成这样了
userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)  // 将Favorite转成流
           .switchIfEmpty(suggestionService.getSuggestions()) // 如果Favorite为空,那么从suggestionService拿推荐
           .take(5)  // 只获取5个
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1
2
3
4
5
6
7

再拓展一下,如果在 800ms 以内没有检索到收藏 ID,则从缓存中获取它们。在基于回调的代码中,这是比较复杂的处理逻辑。在 Reactor 中,只需在链中添加一个 timeout 操作符即可 **,如下所示:**

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) // 超过800ms
           .onErrorResume(cacheService.cachedFavoritesFor(userId))  // 从缓存中获取
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1
2
3
4
5
6
7
8

Future 对象比回调要好一些,但在组合方面仍然做得不好,尽管 CompletableFuture 在 Java 8 中有所改进。将多个 Future 对象协调在一起是可行的,但是也并不直观。此外, Future 还有其他问题。

  1. 调用 get() 方法很容易导致 Future 对象出现另一种阻塞情况。
  2. 不支持懒计算。
  3. 不支持多值和高级错误处理。

再看一个例子:我们得到一个 ID 列表,想从中获取一个名称和一个统计信息,然后将它们配对组合,所有这些都是异步完成的。下面的示例使用 CompletableFuture 类型的列表实现了这一功能:

CompletableFuture<List<String>> ids = ifhIds();

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> {
				CompletableFuture<String> nameTask = ifhName(i);
				CompletableFuture<Integer> statTask = ifhStat(i);
				// 合并两个结果
				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
			});
    
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
	// 一起执行
	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join)
			.collect(Collectors.toList()));
});

List<String> results = result.join();
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 用reactor改造
Flux<String> ids = ifhrIds();

Flux<String> combinations =
		ids.flatMap(id -> {
			Mono<String> nameTask = ifhrName(id);
			Mono<Integer> statTask = ifhrStat(id);

			return nameTask.zipWith(statTask,
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList();
// 阻塞,等待所有任务完成,和CompletableFuture.allOf差不多。
List<String> results = result.block();

assertThat(results).containsExactly(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 在 subscribe() 之前,什么都不会发生

一个发布者最少要有一个订阅者,否则程序不会有任何行为。

# Flux

一个 Flux<T> 是一个标准的 Publisher<T> ,表示 0 到 N 个发射项的异步序列,可选择以完成信号或错误结束。与 Reactive Streams 规范中一样,这三种类型的信号可转化为对下游 Subscriber 的 onNext 、 onComplete 和 onError 方法的调用。

# 案例

public class FluxDemo01 {
    public static void main(String[] args) {
        // Flux 0 个或多个元素
        // 创建一个Flux 流
        Flux<Integer> just = Flux.just(1, 2, 3, 4, 5);

        // 订阅,流不消费就没用
        just.subscribe(e -> System.out.println("e1收到数据:" + e));
        // 一个流可以多次订阅
        just.subscribe(e -> System.out.println("e2收到数据:" + e));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
public class FluxDemo02 {
    public static void main(String[] args) {
        // 创建一个每秒发送一次的流
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        interval.subscribe(e -> System.out.println("e3收到数据:" + e));
        // 主线程不能停止,否则程序结束,流还没打印
        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class FluxDemo03 {
    /**
     *
     * 使用 Reactor 的 Flux 创建一个整数序列流,该流从1开始,包含10个元素。
     * 每个元素在发出前都会延迟1秒。
     *
     * 通过 doOnComplete、doOnCancel 和 doOnNext 方法添加事件感知回调,
     * 分别在流结束时、取消时和发出下一个元素时执行指定的操作。
     *
     * 注意:如果不订阅流,doOnComplete 方法将不会被调用。
     */
    public static void main(String[] args) {
        Flux<Integer> complete = Flux.range(1, 10)// 产生1-10的数字
                // 延迟1秒
                .delayElements(Duration.ofSeconds(1))
                .doOnComplete(() -> System.out.println("流结束"))
                .doOnCancel(() -> System.out.println("流取消"))
                .doOnNext(e -> System.out.println("下一个元素是:" + e));
        // 如果不订阅,doOnComplete方法不会被调用
        complete.subscribe(e -> System.out.println("e4收到数据:" + e));
        // 防止主线程结束
        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class FluxDemo04 {
    public static void main(String[] args) {
        Flux<Integer> range = Flux.range(1, 10)
                // 延迟100毫秒
                .delayElements(Duration.ofMillis(100));
        
        range
            	// doOnNext,它会在流中每次发出一个元素时触发
                .doOnNext(item -> System.out.println("下一个元素是: " + item))
                .subscribe(item -> System.out.println("当前接收到的元素是: " + item));
        // doOnEach,不仅能感知元素而且,还能感知信号
        range
                .doOnEach(signal -> {
                    if (signal.isOnNext()) {
                        Integer i = signal.get();
                        System.out.println("下一个元素是: " + i);
                        if (7 == i) {
                            i = i / 0;
                        }
                    } else if (signal.isOnComplete()) {
                        System.out.println("流结束了");
                    } else if (signal.isOnError()) {
                        System.out.println("流出现异常了: " + Objects.requireNonNull(signal.getThrowable()).getMessage());
                    }
                })
                .subscribe(item -> System.out.println("当前接收到的元素是: " + item));
        // 防止主线程结束
        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FluxDemo05 {
    public static void main(String[] args) {
        Flux<Integer> complete = Flux.range(1, 10)
                // 延迟1秒
                .delayElements(Duration.ofSeconds(1))
                // 整流操作
                .filter(i -> i % 2 == 0);            
        // 自定义订阅者
        complete.subscribe(new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                System.out.println("开始订阅了");
                // 请求一个数据,如果不写,则不会输出下一个数据
                request(1);
                // 请求无界的数据
                requestUnbounded();
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("接收到了元素:" + value);
                // 再次请求一个元素,如果请求的是无界的数据那么这里就不需要进行请求了
                request(1);
            }

            @Override
            protected void hookOnComplete() {
                System.out.println("流结束了");
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.out.println("发生了错误");
            }

            @Override
            protected void hookOnCancel() {
                System.out.println("流取消了");
            }

            @Override
            protected void hookFinally(SignalType type) {
                System.out.println("最终执行了"+type);
            }
        });
        // 防止主线程结束
        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class FluxDemo06 {
	public static void main(String[] args) {
		// 事件感知API,当流结束时,会调用onComplete方法, doOnxxx就是一个回调方法,本质是Hook方法
		Flux<Integer> complete = Flux.range(1, 50)
				// 延迟1秒
				.delayElements(Duration.ofMillis(100))
				// 整流操作
				.filter(i -> i % 2 == 0).doOnNext(e -> {
					if (e == 26)
						e = 10 / 0;
				})
            	// 出错时流直接完成
                .onErrorComplete();
       			// 出错时跳过错误继续执行
//				.onErrorContinue((throwable, o) -> {
//					System.out.println(throwable.getMessage());
//					System.out.println("发生报错的元素是" + o);
//				});
		// 自定义订阅者1
//		complete.subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()),
//				() -> System.out.println("完成"));
		// 自定义订阅者2
		complete.subscribe(new BaseSubscriber<Integer>() {
			@Override
			protected void hookOnSubscribe(Subscription subscription) {
				// 请求无界数量
				requestUnbounded();
			}

			@Override
			protected void hookOnNext(Integer value) {
				System.out.println("元素是"+value);
			}

			@Override
			protected void hookOnComplete() {
				System.out.println("已完成");
			}

			@Override
			protected void hookOnError(Throwable throwable) {
				System.out.println(throwable.getMessage());
			}

			@Override
			protected void hookOnCancel() {
				System.out.println("流已取消");
			}

		});

		// 防止主线程结束
		try {
			System.in.read();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class FluxDemo07 {
	public static void main(String[] args) {
		Flux.range(1, 10).doOnRequest(r -> System.out.println("request of " + r))
				.subscribe(new BaseSubscriber<>() {

                    @Override
                    public void hookOnSubscribe(Subscription subscription) {
                        requestUnbounded();
                    }

                    @Override
                    public void hookOnNext(Integer integer) {
                        System.out.println("元素是 " + integer);
						// 取消流,程序直接完成。
                        cancel();
                    }
                });
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FluxDemo08 {
    public static void main(String[] args) {
        // buffer是一个缓冲区,能把指定的元素根据缓冲器的大小,分成一个个ArrayList
        Flux<List<Integer>> flux = Flux.range(1, 10).buffer(3);

        flux.subscribe(new BaseSubscriber<List<Integer>>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);
            }

            @Override
            protected void hookOnNext(List<Integer> value) {
                request(1);
                System.out.println("请求到的元素是:" + Arrays.toString(value.toArray()));
            }
        });
    }
    //请求到的元素是:[1, 2, 3]
    //请求到的元素是:[4, 5, 6]
    //请求到的元素是:[7, 8, 9]
    //请求到的元素是:[10]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 说明

在上面的例子中会发现有很多的 doNoXXX、onXXX、hookOnXXX 操作。

# doOn 系列

# Mono

#响应式编程
上次更新: 2025/04/12, 05:37:39
前置条件
微服务架构的概念

← 前置条件 微服务架构的概念→

最近更新
01
前置条件
10-30
02
计算机网络
09-13
03
软件工程
09-13
更多文章>
Theme by Vdoing | Copyright © 2019-2025 powered by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式