笔记 笔记
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

EasT-Duan

Java 开发
首页
  • 开发工具
  • Java Web
  • Java 进阶
  • 容器化技术
  • Java 专栏

    • Java 核心技术面试精讲
    • Java 业务开发常见错误 100 例
  • 数据库专栏

    • MySQL 实战 45 讲
    • Redis 核心技术与实战
  • 安全专栏

    • OAuth 2.0 实战课
  • 计算机系统
  • 程序设计语言
  • 数据结构
  • 知识产权
  • 数据库
  • 面向对象
  • UML
  • 设计模式
  • 操作系统
  • 结构化开发
  • 软件工程
  • 计算机网络
  • 上午题错题
在线工具 (opens new window)

购买兑换码请添加

添加时候请写好备注,否则无法通过。

  • Maven

  • Bootstrap

  • Spring

  • Spring MVC

  • MyBatis

  • JUnit

  • GitFlow 工作流指南

  • SpringBoot

  • Reactor

  • Spring WebFlux

    • Spring WebFlux
      • 一、核心概念与架构设计
        • 1.1 什么是 Spring WebFlux
        • 1.2 响应式编程基础
        • 核心规范
        • 与 Spring MVC 的本质区别
      • 二、技术栈与依赖配置
        • 2.1 核心依赖(非 Spring Boot 方式)
        • 2.2 支持的运行时
      • 三、核心组件与处理流程
        • 3.1 WebFlux 请求处理链
        • 3.2 核心组件详解
        • HandlerMapping
        • HandlerAdapter
        • WebHandler API
      • 四、注解式编程模型
        • 4.1 核心注解(完全向下兼容Spring MVC)
        • 4.2 请求方法参数支持
        • 4.3 返回值类型支持
      • 五、函数式编程模型
        • 5.1 核心概念
        • 5.2 RouterFunction 高级用法
      • 六、响应式类型深度解析
        • 6.1 Mono 与 Flux 操作
        • 6.2 背压策略
      • 七、WebClient 响应式 HTTP 客户端
        • 7.1 核心特性
        • 7.2 高级用法
      • 八、服务器推送事件(SSE)
        • 8.1 SSE 实现
      • 九、WebSocket 支持
        • 9.1 响应式 WebSocket API
        • 9.2 STOMP over WebSocket
      • 十、测试支持
        • 10.1 WebTestClient
      • 十一、高级主题
        • 11.1 错误处理
        • 11.2 CORS 配置
        • 11.3 内容协商
        • 11.4 静态资源处理
      • 十二、性能调优与最佳实践
        • 12.1 线程模型
        • 12.2 背压配置
        • 12.3 最佳实践
  • 微服务

  • Java Web
  • Spring WebFlux
EasT-Duan
2025-11-18
目录

Spring WebFlux

欢迎来到我的 ChatGPT 中转站,极具性价比,为付费不方便的朋友提供便利,有需求的可以添加左侧 QQ 二维码,另外,邀请新用户能获取余额哦!最后说一句,那啥:请自觉遵守《生成式人工智能服务管理暂行办法》。

提示

重要说明:本笔记聚焦于 Spring Framework 核心模块中的 WebFlux,而非 Spring Boot 的自动配置。所有内容均对应 Spring Framework 官方文档原始章节。在学习本篇内容之前,建议先完成对 Reactor 篇 (opens new window)的内容学习。


# 一、核心概念与架构设计

# 1.1 什么是 Spring WebFlux

Spring WebFlux 是 Spring Framework 5.0 引入的响应式 Web 框架,与 Spring MVC 并列,但基于完全不同的技术栈:

  • 编程模型:支持注解式(@Controller)和函数式(RouterFunction)两种开发模式
  • 运行时环境:无需 Servlet API,基于 Reactive Streams 规范
  • 并发模型:使用少量线程处理大量请求(事件驱动,非阻塞 I/O)
  • 依赖基础:基于 Reactor 项目(Flux/Mono)和 Netty 运行时

