Spring WebFlux
提示
重要说明:本笔记聚焦于 Spring Framework 核心模块中的 WebFlux,而非 Spring Boot 的自动配置。所有内容均对应 Spring Framework 官方文档原始章节。在学习本篇内容之前,建议先完成对 Reactor 篇 (opens new window)的内容学习。
# 一、核心概念与架构设计
# 1.1 什么是 Spring WebFlux
Spring WebFlux 是 Spring Framework 5.0 引入的响应式 Web 框架,与 Spring MVC 并列,但基于完全不同的技术栈:
- 编程模型:支持注解式(@Controller)和函数式(RouterFunction)两种开发模式
- 运行时环境:无需 Servlet API,基于 Reactive Streams 规范
- 并发模型:使用少量线程处理大量请求(事件驱动,非阻塞 I/O)
- 依赖基础:基于 Reactor 项目(Flux/Mono)和 Netty 运行时
官方文档对应:Spring Framework Docs → Web on Reactive Stack → 1. Spring WebFlux (opens new window)
# 1.2 响应式编程基础
# 核心规范
- Reactive Streams:定义了
Publisher/Subscriber/Subscription/Processor四个核心接口 - Reactor 项目:Spring 官方实现的响应式库,提供
Mono(0-1 个元素)和Flux(0-N 个元素)类型 - 背压(Backpressure):下游控制上游数据流速率的机制
# 与 Spring MVC 的本质区别
| 特性 | Spring MVC | Spring WebFlux |
|---|---|---|
| IO 模型 | 阻塞式 Servlet API | 非阻塞 Reactive Streams |
| 线程模型 | 每请求一线程(Thread-per-request) | 事件循环(Event Loop) |
| 并发能力 | 依赖线程池大小 | 少量线程处理海量连接 |
| 数据访问 | 阻塞 JDBC/JPA | 响应式 R2DBC、MongoDB Reactive |
| 适用场景 | 传统 CRUD、CPU 密集型 | 高并发、IO 密集型、流式处理 |
# 二、技术栈与依赖配置
# 2.1 核心依赖(非 Spring Boot 方式)
<!-- Maven 依赖 -->
<dependencies>
<!-- Spring WebFlux 核心模块 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>6.1.4</version> <!-- 对应 Spring Framework 版本 -->
</dependency>
<!-- Reactor 核心 -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>2023.0.3</version>
</dependency>
<!-- 运行时选择:Netty(默认)或 Servlet 3.1+ 容器 -->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
<version>1.1.15</version>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 2.2 支持的运行时
WebFlux 可在两种运行时上部署:
- Netty(推荐):
reactor-netty,完全非阻塞 - Servlet 3.1+ 容器:Tomcat 9.0+, Jetty 9.4+, Undertow 2.0+(需支持非阻塞 IO)
手动启动服务器:
HttpHandler handler = (request, response) -> {
System.out.println("收到了请求,URI=" + request.getURI());
DataBufferFactory dataBufferFactory = response.bufferFactory();
DataBuffer wrap = dataBufferFactory.wrap("Hello World".getBytes());
response.writeWith(Mono.just(wrap)).subscribe();
return response.setComplete();
};
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer.create()
.host("localhost")
.port(8080)
.handle(adapter)
.bindNow()
.onDispose()
.block();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 三、核心组件与处理流程
# 3.1 WebFlux 请求处理链
核心入口: DispatcherHandler (对应 MVC 的 DispatcherServlet)
public class DispatcherHandler implements WebHandler {
// 核心组件
private List<HandlerMapping> handlerMappings;
private List<HandlerAdapter> handlerAdapters;
private List<HandlerResultHandler> resultHandlers;
// 处理流程
public Mono<Void> handle(ServerWebExchange exchange) {
return Flux.fromIterable(handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3.2 核心组件详解
# HandlerMapping
- 将请求映射到处理器(类似 MVC 的 HandlerMapping)
- 内置实现:
RequestMappingHandlerMapping(注解式)、RouterFunctionMapping(函数式)
# HandlerAdapter
- 适配不同类型的处理器
- 内置实现:
RequestMappingHandlerAdapter(@Controller)、HandlerFunctionAdapter(函数式)
# WebHandler API
WebFlux 的底层 API,所有请求处理都基于 WebHandler 接口:
public interface WebHandler {
Mono<Void> handle(ServerWebExchange exchange);
}
2
3
处理链装饰器:
ExceptionHandlingWebHandler:异常处理FilteringWebHandler:WebFilter 链HttpWebHandlerAdapter:适配 HttpHandler
# 四、注解式编程模型
# 4.1 核心注解(完全向下兼容 Spring MVC)
@RestController
@RequestMapping("/users")
public class UserController {
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id);
}
@GetMapping
public Flux<User> listUsers() {
return userRepository.findAll();
}
@PostMapping
public Mono<ResponseEntity<User>> createUser(@RequestBody User user) {
return userRepository.save(user)
.map(saved -> ResponseEntity.created(
URI.create("/users/" + saved.getId())
).body(saved));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 4.2 请求方法参数支持
| 参数类型 | 说明 | 示例 |
|---|---|---|
ServerWebExchange | 完整交换对象 | exchange.getRequest().getHeaders() |
@PathVariable | 路径变量 | @PathVariable("id") Long id |
@RequestParam | 请求参数 | @RequestParam(defaultValue="1") int page |
@RequestBody | 请求体(支持 Reactive) | @RequestBody Mono<User> userMono |
@RequestHeader | 请求头 | @RequestHeader("Authorization") String auth |
@CookieValue | Cookie | @CookieValue("sessionId") String session |
Principal | 认证主体 | Mono<Principal> principal |
@MatrixVariable | 矩阵变量 | @MatrixVariable(pathVar="cars") List<String> colors |
响应式参数绑定:
@PostMapping
public Mono<Void> create(@RequestBody Flux<User> userStream) {
return userRepository.saveAll(userStream).then();
}
2
3
4
# 4.3 返回值类型支持
| 返回值类型 | 说明 |
|---|---|
Mono<T> | 异步单个结果 |
Flux<T> | 异步多个结果(流) |
ResponseEntity<Mono<T>> | 带状态的响应 |
ServerResponse | 函数式响应对象 |
Mono<Void> | 无返回值,仅状态 |
Mono<Rendering> | 视图渲染(模板引擎) |
SSE 流式返回:
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.builder("event-" + seq).build());
}
2
3
4
5
# 五、函数式编程模型
# 5.1 核心概念
RouterFunction:路由函数,将请求路由到对应的 HandlerFunction
HandlerFunction:处理器函数,直接处理请求
// 定义处理器
public class UserHandler {
public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
// 配置路由
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions
.route(GET("/users/{id}").and(accept(APPLICATION_JSON)), handler::getUser)
.andRoute(GET("/users").and(accept(APPLICATION_JSON)), handler::listUsers)
.andRoute(POST("/users").and(contentType(APPLICATION_JSON)), handler::createUser);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 5.2 RouterFunction 高级用法
嵌套路由:
@Bean
public RouterFunction<ServerResponse> nestedRoute() {
return RouterFunctions.route()
.path("/api", builder -> builder
.nest(accept(APPLICATION_JSON), nested -> nested
.GET("/users", request -> ServerResponse.ok().build())
.POST("/users", request -> ServerResponse.ok().build())
)
.GET("/health", request -> ServerResponse.ok().bodyValue("UP"))
)
.build();
}
2
3
4
5
6
7
8
9
10
11
12
过滤器:
@Bean
public RouterFunction<ServerResponse> filteredRoute() {
return RouterFunctions.route()
.GET("/protected", request -> ServerResponse.ok().build())
.filter((request, next) -> {
String token = request.headers().firstHeader("Authorization");
if (isValid(token)) {
return next.handle(request);
}
return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
})
.build();
}
2
3
4
5
6
7
8
9
10
11
12
13
# 六、响应式类型深度解析
# 6.1 Mono 与 Flux 操作
Mono(0-1 个元素):
Mono.just("Hello")
.map(String::toUpperCase)
.flatMap(this::saveToDatabase)
.onErrorResume(e -> Mono.error(new CustomException(e)))
.doOnNext(System.out::println);
2
3
4
5
Flux(0-N 个元素):
Flux.range(1, 100)
.delayElements(Duration.ofMillis(100))
.buffer(10) // 每10个分一组
.flatMap(this::batchProcess)
.onBackpressureBuffer(100) // 背压策略
.subscribe();
2
3
4
5
6
# 6.2 背压策略
flux.onBackpressureBuffer(100) // 缓冲
.onBackpressureDrop() // 丢弃
.onBackpressureLatest() // 只保留最新
.onBackpressureError() // 抛出异常
2
3
4
# 七、WebClient 响应式 HTTP 客户端
# 7.1 核心特性
WebClient 是 WebFlux 提供的非阻塞、响应式 HTTP 客户端,用于替代 RestTemplate。
// 创建 WebClient
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.codecs(config -> config.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
.build();
// 发起请求
Mono<User> userMono = client.get()
.uri("/users/{id}", 123)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new NotFoundException()))
.bodyToMono(User.class);
// 流式处理
Flux<Event> eventStream = client.get()
.uri("/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(Event.class);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 7.2 高级用法
请求体流式上传:
Flux<DataBuffer> dataBufferFlux = ...;
client.post()
.uri("/upload")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(BodyInserters.fromDataBuffers(dataBufferFlux))
.retrieve()
.bodyToMono(Void.class);
2
3
4
5
6
7
表单提交:
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("username", "admin");
formData.add("password", "secret");
client.post()
.uri("/login")
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(BodyInserters.fromFormData(formData))
.retrieve()
.bodyToMono(Token.class);
2
3
4
5
6
7
8
9
10
# 八、服务器推送事件(SSE)
# 8.1 SSE 实现
@RestController
public class SseController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE Message - " + LocalTime.now())
.comment("comment")
.build());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
客户端接收:
webClient.get()
.uri("/sse")
.retrieve()
.bodyToFlux(String.class)
.subscribe(System.out::println);
2
3
4
5
# 九、WebSocket 支持
# 9.1 响应式 WebSocket API
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/echo", new EchoWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
public class EchoWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.map(msg -> "Echo: " + msg.getPayloadAsText())
.map(session::textMessage);
return session.send(output);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 9.2 STOMP over WebSocket
@Configuration
@EnableWebFluxMessageBroker
public class StompConfig implements WebFluxMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp").setAllowedOrigins("*").withSockJS();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 十、测试支持
# 10.1 WebTestClient
WebTestClient 是 WebFlux 的测试客户端,可绑定到真实服务器或模拟环境。
@SpringJUnitConfig(WebConfig.class)
public class UserControllerTest {
@Autowired
private ApplicationContext context;
private WebTestClient client;
@BeforeEach
void setUp() {
client = WebTestClient.bindToApplicationContext(context).build();
}
@Test
void testGetUser() {
client.get()
.uri("/users/123")
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(User.class)
.value(user -> assertEquals("John", user.getName()));
}
@Test
void testStream() {
client.get()
.uri("/stream")
.exchange()
.expectStatus().isOk()
.returnResult(String.class)
.getResponseBody()
.take(5)
.as(StepVerifier::create)
.expectNextCount(5)
.verifyComplete();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 十一、高级主题
# 11.1 错误处理
全局异常处理:
@ControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(NotFoundException.class)
public Mono<ServerResponse> handleNotFound(NotFoundException ex) {
return ServerResponse.notFound().build();
}
@ExceptionHandler(Exception.class)
public Mono<ServerResponse> handleGenericError(Exception ex) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", ex.getMessage()));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
函数式错误处理:
@Bean
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/users/{id}", this::getUser)
.onError(NotFoundException.class,
(e, request) -> ServerResponse.notFound().build())
.build();
}
2
3
4
5
6
7
8
# 11.2 CORS 配置
@Configuration
public class CorsConfig implements WebFluxConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/**")
.allowedOrigins("https://example.com")
.allowedMethods("GET", "POST", "PUT", "DELETE")
.allowedHeaders("*")
.allowCredentials(true)
.maxAge(3600);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 11.3 内容协商
@Configuration
public class ContentNegotiationConfig implements WebFluxConfigurer {
@Override
public void configureContentTypeResolver(RequestedContentTypeResolverBuilder builder) {
builder.fixedResolver(MediaType.APPLICATION_JSON);
}
}
2
3
4
5
6
7
8
# 11.4 静态资源处理
@Configuration
public class StaticResourceConfig implements WebFluxConfigurer {
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/static/")
.setCacheControl(CacheControl.maxAge(365, TimeUnit.DAYS));
}
}
2
3
4
5
6
7
8
9
10
# 十二、性能调优与最佳实践
# 12.1 线程模型
WebFlux 默认使用 Netty 的 EventLoop:
- Boss Group:接受连接(通常 1 个线程)
- Worker Group:处理 I/O(默认 CPU 核心数 * 2)
自定义线程池:
@Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setLoopResourcesSupplier(() ->
LoopResources.create("my-http", 2, 4, true)
);
return factory;
}
2
3
4
5
6
7
8
# 12.2 背压配置
@Bean
public WebFluxConfigurer webFluxConfigurer() {
return new WebFluxConfigurer() {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs()
.maxInMemorySize(2 * 1024 * 1024) // 2MB
.enableLoggingRequestDetails(true);
}
};
}
2
3
4
5
6
7
8
9
10
11
# 12.3 最佳实践
- 避免阻塞操作:所有 I/O 操作必须使用响应式 API
- 不要混用阻塞代码:在 Netty 线程中调用
block()会导致死锁 - 使用
publishOn切换线程:flux.publishOn(Schedulers.boundedElastic()) // CPU 密集型任务1 - 合理处理背压:根据业务场景选择合适的背压策略
- 资源清理:使用
doFinally或using操作符释放资源