前置条件
先定义一个接口
public interface MyInterface {
int sum(int i, int j);
}
2
3
# 传统方式
//实现类的方式
public class MyInterfaceImpl implements MyInterface {
@Override
public int sum(int i, int j) {
return i + j;
}
}
2
3
4
5
6
7
public class Main {
public static void main(String[] args) {
// 传统方式
// 方式 1
MyInterface myInterface = new MyInterfaceImpl();
int res = myInterface.sum(1, 2);
System.out.println(res);
// 方式 2 内部类实现
MyInterface myInterface1 = new MyInterface() {
@Override
public int sum(int i, int j) {
return i + j;
}
};
res = myInterface1.sum(1, 2);
System.out.println(res);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Lambda
public class Main {
public static void main(String[] args) {
MyInterface myInterface = (i, j) -> i + j;
// 更加简洁的写法
// MyInterface myInterface = Integer::sum;
int res = myInterface.sum(1, 2);
System.out.println(res);
}
}
2
3
4
5
6
7
8
9
Lambda 表达式的语法糖: 参数列表 + 箭头 + 方法体
。函数式接口是 Lambda 表达式的目标类型。
# 函数式接口
- 单一抽象方法(Single Abstract Method, SAM):函数式接口只能有一个抽象方法。这个方法也被称为函数式接口的函数描述符。
- 可以包含默认方法和静态方法:函数式接口可以包含默认方法(使用
default
关键字定义的方法)和静态方法,这些方法可以有具体的实现,不违反函数式接口的定义。 - 可以包含 Object 类中的公共方法:函数式接口可以声明
equals
,hashCode
和toString
等方法,因为这些方法是从Object
类继承而来的,不是新增的抽象方法。
简单来说:
- 只要是函数式接口就可以用 Lambda 表达式简化
- 接口中有且只有一个未实现的方法,这个接口就叫函数式接口
//在Lambda中的例子其实就是一个函数式接口的实现
//参数类型可以不写,只写(参数名),参数变量名随意定义;
interface MyHaha {
int haha();
}
MyHaha myHaha = () -> 2;
res = myHaha.haha();
System.out.println(res);
interface MyHehe {
int hehe(int i);
}
MyHehe myHehe = x -> x + 1;
res = myHehe.hehe(1);
System.out.println(res);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 如何识别
通过 @FunctionalInterface
注解可以检测当前这个接口是否符合规范,例如
@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {
int hehe(int i);
// 如定义中所描述,有默认实现的也可以是个函数式接口
default int heihei() {
return hehe(10);
}
// 但如果是个抽象的定义,这种就会发生错误
// int lala(int i);
}
2
3
4
5
6
7
8
9
10
# Function
在 Java 中,函数式接口是只包含一个抽象方法的接口。它们是支持 Lambda 表达式的基础,因为 Lambda 表达式需要一个目标类型,这个目标类型必须是一个函数式接口。
函数式接口的出入参定义:
- 有入参,无出参【消费者】: function.accept
BiConsumer<String,String> function = (a,b)->{ //能接受两个入参
System.out.println("哈哈:"+a+";呵呵:"+b);
};
function.accept("1","2");
2
3
4
2、有入参,有出参【多功能函数】: function.apply
Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));
2
3、无入参,无出参【普通函数】:
Runnable runnable = () -> System.out.println("aaa");
new Thread(runnable).start();
2
4、无入参 ,有出参【提供者】: supplier.get ()
Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);
2
3
java.util.function 包下的所有 function 定义:
- Consumer: 消费者
- Supplier: 提供者
- Predicate: 断言
get/test/apply/accept 调用的函数方法;
private static void demo() {
Supplier<String> supplier = () -> "3";
Predicate<String> predicate = str -> str.matches("-?\\d+(\\.\\d+)?");
Function<String, Integer> function = Integer::parseInt;
Consumer<Integer> consumer = num -> {
if (num % 2 == 0) {
System.out.println(num + "是偶数");
} else {
System.out.println(num + "是奇数");
}
};
deal(supplier, predicate, function, consumer);
}
private static void deal(Supplier<String> supplier, Predicate<String> predicate, Function<String, Integer> function, Consumer<Integer> consumer) {
boolean test = predicate.test(supplier.get());
if (test) {
consumer.accept(function.apply(supplier.get()));
} else {
System.out.println("不是数字");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Stream API
Stream 所有数据和操作被组合成流管道流管道组成:
- 一个数据源(可以是一个数组、集合、生成器函数、I/O 管道)
- 零或多个中间操作(将一个流变形成另一个流)
- 一个终止操作(产生最终结果)
流是惰性的,只有在启动最终操作时才会对源数据进行计算,而且只在需要时才会消耗源元素
这句话的意思是:它们不会立即处理源数据,而是等到遇到 “终止操作” 时才会开始计算并处理数据。并且,在处理数据时,它们不会一次性加载和处理所有数据,而是只处理当前需要的数据部分。
# 创建流
of、builder、empty、ofNullable、generate、concat、集合.stream
# 中间操作(Intermediate Operations)
- filter:过滤;挑出我们用的元素
- map: 映射:一一映射,a 变成 b
- mapToInt、mapToLong、mapToDouble
- flatMap:打散、散列、展开、扩维:一对多映射
更多方法
filter、map、mapToInt、mapToLong、mapToDouble、flatMap、flatMapToInt、flatMapToLong、flatMapToDouble、mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、parallel、unordered、onClose、sequential、distinct、sorted、peek、limit、skip、takeWhile、dropWhile。
# 终止操作(Terminal Operation)
- forEach:对此流的每个元素执行一个操作。
- toList:将此流的元素累积到 List 中。
- collect:对此流的元素执行可变归约操作,比如转成一个 Map、Set、List 等等,还支持分组等等一系列的操作。
更多方法
forEach、forEachOrdered、toArray、reduce、collect、toList、min、max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator。
public class StreamInOp {
public static void main(String[] args) {
List<Person> personList = List.of(
new Person("张三", "男", 20),
new Person("李四", "男", 22),
new Person("王五", "女", 20),
new Person("赵六", "女", 33),
new Person("钱七", "男", 44),
new Person("孙八", "女", 12),
new Person("周八", "男", 56),
new Person("吴七", "女", 21),
new Person("吴七", "女", 21)
);
List<Person> persons = new ArrayList<>(personList);
// 这里所有的操作都不会对源数据产生影响
personList.stream()
// 查看一下流,如peek的字面意思,这里不会对数据造成任何影响。
.peek(System.out::println)
.map(Person::getName)
//去重
.distinct()
// 打散流成为一个新的流
.flatMap(pName -> Stream.of("姓名是:" + pName))
.sorted(String::compareTo)
.forEach(System.out::println);
// 可以通过这个方法查看一下,personList并没有被修改
personList.forEach(System.out::println);
List<Integer> integers = List.of(1, 2, 3, 4, 5);
//takeWhile从一个流中获取满足特定条件的元素,直到遇到第一个不满足条件的元素为止。之后的元素将被忽略
integers.stream().takeWhile(integer -> integer % 2 == 0).forEach(System.out::println); //输出为空
integers.stream().takeWhile(integer -> integer < 5).forEach(System.out::println); //输出:1,2,3,4
Map<String, List<Person>> collect =
persons.stream().collect(Collectors.groupingBy(Person::getSex));
collect.forEach((k, v) -> System.out.println("key:" + k + ",value:" + v));
// ---------------------------------------------------------------------
List<String> words = Arrays.asList("apple", "banana", "orange", "grape", "avocado", "blueberry");
List<String> result = words.stream()
//过滤长度大于5的单词
.filter(word -> word.length() > 5)
//将单词转换为大写形式
.map(String::toUpperCase)
//将每个单词拆分为字符,并放入新的流中
.flatMap(word -> Arrays.stream(word.split("")))
//去重
.distinct()
//按照字母顺序排序
.sorted()
//在每个字符上执行某些操作,并打印调试信息
.peek(System.out::println)
//取前4个字符
.limit(4)
//跳过前两个字符
.skip(2)
//使用takeWhile,保留小于'F'的字符
.takeWhile(ch -> ch.charAt(0) < 'F')
//收集结果并转为列表
.toList();
System.out.println("Result:" + result);
}
}
class Person {
private String name;
private String sex;
private int age;
public Person(String name, String sex, int age) {
this.name = name;
this.sex = sex;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", sex='" + sex + '\'' +
", age=" + age +
'}';
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# 总结
在 Stream 中进行操作的时候,注意两个东西
- 接收的参数:就如上面讲的
Function
响应式编程一样,看清入参和返回,并且要看清当前函数式接口继承于什么,例如<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
这种就说明要返回是一个 Stream 类型,而<R> Stream<R> map(Function<? super T, ? extends R> mapper);
这种就说明返回的是自定义的类型。 - 是中间操作还是终止操作:点开方法的源代码,要看清文档标注的是什么类型的操作,如果是一个中间操作,那么后续可以继续进行操作,如果是一个终止操作,这个方法写完后就无法再对这个流进行操作了。
# Reactive-Stream
Reactive Streams 是 JVM 面向流的库的标准和规范
- 处理可能无限数量的元素
- 有序
- 在组件之间异步传递元素
- 强制性非阻塞,背压模式
推荐阅读:响应式宣言:https://www.reactivemanifesto.org/zh-CNReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md
# 正压(Push)
正压指的是生产者(数据源)主动推动数据流向消费者。也就是说,数据在流动过程中是由生产者控制的,消费者无需主动请求数据,而是自动接受和处理流入的数据。正压是响应式编程的核心机制之一。
# 背压(Backpressure)
背压则是指当消费者处理数据的速度跟不上生产者生成数据的速度时,消费者向生产者发出请求,表示暂时无法继续接收更多的数据。背压机制能有效防止由于生产者过快产生数据导致消费者无法及时处理,进而避免内存溢出或系统崩溃等问题。
背压通常是在流数据中遇到的消费者处理能力不及生产者速度时进行的一种调节方式。
# 多线程
线程数绝对不是越多越好,当线程数量多于 CPU 核心数时,系统需要频繁地在线程之间进行切换,这种切换会带来额外的开销,包括上下文切换(保存和恢复线程状态)、内存开销(更多的线程占用更多的内存)以及时间开销(线程等待时间增加)。过多的线程会导致资源浪费,因为线程之间会竞争 CPU 时间片,并且频繁切换带来的开销会使系统整体性能下降。
最好的方法是基于核心数合理设置线程池大小,使每个线程保持繁忙,而不是创建大量线程去等待和切换。
# 实时的数据流系统
通过全异步 + 缓冲区的方式可以构建出来一个实时的数据流系统,其模式比较类似于 MQ。
MQ 通常是基于异步通信的模式,在生产者(发送者)和消费者(接收者)之间解耦。生产者发送消息时不会等待消费者处理完毕,而是直接将消息放入队列中,接着可以继续处理其他任务。消费者从队列中取出消息进行处理,这种异步方式提高了系统的吞吐量和响应性。
高可用:分片、复制、选领导。
高并发:缓存、异步、队排好。
// 分布式响应式系统,MQ就是类似这种做法,在传统的开发模式中,就需要引入MQ等等一些列的中间件完成无阻塞
public class NonBlockingDemo {
private static final String[] buffer = new String[10];
public static void main(String[] args) {
NonBlockingDemo nonBlockingDemo = new NonBlockingDemo();
new Thread(nonBlockingDemo::a).start();//发布数据
new Thread(nonBlockingDemo::b).start();//消费数据
}
private void a() {
String A = "A";
System.out.println("A发送了消息");
buffer[0] = A; // 放入缓冲区
// 执行其他的任务
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("A完成了任务");
}
private void b() {
// B的执行并不会等待A的执行完成,
String A = null;
while (A == null) {
// 重试去拿数据
A = buffer[0];
}
System.out.println("B完成了任务" + A);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Flow API
# 核心组件
在 Java 9 引入的 Flow API 中,核心组件包括三个接口: Publisher
、 Subscriber
和 Subscription
。这些接口提供了一种标准的方式来处理流数据,并实现背压控制。它们是响应式编程的基础,允许数据在异步的流动中进行有效的传递与处理。
Publisher 接口
Publisher
是数据流的生产者,它负责发布数据流并将数据发送给订阅者( Subscriber
)。 Publisher
提供一个 subscribe
方法,允许订阅者订阅它,从而开始接收数据流。
主要方法:
void subscribe(Subscriber<? super T> subscriber);
- 用于将
Subscriber
订阅到Publisher
上。当订阅者调用subscribe
方法时,Publisher
会通知订阅者,开始向其传送数据。
- 用于将
说明:
Publisher
是流的源头,负责产生和发布数据。它没有直接控制消费端(Subscriber
)如何处理数据,但它提供了数据流的入口。
Subscriber 接口
Subscriber
是数据流的消费者,负责接收和处理从 Publisher
发出的数据流。一个 Subscriber
可以通过实现 onSubscribe
、 onNext
、 onError
和 onComplete
方法来处理流中的数据。
主要方法:
void onSubscribe(Subscription subscription);
- 该方法在订阅建立时由
Publisher
调用,用于接收一个Subscription
对象,进而控制数据流的请求。
- 该方法在订阅建立时由
void onNext(T item);
- 当
Publisher
发布新的数据项时,onNext
方法会被调用。item
是新发布的数据。
- 当
void onError(Throwable throwable);
- 当数据流中发生错误时(例如异常、连接中断等),
onError
会被调用,throwable
包含错误的详细信息。
- 当数据流中发生错误时(例如异常、连接中断等),
void onComplete();
- 当数据流结束时(即
Publisher
不再发布数据时),onComplete
方法会被调用。
- 当数据流结束时(即
说明:
Subscriber
接收数据并对其进行处理。它还可以向Publisher
请求更多的数据(通过Subscription
)。Subscriber
通过实现背压控制来避免在消费速度跟不上生产速度时被数据溢出。
Subscription 接口
Subscription
是 Publisher
和 Subscriber
之间的桥梁,负责在二者之间协调数据流的传递。 Subscription
提供了请求和取消数据传输的机制。
主要方法:
void request(long n);
- 该方法用于请求
Publisher
发送n
条数据。通常,Subscriber
通过request
来告诉Publisher
它准备好接收更多的数据。n
表示订阅者希望接收的数据数量。
- 该方法用于请求
void cancel();
- 该方法用于取消数据传输,通知
Publisher
停止发送数据。消费者如果不再需要接收数据或遇到错误时,可以调用该方法。
- 该方法用于取消数据传输,通知
说明:
Subscription
使得消费者能够控制数据流的速度(通过request
方法)。通过调用cancel
方法,订阅者可以中止接收数据流,从而释放资源。
# Flow API 核心组件交互
- Publisher 启动数据流,通过
subscribe
方法通知一个Subscriber
来订阅数据。 - 订阅后,
Publisher
会将数据流传递给Subscriber
,并通过Subscription
进行数据请求与流量控制。 Subscriber
在接收到数据时,调用onNext
方法进行处理。如果需要更多的数据,调用request
方法告诉Publisher
继续传送数据。Subscription
也允许Subscriber
在处理过程中取消数据流(通过cancel
方法)或控制数据请求量,避免被过多数据淹没。
public class FlowDemo {
/**
* 主方法演示了如何使用SubmissionPublisher和Flow.Subscriber来实现一个简单的响应式流。
* 该程序创建了一个发布者来发布字符串,并定义了一个订阅者来消费这些字符串。订阅者每次只请求一个项目,
* 处理后请求下一个项目,确保项目的顺序处理。
*
* @throws InterruptedException 如果在等待所有项目被处理时线程中断。
*/
public static void main(String[] args) throws InterruptedException {
// 创建一个SubmissionPublisher来发布字符串
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
// 定义一个实现了Flow.Subscriber的订阅者
Flow.Subscriber<String> stringSubscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
/**
* 当订阅建立时调用。
*
* @param subscription 发布者到该订阅者的订阅。
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println(Thread.currentThread() + "订阅开始");
// 初始请求一条数据
subscription.request(1);
}
/**
* 当有新项目可用时调用,用于处理。
*
* @param item 发布者发布的下一个项目。
*/
@Override
public void onNext(String item) {
if (item.equals("p+5")) {
int y = 1 / 0;
throw new RuntimeException("p5");
}
System.out.println(Thread.currentThread() + "消费下一条:" + item);
subscription.request(1); // 处理后请求下一个项目
}
/**
* 当在流处理过程中发生错误时调用。
*
* @param throwable 发生的异常。
*/
@Override
public void onError(Throwable throwable) {
System.err.println("拿到了一个错误:" + throwable.getMessage());
}
/**
* 当所有项目成功处理且不会再有更多项目发布时调用。
*/
@Override
public void onComplete() {
System.out.println("完成");
}
};
// 先订阅,后发布数据
publisher.subscribe(stringSubscriber);
// 发布10个字符串
for (int i = 0; i < 10; i++) {
publisher.submit("p+" + i);
}
// 关闭发布者
publisher.close();
// 等待一段时间,确保所有项目都被处理
Thread.sleep(1000);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 流的生命周期
- 订阅(Subscription):
Subscriber
通过调用Publisher
的subscribe
方法进行订阅。订阅时,Publisher
会给Subscriber
发送onSubscribe
回调。 - 数据流传递:订阅建立后,
Publisher
会向Subscriber
发送数据项(onNext
),并根据Subscriber
请求的数量决定发送数据的频率。 - 背压控制:
Subscriber
可以通过request
控制请求的数据数量,避免因数据传输过快而导致消费者处理不及。 - 完成与错误处理:当数据流结束时,
Publisher
会调用onComplete
;如果发生错误,Publisher
会调用onError
。
# 背压机制(Backpressure)
背压机制是 Flow API 的重要特性,它允许 Subscriber
控制从 Publisher
请求数据的数量。 Subscriber
通过 subscription.request(n)
来指定希望接收的数据量,而 Publisher
只能根据消费者的请求量来发送数据。这避免了消费者处理速度慢时,生产者发送过多数据导致的内存溢出。
# 和 mq 的区别
Reactive 和 MQ 的在核心功能上有一定的相似性。但是两者的用法和适用场景是完全不一致的。
相似性
- 异步通信:两者都支持异步通信,可以在不同的线程或进程中处理消息的发布和接收。
- 解耦:两者都能解耦生产者和消费者,使得系统更加灵活和可扩展。
区别
- 实现复杂度:
SubmissionPublisher
和Flow.Subscriber
是 Java 标准库中的一部分,不需要额外的依赖。而 MQ 通常需要依赖外部的消息中间件(如 RabbitMQ、Kafka 等),这可能会增加系统的复杂性和运维成本。 - 适用场景:MQ 通常用于跨进程、跨机器的通信,适用于需要高吞吐量、高可靠性的分布式系统。而
SubmissionPublisher
和Flow.Subscriber
更适合在 **同一 JVM 内** 进行异步任务处理。 - 性能:在同一个 JVM 内,
SubmissionPublisher
和Flow.Subscriber
的通信开销相对较小,因为它们不需要序列化 / 反序列化数据,也不需要网络传输。 - 灵活性:使用 Java 标准库中的组件,可以更容易地进行单元测试和调试,因为所有逻辑都在同一个 JVM 内运行。
# 加工
加工( Processor
)的概念其实就是一个中间处理器,而且是一个非常重要的概念。它既是数据的发布者( Publisher
),也是数据的订阅者( Subscriber
)。这意味着它可以接收数据并处理这些数据,然后将处理后的数据发布给其他订阅者。
在其源码中只有一个简单的声明,表示它可以发布和订阅数据
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
2
3
4
5
6
7
8
其主要作用就是
- 数据转换:Processor 可以用于将一种类型的数据转换为另一种类型的数据。例如,你可以将一个字符串流转换为一个整数流。
- 缓冲和过滤:Processor 可以缓冲或过滤传入的数据,然后再将其发布出去。例如,你可以过滤掉某些不需要的数据项或对数据进行批量处理后再发布。
- 错误处理:Processor 可以在接收到错误时进行特定的处理逻辑,然后再决定是否继续发布后续的数据项或终止流。
- 多级管道:多个 Processor 可以串联在一起形成一个多级的管道结构,每个处理器负责不同的任务(如转换、过滤、聚合等),最终将结果传递给最终的订阅者。
例如:加工收到的数据,。
public class FlowDemo {
static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private final int i;
private Flow.Subscription subscription;
public MyProcessor(int i) {
this.i = i;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Processor " + i + " 订阅绑定完成");
subscription.request(1); // 初始请求一条数据
}
@Override
public void onNext(String item) {
// 拿到数据后进行处理
item = "当前是:" + i + " 在加工 " + item;
submit(item); // 将处理后的数据提交给下游处理器
subscription.request(1); // 请求下一条数据
}
@Override
public void onError(Throwable throwable) {
System.err.println("Processor " + i + " 错误: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Processor " + i + " 完成");
}
}
public static void main(String[] args) throws InterruptedException {
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("订阅者1开始接收数据");
subscription.request(1); // 初始请求一条数据
}
@Override
public void onNext(String item) {
System.out.println("订阅者1消费数据: " + item);
subscription.request(1); // 请求下一条数据
}
@Override
public void onError(Throwable throwable) {
System.err.println("订阅者1 错误: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("订阅者1 完成");
}
};
// 创建4个处理器,上面说了多个处理器和串联成一个管道
MyProcessor myProcessor1 = new MyProcessor(1);
MyProcessor myProcessor2 = new MyProcessor(2);
MyProcessor myProcessor3 = new MyProcessor(3);
MyProcessor myProcessor4 = new MyProcessor(4);
// 绑定处理器链,数据流从 myProcessor1 传递到 myProcessor2,再到 myProcessor3,最后到 myProcessor4
myProcessor1.subscribe(myProcessor2);
myProcessor2.subscribe(myProcessor3);
myProcessor3.subscribe(myProcessor4);
// 将处理器4的输出绑定到订阅者1,订阅者要的是最后处理好的数据
myProcessor4.subscribe(subscriber1);
// 发布者将数据交给processor1,让processor1从头开始处理
publisher.subscribe(myProcessor1);
// 发布数据
for (int i = 0; i < 10; i++) {
publisher.submit("p+" + i);
}
// 关闭发布者
publisher.close();
// 等待处理完成
Thread.sleep(1000);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# 总结
响应式编程
- 底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
- 编码:流式编程 + 链式调用 + 声明式 API
- 效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源