笔记 笔记
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

EasT-Duan

Java 开发
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

购买兑换码请添加

添加时候请写好备注,否则无法通过。

  • Maven

  • Bootstrap

  • Spring

  • Spring MVC

  • MyBatis

  • JUnit

  • GitFlow 工作流指南

  • SpringBoot

  • Reactor

    • 前置条件
      • 传统方式
      • Lambda
      • 函数式接口
        • 如何识别
      • Function
      • Stream API
        • 创建流
        • 中间操作(Intermediate Operations)
        • 终止操作(Terminal Operation)
        • 总结
      • Reactive-Stream
        • 正压(Push)
        • 背压(Backpressure)
        • 多线程
        • 实时的数据流系统
        • Flow API
        • 核心组件
        • Flow API 核心组件交互
        • 流的生命周期
        • 背压机制(Backpressure)
        • 和mq的区别
        • 加工
        • 总结
    • Reactor 核心
  • 微服务

  • Java Web
  • Reactor
EasT-Duan
2024-10-30
目录

前置条件

欢迎来到我的 ChatGPT 中转站,极具性价比,为付费不方便的朋友提供便利,有需求的可以添加左侧 QQ 二维码,另外,邀请新用户能获取余额哦!最后说一句,那啥:请自觉遵守《生成式人工智能服务管理暂行办法》。

先定义一个接口

public interface MyInterface {
    int sum(int i, int j);
}
1
2
3

# 传统方式

//实现类的方式
public class MyInterfaceImpl implements MyInterface {
    @Override
    public int sum(int i, int j) {
        return i + j;
    }
}
1
2
3
4
5
6
7
public class Main {
    public static void main(String[] args) {
        // 传统方式
        // 方式 1
        MyInterface myInterface = new MyInterfaceImpl();
        int res = myInterface.sum(1, 2);
        System.out.println(res);
        // 方式 2 内部类实现
        MyInterface myInterface1 = new MyInterface() {
            @Override
            public int sum(int i, int j) {
                return i + j;
            }
        };
        res = myInterface1.sum(1, 2);
        System.out.println(res);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# Lambda

public class Main {
    public static void main(String[] args) {
        MyInterface myInterface = (i, j) -> i + j;
        // 更加简洁的写法
        // MyInterface myInterface = Integer::sum;
        int res = myInterface.sum(1, 2);
        System.out.println(res);
    }
}
1
2
3
4
5
6
7
8
9

Lambda 表达式的语法糖: 参数列表 + 箭头 + 方法体 。函数式接口是 Lambda 表达式的目标类型。

# 函数式接口

  • 单一抽象方法(Single Abstract Method, SAM):函数式接口只能有一个抽象方法。这个方法也被称为函数式接口的函数描述符。
  • 可以包含默认方法和静态方法:函数式接口可以包含默认方法(使用 default 关键字定义的方法)和静态方法,这些方法可以有具体的实现,不违反函数式接口的定义。
  • 可以包含 Object 类中的公共方法:函数式接口可以声明 equals , hashCode 和 toString 等方法,因为这些方法是从 Object 类继承而来的,不是新增的抽象方法。

简单来说:

  1. 只要是函数式接口就可以用 Lambda 表达式简化
  2. 接口中有且只有一个未实现的方法,这个接口就叫函数式接口
//在Lambda中的例子其实就是一个函数式接口的实现
//参数类型可以不写,只写(参数名),参数变量名随意定义;
interface MyHaha {
    int haha();
}
MyHaha myHaha = () -> 2;
res = myHaha.haha();
System.out.println(res);

interface MyHehe {
    int hehe(int i);
}
MyHehe myHehe = x -> x + 1;
res = myHehe.hehe(1);
System.out.println(res);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 如何识别

通过 @FunctionalInterface 注解可以检测当前这个接口是否符合规范,例如

@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {
    int hehe(int i);
    // 如定义中所描述,有默认实现的也可以是个函数式接口
    default int heihei() {
        return hehe(10);
    }
    // 但如果是个抽象的定义,这种就会发生错误
    // int lala(int i);
}
1
2
3
4
5
6
7
8
9
10

# Function

在 Java 中,函数式接口是只包含一个抽象方法的接口。它们是支持 Lambda 表达式的基础,因为 Lambda 表达式需要一个目标类型,这个目标类型必须是一个函数式接口。

函数式接口的出入参定义:

  1. 有入参,无出参【消费者】: function.accept
BiConsumer<String,String> function = (a,b)->{ //能接受两个入参
    System.out.println("哈哈:"+a+";呵呵:"+b);
};
function.accept("1","2");
1
2
3
4

2、有入参,有出参【多功能函数】: function.apply

Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));
1
2

