Mono 和 Flux 详解

吴书松
吴书松
发布于 2026-04-07 / 3 阅读
0
0

Mono 和 Flux 详解

在响应式编程中,数据流 是核心抽象。Project Reactor 提供了两个核心类型:Mono<T>Flux<T>,它们都实现了 Reactive Streams 规范的 Publisher<T> 接口,代表一个异步的、可能包含零个或多个元素的序列。


一、基本概念

类型

语义

最大元素数

典型场景

Mono<T>

异步返回 0 或 1 个元素

1

单值结果(如 HTTP 响应、数据库单条查询、Optional 值)

Flux<T>

异步返回 0 到 N 个元素

无限

集合、流式数据、实时事件、SSE 等

两者都是 懒加载 的:在没有 subscribe() 订阅之前,什么都不会发生。它们支持 背压(下游控制上游发射速度)和 非阻塞


二、创建 Mono 和 Flux

2.1 创建 Mono

java

// 包含一个值
Mono<String> monoJust = Mono.just("Hello");

// 空 Mono(类似 Optional.empty())
Mono<String> monoEmpty = Mono.empty();

// 只包含错误
Mono<String> monoError = Mono.error(new RuntimeException("出错了"));

// 延迟执行(从 Callable 创建,适合包装阻塞代码)
Mono<String> monoFromCallable = Mono.fromCallable(() -> {
    Thread.sleep(1000);
    return "Result";
});

// 不产生值,只表示完成信号(类似 Runnable)
Mono<Void> monoVoid = Mono.fromRunnable(() -> System.out.println("完成"));

// 延迟创建(每次订阅才生成真正的 Mono)
Mono<String> monoDefer = Mono.defer(() -> Mono.just(LocalTime.now().toString()));

2.2 创建 Flux

java

// 多个已知值
Flux<String> fluxJust = Flux.just("A", "B", "C");

// 从集合/数组创建
Flux<Integer> fluxFromIterable = Flux.fromIterable(Arrays.asList(1, 2, 3));

// 范围
Flux<Integer> fluxRange = Flux.range(1, 5);   // 1,2,3,4,5

// 空 Flux
Flux<String> fluxEmpty = Flux.empty();

// 错误 Flux
Flux<String> fluxError = Flux.error(new RuntimeException("错误"));

// 无限流
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1)); // 0,1,2,... 每秒发射

// 动态生成(同步)
Flux<String> fluxGenerate = Flux.generate(() -> 0, (state, sink) -> {
    sink.next("Value-" + state);
    if (state == 3) sink.complete();
    return state + 1;
});

// 动态生成(异步,多元素)
Flux<String> fluxCreate = Flux.create(sink -> {
    for (int i = 0; i < 5; i++) {
        sink.next("Data-" + i);
    }
    sink.complete();
});

三、订阅与触发执行

不订阅 = 不执行。订阅时传入消费者(Consumer)来处理每个元素、错误和完成信号。

java

// 最简单的订阅(触发执行,但忽略结果)
monoJust.subscribe();

// 订阅并消费元素
monoJust.subscribe(System.out::println);

// 处理元素、错误、完成
fluxJust.subscribe(
    value -> System.out.println("Value: " + value),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed")
);

// 还支持背压控制(请求数量)
fluxJust.subscribe(
    value -> System.out.println(value),
    error -> {},
    () -> {},
    subscription -> subscription.request(2)   // 只请求前2个元素
);

实际开发中通常不会直接调用 subscribe(),而是由框架(如 Spring WebFlux)自动订阅返回的 Mono/Flux。


四、常用操作符(Operators)

操作符将多个响应式流连接成处理链,返回新的 Mono/Flux。

4.1 转换操作符

操作符

作用

示例

map

同步转换每个元素

flux.map(i -> i * 2)

flatMap

异步转换,每个元素转为 Mono/Flux 并展平

mono.flatMap(user -> save(user))

concatMap

类似 flatMap,但保持顺序

同上

cast

类型转换

