在响应式编程中,数据流 是核心抽象。Project Reactor 提供了两个核心类型:Mono<T> 和 Flux<T>,它们都实现了 Reactive Streams 规范的 Publisher<T> 接口,代表一个异步的、可能包含零个或多个元素的序列。
一、基本概念
两者都是 懒加载 的:在没有 subscribe() 订阅之前,什么都不会发生。它们支持 背压(下游控制上游发射速度)和 非阻塞。
二、创建 Mono 和 Flux
2.1 创建 Mono
java
// 包含一个值
Mono<String> monoJust = Mono.just("Hello");
// 空 Mono(类似 Optional.empty())
Mono<String> monoEmpty = Mono.empty();
// 只包含错误
Mono<String> monoError = Mono.error(new RuntimeException("出错了"));
// 延迟执行(从 Callable 创建,适合包装阻塞代码)
Mono<String> monoFromCallable = Mono.fromCallable(() -> {
Thread.sleep(1000);
return "Result";
});
// 不产生值,只表示完成信号(类似 Runnable)
Mono<Void> monoVoid = Mono.fromRunnable(() -> System.out.println("完成"));
// 延迟创建(每次订阅才生成真正的 Mono)
Mono<String> monoDefer = Mono.defer(() -> Mono.just(LocalTime.now().toString()));2.2 创建 Flux
java
// 多个已知值
Flux<String> fluxJust = Flux.just("A", "B", "C");
// 从集合/数组创建
Flux<Integer> fluxFromIterable = Flux.fromIterable(Arrays.asList(1, 2, 3));
// 范围
Flux<Integer> fluxRange = Flux.range(1, 5); // 1,2,3,4,5
// 空 Flux
Flux<String> fluxEmpty = Flux.empty();
// 错误 Flux
Flux<String> fluxError = Flux.error(new RuntimeException("错误"));
// 无限流
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1)); // 0,1,2,... 每秒发射
// 动态生成(同步)
Flux<String> fluxGenerate = Flux.generate(() -> 0, (state, sink) -> {
sink.next("Value-" + state);
if (state == 3) sink.complete();
return state + 1;
});
// 动态生成(异步,多元素)
Flux<String> fluxCreate = Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
sink.next("Data-" + i);
}
sink.complete();
});三、订阅与触发执行
不订阅 = 不执行。订阅时传入消费者(Consumer)来处理每个元素、错误和完成信号。
java
// 最简单的订阅(触发执行,但忽略结果)
monoJust.subscribe();
// 订阅并消费元素
monoJust.subscribe(System.out::println);
// 处理元素、错误、完成
fluxJust.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
// 还支持背压控制(请求数量)
fluxJust.subscribe(
value -> System.out.println(value),
error -> {},
() -> {},
subscription -> subscription.request(2) // 只请求前2个元素
);实际开发中通常不会直接调用
subscribe(),而是由框架(如 Spring WebFlux)自动订阅返回的 Mono/Flux。
四、常用操作符(Operators)
操作符将多个响应式流连接成处理链,返回新的 Mono/Flux。
4.1 转换操作符
java
Flux.just(1,2,3)
.map(i -> "Number-" + i) // String: Number-1,Number-2,Number-3
.flatMap(s -> Mono.just(s.length())) // 转为长度: 8,8,8
.subscribe(System.out::println);4.2 过滤操作符
java
Flux.range(1, 10)
.filter(i -> i % 2 == 0) // 偶数
.take(2) // 2,4
.subscribe();4.3 合并与组合
java
Flux<Integer> a = Flux.just(1,2);
Flux<Integer> b = Flux.just(10,20);
Flux.zip(a, b, (x, y) -> x + y) // 输出 11,22
.subscribe(System.out::println);4.4 副作用操作符(Do 系列)
用于日志、埋点、监控,不修改数据流:
java
flux.doOnSubscribe(s -> log.info("订阅开始"))
.doOnNext(item -> log.info("收到元素: {}", item))
.doOnError(e -> log.error("错误", e))
.doOnComplete(() -> log.info("完成"))
.doFinally(signal -> log.info("最终结束: {}", signal))
.subscribe();4.5 错误处理操作符
java
Mono.just("123")
.map(Integer::parseInt)
.onErrorReturn(0) // 解析失败返回0
.onErrorResume(e -> Mono.just(-1)) // 或者返回备用 Mono
.retry(3) // 出错重试3次
.subscribe();4.6 条件与默认值
java
Mono.empty().defaultIfEmpty("默认值"); // 空时返回默认值
Mono.just("A").switchIfEmpty(Mono.just("B")); // 空时切换备选
flux.hasElement(5); // 返回 Mono<Boolean>
flux.any(i -> i > 0); // 任一满足条件4.7 聚合与收集
java
flux.count(); // Mono<Long> 元素个数
flux.reduce((a,b) -> a+b); // Mono<T> 归约
flux.collectList(); // Mono<List<T>> 收集为列表
flux.collectMap(i -> i % 2); // Mono<Map>4.8 阻塞操作(仅测试或非响应式边界)
java
String result = mono.block(); // 阻塞等待(不推荐在生产使用)
User user = mono.block(Duration.ofSeconds(1));
List<Integer> list = flux.collectList().block();五、背压(Backpressure)支持
Mono/Flux 天然支持背压。下游可以通过 subscription.request(n) 告知上游需要多少数据。
java
Flux.range(1, 1000)
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(5); // 只请求5个
}
@Override
public void onNext(Integer i) { System.out.println(i); }
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});使用操作符简化背压控制:
java
flux.limitRate(10); // 每次请求10个
flux.limitRequest(100); // 最多只取100个六、线程调度(Schedulers)
默认情况下,操作链在 subscribe() 调用的线程上执行。可通过 subscribeOn 和 publishOn 切换线程池。
java
Mono.fromCallable(() -> blockingOp())
.subscribeOn(Schedulers.boundedElastic()) // 源头在弹性线程执行
.publishOn(Schedulers.parallel()) // 后续操作切换到并行线程
.map(x -> x + " processed")
.subscribe();
subscribeOn影响上游(直到遇到publishOn),publishOn影响下游。
七、测试响应式流(StepVerifier)
Project Reactor 提供 StepVerifier 用于测试。
java
@Test
public void testFlux() {
Flux<Integer> flux = Flux.just(1,2,3).map(i -> i * 10);
StepVerifier.create(flux)
.expectNext(10, 20, 30)
.verifyComplete();
}
@Test
public void testError() {
Mono<String> mono = Mono.error(new RuntimeException("Boom"));
StepVerifier.create(mono)
.expectErrorMatches(e -> e.getMessage().equals("Boom"))
.verify();
}八、Mono 与 Flux 的对比总结
九、与 Future / CompletableFuture 对比
十、最佳实践提示
避免在响应式链中调用阻塞方法(如
Thread.sleep()、ResultSet.next())。必须阻塞时用subscribeOn(Schedulers.boundedElastic())。不要共享可变状态,除非使用
Synchronized或原子类,但最好用不可变对象。善用
defer确保每次订阅都重新计算源。始终处理错误,至少在最外层使用
onErrorResume或全局处理。使用
checkpoint()或log()辅助调试栈追踪。
通过掌握 Mono 和 Flux 的创建、操作符、背压、调度和测试,你就能在 Spring WebFlux 等响应式框架中高效开发非阻塞、高吞吐量的应用。
fromCallable 与 subscribeOn 的关系:不会冲突,不会重复创建线程
先给出结论:不会冲突,也不会重复创建线程。
原因在于:fromCallable 本身 并不创建任何线程,它只是将一个 Callable 包装成一个响应式 Mono;真正决定在哪个线程上执行 Callable 逻辑的是 subscribeOn(以及最终的 subscribe)。
一、Mono.fromCallable 做了什么?
java
Mono<String> mono = Mono.fromCallable(() -> {
// 这里是你的业务逻辑
return "Hello";
});fromCallable接收一个Callable,返回一个Mono。此时没有线程被创建,
Callable内的代码 不会执行(Mono 是惰性的)。它仅仅是一个声明:“当有人订阅这个 Mono 时,请在某个线程上执行这个 Callable,并将其结果作为数据元素发射”。
二、subscribeOn 做了什么?
java
mono.subscribeOn(Schedulers.boundedElastic())
.subscribe(...);subscribeOn是一个 调度操作符,它告诉 Reactor:“当订阅发生时,请将整个上游(包括fromCallable中的 Callable)的执行,切换到指定的调度器(线程池)上运行”。它不会创建新的线程池(你传入的
Schedulers.boundedElastic()本身就是一个已存在的共享线程池),也不会在subscribeOn调用时立即创建任何线程。
三、完整执行流程(无重复线程)
java
Mono.fromCallable(() -> {
System.out.println("执行线程: " + Thread.currentThread().getName());
return "result";
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe(System.out::println);执行步骤:
代码执行到
subscribe(...)时,订阅动作开始。subscribeOn拦截订阅信号,将整个上游(fromCallable部分)的任务提交给boundedElastic线程池。boundedElastic从它的线程池中 取出一个空闲线程(例如名为boundedElastic-1),在该线程上执行Callable中的逻辑。打印:
执行线程: boundedElastic-1。结果
"result"被发射,并在同一线程(或下游可能切换)上传递给subscribe的消费者。
整个过程只使用了 boundedElastic 线程池中的一个线程,fromCallable 没有自己创建任何线程。所以不存在“重复创建线程”的问题。
四、误解来源:认为 Callable 会自带线程
很多初学者误以为 Callable 本身就关联了线程(比如 ExecutorService.submit(Callable) 会创建/分配线程)。但 Mono.fromCallable 只是将 Callable 包装成声明式的数据源,并不提交给任何执行器。它仅仅是一个“待执行的代码块”,必须由后续的调度器(或默认的立即执行)来驱动。
五、如果不用 subscribeOn 会怎样?
java
Mono.fromCallable(() -> {
System.out.println(Thread.currentThread().getName());
return "result";
}).subscribe();没有
subscribeOn,那么Callable会在 调用subscribe()的线程 上执行。例如在 WebFlux 请求处理线程(Netty 事件循环线程)中执行,如果
Callable内有阻塞操作,会阻塞事件循环。
六、fromCallable + subscribeOn 的最佳实践
不要 在
fromCallable内部再手动创建线程或提交到其他线程池,那才是真正的重复线程。只需 用
subscribeOn指定一个合适的调度器(如boundedElastic用于阻塞操作)。如果你既在
Callable内部用了ExecutorService,又在外面用subscribeOn,才会造成线程冗余,但那属于编码错误,不是框架的冲突。
正确示例(仅使用 Reactor 调度器):
java
Mono.fromCallable(() -> {
// 阻塞 IO 操作
return jdbcTemplate.queryForList("SELECT * FROM user");
})
.subscribeOn(Schedulers.boundedElastic())
.map(...)
.subscribe(...);错误示例(手动多线程,画蛇添足):
java
Mono.fromCallable(() -> {
ExecutorService es = Executors.newSingleThreadExecutor();
Future<String> future = es.submit(() -> "bad");
return future.get(); // 又在外面 subscribeOn,浪费线程
})
.subscribeOn(Schedulers.boundedElastic()) // 外面的线程池已经足够
.subscribe(...);七、总结
一句话总结:fromCallable 负责提供任务逻辑,subscribeOn 负责指定执行任务的线程池,两者各司其职,完美配合,没有重复线程。