3、无入参,无出参【普通函数】:

Runnable runnable = () -> System.out.println("aaa");
new Thread(runnable).start();
1
2

4、无入参 ,有出参【提供者】: supplier.get ()

Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);
1
2
3

java.util.function 包下的所有 function 定义:

  • Consumer: 消费者
  • Supplier: 提供者
  • Predicate: 断言

get/test/apply/accept 调用的函数方法;

private static void demo() {
    Supplier<String> supplier = () -> "3";
    Predicate<String> predicate = str -> str.matches("-?\\d+(\\.\\d+)?");
    Function<String, Integer> function = Integer::parseInt;
    Consumer<Integer> consumer = num -> {
        if (num % 2 == 0) {
            System.out.println(num + "是偶数");
        } else {
            System.out.println(num + "是奇数");
        }
    };
    deal(supplier, predicate, function, consumer);
}

private static void deal(Supplier<String> supplier, Predicate<String> predicate, Function<String, Integer> function, Consumer<Integer> consumer) {
    boolean test = predicate.test(supplier.get());
    if (test) {
        consumer.accept(function.apply(supplier.get()));
    } else {
        System.out.println("不是数字");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# Stream API

Stream 所有数据和操作被组合成流管道流管道组成:

  • 一个数据源(可以是一个数组、集合、生成器函数、I/O 管道)
  • 零或多个中间操作(将一个流变形成另一个流)
  • 一个终止操作(产生最终结果)

流是惰性的,只有在启动最终操作时才会对源数据进行计算,而且只在需要时才会消耗源元素

这句话的意思是:它们不会立即处理源数据,而是等到遇到 “终止操作” 时才会开始计算并处理数据。并且,在处理数据时,它们不会一次性加载和处理所有数据,而是只处理当前需要的数据部分。

# 创建流

of、builder、empty、ofNullable、generate、concat、集合.stream

# 中间操作(Intermediate Operations)

  • filter:过滤;挑出我们用的元素
  • map: 映射:一一映射,a 变成 b
  • mapToInt、mapToLong、mapToDouble
  • flatMap:打散、散列、展开、扩维:一对多映射
更多方法

filter、map、mapToInt、mapToLong、mapToDouble、flatMap、flatMapToInt、flatMapToLong、flatMapToDouble、mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、parallel、unordered、onClose、sequential、distinct、sorted、peek、limit、skip、takeWhile、dropWhile。

# 终止操作(Terminal Operation)

  • forEach:对此流的每个元素执行一个操作。
  • toList:将此流的元素累积到 List 中。
  • collect:对此流的元素执行可变归约操作,比如转成一个 Map、Set、List 等等,还支持分组等等一系列的操作。
更多方法

forEach、forEachOrdered、toArray、reduce、collect、toList、min、max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator。

public class StreamInOp {
    public static void main(String[] args) {
        List<Person> personList = List.of(
                new Person("张三", "男", 20),
                new Person("李四", "男", 22),
                new Person("王五", "女", 20),
                new Person("赵六", "女", 33),
                new Person("钱七", "男", 44),
                new Person("孙八", "女", 12),
                new Person("周八", "男", 56),
                new Person("吴七", "女", 21),
                new Person("吴七", "女", 21)
        );
        List<Person> persons = new ArrayList<>(personList);

        // 这里所有的操作都不会对源数据产生影响
        personList.stream()
                // 查看一下流,如peek的字面意思,这里不会对数据造成任何影响。
                .peek(System.out::println)
                .map(Person::getName)
                //去重
                .distinct()
                // 打散流成为一个新的流
                .flatMap(pName -> Stream.of("姓名是:" + pName))
                .sorted(String::compareTo)
                .forEach(System.out::println);
        // 可以通过这个方法查看一下,personList并没有被修改
        personList.forEach(System.out::println);

        List<Integer> integers = List.of(1, 2, 3, 4, 5);
        //takeWhile从一个流中获取满足特定条件的元素,直到遇到第一个不满足条件的元素为止。之后的元素将被忽略
        integers.stream().takeWhile(integer -> integer % 2 == 0).forEach(System.out::println); //输出为空
        integers.stream().takeWhile(integer -> integer < 5).forEach(System.out::println); //输出:1,2,3,4

        Map<String, List<Person>> collect =
                persons.stream().collect(Collectors.groupingBy(Person::getSex));
        collect.forEach((k, v) -> System.out.println("key:" + k + ",value:" + v));
        // ---------------------------------------------------------------------
        List<String> words = Arrays.asList("apple", "banana", "orange", "grape", "avocado", "blueberry");
        List<String> result = words.stream()
                //过滤长度大于5的单词
                .filter(word -> word.length() > 5)
                //将单词转换为大写形式
                .map(String::toUpperCase)
                //将每个单词拆分为字符,并放入新的流中
                .flatMap(word -> Arrays.stream(word.split("")))
                //去重
                .distinct()
                //按照字母顺序排序
                .sorted()
                //在每个字符上执行某些操作,并打印调试信息
                .peek(System.out::println)
                //取前4个字符
                .limit(4)
                //跳过前两个字符
                .skip(2)
                //使用takeWhile,保留小于'F'的字符
                .takeWhile(ch -> ch.charAt(0) < 'F')
                //收集结果并转为列表
                .toList();
        System.out.println("Result:" + result);
    }
}


class Person {
    private String name;
    private String sex;
    private int age;

    public Person(String name, String sex, int age) {
        this.name = name;
        this.sex = sex;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", sex='" + sex + '\'' +
                ", age=" + age +
                '}';
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

# 总结

在 Stream 中进行操作的时候,注意两个东西

  • 接收的参数:就如上面讲的 Function 响应式编程一样,看清入参和返回,并且要看清当前函数式接口继承于什么,例如 <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper); 这种就说明要返回是一个 Stream 类型,而 <R> Stream<R> map(Function<? super T, ? extends R> mapper); 这种就说明返回的是自定义的类型。
  • 是中间操作还是终止操作:点开方法的源代码,要看清文档标注的是什么类型的操作,如果是一个中间操作,那么后续可以继续进行操作,如果是一个终止操作,这个方法写完后就无法再对这个流进行操作了。

# Reactive-Stream

Reactive Streams 是 JVM 面向流的库的标准和规范

  1. 处理可能无限数量的元素
  2. 有序
  3. 在组件之间异步传递元素
  4. 强制性非阻塞,背压模式

推荐阅读:响应式宣言:https://www.reactivemanifesto.org/zh-CNReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md

# 正压(Push)

正压指的是生产者(数据源)主动推动数据流向消费者。也就是说,数据在流动过程中是由生产者控制的,消费者无需主动请求数据,而是自动接受和处理流入的数据。正压是响应式编程的核心机制之一。

# 背压(Backpressure)

背压则是指当消费者处理数据的速度跟不上生产者生成数据的速度时,消费者向生产者发出请求,表示暂时无法继续接收更多的数据。背压机制能有效防止由于生产者过快产生数据导致消费者无法及时处理,进而避免内存溢出或系统崩溃等问题。

背压通常是在流数据中遇到的消费者处理能力不及生产者速度时进行的一种调节方式。

# 多线程

线程数绝对不是越多越好,当线程数量多于 CPU 核心数时,系统需要频繁地在线程之间进行切换,这种切换会带来额外的开销,包括上下文切换(保存和恢复线程状态)、内存开销(更多的线程占用更多的内存)以及时间开销(线程等待时间增加)。过多的线程会导致资源浪费,因为线程之间会竞争 CPU 时间片,并且频繁切换带来的开销会使系统整体性能下降。

最好的方法是基于核心数合理设置线程池大小,使每个线程保持繁忙,而不是创建大量线程去等待和切换。

# 实时的数据流系统

通过全异步 + 缓冲区的方式可以构建出来一个实时的数据流系统,其模式比较类似于 MQ。

MQ 通常是基于异步通信的模式,在生产者(发送者)和消费者(接收者)之间解耦。生产者发送消息时不会等待消费者处理完毕,而是直接将消息放入队列中,接着可以继续处理其他任务。消费者从队列中取出消息进行处理,这种异步方式提高了系统的吞吐量和响应性。

高可用:分片、复制、选领导。

高并发:缓存、异步、队排好。

// 分布式响应式系统,MQ就是类似这种做法,在传统的开发模式中,就需要引入MQ等等一些列的中间件完成无阻塞
public class NonBlockingDemo {
    private static final String[] buffer = new String[10];

    public static void main(String[] args) {
        NonBlockingDemo nonBlockingDemo = new NonBlockingDemo();
        new Thread(nonBlockingDemo::a).start();//发布数据
        new Thread(nonBlockingDemo::b).start();//消费数据
    }
    
    private void a() {
        String A = "A";
        System.out.println("A发送了消息");
        buffer[0] = A;      // 放入缓冲区
        // 执行其他的任务
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("A完成了任务");
    }
    
    private void b() {
        // B的执行并不会等待A的执行完成,
        String A = null;
        while (A == null) {
            // 重试去拿数据
            A = buffer[0];
        }
        System.out.println("B完成了任务" + A);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# Flow API

# 核心组件

在 Java 9 引入的 Flow API 中,核心组件包括三个接口: Publisher 、 Subscriber 和 Subscription 。这些接口提供了一种标准的方式来处理流数据,并实现背压控制。它们是响应式编程的基础,允许数据在异步的流动中进行有效的传递与处理。

Publisher 接口

Publisher 是数据流的生产者,它负责发布数据流并将数据发送给订阅者( Subscriber )。 Publisher 提供一个 subscribe 方法,允许订阅者订阅它,从而开始接收数据流。

  • 主要方法:

    • void subscribe(Subscriber<? super T> subscriber);
      • 用于将 Subscriber 订阅到 Publisher 上。当订阅者调用 subscribe 方法时, Publisher 会通知订阅者,开始向其传送数据。
  • 说明: Publisher 是流的源头,负责产生和发布数据。它没有直接控制消费端( Subscriber )如何处理数据,但它提供了数据流的入口。

Subscriber 接口

Subscriber 是数据流的消费者,负责接收和处理从 Publisher 发出的数据流。一个 Subscriber 可以通过实现 onSubscribe 、 onNext 、 onError 和 onComplete 方法来处理流中的数据。

  • 主要方法:

    • void onSubscribe(Subscription subscription);
      • 该方法在订阅建立时由 Publisher 调用,用于接收一个 Subscription 对象,进而控制数据流的请求。
    • void onNext(T item);
      • 当 Publisher 发布新的数据项时, onNext 方法会被调用。 item 是新发布的数据。
    • void onError(Throwable throwable);
      • 当数据流中发生错误时(例如异常、连接中断等), onError 会被调用, throwable 包含错误的详细信息。
    • void onComplete();
      • 当数据流结束时(即 Publisher 不再发布数据时), onComplete 方法会被调用。
  • 说明: Subscriber 接收数据并对其进行处理。它还可以向 Publisher 请求更多的数据(通过 Subscription )。 Subscriber 通过实现背压控制来避免在消费速度跟不上生产速度时被数据溢出。

Subscription 接口

Subscription 是 Publisher 和 Subscriber 之间的桥梁,负责在二者之间协调数据流的传递。 Subscription 提供了请求和取消数据传输的机制。

  • 主要方法:

    • void request(long n);
      • 该方法用于请求 Publisher 发送 n 条数据。通常, Subscriber 通过 request 来告诉 Publisher 它准备好接收更多的数据。 n 表示订阅者希望接收的数据数量。
    • void cancel();
      • 该方法用于取消数据传输,通知 Publisher 停止发送数据。消费者如果不再需要接收数据或遇到错误时,可以调用该方法。
  • 说明: Subscription 使得消费者能够控制数据流的速度(通过 request 方法)。通过调用 cancel 方法,订阅者可以中止接收数据流,从而释放资源。

# Flow API 核心组件交互

  • Publisher 启动数据流,通过 subscribe 方法通知一个 Subscriber 来订阅数据。
  • 订阅后, Publisher 会将数据流传递给 Subscriber ,并通过 Subscription 进行数据请求与流量控制。
  • Subscriber 在接收到数据时,调用 onNext 方法进行处理。如果需要更多的数据,调用 request 方法告诉 Publisher 继续传送数据。
  • Subscription 也允许 Subscriber 在处理过程中取消数据流(通过 cancel 方法)或控制数据请求量,避免被过多数据淹没。
public class FlowDemo {
    /**
     * 主方法演示了如何使用SubmissionPublisher和Flow.Subscriber来实现一个简单的响应式流。
     * 该程序创建了一个发布者来发布字符串,并定义了一个订阅者来消费这些字符串。订阅者每次只请求一个项目,
     * 处理后请求下一个项目,确保项目的顺序处理。
     *
     * @throws InterruptedException 如果在等待所有项目被处理时线程中断。
     */
    public static void main(String[] args) throws InterruptedException {
        // 创建一个SubmissionPublisher来发布字符串
        try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
            // 定义一个实现了Flow.Subscriber的订阅者
            Flow.Subscriber<String> stringSubscriber = new Flow.Subscriber<>() {
                private Flow.Subscription subscription;

                /**
                 * 当订阅建立时调用。
                 *
                 * @param subscription 发布者到该订阅者的订阅。
                 */
                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    this.subscription = subscription;
                    System.out.println(Thread.currentThread() + "订阅开始");
                    // 初始请求一条数据
                    subscription.request(1);
                }

                /**
                 * 当有新项目可用时调用,用于处理。
                 *
                 * @param item 发布者发布的下一个项目。
                 */
                @Override
                public void onNext(String item) {
                    if (item.equals("p+5")) {
                        int y = 1 / 0;
                        throw new RuntimeException("p5");
                    }
                    System.out.println(Thread.currentThread() + "消费下一条:" + item);
                    subscription.request(1);  // 处理后请求下一个项目
                }

                /**
                 * 当在流处理过程中发生错误时调用。
                 *
                 * @param throwable 发生的异常。
                 */
                @Override
                public void onError(Throwable throwable) {
                    System.err.println("拿到了一个错误:" + throwable.getMessage());
                }

                /**
                 * 当所有项目成功处理且不会再有更多项目发布时调用。
                 */
                @Override
                public void onComplete() {
                    System.out.println("完成");
                }
            };

            // 先订阅,后发布数据
            publisher.subscribe(stringSubscriber);

            // 发布10个字符串
            for (int i = 0; i < 10; i++) {
                publisher.submit("p+" + i);
            }

            // 关闭发布者
            publisher.close();

            // 等待一段时间,确保所有项目都被处理
            Thread.sleep(1000);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

# 流的生命周期

  • 订阅(Subscription): Subscriber 通过调用 Publisher 的 subscribe 方法进行订阅。订阅时, Publisher 会给 Subscriber 发送 onSubscribe 回调。
  • 数据流传递:订阅建立后, Publisher 会向 Subscriber 发送数据项( onNext ),并根据 Subscriber 请求的数量决定发送数据的频率。
  • 背压控制: Subscriber 可以通过 request 控制请求的数据数量,避免因数据传输过快而导致消费者处理不及。
  • 完成与错误处理:当数据流结束时, Publisher 会调用 onComplete ;如果发生错误, Publisher 会调用 onError 。

# 背压机制(Backpressure)

背压机制是 Flow API 的重要特性,它允许 Subscriber 控制从 Publisher 请求数据的数量。 Subscriber 通过 subscription.request(n) 来指定希望接收的数据量,而 Publisher 只能根据消费者的请求量来发送数据。这避免了消费者处理速度慢时,生产者发送过多数据导致的内存溢出。

# 和 mq 的区别

Reactive 和 MQ 的在核心功能上有一定的相似性。但是两者的用法和适用场景是完全不一致的。

相似性

  1. 异步通信:两者都支持异步通信,可以在不同的线程或进程中处理消息的发布和接收。
  2. 解耦:两者都能解耦生产者和消费者,使得系统更加灵活和可扩展。

区别

  1. 实现复杂度: SubmissionPublisher 和 Flow.Subscriber 是 Java 标准库中的一部分,不需要额外的依赖。而 MQ 通常需要依赖外部的消息中间件(如 RabbitMQ、Kafka 等),这可能会增加系统的复杂性和运维成本。
  2. 适用场景:MQ 通常用于跨进程、跨机器的通信,适用于需要高吞吐量、高可靠性的分布式系统。而 SubmissionPublisher 和 Flow.Subscriber 更适合在 **同一 JVM 内** 进行异步任务处理。
  3. 性能:在同一个 JVM 内, SubmissionPublisher 和 Flow.Subscriber 的通信开销相对较小,因为它们不需要序列化 / 反序列化数据,也不需要网络传输。
  4. 灵活性:使用 Java 标准库中的组件,可以更容易地进行单元测试和调试,因为所有逻辑都在同一个 JVM 内运行。

# 加工

加工( Processor )的概念其实就是一个中间处理器,而且是一个非常重要的概念。它既是数据的发布者( Publisher ),也是数据的订阅者( Subscriber )。这意味着它可以接收数据并处理这些数据,然后将处理后的数据发布给其他订阅者。

在其源码中只有一个简单的声明,表示它可以发布和订阅数据

    /**
     * A component that acts as both a Subscriber and Publisher.
     *
     * @param <T> the subscribed item type
     * @param <R> the published item type
     */
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }
1
2
3
4
5
6
7
8

其主要作用就是

  • 数据转换:Processor 可以用于将一种类型的数据转换为另一种类型的数据。例如,你可以将一个字符串流转换为一个整数流。
  • 缓冲和过滤:Processor 可以缓冲或过滤传入的数据,然后再将其发布出去。例如,你可以过滤掉某些不需要的数据项或对数据进行批量处理后再发布。
  • 错误处理:Processor 可以在接收到错误时进行特定的处理逻辑,然后再决定是否继续发布后续的数据项或终止流。
  • 多级管道:多个 Processor 可以串联在一起形成一个多级的管道结构,每个处理器负责不同的任务(如转换、过滤、聚合等),最终将结果传递给最终的订阅者。

例如:加工收到的数据,。

public class FlowDemo {

    static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
        private final int i;
        private Flow.Subscription subscription;

        public MyProcessor(int i) {
            this.i = i;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Processor " + i + " 订阅绑定完成");
            subscription.request(1); // 初始请求一条数据
        }

        @Override
        public void onNext(String item) {
            // 拿到数据后进行处理
            item = "当前是:" + i + " 在加工 " + item;
            submit(item); // 将处理后的数据提交给下游处理器
            subscription.request(1);  // 请求下一条数据
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Processor " + i + " 错误: " + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("Processor " + i + " 完成");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {

            Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() {
                private Flow.Subscription subscription;

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    this.subscription = subscription;
                    System.out.println("订阅者1开始接收数据");
                    subscription.request(1);  // 初始请求一条数据
                }

                @Override
                public void onNext(String item) {
                    System.out.println("订阅者1消费数据: " + item);
                    subscription.request(1);  // 请求下一条数据
                }

                @Override
                public void onError(Throwable throwable) {
                    System.err.println("订阅者1 错误: " + throwable.getMessage());
                }

                @Override
                public void onComplete() {
                    System.out.println("订阅者1 完成");
                }
            };

            // 创建4个处理器,上面说了多个处理器和串联成一个管道
            MyProcessor myProcessor1 = new MyProcessor(1);
            MyProcessor myProcessor2 = new MyProcessor(2);
            MyProcessor myProcessor3 = new MyProcessor(3);
            MyProcessor myProcessor4 = new MyProcessor(4);

            // 绑定处理器链,数据流从 myProcessor1 传递到 myProcessor2,再到 myProcessor3,最后到 myProcessor4
            myProcessor1.subscribe(myProcessor2);
            myProcessor2.subscribe(myProcessor3);
            myProcessor3.subscribe(myProcessor4);

            // 将处理器4的输出绑定到订阅者1,订阅者要的是最后处理好的数据
            myProcessor4.subscribe(subscriber1);

            // 发布者将数据交给processor1,让processor1从头开始处理
            publisher.subscribe(myProcessor1);

            // 发布数据
            for (int i = 0; i < 10; i++) {
                publisher.submit("p+" + i);
            }

            // 关闭发布者
            publisher.close();

            // 等待处理完成
            Thread.sleep(1000);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

# 总结

响应式编程

  1. 底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
  2. 编码:流式编程 + 链式调用 + 声明式 API
  3. 效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源
#响应式编程
上次更新: 2025/04/12, 05:37:39
SpringBoot3-场景集成
Reactor 核心

← SpringBoot3-场景集成 Reactor 核心→

最近更新
01
Reactor 核心
02-24
02
计算机网络
09-13
03
软件工程
09-13
更多文章>
Theme by Vdoing | Copyright © 2019-2025 powered by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式