flux.cast(User.class)

ofType

过滤并转换类型

flux.ofType(String.class)

java

Flux.just(1,2,3)
    .map(i -> "Number-" + i)          // String: Number-1,Number-2,Number-3
    .flatMap(s -> Mono.just(s.length())) // 转为长度: 8,8,8
    .subscribe(System.out::println);

4.2 过滤操作符

操作符

作用

示例

filter

过滤元素

flux.filter(i -> i > 0)

take

取前 N 个

flux.take(3)

skip

跳过前 N 个

flux.skip(2)

distinct

去重

flux.distinct()

elementAt

取指定索引的元素(返回 Mono)

flux.elementAt(2)

java

Flux.range(1, 10)
    .filter(i -> i % 2 == 0)   // 偶数
    .take(2)                   // 2,4
    .subscribe();

4.3 合并与组合

操作符

作用

示例

concat / concatWith

先后连接(先完后再下一个)

flux1.concatWith(flux2)

merge / mergeWith

交错合并(按发射时间)

flux1.mergeWith(flux2)

zip / zipWith

组合成 Tuple 或自定义对象

Mono.zip(userMono, orderMono)

combineLatest

任一最新值组合

-

java

Flux<Integer> a = Flux.just(1,2);
Flux<Integer> b = Flux.just(10,20);
Flux.zip(a, b, (x, y) -> x + y)  // 输出 11,22
    .subscribe(System.out::println);

4.4 副作用操作符(Do 系列)

用于日志、埋点、监控,不修改数据流:

java

flux.doOnSubscribe(s -> log.info("订阅开始"))
    .doOnNext(item -> log.info("收到元素: {}", item))
    .doOnError(e -> log.error("错误", e))
    .doOnComplete(() -> log.info("完成"))
    .doFinally(signal -> log.info("最终结束: {}", signal))
    .subscribe();

4.5 错误处理操作符

操作符

作用

onErrorReturn

发生错误时返回一个默认值

onErrorResume

发生错误时切换到一个备用的 Publisher

onErrorMap

转换异常类型

retry

出错后重试(有限次数)

retryWhen

更灵活的重试策略(配合 Retry 规范)

java

Mono.just("123")
    .map(Integer::parseInt)
    .onErrorReturn(0)                      // 解析失败返回0
    .onErrorResume(e -> Mono.just(-1))     // 或者返回备用 Mono
    .retry(3)                              // 出错重试3次
    .subscribe();

4.6 条件与默认值

java

Mono.empty().defaultIfEmpty("默认值");           // 空时返回默认值
Mono.just("A").switchIfEmpty(Mono.just("B"));   // 空时切换备选
flux.hasElement(5);                             // 返回 Mono<Boolean>
flux.any(i -> i > 0);                           // 任一满足条件

4.7 聚合与收集

java

flux.count();                    // Mono<Long> 元素个数
flux.reduce((a,b) -> a+b);       // Mono<T> 归约
flux.collectList();              // Mono<List<T>> 收集为列表
flux.collectMap(i -> i % 2);     // Mono<Map>

4.8 阻塞操作(仅测试或非响应式边界)

java

String result = mono.block();           // 阻塞等待(不推荐在生产使用)
User user = mono.block(Duration.ofSeconds(1));
List<Integer> list = flux.collectList().block();

五、背压(Backpressure)支持

Mono/Flux 天然支持背压。下游可以通过 subscription.request(n) 告知上游需要多少数据。

java

Flux.range(1, 1000)
    .subscribe(new Subscriber<Integer>() {
        private Subscription subscription;
        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            s.request(5);   // 只请求5个
        }
        @Override
        public void onNext(Integer i) { System.out.println(i); }
        @Override
        public void onError(Throwable t) {}
        @Override
        public void onComplete() {}
    });

使用操作符简化背压控制:

java

flux.limitRate(10);           // 每次请求10个
flux.limitRequest(100);       // 最多只取100个

六、线程调度(Schedulers)