官方文档对应:Spring Framework Docs → Web on Reactive Stack → 1. Spring WebFlux (opens new window)

# 1.2 响应式编程基础

# 核心规范

  • Reactive Streams:定义了 Publisher / Subscriber / Subscription / Processor 四个核心接口
  • Reactor 项目:Spring 官方实现的响应式库,提供 Mono (0-1 个元素)和 Flux (0-N 个元素)类型
  • 背压(Backpressure):下游控制上游数据流速率的机制

# 与 Spring MVC 的本质区别

特性 Spring MVC Spring WebFlux
IO 模型 阻塞式 Servlet API 非阻塞 Reactive Streams
线程模型 每请求一线程(Thread-per-request) 事件循环(Event Loop)
并发能力 依赖线程池大小 少量线程处理海量连接
数据访问 阻塞 JDBC/JPA 响应式 R2DBC、MongoDB Reactive
适用场景 传统 CRUD、CPU 密集型 高并发、IO 密集型、流式处理

# 二、技术栈与依赖配置

# 2.1 核心依赖(非 Spring Boot 方式)

<!-- Maven 依赖 -->
<dependencies>
    <!-- Spring WebFlux 核心模块 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webflux</artifactId>
        <version>6.1.4</version> <!-- 对应 Spring Framework 版本 -->
    </dependency>
  
    <!-- Reactor 核心 -->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>2023.0.3</version>
    </dependency>
  
    <!-- 运行时选择:Netty(默认)或 Servlet 3.1+ 容器 -->
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty-http</artifactId>
        <version>1.1.15</version>
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 2.2 支持的运行时

WebFlux 可在两种运行时上部署:

  1. Netty(推荐): reactor-netty ,完全非阻塞
  2. Servlet 3.1+ 容器:Tomcat 9.0+, Jetty 9.4+, Undertow 2.0+(需支持非阻塞 IO)

手动启动服务器:

HttpHandler handler = (request, response) -> {
    System.out.println("收到了请求,URI=" + request.getURI());
    DataBufferFactory dataBufferFactory = response.bufferFactory();
    DataBuffer wrap = dataBufferFactory.wrap("Hello World".getBytes());
    response.writeWith(Mono.just(wrap)).subscribe();
    return response.setComplete();
};

ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer.create()
    .host("localhost")
    .port(8080)
    .handle(adapter)
    .bindNow()
    .onDispose()
    .block();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 三、核心组件与处理流程

# 3.1 WebFlux 请求处理链

核心入口: DispatcherHandler (对应 MVC 的 DispatcherServlet)

public class DispatcherHandler implements WebHandler {
    // 核心组件
    private List<HandlerMapping> handlerMappings;
    private List<HandlerAdapter> handlerAdapters;
    private List<HandlerResultHandler> resultHandlers;
  
