场景:数据库1000万条数据,可以根据批次分组成4000个小组,然后通过多线程,去处理4000个小组,每个小组处理一批数据,将数据同步到redis
线程池配置:
* 1、corePoolSize=10
* 2、maxPoolSize=128
* 3、queueCapacity=1000// 4000个分组
List<String> groupList = new ArrayList<>();
List<Future<Integer>> futureList = new ArrayList<>();
for (int groupId = 0; groupId < 4000; groupId++) {
Future<Integer> submit = ThreadUtils.POOL_EXECUTOR.submit(new SyncData2RedisCallable<>(groupId));
futureList.add(submit);
}
//SyncData2RedisCallable 内容
int idLast = 0;
List<DbEntity> dbList = new ArrayList<>();
while (true) {
//根据id分页
dbList = dbService.queryList(idLast, 4000);
if (CollUtil.isEmpty(dbList)) {
break;
}
idLast = dbList.get(dbList.size() - 1).getId();
//4000 分批 -》 1000*4
List<List<DbEntity>> dbListList = ListUtil.partition(dbList, 1000);
//方案0 不用线程
for (List<IotCardBaseInfoEntity> batch : dbListList) {
this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);
//已处理数量
result.setNum(result.getNum() + batch.size());
}
//方案1 自定义线程池
// ExecutorService executor = Executors.newFixedThreadPool(4);
// int doNum = processData(dbListList,executor);
// result.setNum(result.getNum() + doNum);
//方案2、异步非堵塞Flux
// int doNum = processReactive(dbListList);
// result.setNum(result.getNum() + doNum);
//方案3、并行流处理
// int subTotal = dbListList.parallelStream()
// .mapToInt(batch -> {
// if(CollUtil.isEmpty(batch)){
// return 0;
// }
// this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);
// return batch.size();
// })
// .sum();
// result.setNum(result.getNum() + subTotal);
if (dbList.size() < NumFilterConstant.DB_MAX_BATCH_NUM) {
dbList.clear();
break;
} else{
dbList.clear();
}
}
public int processData(List<List<DbEntity>> dbListList, ExecutorService executor) {
List<CompletableFuture<Integer>> futures = dbListList.stream()
.map(batch -> CompletableFuture.supplyAsync(() -> {
try {
if(CollUtil.isEmpty(batch)){
return 0;
}
this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);
return batch.size();
} catch (Exception e) {
log.error("处理批次数据失败", e);
return 0;
}
}, null == executor?ThreadUtils.POOL_EXECUTOR:executor))
.collect(Collectors.toList());
// 使用allOf等待所有任务完成,然后汇总结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
return allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.mapToInt(Integer::intValue)
.sum()
).join();
}
public int processReactive(List<List<DbEntity>> dbListList) {
Integer num = Flux.fromIterable(dbListList)
.flatMap(batch -> Mono.fromCallable(() -> {
if(CollUtil.isEmpty(batch)){
return 0;
}
this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);
return batch.size();
})
// 专为阻塞操作设计的调度器
.subscribeOn(Schedulers.boundedElastic())
// 控制并发度
, 4)
.reduce(0, Integer::sum)
.onErrorReturn(0)
.block();
return num;
}上面四种方式存在问题:
1、不使用线程,循环同步,耗时
2、使用自定义线程池,需要注意线程池的配置,用完后要销毁,线程池大小配置,因为是在外层循环中,所以线程池大小配置,不能太大,否则容易导致资源耗尽
3、使用异步非堵塞Flux,内部其实用的也是一个内部线程池,所以线程池配置,不能太大,否则容易导致资源耗尽
Flux内部线程池配置: .subscribeOn(Schedulers.boundedElastic()):
Schedulers.immediate()无线程的线程池,类似no-op的一个封装类,执行Runnable时会立即用当前线程执行,只是为了某种占位时用。也就是使用当前主线程来执行
Schedulers.single()单线程的线程池,只会有一个线程在执行,可以用于一些低优先级场景的情况下。If you want a per-call dedicated thread, use Schedulers.newSingle() for each call.
Schedulers.elastic()无界的线程池,可以创建无数多的线程,只要有任务需要执行且没有空闲的线程时,就会创建新线程,类似于jdk中的Executors.newCachedThreadPool()方式。由于,可创建的线程数不受控制,当任务量大的时候,会导致系统资源被耗尽,进而引起崩溃。
Schedulers.boundedElastic()有界的线程池,是对于Schedulers.elastic()的改进,在保证能尽可能创建线程的同时,又限定了总数,减少系统崩溃的可能性。默认最多可创建CPU核数的10倍,同时当线程空闲的时候会被回收(默认60s),在线程数达到极限后,单个线程最多会让(默认)100000个任务进入队列,所以总共可容纳的任务数是线程数100000,基本上已经可以容纳足够的任务数了,效果基本等同于elastic。因此,boundedElastic()相对于elastic()更可控,所以通常推荐使用boundedElastic()。
Schedulers.parallel()固定的线程池,用于执行无阻塞的任务,因此线程数默认等同于CPU核数,类似于类似于jdk中的Executors.newFixedThreadPool()。4、并行流处理
parallelStream和completablefuture 并行流,内部使用的是一个固定大小的公用线程池,ForkJoinPool.commonPool(); 因为是公用的,所以容易耗尽,如果这个线程池耗尽,则会阻塞,导致程序卡顿。
注意: 外层也是一个多线程处理任务,线程内部使用parallelStream,其内部使用一个公用的线程池,如果这个线程池耗尽,则会导致外层整个循环阻塞,导致程序卡顿。可以使用自定义线程池,来处理parallelStream,避免线程池耗尽,导致外层循环阻塞。但是自定义线程池,仍旧需要注意方式2存在的问题。
本次问题:
问题1:外层线程池配置不合理,queueCapacity=5000,配置过大,因为总共4000个分组,达不到5000个堵塞队列,线程池不会创建新的线程去处理任务,仍旧使用的是corePoolSize=10个线程去处理任务,maxPoolSize=128没生效,线程池不会扩容。解决:将queueCapacity=1000,超过1000,就创建新线程处理任务,线程池会扩容。
问题2:外层多线程循环处理,线程池内部使用了parallelStream,内部使用一个公用的线程池,如果这个线程池耗尽,则会导致外层整个循环阻塞,导致程序卡顿。解决:线程内部,使用单线程处理问题(方式1),或者使用自定义线程池,或者使用Flux(仍旧需要注意线程池配置,不能太大,否则容易导致资源耗尽)