默认情况下,操作链在 subscribe() 调用的线程上执行。可通过 subscribeOnpublishOn 切换线程池。

调度器

用途

Schedulers.immediate()

当前线程

Schedulers.single()

单一线程(可复用)

Schedulers.parallel()

固定大小(=CPU核数),适合 CPU 密集型

Schedulers.boundedElastic()

弹性线程池(有界),适合包装阻塞 IO

Schedulers.newParallel("my")

自定义

java

Mono.fromCallable(() -> blockingOp())
    .subscribeOn(Schedulers.boundedElastic())   // 源头在弹性线程执行
    .publishOn(Schedulers.parallel())           // 后续操作切换到并行线程
    .map(x -> x + " processed")
    .subscribe();

subscribeOn 影响上游(直到遇到 publishOn),publishOn 影响下游。


七、测试响应式流(StepVerifier)

Project Reactor 提供 StepVerifier 用于测试。

java

@Test
public void testFlux() {
    Flux<Integer> flux = Flux.just(1,2,3).map(i -> i * 10);
    
    StepVerifier.create(flux)
        .expectNext(10, 20, 30)
        .verifyComplete();
}

@Test
public void testError() {
    Mono<String> mono = Mono.error(new RuntimeException("Boom"));
    
    StepVerifier.create(mono)
        .expectErrorMatches(e -> e.getMessage().equals("Boom"))
        .verify();
}

八、Mono 与 Flux 的对比总结

特性

Mono<T>

Flux<T>

元素数量

0 或 1

0 到 N(包括无限)

是否可为空

是(empty)

是(empty)

常见返回场景

单条查询结果、REST 响应

列表、流式数据、SSE

特有操作符

defaultIfEmpty, switchIfEmpty, then, and

concatMap, merge, window, buffer

从 Flux 转换

flux.single(), flux.next()

mono.flux() 或直接使用


九、与 Future / CompletableFuture 对比

方面

CompletableFuture<T>

Mono<T> / Flux<T>

元素数量

单值

0/1 或多个

背压

不支持

完全支持

惰性

急切(提交后立即执行)

惰性(只有 subscribe 才执行)

操作符丰富度

有限(thenApply, thenCompose 等)

极其丰富(500+ 操作符)

错误处理

exceptionally, handle

onErrorResume, retry, 更细粒度

流式处理

困难

原生支持(flatMap, concatMap, buffer...)


十、最佳实践提示

  1. 避免在响应式链中调用阻塞方法(如 Thread.sleep()ResultSet.next())。必须阻塞时用 subscribeOn(Schedulers.boundedElastic())

  2. 不要共享可变状态,除非使用 Synchronized 或原子类,但最好用不可变对象。

  3. 善用 defer 确保每次订阅都重新计算源。

  4. 始终处理错误,至少在最外层使用 onErrorResume 或全局处理。

  5. 使用 checkpoint()log() 辅助调试栈追踪。


通过掌握 MonoFlux 的创建、操作符、背压、调度和测试,你就能在 Spring WebFlux 等响应式框架中高效开发非阻塞、高吞吐量的应用。

fromCallablesubscribeOn 的关系:不会冲突,不会重复创建线程