    // 处理流程
    public Mono<Void> handle(ServerWebExchange exchange) {
        return Flux.fromIterable(handlerMappings)
            .concatMap(mapping -> mapping.getHandler(exchange))
            .next()
            .flatMap(handler -> invokeHandler(exchange, handler))
            .flatMap(result -> handleResult(exchange, result));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 3.2 核心组件详解

# HandlerMapping

  • 将请求映射到处理器(类似 MVC 的 HandlerMapping)
  • 内置实现: RequestMappingHandlerMapping (注解式)、 RouterFunctionMapping (函数式)

# HandlerAdapter

  • 适配不同类型的处理器
  • 内置实现: RequestMappingHandlerAdapter (@Controller)、 HandlerFunctionAdapter (函数式)

# WebHandler API

WebFlux 的底层 API,所有请求处理都基于 WebHandler 接口:

public interface WebHandler {
    Mono<Void> handle(ServerWebExchange exchange);
}
1
2
3

处理链装饰器:

  • ExceptionHandlingWebHandler :异常处理
  • FilteringWebHandler :WebFilter 链
  • HttpWebHandlerAdapter :适配 HttpHandler

# 四、注解式编程模型

# 4.1 核心注解(完全向下兼容 Spring MVC)

@RestController
@RequestMapping("/users")
public class UserController {

    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        return userRepository.findById(id);
    }
  
    @GetMapping
    public Flux<User> listUsers() {
        return userRepository.findAll();
    }
  
    @PostMapping
    public Mono<ResponseEntity<User>> createUser(@RequestBody User user) {
        return userRepository.save(user)
            .map(saved -> ResponseEntity.created(
                URI.create("/users/" + saved.getId())
            ).body(saved));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 4.2 请求方法参数支持

参数类型 说明 示例
ServerWebExchange 完整交换对象 exchange.getRequest().getHeaders()
@PathVariable 路径变量 @PathVariable("id") Long id
@RequestParam 请求参数 @RequestParam(defaultValue="1") int page
@RequestBody 请求体(支持 Reactive) @RequestBody Mono<User> userMono
@RequestHeader 请求头 @RequestHeader("Authorization") String auth
@CookieValue Cookie @CookieValue("sessionId") String session
Principal 认证主体 Mono<Principal> principal
@MatrixVariable 矩阵变量 @MatrixVariable(pathVar="cars") List<String> colors

响应式参数绑定:

@PostMapping
public Mono<Void> create(@RequestBody Flux<User> userStream) {
    return userRepository.saveAll(userStream).then();
}
1
2
3
4

# 4.3 返回值类型支持

返回值类型 说明
Mono<T> 异步单个结果
Flux<T> 异步多个结果(流)
ResponseEntity<Mono<T>> 带状态的响应
ServerResponse 函数式响应对象
Mono<Void> 无返回值,仅状态
Mono<Rendering> 视图渲染(模板引擎)

SSE 流式返回:

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
        .map(seq -> ServerSentEvent.builder("event-" + seq).build());
}
1
2
3
4
5

# 五、函数式编程模型

# 5.1 核心概念

RouterFunction:路由函数,将请求路由到对应的 HandlerFunction

HandlerFunction:处理器函数,直接处理请求

// 定义处理器
public class UserHandler {
    public Mono<ServerResponse> getUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        return userRepository.findById(id)
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

// 配置路由
@Configuration
public class UserRouter {
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions
            .route(GET("/users/{id}").and(accept(APPLICATION_JSON)), handler::getUser)
            .andRoute(GET("/users").and(accept(APPLICATION_JSON)), handler::listUsers)
            .andRoute(POST("/users").and(contentType(APPLICATION_JSON)), handler::createUser);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 5.2 RouterFunction 高级用法

嵌套路由:

@Bean
public RouterFunction<ServerResponse> nestedRoute() {
    return RouterFunctions.route()
        .path("/api", builder -> builder
            .nest(accept(APPLICATION_JSON), nested -> nested
                .GET("/users", request -> ServerResponse.ok().build())
                .POST("/users", request -> ServerResponse.ok().build())
            )
            .GET("/health", request -> ServerResponse.ok().bodyValue("UP"))
        )
        .build();
}
1
2
3
4
5
6
7
8
9
10
11
12

过滤器:

@Bean
public RouterFunction<ServerResponse> filteredRoute() {
    return RouterFunctions.route()
        .GET("/protected", request -> ServerResponse.ok().build())
        .filter((request, next) -> {
            String token = request.headers().firstHeader("Authorization");
            if (isValid(token)) {
                return next.handle(request);
            }
            return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
        })
        .build();
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 六、响应式类型深度解析

# 6.1 Mono 与 Flux 操作

Mono(0-1 个元素):

Mono.just("Hello")
    .map(String::toUpperCase)
    .flatMap(this::saveToDatabase)
    .onErrorResume(e -> Mono.error(new CustomException(e)))
    .doOnNext(System.out::println);
1
2
3
4
5

Flux(0-N 个元素):

Flux.range(1, 100)
    .delayElements(Duration.ofMillis(100))
    .buffer(10) // 每10个分一组
    .flatMap(this::batchProcess)
    .onBackpressureBuffer(100) // 背压策略
    .subscribe();
1
2
3
4
5
6

# 6.2 背压策略

flux.onBackpressureBuffer(100)  // 缓冲
    .onBackpressureDrop()        // 丢弃
    .onBackpressureLatest()      // 只保留最新
    .onBackpressureError()       // 抛出异常
1
2
3
4

# 七、WebClient 响应式 HTTP 客户端

# 7.1 核心特性

WebClient 是 WebFlux 提供的非阻塞、响应式 HTTP 客户端,用于替代 RestTemplate。

// 创建 WebClient
WebClient client = WebClient.builder()
    .baseUrl("https://api.example.com")
    .defaultHeader("Authorization", "Bearer token")
    .codecs(config -> config.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
    .build();

// 发起请求
Mono<User> userMono = client.get()
    .uri("/users/{id}", 123)
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new NotFoundException()))
    .bodyToMono(User.class);

// 流式处理
Flux<Event> eventStream = client.get()
    .uri("/events")
    .accept(MediaType.TEXT_EVENT_STREAM)
    .retrieve()
    .bodyToFlux(Event.class);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 7.2 高级用法

请求体流式上传:

Flux<DataBuffer> dataBufferFlux = ...;
client.post()
    .uri("/upload")
    .contentType(MediaType.APPLICATION_OCTET_STREAM)
    .body(BodyInserters.fromDataBuffers(dataBufferFlux))
    .retrieve()
    .bodyToMono(Void.class);
1
2
3
4
5
6
7

表单提交:

MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("username", "admin");
formData.add("password", "secret");

client.post()
    .uri("/login")
    .contentType(MediaType.APPLICATION_FORM_URLENCODED)
    .body(BodyInserters.fromFormData(formData))
    .retrieve()
    .bodyToMono(Token.class);
1
2
3
4
5
6
7
8
9
10

# 八、服务器推送事件(SSE)

# 8.1 SSE 实现

@RestController
public class SseController {
  
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> stream() {
        return Flux.interval(Duration.ofSeconds(1))
            .map(sequence -> ServerSentEvent.<String>builder()
                .id(String.valueOf(sequence))
                .event("periodic-event")
                .data("SSE Message - " + LocalTime.now())
                .comment("comment")
                .build());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

客户端接收:

webClient.get()
    .uri("/sse")
    .retrieve()
    .bodyToFlux(String.class)
    .subscribe(System.out::println);
1
2
3
4
5

# 九、WebSocket 支持

# 9.1 响应式 WebSocket API

@Configuration
public class WebSocketConfig {
  
    @Bean
    public HandlerMapping webSocketMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/echo", new EchoWebSocketHandler());
      
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        return mapping;
    }
  
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

public class EchoWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> output = session.receive()
            .map(msg -> "Echo: " + msg.getPayloadAsText())
            .map(session::textMessage);
          
        return session.send(output);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 9.2 STOMP over WebSocket

@Configuration
@EnableWebFluxMessageBroker
public class StompConfig implements WebFluxMessageBrokerConfigurer {
  
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }
  
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp").setAllowedOrigins("*").withSockJS();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 十、测试支持

# 10.1 WebTestClient

WebTestClient 是 WebFlux 的测试客户端,可绑定到真实服务器或模拟环境。

@SpringJUnitConfig(WebConfig.class)
public class UserControllerTest {
  
    @Autowired
    private ApplicationContext context;
  
    private WebTestClient client;
  
    @BeforeEach
    void setUp() {
        client = WebTestClient.bindToApplicationContext(context).build();
    }
  
    @Test
    void testGetUser() {
        client.get()
            .uri("/users/123")
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBody(User.class)
            .value(user -> assertEquals("John", user.getName()));
    }
  
    @Test
    void testStream() {
        client.get()
            .uri("/stream")
            .exchange()
            .expectStatus().isOk()
            .returnResult(String.class)
            .getResponseBody()
            .take(5)
            .as(StepVerifier::create)
            .expectNextCount(5)
            .verifyComplete();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 十一、高级主题

# 11.1 错误处理

全局异常处理:

@ControllerAdvice
public class GlobalExceptionHandler {
  
    @ExceptionHandler(NotFoundException.class)
    public Mono<ServerResponse> handleNotFound(NotFoundException ex) {
        return ServerResponse.notFound().build();
    }
  
    @ExceptionHandler(Exception.class)
    public Mono<ServerResponse> handleGenericError(Exception ex) {
        return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
            .bodyValue(Map.of("error", ex.getMessage()));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

函数式错误处理:

@Bean
public RouterFunction<ServerResponse> routes() {
    return RouterFunctions.route()
        .GET("/users/{id}", this::getUser)
        .onError(NotFoundException.class, 
            (e, request) -> ServerResponse.notFound().build())
        .build();
}
1
2
3
4
5
6
7
8

# 11.2 CORS 配置

@Configuration
public class CorsConfig implements WebFluxConfigurer {
  
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/api/**")
            .allowedOrigins("https://example.com")
            .allowedMethods("GET", "POST", "PUT", "DELETE")
            .allowedHeaders("*")
            .allowCredentials(true)
            .maxAge(3600);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 11.3 内容协商

@Configuration
public class ContentNegotiationConfig implements WebFluxConfigurer {
  
    @Override
    public void configureContentTypeResolver(RequestedContentTypeResolverBuilder builder) {
        builder.fixedResolver(MediaType.APPLICATION_JSON);
    }
}
1
2
3
4
5
6
7
8

# 11.4 静态资源处理

@Configuration
public class StaticResourceConfig implements WebFluxConfigurer {
  
    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry.addResourceHandler("/static/**")
            .addResourceLocations("classpath:/static/")
            .setCacheControl(CacheControl.maxAge(365, TimeUnit.DAYS));
    }
}
1
2
3
4
5
6
7
8
9
10

# 十二、性能调优与最佳实践

# 12.1 线程模型

WebFlux 默认使用 Netty 的 EventLoop:

  • Boss Group:接受连接(通常 1 个线程)
  • Worker Group:处理 I/O(默认 CPU 核心数 * 2)

自定义线程池:

@Bean
public ReactorResourceFactory resourceFactory() {
    ReactorResourceFactory factory = new ReactorResourceFactory();
    factory.setLoopResourcesSupplier(() -> 
        LoopResources.create("my-http", 2, 4, true)
    );
    return factory;
}
1
2
3
4
5
6
7
8

# 12.2 背压配置

@Bean
public WebFluxConfigurer webFluxConfigurer() {
    return new WebFluxConfigurer() {
        @Override
        public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
            configurer.defaultCodecs()
                .maxInMemorySize(2 * 1024 * 1024) // 2MB
                .enableLoggingRequestDetails(true);
        }
    };
}
1
2
3
4
5
6
7
8
9
10
11

# 12.3 最佳实践

  1. 避免阻塞操作:所有 I/O 操作必须使用响应式 API
  2. 不要混用阻塞代码:在 Netty 线程中调用 block() 会导致死锁
  3. 使用 publishOn 切换线程:
    flux.publishOn(Schedulers.boundedElastic()) // CPU 密集型任务
    
    1
  4. 合理处理背压:根据业务场景选择合适的背压策略
  5. 资源清理:使用 doFinally 或 using 操作符释放资源

上次更新: 2025/11/18, 02:03:50
Reactor 核心
微服务架构的概念

← Reactor 核心 微服务架构的概念→

最近更新
01
Reactor 核心
02-24
02
前置条件
10-30
03
计算机网络
09-13
更多文章>
Theme by Vdoing | Copyright © 2019-2025 powered by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式