先给出结论:不会冲突,也不会重复创建线程
原因在于:fromCallable 本身 并不创建任何线程,它只是将一个 Callable 包装成一个响应式 Mono;真正决定在哪个线程上执行 Callable 逻辑的是 subscribeOn(以及最终的 subscribe


一、Mono.fromCallable 做了什么?

java

Mono<String> mono = Mono.fromCallable(() -> {
    // 这里是你的业务逻辑
    return "Hello";
});
  • fromCallable 接收一个 Callable,返回一个 Mono

  • 此时没有线程被创建Callable 内的代码 不会执行(Mono 是惰性的)。

  • 它仅仅是一个声明:“当有人订阅这个 Mono 时,请在某个线程上执行这个 Callable,并将其结果作为数据元素发射”


二、subscribeOn 做了什么?

java

mono.subscribeOn(Schedulers.boundedElastic())
    .subscribe(...);
  • subscribeOn 是一个 调度操作符,它告诉 Reactor:“当订阅发生时,请将整个上游(包括 fromCallable 中的 Callable)的执行,切换到指定的调度器(线程池)上运行”

  • 它不会创建新的线程池(你传入的 Schedulers.boundedElastic() 本身就是一个已存在的共享线程池),也不会在 subscribeOn 调用时立即创建任何线程。


三、完整执行流程(无重复线程)

java

Mono.fromCallable(() -> {
        System.out.println("执行线程: " + Thread.currentThread().getName());
        return "result";
    })
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(System.out::println);

执行步骤

  1. 代码执行到 subscribe(...) 时,订阅动作开始。

  2. subscribeOn 拦截订阅信号,将整个上游(fromCallable 部分)的任务提交给 boundedElastic 线程池。

  3. boundedElastic 从它的线程池中 取出一个空闲线程(例如名为 boundedElastic-1),在该线程上执行 Callable 中的逻辑。

  4. 打印:执行线程: boundedElastic-1

  5. 结果 "result" 被发射,并在同一线程(或下游可能切换)上传递给 subscribe 的消费者。

整个过程只使用了 boundedElastic 线程池中的一个线程fromCallable 没有自己创建任何线程。所以不存在“重复创建线程”的问题。


四、误解来源:认为 Callable 会自带线程

很多初学者误以为 Callable 本身就关联了线程(比如 ExecutorService.submit(Callable) 会创建/分配线程)。但 Mono.fromCallable 只是将 Callable 包装成声明式的数据源,并不提交给任何执行器。它仅仅是一个“待执行的代码块”,必须由后续的调度器(或默认的立即执行)来驱动。


五、如果不用 subscribeOn 会怎样?

java

Mono.fromCallable(() -> {
    System.out.println(Thread.currentThread().getName());
    return "result";
}).subscribe();
  • 没有 subscribeOn,那么 Callable 会在 调用 subscribe() 的线程 上执行。

  • 例如在 WebFlux 请求处理线程(Netty 事件循环线程)中执行,如果 Callable 内有阻塞操作,会阻塞事件循环。


六、fromCallable + subscribeOn 的最佳实践

  • 不要fromCallable 内部再手动创建线程或提交到其他线程池,那才是真正的重复线程。

  • 只需subscribeOn 指定一个合适的调度器(如 boundedElastic 用于阻塞操作)。

  • 如果你既在 Callable 内部用了 ExecutorService,又在外面用 subscribeOn,才会造成线程冗余,但那属于编码错误,不是框架的冲突。

正确示例(仅使用 Reactor 调度器):

java

Mono.fromCallable(() -> {
    // 阻塞 IO 操作
    return jdbcTemplate.queryForList("SELECT * FROM user");
})
.subscribeOn(Schedulers.boundedElastic())
.map(...)
.subscribe(...);

错误示例(手动多线程,画蛇添足):

java

Mono.fromCallable(() -> {
    ExecutorService es = Executors.newSingleThreadExecutor();
    Future<String> future = es.submit(() -> "bad");
    return future.get();   // 又在外面 subscribeOn,浪费线程
})
.subscribeOn(Schedulers.boundedElastic())  // 外面的线程池已经足够
.subscribe(...);

七、总结

问题

答案

fromCallable 会自己创建线程吗?

不会,它只是包装代码,不执行。

subscribeOn 会重复创建线程吗?

不会,它只是将执行切换到指定的线程池(复用池中线程)。

两者一起用会冲突吗?

不会subscribeOn 正好控制了 fromCallable 逻辑的执行线程,是推荐的组合用法。

会不会创建两个线程?

不会,只会使用 subscribeOn 指定的线程池中的一个线程。

一句话总结:fromCallable 负责提供任务逻辑,subscribeOn 负责指定执行任务的线程池,两者各司其职,完美配合,没有重复线程。


评论